-
-
Notifications
You must be signed in to change notification settings - Fork 148
Protobuf ingestion #1391
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Protobuf ingestion #1391
Conversation
""" WalkthroughSupport for ingesting OpenTelemetry logs, metrics, and traces in both JSON and Protobuf formats was added. The ingestion handlers now branch on the "Content-Type" header, using new flattening functions for Protobuf payloads. Dependencies were updated to enable Protobuf support, and relevant flattening utilities were implemented for each OTEL data type. Common validation and stream setup logic were centralized in helper functions for consistency. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Server
participant FlatteningUtil
Client->>Server: POST /ingest/{logs,metrics,traces} (Content-Type: application/json or application/x-protobuf)
Server->>Server: Extract stream name and validate log source
Server->>Server: Check Content-Type header
alt Content-Type is JSON
Server->>FlatteningUtil: flatten_and_push_JSON()
else Content-Type is Protobuf
Server->>FlatteningUtil: decode_and_flatten_protobuf()
FlatteningUtil->>FlatteningUtil: push_logs(flattened_records)
else Invalid Content-Type
Server->>Client: Return error response
end
Server->>Client: Return HTTP response
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Suggested labels
Poem
Note ⚡️ Unit Test Generation is now available in beta!Learn more here, or try it out under "Finishing Touches" below. 📜 Recent review detailsConfiguration used: CodeRabbit UI ⛔ Files ignored due to path filters (1)
📒 Files selected for processing (7)
🚧 Files skipped from review as they are similar to previous changes (5)
🧰 Additional context used🧠 Learnings (2)📓 Common learnings
src/handlers/http/ingest.rs (5)Learnt from: nikhilsinhaparseable Learnt from: nikhilsinhaparseable Learnt from: de-sh Learnt from: nikhilsinhaparseable Learnt from: de-sh ⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
🔇 Additional comments (9)
✨ Finishing Touches
🧪 Generate unit tests
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lock
is excluded by!**/*.lock
📒 Files selected for processing (6)
Cargo.toml
(3 hunks)src/handlers/http/ingest.rs
(8 hunks)src/handlers/http/modal/utils/ingest_utils.rs
(1 hunks)src/otel/logs.rs
(2 hunks)src/otel/metrics.rs
(2 hunks)src/otel/traces.rs
(4 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1263
File: src/handlers/http/ingest.rs:300-310
Timestamp: 2025-03-26T06:44:53.362Z
Learning: In Parseable, every stream is always associated with a log_source - no stream can exist without a log_source. For otel-traces and otel-metrics, strict restrictions are implemented where ingestion is rejected if a stream already has a different log_source format. However, regular logs from multiple log_sources can coexist in a single stream.
src/handlers/http/modal/utils/ingest_utils.rs (1)
Learnt from: nikhilsinhaparseable
PR: #1346
File: src/handlers/http/health_check.rs:81-90
Timestamp: 2025-06-16T02:04:58.990Z
Learning: In the shutdown function in src/handlers/http/health_check.rs, the design approach is to log errors from sync operations rather than propagate them. This is intentional because the shutdown function is called on SIGTERM/SIGINT signals, and the goal is to perform best-effort cleanup (syncing pending files to object storage) while allowing the shutdown to proceed regardless of sync failures. Logging provides debugging information without blocking the shutdown process.
src/handlers/http/ingest.rs (2)
Learnt from: nikhilsinhaparseable
PR: #1263
File: src/handlers/http/ingest.rs:300-310
Timestamp: 2025-03-26T06:44:53.362Z
Learning: In Parseable, every stream is always associated with a log_source - no stream can exist without a log_source. For otel-traces and otel-metrics, strict restrictions are implemented where ingestion is rejected if a stream already has a different log_source format. However, regular logs from multiple log_sources can coexist in a single stream.
Learnt from: de-sh
PR: #1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern !PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)
ensures proper error handling in both modes.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: coverage
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
🔇 Additional comments (16)
Cargo.toml (3)
36-36
: LGTM!Adding the "prost" feature to tonic is necessary for protobuf support and aligns with the PR objective.
79-85
: LGTM!Moving from git dependency to versioned crate (0.30.0) improves stability. The enabled features ("gen-tonic", "with-serde", "logs", "metrics", "trace") are appropriate for the protobuf ingestion functionality.
142-142
: LGTM!Adding prost as a direct dependency is necessary for protobuf message decoding in the application code.
src/handlers/http/modal/utils/ingest_utils.rs (1)
95-95
: LGTM!Making
push_logs
public is appropriate to support the new protobuf ingestion handlers that need to process individual flattened records after protobuf decoding.src/otel/logs.rs (2)
21-21
: LGTM!Import is necessary for the new protobuf flattening functionality.
146-176
: LGTM!The
flatten_otel_protobuf
function correctly implements protobuf support for OTEL logs. The implementation properly:
- Iterates over resource logs in the protobuf message
- Extracts resource attributes and metadata
- Reuses existing
flatten_scope_log
for consistency- Merges resource-level data into individual log records
- Follows the same pattern as the existing
flatten_otel_logs
functionsrc/otel/metrics.rs (2)
18-18
: LGTM!Import is necessary for the new protobuf metrics flattening functionality.
607-661
: LGTM!The
flatten_otel_metrics_protobuf
function correctly implements protobuf support for OTEL metrics. The implementation properly:
- Processes resource metrics from the protobuf message
- Extracts resource and scope-level metadata
- Reuses existing
flatten_metrics_record
for consistency- Merges hierarchical metadata into individual metric records
- Follows the same pattern as
flatten_otel_metrics
but for protobuf formatsrc/otel/traces.rs (4)
18-18
: LGTM!Import is necessary for the new protobuf traces flattening functionality.
343-343
: LGTM!Adding EntityRef import is necessary for the updated test data construction.
784-791
: LGTM!Adding
entity_refs
to the test Resource provides more comprehensive test coverage for the complete Resource structure.
938-997
: LGTM!The
flatten_otel_traces_protobuf
function correctly implements protobuf support for OTEL traces. The implementation properly:
- Processes resource spans from the protobuf message
- Extracts resource and scope-level metadata
- Reuses existing
flatten_span_record
for consistency- Merges hierarchical metadata (scope and resource) into individual span records
- Follows the same pattern as
flatten_otel_traces
but for protobuf formatsrc/handlers/http/ingest.rs (4)
21-21
: LGTM! Import changes support the new Protobuf functionality.The import changes are well-aligned with the new functionality:
- Removing
Json
from the web import since handlers now accept raw bytes- Adding
push_logs
for individual record processing in the Protobuf path- Adding the necessary OTEL flattening functions and Protobuf message types
Also applies to: 32-32, 36-46
165-165
: LGTM! Parameter type change supports both JSON and Protobuf.Changing from
Json<StrictValue>
toweb::Bytes
allows the handler to inspect the raw payload and determine the format based on Content-Type header.
264-264
: LGTM! Parameter type change supports both JSON and Protobuf.Consistent with the logs handler, changing to
web::Bytes
enables content-type based format detection.
351-351
: LGTM! Parameter type change supports both JSON and Protobuf.Consistent with other OTEL handlers, enabling content-type based format detection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
src/handlers/http/ingest.rs (1)
249-290
: Improve Content-Type handling for robustness.The Content-Type handling has several issues that were previously identified and remain unaddressed:
- Direct string equality won't handle parameters like
application/json; charset=utf-8
- No explicit error for unsupported Content-Type values
- Size limit only enforced for Protobuf, not JSON
- Generic error for missing Content-Type header
Apply this improved implementation:
match req .headers() .get("Content-Type") .and_then(|h| h.to_str().ok()) { Some(content_type) => { - if content_type == CONTENT_TYPE_JSON { + if content_type.starts_with(CONTENT_TYPE_JSON) { + // Apply size limit to JSON as well + if body.len() > MAX_EVENT_PAYLOAD_SIZE { + return Err(PostError::Invalid(anyhow::anyhow!( + "JSON payload size {} exceeds maximum allowed size of {} bytes", + body.len(), + MAX_EVENT_PAYLOAD_SIZE + ))); + } flatten_and_push_logs( serde_json::from_slice(&body)?, stream_name, log_source, &p_custom_fields, ) .await?; - } else if content_type == CONTENT_TYPE_PROTOBUF { + } else if content_type.starts_with(CONTENT_TYPE_PROTOBUF) { // 10MB limit if body.len() > MAX_EVENT_PAYLOAD_SIZE { return Err(PostError::Invalid(anyhow::anyhow!( "Protobuf message size {} exceeds maximum allowed size of {} bytes", body.len(), MAX_EVENT_PAYLOAD_SIZE ))); } match decode_protobuf(body) { Ok(decoded) => { for record in flatten_protobuf(&decoded) { push_logs(stream_name, record, log_source, &p_custom_fields).await?; } } Err(e) => { return Err(PostError::Invalid(anyhow::anyhow!( "Failed to decode protobuf message: {}", e ))); } } + } else { + return Err(PostError::Invalid(anyhow::anyhow!( + "Unsupported Content-Type: {}. Expected {} or {}", + content_type, + CONTENT_TYPE_JSON, + CONTENT_TYPE_PROTOBUF + ))); } } None => { - return Err(PostError::Header(ParseHeaderError::InvalidValue)); + return Err(PostError::Invalid(anyhow::anyhow!( + "Missing Content-Type header. Expected {} or {}", + CONTENT_TYPE_JSON, + CONTENT_TYPE_PROTOBUF + ))); } }
🧹 Nitpick comments (3)
src/otel/metrics.rs (2)
503-514
: Consider reducing function parameters for better maintainability.While the generic approach is good, having 8 function parameters makes the function signature complex and harder to maintain. Consider grouping related functions into a trait or struct.
Consider defining a trait to encapsulate the accessor functions:
-#[allow(clippy::too_many_arguments)] -fn process_resource_metrics<T, S, M>( - resource_metrics: &[T], - get_resource: fn(&T) -> Option<&opentelemetry_proto::tonic::resource::v1::Resource>, - get_scope_metrics: fn(&T) -> &[S], - get_schema_url: fn(&T) -> &str, - get_scope: fn(&S) -> Option<&opentelemetry_proto::tonic::common::v1::InstrumentationScope>, - get_scope_schema_url: fn(&S) -> &str, - get_metrics: fn(&S) -> &[M], - get_metric: fn(&M) -> &Metric, -) -> Vec<Value> { +trait ResourceMetricsAccessor<S, M> { + fn get_resource(&self) -> Option<&opentelemetry_proto::tonic::resource::v1::Resource>; + fn get_scope_metrics(&self) -> &[S]; + fn get_schema_url(&self) -> &str; +} + +trait ScopeMetricsAccessor<M> { + fn get_scope(&self) -> Option<&opentelemetry_proto::tonic::common::v1::InstrumentationScope>; + fn get_scope_schema_url(&self) -> &str; + fn get_metrics(&self) -> &[M]; +} + +trait MetricAccessor { + fn get_metric(&self) -> &Metric; +} + +fn process_resource_metrics<T, S, M>( + resource_metrics: &[T], +) -> Vec<Value> +where + T: ResourceMetricsAccessor<S, M>, + S: ScopeMetricsAccessor<M>, + M: MetricAccessor, +{This would make the function calls cleaner and more type-safe.
571-577
: Optimize cloning of resource metadata.The
clone()
operation on line 576 is performed for each metric, which could be inefficient when processing large numbers of metrics. Since resource metadata is the same for all metrics within a resource, consider a more efficient approach.Consider pre-cloning the resource metadata once per resource:
for resource_metric_json in &mut vec_scope_metrics_json { - for (key, value) in &resource_metrics_json { - resource_metric_json.insert(key.clone(), value.clone()); - } - - vec_otel_json.push(Value::Object(resource_metric_json.clone())); + resource_metric_json.extend(resource_metrics_json.clone()); + vec_otel_json.push(Value::Object(std::mem::take(resource_metric_json))); }Alternatively, consider using
Rc
orArc
for shared metadata to avoid repeated cloning.src/handlers/http/ingest.rs (1)
199-225
: Stream compatibility validation logic is correct.The implementation correctly enforces the learned restrictions where OTEL logs can coexist with other log types (except metrics/traces), while OTEL metrics/traces require exact type matching. Consider adding a comment to document these rules for future maintainers.
Add a clarifying comment:
// Validate stream compatibility if let Ok(stream) = PARSEABLE.get_stream(&stream_name) { match log_source { LogSource::OtelLogs => { - // For logs, reject if stream is metrics or traces + // OTEL logs can coexist with other log types but not with OTEL metrics/traces stream .get_log_source() .iter() .find(|&stream_log_source_entry| { stream_log_source_entry.log_source_format != LogSource::OtelTraces && stream_log_source_entry.log_source_format != LogSource::OtelMetrics }) .ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?; } LogSource::OtelMetrics | LogSource::OtelTraces => { - // For metrics/traces, only allow same type + // OTEL metrics/traces require strict type matching - no mixing allowed stream .get_log_source() .iter() .find(|&stream_log_source_entry| { stream_log_source_entry.log_source_format == log_source }) .ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?; } _ => {} } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
src/handlers/http/ingest.rs
(6 hunks)src/handlers/mod.rs
(1 hunks)src/otel/logs.rs
(3 hunks)src/otel/metrics.rs
(6 hunks)src/otel/traces.rs
(4 hunks)
✅ Files skipped from review due to trivial changes (1)
- src/handlers/mod.rs
🚧 Files skipped from review as they are similar to previous changes (2)
- src/otel/logs.rs
- src/otel/traces.rs
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1263
File: src/handlers/http/ingest.rs:300-310
Timestamp: 2025-03-26T06:44:53.362Z
Learning: In Parseable, every stream is always associated with a log_source - no stream can exist without a log_source. For otel-traces and otel-metrics, strict restrictions are implemented where ingestion is rejected if a stream already has a different log_source format. However, regular logs from multiple log_sources can coexist in a single stream.
src/handlers/http/ingest.rs (5)
Learnt from: nikhilsinhaparseable
PR: #1263
File: src/handlers/http/ingest.rs:300-310
Timestamp: 2025-03-26T06:44:53.362Z
Learning: In Parseable, every stream is always associated with a log_source - no stream can exist without a log_source. For otel-traces and otel-metrics, strict restrictions are implemented where ingestion is rejected if a stream already has a different log_source format. However, regular logs from multiple log_sources can coexist in a single stream.
Learnt from: de-sh
PR: parseablehq/parseable#0
File: :0-0
Timestamp: 2025-03-20T15:50:45.435Z
Learning: Pay close attention to code comments for typos and semantic clarity during reviews for the Parseable project.
Learnt from: nikhilsinhaparseable
PR: #1370
File: resources/ingest_demo_data.sh:12-14
Timestamp: 2025-07-14T11:29:22.552Z
Learning: In the Parseable codebase, the base64 encoding for Basic Auth headers in resources/ingest_demo_data.sh works correctly as implemented without needing to strip trailing newlines. The user has validated this works with their server environment.
Learnt from: de-sh
PR: #1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern !PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)
ensures proper error handling in both modes.
Learnt from: nikhilsinhaparseable
PR: #1392
File: src/migration/stream_metadata_migration.rs:303-322
Timestamp: 2025-07-28T17:10:39.395Z
Learning: In Parseable's migration system (src/migration/stream_metadata_migration.rs), each migration function updates the metadata to the current latest format using CURRENT_OBJECT_STORE_VERSION and CURRENT_SCHEMA_VERSION constants, rather than producing incremental versions. For example, v5_v6 function produces v7 format output when these constants are set to "v7", not v6 format.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: coverage
🔇 Additional comments (2)
src/otel/metrics.rs (1)
583-610
: LGTM! Clean refactoring that promotes code reuse.The refactoring successfully extracts common logic while maintaining type safety through closures. Both
flatten_otel_metrics
andflatten_otel_metrics_protobuf
cleanly delegate to the generic helper with appropriate accessors.src/handlers/http/ingest.rs (1)
295-362
: Excellent refactoring of OTEL handlers.The refactoring successfully eliminates code duplication by extracting common logic into
setup_otel_stream
andprocess_otel_content
helpers. All three handlers now follow a consistent pattern and properly support both JSON and Protobuf formats.
94e5022
to
b582d9d
Compare
Summary by CodeRabbit