From 03e67b410a7e7a0d5ea224a69c104b052e45fed3 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sun, 13 Jul 2025 09:05:25 -0700 Subject: [PATCH 1/4] add protobuf support for otel logs --- Cargo.lock | 116 ++++++++++++++---- Cargo.toml | 11 +- src/handlers/http/ingest.rs | 44 +++++-- src/handlers/http/modal/utils/ingest_utils.rs | 2 +- src/otel/logs.rs | 33 +++++ src/otel/traces.rs | 10 +- 6 files changed, 175 insertions(+), 41 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 02a373ca0..619112b81 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -65,7 +65,7 @@ dependencies = [ "mime", "percent-encoding", "pin-project-lite", - "rand", + "rand 0.8.5", "sha1", "smallvec", "tokio", @@ -557,7 +557,7 @@ dependencies = [ "futures", "prost", "prost-types", - "tonic", + "tonic 0.12.3", ] [[package]] @@ -1456,7 +1456,7 @@ dependencies = [ "object_store", "parking_lot", "parquet", - "rand", + "rand 0.8.5", "regex", "sqlparser", "tempfile", @@ -1546,7 +1546,7 @@ dependencies = [ "log", "object_store", "parking_lot", - "rand", + "rand 0.8.5", "tempfile", "url", ] @@ -1607,7 +1607,7 @@ dependencies = [ "itertools 0.14.0", "log", "md-5", - "rand", + "rand 0.8.5", "regex", "sha2", "unicode-segmentation", @@ -3243,7 +3243,7 @@ dependencies = [ "parking_lot", "percent-encoding", "quick-xml", - "rand", + "rand 0.8.5", "reqwest 0.12.12", "ring", "rustls-pemfile 2.2.0", @@ -3311,8 +3311,9 @@ dependencies = [ [[package]] name = "opentelemetry" -version = "0.27.1" -source = "git+https://github.com/parseablehq/opentelemetry-rust?branch=fix-metrics-u64-serialization#7e84c98d75ae16993a37bd5ff75a9768d652fe8f" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aaf416e4cb72756655126f7dd7bb0af49c674f4c1b9903e80c009e0c37e552e6" dependencies = [ "futures-core", "futures-sink", @@ -3324,30 +3325,31 @@ dependencies = [ [[package]] name = "opentelemetry-proto" -version = "0.27.0" -source = "git+https://github.com/parseablehq/opentelemetry-rust?branch=fix-metrics-u64-serialization#7e84c98d75ae16993a37bd5ff75a9768d652fe8f" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e046fd7660710fe5a05e8748e70d9058dc15c94ba914e7c4faa7c728f0e8ddc" dependencies = [ + "base64 0.22.1", "hex", "opentelemetry", "opentelemetry_sdk", "prost", "serde", - "tonic", + "tonic 0.13.1", ] [[package]] name = "opentelemetry_sdk" -version = "0.27.1" -source = "git+https://github.com/parseablehq/opentelemetry-rust?branch=fix-metrics-u64-serialization#7e84c98d75ae16993a37bd5ff75a9768d652fe8f" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11f644aa9e5e31d11896e024305d7e3c98a88884d9f8919dbf37a9991bc47a4b" dependencies = [ - "async-trait", "futures-channel", "futures-executor", "futures-util", - "glob", "opentelemetry", "percent-encoding", - "rand", + "rand 0.9.1", "serde_json", "thiserror 2.0.11", ] @@ -3505,7 +3507,8 @@ dependencies = [ "path-clean", "prometheus", "prometheus-parse", - "rand", + "prost", + "rand 0.8.5", "rdkafka", "regex", "relative-path", @@ -3528,7 +3531,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-util", - "tonic", + "tonic 0.12.3", "tonic-web", "tower-http 0.6.2", "tracing", @@ -3549,7 +3552,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "346f04948ba92c43e8469c1ee6736c7563d71012b17d40745260fe106aac2166" dependencies = [ "base64ct", - "rand_core", + "rand_core 0.6.4", "subtle", ] @@ -3622,7 +3625,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c80231409c20246a13fddb31776fb942c38553c51e871f8cbd687a4cfb5843d" dependencies = [ "phf_shared", - "rand", + "rand 0.8.5", ] [[package]] @@ -3880,7 +3883,7 @@ checksum = "a2fe5ef3495d7d2e377ff17b1a8ce2ee2ec2a18cde8b6ad6619d65d0701c135d" dependencies = [ "bytes", "getrandom 0.2.15", - "rand", + "rand 0.8.5", "ring", "rustc-hash", "rustls 0.23.22", @@ -3922,8 +3925,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", - "rand_chacha", - "rand_core", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + +[[package]] +name = "rand" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fbfd9d094a40bf3ae768db9361049ace4c0e04a4fd6b359518bd7b73a73dd97" +dependencies = [ + "rand_chacha 0.9.0", + "rand_core 0.9.3", ] [[package]] @@ -3933,7 +3946,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core 0.9.3", ] [[package]] @@ -3945,6 +3968,15 @@ dependencies = [ "getrandom 0.2.15", ] +[[package]] +name = "rand_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" +dependencies = [ + "getrandom 0.3.1", +] + [[package]] name = "raw-cpuid" version = "10.7.0" @@ -5161,6 +5193,32 @@ dependencies = [ "zstd", ] +[[package]] +name = "tonic" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9" +dependencies = [ + "async-trait", + "base64 0.22.1", + "bytes", + "http 1.2.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.6.0", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "prost", + "tokio", + "tokio-stream", + "tower 0.5.2", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tonic-web" version = "0.12.3" @@ -5174,7 +5232,7 @@ dependencies = [ "http-body-util", "pin-project", "tokio-stream", - "tonic", + "tonic 0.12.3", "tower-http 0.5.2", "tower-layer", "tower-service", @@ -5192,7 +5250,7 @@ dependencies = [ "indexmap 1.9.3", "pin-project", "pin-project-lite", - "rand", + "rand 0.8.5", "slab", "tokio", "tokio-util", @@ -5209,11 +5267,15 @@ checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" dependencies = [ "futures-core", "futures-util", + "indexmap 2.7.1", "pin-project-lite", + "slab", "sync_wrapper 1.0.2", "tokio", + "tokio-util", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -5349,7 +5411,7 @@ version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f294bff79170ed1c5633812aff1e565c35d993a36e757f9bc0accf5eec4e6045" dependencies = [ - "rand", + "rand 0.8.5", "serde", "web-time", ] diff --git a/Cargo.toml b/Cargo.toml index 2b44af323..512bdf7bb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,7 @@ actix-web-prometheus = { version = "0.1" } actix-web-static-files = "4.0" http = "0.2.7" http-auth-basic = "0.3.3" -tonic = { version = "0.12.3", features = ["tls", "transport", "gzip", "zstd"] } +tonic = { version = "0.12.3", features = ["tls", "transport", "gzip", "zstd", "prost"] } tonic-web = "0.12.3" tower-http = { version = "0.6.1", features = ["cors"] } url = "2.4.0" @@ -76,7 +76,13 @@ tokio-stream = { version = "0.1", features = ["fs"] } tokio-util = { version = "0.7" } # Logging and Metrics -opentelemetry-proto = { git = "https://github.com/parseablehq/opentelemetry-rust", branch = "fix-metrics-u64-serialization" } +opentelemetry-proto = { version = "0.30.0", features = [ + "gen-tonic", + "with-serde", + "logs", + "metrics", + "trace", +] } prometheus = { version = "0.13", features = ["process"] } prometheus-parse = "0.2.5" tracing = "0.1" @@ -133,6 +139,7 @@ xxhash-rust = { version = "0.8", features = ["xxh3"] } futures-core = "0.3.31" tempfile = "3.20.0" lazy_static = "1.4.0" +prost = "0.13.1" [build-dependencies] cargo_toml = "0.21" diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 09ad09fbd..c0c9cfeec 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -18,7 +18,7 @@ use std::collections::{HashMap, HashSet}; -use actix_web::web::{Json, Path}; +use actix_web::web::{self, Json, Path}; use actix_web::{HttpRequest, HttpResponse, http::header::ContentType}; use arrow_array::RecordBatch; use bytes::Bytes; @@ -29,18 +29,21 @@ use crate::event::error::EventError; use crate::event::format::known_schema::{self, KNOWN_SCHEMA_LIST}; use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry}; use crate::event::{self, FORMAT_KEY, USER_AGENT_KEY}; +use crate::handlers::http::modal::utils::ingest_utils::push_logs; use crate::handlers::{ EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY, TELEMETRY_TYPE_KEY, TelemetryType, }; use crate::metadata::SchemaVersion; use crate::option::Mode; -use crate::otel::logs::OTEL_LOG_KNOWN_FIELD_LIST; +use crate::otel::logs::{OTEL_LOG_KNOWN_FIELD_LIST, flatten_otel_protobuf}; use crate::otel::metrics::OTEL_METRICS_KNOWN_FIELD_LIST; use crate::otel::traces::OTEL_TRACES_KNOWN_FIELD_LIST; use crate::parseable::{PARSEABLE, StreamNotFound}; use crate::storage::{ObjectStorageError, StreamType}; use crate::utils::header_parsing::ParseHeaderError; use crate::utils::json::{flatten::JsonFlattenError, strict::StrictValue}; +use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; +use prost::Message; use super::logstream::error::{CreateStreamError, StreamError}; use super::modal::utils::ingest_utils::{flatten_and_push_logs, get_custom_fields_from_header}; @@ -166,7 +169,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< // creates if stream does not exist pub async fn handle_otel_logs_ingestion( req: HttpRequest, - Json(json): Json, + body: web::Bytes, ) -> Result { let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingStreamName)); @@ -217,14 +220,35 @@ pub async fn handle_otel_logs_ingestion( .await?; let p_custom_fields = get_custom_fields_from_header(&req); + match req.headers().get("Content-Type") { + Some(content_type) => { + if content_type == "application/json" { + flatten_and_push_logs( + serde_json::from_slice(&body)?, + &stream_name, + &log_source, + &p_custom_fields, + ) + .await?; + } - flatten_and_push_logs( - json.into_inner(), - &stream_name, - &log_source, - &p_custom_fields, - ) - .await?; + if content_type == "application/x-protobuf" { + match ExportLogsServiceRequest::decode(body) { + Ok(json) => { + for record in flatten_otel_protobuf(&json) { + push_logs(&stream_name, record, &log_source, &p_custom_fields).await?; + } + } + Err(e) => { + return Err(PostError::Invalid(e.into())); + } + } + } + } + None => { + return Err(PostError::Header(ParseHeaderError::InvalidValue)); + } + } Ok(HttpResponse::Ok().finish()) } diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index bffd7f882..2e84cbbf9 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -92,7 +92,7 @@ pub async fn flatten_and_push_logs( Ok(()) } -async fn push_logs( +pub async fn push_logs( stream_name: &str, json: Value, log_source: &LogSource, diff --git a/src/otel/logs.rs b/src/otel/logs.rs index f301869d2..582e1b594 100644 --- a/src/otel/logs.rs +++ b/src/otel/logs.rs @@ -18,6 +18,7 @@ use super::otel_utils::collect_json_from_values; use super::otel_utils::convert_epoch_nano_to_timestamp; use super::otel_utils::insert_attributes; +use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::logs::v1::LogRecord; use opentelemetry_proto::tonic::logs::v1::LogsData; use opentelemetry_proto::tonic::logs::v1::ScopeLogs; @@ -142,6 +143,38 @@ fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec> { vec_scope_log_json } +pub fn flatten_otel_protobuf(message: &ExportLogsServiceRequest) -> Vec { + let mut vec_otel_json = Vec::new(); + for resource_logs in &message.resource_logs { + let mut resource_log_json = Map::new(); + if let Some(resource) = &resource_logs.resource { + insert_attributes(&mut resource_log_json, &resource.attributes); + resource_log_json.insert( + "resource_dropped_attributes_count".to_string(), + Value::Number(resource.dropped_attributes_count.into()), + ); + } + + let mut vec_resource_logs_json = Vec::new(); + for scope_log in &resource_logs.scope_logs { + vec_resource_logs_json.extend(flatten_scope_log(scope_log)); + } + + resource_log_json.insert( + "schema_url".to_string(), + Value::String(resource_logs.schema_url.clone()), + ); + + for resource_logs_json in &mut vec_resource_logs_json { + resource_logs_json.extend(resource_log_json.clone()); + + vec_otel_json.push(Value::Object(resource_logs_json.clone())); + } + } + + vec_otel_json +} + /// this function performs the custom flattening of the otel logs /// and returns a `Vec` of `Value::Object` of the flattened json pub fn flatten_otel_logs(message: &LogsData) -> Vec { diff --git a/src/otel/traces.rs b/src/otel/traces.rs index 1d209e97b..cf71a2e28 100644 --- a/src/otel/traces.rs +++ b/src/otel/traces.rs @@ -339,7 +339,7 @@ fn flatten_span_record(span_record: &Span) -> Vec> { #[cfg(test)] mod tests { use super::*; - use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue}; + use opentelemetry_proto::tonic::common::v1::{AnyValue, EntityRef, KeyValue}; use opentelemetry_proto::tonic::resource::v1::Resource; use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, TracesData}; @@ -780,6 +780,14 @@ mod tests { }), }], dropped_attributes_count: 0, + entity_refs: vec![ + EntityRef{ + schema_url: "https://opentelemetry.io/schemas/1.21.0".to_string(), + r#type: "service".to_string(), + id_keys: vec!["service.name".to_string()], + description_keys: vec!["service.name".to_string()], + } + ] }), scope_spans: vec![ScopeSpans { scope: Some(opentelemetry_proto::tonic::common::v1::InstrumentationScope { From ce66c19de77a82acaaa71dcecab451348d7bbf08 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 28 Jul 2025 08:04:12 -0700 Subject: [PATCH 2/4] add protobuf support for otel metrics and traces --- src/handlers/http/ingest.rs | 95 +++++++++++++++++++++++++++++-------- src/otel/metrics.rs | 58 ++++++++++++++++++++++ src/otel/traces.rs | 63 ++++++++++++++++++++++++ 3 files changed, 197 insertions(+), 19 deletions(-) diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index c0c9cfeec..702388f50 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -36,13 +36,15 @@ use crate::handlers::{ use crate::metadata::SchemaVersion; use crate::option::Mode; use crate::otel::logs::{OTEL_LOG_KNOWN_FIELD_LIST, flatten_otel_protobuf}; -use crate::otel::metrics::OTEL_METRICS_KNOWN_FIELD_LIST; -use crate::otel::traces::OTEL_TRACES_KNOWN_FIELD_LIST; +use crate::otel::metrics::{OTEL_METRICS_KNOWN_FIELD_LIST, flatten_otel_metrics_protobuf}; +use crate::otel::traces::{OTEL_TRACES_KNOWN_FIELD_LIST, flatten_otel_traces_protobuf}; use crate::parseable::{PARSEABLE, StreamNotFound}; use crate::storage::{ObjectStorageError, StreamType}; use crate::utils::header_parsing::ParseHeaderError; use crate::utils::json::{flatten::JsonFlattenError, strict::StrictValue}; use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; +use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; +use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use prost::Message; use super::logstream::error::{CreateStreamError, StreamError}; @@ -233,6 +235,14 @@ pub async fn handle_otel_logs_ingestion( } if content_type == "application/x-protobuf" { + const MAX_PROTOBUF_SIZE: usize = 10 * 1024 * 1024; // 10MB limit + if body.len() > MAX_PROTOBUF_SIZE { + return Err(PostError::Invalid(anyhow::anyhow!( + "Protobuf message size {} exceeds maximum allowed size of {} bytes", + body.len(), + MAX_PROTOBUF_SIZE + ))); + } match ExportLogsServiceRequest::decode(body) { Ok(json) => { for record in flatten_otel_protobuf(&json) { @@ -240,7 +250,10 @@ pub async fn handle_otel_logs_ingestion( } } Err(e) => { - return Err(PostError::Invalid(e.into())); + return Err(PostError::Invalid(anyhow::anyhow!( + "Failed to decode protobuf message: {}", + e + ))); } } } @@ -258,7 +271,7 @@ pub async fn handle_otel_logs_ingestion( // creates if stream does not exist pub async fn handle_otel_metrics_ingestion( req: HttpRequest, - Json(json): Json, + body: web::Bytes, ) -> Result { let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingStreamName)); @@ -308,13 +321,35 @@ pub async fn handle_otel_metrics_ingestion( let p_custom_fields = get_custom_fields_from_header(&req); - flatten_and_push_logs( - json.into_inner(), - &stream_name, - &log_source, - &p_custom_fields, - ) - .await?; + match req.headers().get("Content-Type") { + Some(content_type) => { + if content_type == "application/json" { + flatten_and_push_logs( + serde_json::from_slice(&body)?, + &stream_name, + &log_source, + &p_custom_fields, + ) + .await?; + } + + if content_type == "application/x-protobuf" { + match ExportMetricsServiceRequest::decode(body) { + Ok(json) => { + for record in flatten_otel_metrics_protobuf(&json) { + push_logs(&stream_name, record, &log_source, &p_custom_fields).await?; + } + } + Err(e) => { + return Err(PostError::Invalid(e.into())); + } + } + } + } + None => { + return Err(PostError::Header(ParseHeaderError::InvalidValue)); + } + } Ok(HttpResponse::Ok().finish()) } @@ -324,7 +359,7 @@ pub async fn handle_otel_metrics_ingestion( // creates if stream does not exist pub async fn handle_otel_traces_ingestion( req: HttpRequest, - Json(json): Json, + body: web::Bytes, ) -> Result { let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingStreamName)); @@ -375,13 +410,35 @@ pub async fn handle_otel_traces_ingestion( let p_custom_fields = get_custom_fields_from_header(&req); - flatten_and_push_logs( - json.into_inner(), - &stream_name, - &log_source, - &p_custom_fields, - ) - .await?; + match req.headers().get("Content-Type") { + Some(content_type) => { + if content_type == "application/json" { + flatten_and_push_logs( + serde_json::from_slice(&body)?, + &stream_name, + &log_source, + &p_custom_fields, + ) + .await?; + } + + if content_type == "application/x-protobuf" { + match ExportTraceServiceRequest::decode(body) { + Ok(json) => { + for record in flatten_otel_traces_protobuf(&json) { + push_logs(&stream_name, record, &log_source, &p_custom_fields).await?; + } + } + Err(e) => { + return Err(PostError::Invalid(e.into())); + } + } + } + } + None => { + return Err(PostError::Header(ParseHeaderError::InvalidValue)); + } + } Ok(HttpResponse::Ok().finish()) } diff --git a/src/otel/metrics.rs b/src/otel/metrics.rs index 824b3a10b..537e9f43b 100644 --- a/src/otel/metrics.rs +++ b/src/otel/metrics.rs @@ -15,6 +15,7 @@ * along with this program. If not, see . * */ +use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value as NumberDataPointValue; use opentelemetry_proto::tonic::metrics::v1::{ Exemplar, ExponentialHistogram, Gauge, Histogram, Metric, MetricsData, NumberDataPoint, Sum, @@ -601,3 +602,60 @@ fn flatten_data_point_flags(flags: u32) -> Map { ); data_point_flags_json } + +/// Flattens OpenTelemetry metrics from protobuf format +pub fn flatten_otel_metrics_protobuf(message: &ExportMetricsServiceRequest) -> Vec { + let mut vec_otel_json = Vec::new(); + for resource_metrics in &message.resource_metrics { + let mut resource_metrics_json = Map::new(); + if let Some(resource) = &resource_metrics.resource { + insert_attributes(&mut resource_metrics_json, &resource.attributes); + resource_metrics_json.insert( + "resource_dropped_attributes_count".to_string(), + Value::Number(resource.dropped_attributes_count.into()), + ); + } + + let mut vec_resource_metrics_json = Vec::new(); + for scope_metrics in &resource_metrics.scope_metrics { + // scope-level metadata + let mut scope_metrics_json = Map::new(); + if let Some(scope) = &scope_metrics.scope { + scope_metrics_json + .insert("scope_name".to_string(), Value::String(scope.name.clone())); + scope_metrics_json.insert( + "scope_version".to_string(), + Value::String(scope.version.clone()), + ); + insert_attributes(&mut scope_metrics_json, &scope.attributes); + scope_metrics_json.insert( + "scope_dropped_attributes_count".to_string(), + Value::Number(scope.dropped_attributes_count.into()), + ); + } + scope_metrics_json.insert( + "scope_schema_url".to_string(), + Value::String(scope_metrics.schema_url.clone()), + ); + + for metric in &scope_metrics.metrics { + vec_resource_metrics_json.extend(flatten_metrics_record(metric)); + } + + for resource_metrics_json_item in &mut vec_resource_metrics_json { + resource_metrics_json_item.extend(scope_metrics_json.clone()); + } + } + + resource_metrics_json.insert( + "resource_schema_url".to_string(), + Value::String(resource_metrics.schema_url.clone()), + ); + + for resource_metrics_json_item in &mut vec_resource_metrics_json { + resource_metrics_json_item.extend(resource_metrics_json.clone()); + vec_otel_json.push(Value::Object(resource_metrics_json_item.clone())); + } + } + vec_otel_json +} diff --git a/src/otel/traces.rs b/src/otel/traces.rs index cf71a2e28..05e9dcfc8 100644 --- a/src/otel/traces.rs +++ b/src/otel/traces.rs @@ -15,6 +15,7 @@ * along with this program. If not, see . * */ +use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use opentelemetry_proto::tonic::trace::v1::ScopeSpans; use opentelemetry_proto::tonic::trace::v1::Span; use opentelemetry_proto::tonic::trace::v1::Status; @@ -932,3 +933,65 @@ mod tests { } } } + +/// Flattens OpenTelemetry traces from protobuf format +pub fn flatten_otel_traces_protobuf(message: &ExportTraceServiceRequest) -> Vec { + let mut vec_otel_json = Vec::new(); + for resource_spans in &message.resource_spans { + let mut resource_spans_json = Map::new(); + if let Some(resource) = &resource_spans.resource { + insert_attributes(&mut resource_spans_json, &resource.attributes); + resource_spans_json.insert( + "resource_dropped_attributes_count".to_string(), + Value::Number(resource.dropped_attributes_count.into()), + ); + } + + let mut vec_resource_spans_json: Vec> = Vec::new(); + for scope_spans in &resource_spans.scope_spans { + // 1) collect all span JSON under this scope + let mut scope_spans_json = Vec::new(); + for span in &scope_spans.spans { + scope_spans_json.extend(flatten_span_record(span)); + } + + // 2) build a Map of scope‐level fields + let mut scope_span_json = Map::new(); + if let Some(scope) = &scope_spans.scope { + scope_span_json.insert("scope_name".to_string(), Value::String(scope.name.clone())); + scope_span_json.insert( + "scope_version".to_string(), + Value::String(scope.version.clone()), + ); + insert_attributes(&mut scope_span_json, &scope.attributes); + scope_span_json.insert( + "scope_dropped_attributes_count".to_string(), + Value::Number(scope.dropped_attributes_count.into()), + ); + } + scope_span_json.insert( + "scope_schema_url".to_string(), + Value::String(scope_spans.schema_url.clone()), + ); + + // 3) merge scope fields into each span record + for span_json in &mut scope_spans_json { + span_json.extend(scope_span_json.clone()); + } + + // 4) append to the resource‐level accumulator + vec_resource_spans_json.extend(scope_spans_json); + } + + resource_spans_json.insert( + "resource_schema_url".to_string(), + Value::String(resource_spans.schema_url.clone()), + ); + + for resource_spans_json_item in &mut vec_resource_spans_json { + resource_spans_json_item.extend(resource_spans_json.clone()); + vec_otel_json.push(Value::Object(resource_spans_json_item.clone())); + } + } + vec_otel_json +} From 6f21cba17d67d3b7c8cd5211acd2065074ab56d8 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 28 Jul 2025 22:24:22 -0700 Subject: [PATCH 3/4] refactor protobuf ingestion code --- src/handlers/http/ingest.rs | 334 +++++++++++++++--------------------- src/handlers/mod.rs | 4 + src/otel/logs.rs | 69 ++++---- src/otel/metrics.rs | 124 ++++++------- src/otel/traces.rs | 114 +++++------- 5 files changed, 273 insertions(+), 372 deletions(-) diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 702388f50..0daa8687c 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -29,9 +29,11 @@ use crate::event::error::EventError; use crate::event::format::known_schema::{self, KNOWN_SCHEMA_LIST}; use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry}; use crate::event::{self, FORMAT_KEY, USER_AGENT_KEY}; +use crate::handlers::http::MAX_EVENT_PAYLOAD_SIZE; use crate::handlers::http::modal::utils::ingest_utils::push_logs; use crate::handlers::{ - EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY, TELEMETRY_TYPE_KEY, TelemetryType, + CONTENT_TYPE_JSON, CONTENT_TYPE_PROTOBUF, EXTRACT_LOG_KEY, LOG_SOURCE_KEY, + STREAM_NAME_HEADER_KEY, TELEMETRY_TYPE_KEY, TelemetryType, }; use crate::metadata::SchemaVersion; use crate::option::Mode; @@ -166,13 +168,13 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< Ok(()) } -// Handler for POST /v1/logs to ingest OTEL logs -// ingests events by extracting stream name from header -// creates if stream does not exist -pub async fn handle_otel_logs_ingestion( - req: HttpRequest, - body: web::Bytes, -) -> Result { +// Common validation and setup for OTEL ingestion +async fn setup_otel_stream( + req: &HttpRequest, + expected_log_source: LogSource, + known_fields: &[&str], + telemetry_type: TelemetryType, +) -> Result<(String, LogSource, LogSourceEntry), PostError> { let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingStreamName)); }; @@ -180,73 +182,106 @@ pub async fn handle_otel_logs_ingestion( let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingLogSource)); }; + let log_source = LogSource::from(log_source.to_str().unwrap()); - if log_source != LogSource::OtelLogs { - return Err(PostError::IncorrectLogSource(LogSource::OtelLogs)); + if log_source != expected_log_source { + return Err(PostError::IncorrectLogSource(expected_log_source)); } let stream_name = stream_name.to_str().unwrap().to_owned(); let log_source_entry = LogSourceEntry::new( log_source.clone(), - OTEL_LOG_KNOWN_FIELD_LIST - .iter() - .map(|&s| s.to_string()) - .collect(), + known_fields.iter().map(|&s| s.to_string()).collect(), ); + PARSEABLE .create_stream_if_not_exists( &stream_name, StreamType::UserDefined, None, vec![log_source_entry.clone()], - TelemetryType::Logs, + telemetry_type, ) .await?; - //if stream exists, fetch the stream log source - //return error if the stream log source is otel traces or otel metrics + // Validate stream compatibility if let Ok(stream) = PARSEABLE.get_stream(&stream_name) { - stream - .get_log_source() - .iter() - .find(|&stream_log_source_entry| { - stream_log_source_entry.log_source_format != LogSource::OtelTraces - && stream_log_source_entry.log_source_format != LogSource::OtelMetrics - }) - .ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?; + match log_source { + LogSource::OtelLogs => { + // For logs, reject if stream is metrics or traces + stream + .get_log_source() + .iter() + .find(|&stream_log_source_entry| { + stream_log_source_entry.log_source_format != LogSource::OtelTraces + && stream_log_source_entry.log_source_format != LogSource::OtelMetrics + }) + .ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?; + } + LogSource::OtelMetrics | LogSource::OtelTraces => { + // For metrics/traces, only allow same type + stream + .get_log_source() + .iter() + .find(|&stream_log_source_entry| { + stream_log_source_entry.log_source_format == log_source + }) + .ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?; + } + _ => {} + } } PARSEABLE - .add_update_log_source(&stream_name, log_source_entry) + .add_update_log_source(&stream_name, log_source_entry.clone()) .await?; - let p_custom_fields = get_custom_fields_from_header(&req); - match req.headers().get("Content-Type") { + Ok((stream_name, log_source, log_source_entry)) +} + +// Common content processing for OTEL ingestion +async fn process_otel_content( + req: &HttpRequest, + body: web::Bytes, + stream_name: &str, + log_source: &LogSource, + decode_protobuf: F, + flatten_protobuf: fn(&T) -> Vec, +) -> Result<(), PostError> +where + T: prost::Message + Default, + F: FnOnce(web::Bytes) -> Result, +{ + let p_custom_fields = get_custom_fields_from_header(req); + + match req + .headers() + .get("Content-Type") + .and_then(|h| h.to_str().ok()) + { Some(content_type) => { - if content_type == "application/json" { + if content_type == CONTENT_TYPE_JSON { flatten_and_push_logs( serde_json::from_slice(&body)?, - &stream_name, - &log_source, + stream_name, + log_source, &p_custom_fields, ) .await?; - } - - if content_type == "application/x-protobuf" { - const MAX_PROTOBUF_SIZE: usize = 10 * 1024 * 1024; // 10MB limit - if body.len() > MAX_PROTOBUF_SIZE { + } else if content_type == CONTENT_TYPE_PROTOBUF { + // 10MB limit + if body.len() > MAX_EVENT_PAYLOAD_SIZE { return Err(PostError::Invalid(anyhow::anyhow!( "Protobuf message size {} exceeds maximum allowed size of {} bytes", body.len(), - MAX_PROTOBUF_SIZE + MAX_EVENT_PAYLOAD_SIZE ))); } - match ExportLogsServiceRequest::decode(body) { - Ok(json) => { - for record in flatten_otel_protobuf(&json) { - push_logs(&stream_name, record, &log_source, &p_custom_fields).await?; + match decode_protobuf(body) { + Ok(decoded) => { + for record in flatten_protobuf(&decoded) { + push_logs(stream_name, record, log_source, &p_custom_fields).await?; } } Err(e) => { @@ -263,6 +298,34 @@ pub async fn handle_otel_logs_ingestion( } } + Ok(()) +} + +// Handler for POST /v1/logs to ingest OTEL logs +// ingests events by extracting stream name from header +// creates if stream does not exist +pub async fn handle_otel_logs_ingestion( + req: HttpRequest, + body: web::Bytes, +) -> Result { + let (stream_name, log_source, _) = setup_otel_stream( + &req, + LogSource::OtelLogs, + &OTEL_LOG_KNOWN_FIELD_LIST, + TelemetryType::Logs, + ) + .await?; + + process_otel_content( + &req, + body, + &stream_name, + &log_source, + ExportLogsServiceRequest::decode, + flatten_otel_protobuf, + ) + .await?; + Ok(HttpResponse::Ok().finish()) } @@ -273,83 +336,23 @@ pub async fn handle_otel_metrics_ingestion( req: HttpRequest, body: web::Bytes, ) -> Result { - let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { - return Err(PostError::Header(ParseHeaderError::MissingStreamName)); - }; - let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else { - return Err(PostError::Header(ParseHeaderError::MissingLogSource)); - }; - let log_source = LogSource::from(log_source.to_str().unwrap()); - if log_source != LogSource::OtelMetrics { - return Err(PostError::IncorrectLogSource(LogSource::OtelMetrics)); - } - - let stream_name = stream_name.to_str().unwrap().to_owned(); - - let log_source_entry = LogSourceEntry::new( - log_source.clone(), - OTEL_METRICS_KNOWN_FIELD_LIST - .iter() - .map(|&s| s.to_string()) - .collect(), - ); - PARSEABLE - .create_stream_if_not_exists( - &stream_name, - StreamType::UserDefined, - None, - vec![log_source_entry.clone()], - TelemetryType::Metrics, - ) - .await?; - - //if stream exists, fetch the stream log source - //return error if the stream log source is not otel metrics - if let Ok(stream) = PARSEABLE.get_stream(&stream_name) { - stream - .get_log_source() - .iter() - .find(|&stream_log_source_entry| { - stream_log_source_entry.log_source_format == log_source.clone() - }) - .ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?; - } - - PARSEABLE - .add_update_log_source(&stream_name, log_source_entry) - .await?; - - let p_custom_fields = get_custom_fields_from_header(&req); - - match req.headers().get("Content-Type") { - Some(content_type) => { - if content_type == "application/json" { - flatten_and_push_logs( - serde_json::from_slice(&body)?, - &stream_name, - &log_source, - &p_custom_fields, - ) - .await?; - } - - if content_type == "application/x-protobuf" { - match ExportMetricsServiceRequest::decode(body) { - Ok(json) => { - for record in flatten_otel_metrics_protobuf(&json) { - push_logs(&stream_name, record, &log_source, &p_custom_fields).await?; - } - } - Err(e) => { - return Err(PostError::Invalid(e.into())); - } - } - } - } - None => { - return Err(PostError::Header(ParseHeaderError::InvalidValue)); - } - } + let (stream_name, log_source, _) = setup_otel_stream( + &req, + LogSource::OtelMetrics, + &OTEL_METRICS_KNOWN_FIELD_LIST, + TelemetryType::Metrics, + ) + .await?; + + process_otel_content( + &req, + body, + &stream_name, + &log_source, + ExportMetricsServiceRequest::decode, + flatten_otel_metrics_protobuf, + ) + .await?; Ok(HttpResponse::Ok().finish()) } @@ -361,84 +364,23 @@ pub async fn handle_otel_traces_ingestion( req: HttpRequest, body: web::Bytes, ) -> Result { - let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { - return Err(PostError::Header(ParseHeaderError::MissingStreamName)); - }; - - let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else { - return Err(PostError::Header(ParseHeaderError::MissingLogSource)); - }; - let log_source = LogSource::from(log_source.to_str().unwrap()); - if log_source != LogSource::OtelTraces { - return Err(PostError::IncorrectLogSource(LogSource::OtelTraces)); - } - let stream_name = stream_name.to_str().unwrap().to_owned(); - - let log_source_entry = LogSourceEntry::new( - log_source.clone(), - OTEL_TRACES_KNOWN_FIELD_LIST - .iter() - .map(|&s| s.to_string()) - .collect(), - ); - - PARSEABLE - .create_stream_if_not_exists( - &stream_name, - StreamType::UserDefined, - None, - vec![log_source_entry.clone()], - TelemetryType::Traces, - ) - .await?; - - //if stream exists, fetch the stream log source - //return error if the stream log source is not otel traces - if let Ok(stream) = PARSEABLE.get_stream(&stream_name) { - stream - .get_log_source() - .iter() - .find(|&stream_log_source_entry| { - stream_log_source_entry.log_source_format == log_source.clone() - }) - .ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?; - } - - PARSEABLE - .add_update_log_source(&stream_name, log_source_entry) - .await?; - - let p_custom_fields = get_custom_fields_from_header(&req); - - match req.headers().get("Content-Type") { - Some(content_type) => { - if content_type == "application/json" { - flatten_and_push_logs( - serde_json::from_slice(&body)?, - &stream_name, - &log_source, - &p_custom_fields, - ) - .await?; - } - - if content_type == "application/x-protobuf" { - match ExportTraceServiceRequest::decode(body) { - Ok(json) => { - for record in flatten_otel_traces_protobuf(&json) { - push_logs(&stream_name, record, &log_source, &p_custom_fields).await?; - } - } - Err(e) => { - return Err(PostError::Invalid(e.into())); - } - } - } - } - None => { - return Err(PostError::Header(ParseHeaderError::InvalidValue)); - } - } + let (stream_name, log_source, _) = setup_otel_stream( + &req, + LogSource::OtelTraces, + &OTEL_TRACES_KNOWN_FIELD_LIST, + TelemetryType::Traces, + ) + .await?; + + process_otel_content( + &req, + body, + &stream_name, + &log_source, + ExportTraceServiceRequest::decode, + flatten_otel_traces_protobuf, + ) + .await?; Ok(HttpResponse::Ok().finish()) } diff --git a/src/handlers/mod.rs b/src/handlers/mod.rs index c1e9cbd40..b1263c775 100644 --- a/src/handlers/mod.rs +++ b/src/handlers/mod.rs @@ -45,6 +45,10 @@ const LOG_SOURCE_KINESIS: &str = "kinesis"; // AWS Kinesis constants const KINESIS_COMMON_ATTRIBUTES_KEY: &str = "x-amz-firehose-common-attributes"; +// constants for content type values +pub const CONTENT_TYPE_JSON: &str = "application/json"; +pub const CONTENT_TYPE_PROTOBUF: &str = "application/x-protobuf"; + #[derive(Debug, Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq)] #[serde(rename_all = "lowercase")] pub enum TelemetryType { diff --git a/src/otel/logs.rs b/src/otel/logs.rs index 582e1b594..9f6ab14d6 100644 --- a/src/otel/logs.rs +++ b/src/otel/logs.rs @@ -143,11 +143,23 @@ fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec> { vec_scope_log_json } -pub fn flatten_otel_protobuf(message: &ExportLogsServiceRequest) -> Vec { +/// Common function to process resource logs and merge resource-level fields +fn process_resource_logs( + resource_logs: &[T], + get_resource: fn(&T) -> Option<&opentelemetry_proto::tonic::resource::v1::Resource>, + get_scope_logs: fn(&T) -> &[ScopeLogs], + get_schema_url: fn(&T) -> &str, +) -> Vec +where + T: std::fmt::Debug, +{ let mut vec_otel_json = Vec::new(); - for resource_logs in &message.resource_logs { + + for resource_log in resource_logs { let mut resource_log_json = Map::new(); - if let Some(resource) = &resource_logs.resource { + + // Process resource attributes if present + if let Some(resource) = get_resource(resource_log) { insert_attributes(&mut resource_log_json, &resource.attributes); resource_log_json.insert( "resource_dropped_attributes_count".to_string(), @@ -156,18 +168,19 @@ pub fn flatten_otel_protobuf(message: &ExportLogsServiceRequest) -> Vec { } let mut vec_resource_logs_json = Vec::new(); - for scope_log in &resource_logs.scope_logs { + let scope_logs = get_scope_logs(resource_log); + + for scope_log in scope_logs { vec_resource_logs_json.extend(flatten_scope_log(scope_log)); } resource_log_json.insert( "schema_url".to_string(), - Value::String(resource_logs.schema_url.clone()), + Value::String(get_schema_url(resource_log).to_string()), ); for resource_logs_json in &mut vec_resource_logs_json { resource_logs_json.extend(resource_log_json.clone()); - vec_otel_json.push(Value::Object(resource_logs_json.clone())); } } @@ -175,36 +188,22 @@ pub fn flatten_otel_protobuf(message: &ExportLogsServiceRequest) -> Vec { vec_otel_json } +pub fn flatten_otel_protobuf(message: &ExportLogsServiceRequest) -> Vec { + process_resource_logs( + &message.resource_logs, + |record| record.resource.as_ref(), + |record| &record.scope_logs, + |record| &record.schema_url, + ) +} + /// this function performs the custom flattening of the otel logs /// and returns a `Vec` of `Value::Object` of the flattened json pub fn flatten_otel_logs(message: &LogsData) -> Vec { - let mut vec_otel_json = Vec::new(); - for record in &message.resource_logs { - let mut resource_log_json = Map::new(); - if let Some(resource) = &record.resource { - insert_attributes(&mut resource_log_json, &resource.attributes); - resource_log_json.insert( - "resource_dropped_attributes_count".to_string(), - Value::Number(resource.dropped_attributes_count.into()), - ); - } - - let mut vec_resource_logs_json = Vec::new(); - for scope_log in &record.scope_logs { - vec_resource_logs_json.extend(flatten_scope_log(scope_log)); - } - - resource_log_json.insert( - "schema_url".to_string(), - Value::String(record.schema_url.clone()), - ); - - for resource_logs_json in &mut vec_resource_logs_json { - resource_logs_json.extend(resource_log_json.clone()); - - vec_otel_json.push(Value::Object(resource_logs_json.clone())); - } - } - - vec_otel_json + process_resource_logs( + &message.resource_logs, + |record| record.resource.as_ref(), + |record| &record.scope_logs, + |record| &record.schema_url, + ) } diff --git a/src/otel/metrics.rs b/src/otel/metrics.rs index 537e9f43b..6974af565 100644 --- a/src/otel/metrics.rs +++ b/src/otel/metrics.rs @@ -500,13 +500,25 @@ pub fn flatten_metrics_record(metrics_record: &Metric) -> Vec data_points_json } -/// this function performs the custom flattening of the otel metrics -/// and returns a `Vec` of `Value::Object` of the flattened json -pub fn flatten_otel_metrics(message: MetricsData) -> Vec { +/// Common function to process resource metrics and merge resource-level fields +#[allow(clippy::too_many_arguments)] +fn process_resource_metrics( + resource_metrics: &[T], + get_resource: fn(&T) -> Option<&opentelemetry_proto::tonic::resource::v1::Resource>, + get_scope_metrics: fn(&T) -> &[S], + get_schema_url: fn(&T) -> &str, + get_scope: fn(&S) -> Option<&opentelemetry_proto::tonic::common::v1::InstrumentationScope>, + get_scope_schema_url: fn(&S) -> &str, + get_metrics: fn(&S) -> &[M], + get_metric: fn(&M) -> &Metric, +) -> Vec { let mut vec_otel_json = Vec::new(); - for record in &message.resource_metrics { + + for resource_metric in resource_metrics { let mut resource_metrics_json = Map::new(); - if let Some(resource) = &record.resource { + + // Process resource attributes if present + if let Some(resource) = get_resource(resource_metric) { insert_attributes(&mut resource_metrics_json, &resource.attributes); resource_metrics_json.insert( "resource_dropped_attributes_count".to_string(), @@ -515,13 +527,17 @@ pub fn flatten_otel_metrics(message: MetricsData) -> Vec { } let mut vec_scope_metrics_json = Vec::new(); - for scope_metric in &record.scope_metrics { + let scope_metrics = get_scope_metrics(resource_metric); + + for scope_metric in scope_metrics { let mut scope_metrics_json = Map::new(); - for metrics_record in &scope_metric.metrics { - vec_scope_metrics_json.extend(flatten_metrics_record(metrics_record)); + + let metrics = get_metrics(scope_metric); + for metric in metrics { + vec_scope_metrics_json.extend(flatten_metrics_record(get_metric(metric))); } - if let Some(scope) = &scope_metric.scope { + if let Some(scope) = get_scope(scope_metric) { scope_metrics_json .insert("scope_name".to_string(), Value::String(scope.name.clone())); scope_metrics_json.insert( @@ -537,7 +553,7 @@ pub fn flatten_otel_metrics(message: MetricsData) -> Vec { scope_metrics_json.insert( "scope_schema_url".to_string(), - Value::String(scope_metric.schema_url.clone()), + Value::String(get_scope_schema_url(scope_metric).to_string()), ); for scope_metric_json in &mut vec_scope_metrics_json { @@ -549,7 +565,7 @@ pub fn flatten_otel_metrics(message: MetricsData) -> Vec { resource_metrics_json.insert( "resource_schema_url".to_string(), - Value::String(record.schema_url.clone()), + Value::String(get_schema_url(resource_metric).to_string()), ); for resource_metric_json in &mut vec_scope_metrics_json { @@ -564,6 +580,35 @@ pub fn flatten_otel_metrics(message: MetricsData) -> Vec { vec_otel_json } +/// this function performs the custom flattening of the otel metrics +/// and returns a `Vec` of `Value::Object` of the flattened json +pub fn flatten_otel_metrics(message: MetricsData) -> Vec { + process_resource_metrics( + &message.resource_metrics, + |record| record.resource.as_ref(), + |record| &record.scope_metrics, + |record| &record.schema_url, + |scope_metric| scope_metric.scope.as_ref(), + |scope_metric| &scope_metric.schema_url, + |scope_metric| &scope_metric.metrics, + |metric| metric, + ) +} + +/// Flattens OpenTelemetry metrics from protobuf format +pub fn flatten_otel_metrics_protobuf(message: &ExportMetricsServiceRequest) -> Vec { + process_resource_metrics( + &message.resource_metrics, + |record| record.resource.as_ref(), + |record| &record.scope_metrics, + |record| &record.schema_url, + |scope_metric| scope_metric.scope.as_ref(), + |scope_metric| &scope_metric.schema_url, + |scope_metric| &scope_metric.metrics, + |metric| metric, + ) +} + /// otel metrics event has json object for aggregation temporality /// there is a mapping of aggregation temporality to its description provided in proto /// this function fetches the description from the aggregation temporality @@ -602,60 +647,3 @@ fn flatten_data_point_flags(flags: u32) -> Map { ); data_point_flags_json } - -/// Flattens OpenTelemetry metrics from protobuf format -pub fn flatten_otel_metrics_protobuf(message: &ExportMetricsServiceRequest) -> Vec { - let mut vec_otel_json = Vec::new(); - for resource_metrics in &message.resource_metrics { - let mut resource_metrics_json = Map::new(); - if let Some(resource) = &resource_metrics.resource { - insert_attributes(&mut resource_metrics_json, &resource.attributes); - resource_metrics_json.insert( - "resource_dropped_attributes_count".to_string(), - Value::Number(resource.dropped_attributes_count.into()), - ); - } - - let mut vec_resource_metrics_json = Vec::new(); - for scope_metrics in &resource_metrics.scope_metrics { - // scope-level metadata - let mut scope_metrics_json = Map::new(); - if let Some(scope) = &scope_metrics.scope { - scope_metrics_json - .insert("scope_name".to_string(), Value::String(scope.name.clone())); - scope_metrics_json.insert( - "scope_version".to_string(), - Value::String(scope.version.clone()), - ); - insert_attributes(&mut scope_metrics_json, &scope.attributes); - scope_metrics_json.insert( - "scope_dropped_attributes_count".to_string(), - Value::Number(scope.dropped_attributes_count.into()), - ); - } - scope_metrics_json.insert( - "scope_schema_url".to_string(), - Value::String(scope_metrics.schema_url.clone()), - ); - - for metric in &scope_metrics.metrics { - vec_resource_metrics_json.extend(flatten_metrics_record(metric)); - } - - for resource_metrics_json_item in &mut vec_resource_metrics_json { - resource_metrics_json_item.extend(scope_metrics_json.clone()); - } - } - - resource_metrics_json.insert( - "resource_schema_url".to_string(), - Value::String(resource_metrics.schema_url.clone()), - ); - - for resource_metrics_json_item in &mut vec_resource_metrics_json { - resource_metrics_json_item.extend(resource_metrics_json.clone()); - vec_otel_json.push(Value::Object(resource_metrics_json_item.clone())); - } - } - vec_otel_json -} diff --git a/src/otel/traces.rs b/src/otel/traces.rs index 05e9dcfc8..2fe7743c3 100644 --- a/src/otel/traces.rs +++ b/src/otel/traces.rs @@ -98,14 +98,23 @@ fn flatten_scope_span(scope_span: &ScopeSpans) -> Vec> { vec_scope_span_json } -/// this function performs the custom flattening of the otel traces event -/// and returns a `Vec` of `Value::Object` of the flattened json -pub fn flatten_otel_traces(message: &TracesData) -> Vec { +/// Common function to process resource spans and merge resource-level fields +fn process_resource_spans( + resource_spans: &[T], + get_resource: fn(&T) -> Option<&opentelemetry_proto::tonic::resource::v1::Resource>, + get_scope_spans: fn(&T) -> &[ScopeSpans], + get_schema_url: fn(&T) -> &str, +) -> Vec +where + T: std::fmt::Debug, +{ let mut vec_otel_json = Vec::new(); - for record in &message.resource_spans { + for resource_span in resource_spans { let mut resource_span_json = Map::new(); - if let Some(resource) = &record.resource { + + // Process resource attributes if present + if let Some(resource) = get_resource(resource_span) { insert_attributes(&mut resource_span_json, &resource.attributes); resource_span_json.insert( "resource_dropped_attributes_count".to_string(), @@ -113,22 +122,22 @@ pub fn flatten_otel_traces(message: &TracesData) -> Vec { ); } + // Process scope spans let mut vec_resource_spans_json = Vec::new(); - for scope_span in &record.scope_spans { + for scope_span in get_scope_spans(resource_span) { let scope_span_json = flatten_scope_span(scope_span); vec_resource_spans_json.extend(scope_span_json); } + // Add resource schema URL resource_span_json.insert( "resource_schema_url".to_string(), - Value::String(record.schema_url.clone()), + Value::String(get_schema_url(resource_span).to_string()), ); + // Merge resource-level fields into each span record for resource_spans_json in &mut vec_resource_spans_json { - for (key, value) in &resource_span_json { - resource_spans_json.insert(key.clone(), value.clone()); - } - + resource_spans_json.extend(resource_span_json.clone()); vec_otel_json.push(Value::Object(resource_spans_json.clone())); } } @@ -136,6 +145,27 @@ pub fn flatten_otel_traces(message: &TracesData) -> Vec { vec_otel_json } +/// Flattens OpenTelemetry traces from protobuf format +pub fn flatten_otel_traces_protobuf(message: &ExportTraceServiceRequest) -> Vec { + process_resource_spans( + &message.resource_spans, + |rs| rs.resource.as_ref(), + |rs| &rs.scope_spans, + |rs| &rs.schema_url, + ) +} + +/// this function performs the custom flattening of the otel traces event +/// and returns a `Vec` of `Value::Object` of the flattened json +pub fn flatten_otel_traces(message: &TracesData) -> Vec { + process_resource_spans( + &message.resource_spans, + |rs| rs.resource.as_ref(), + |rs| &rs.scope_spans, + |rs| &rs.schema_url, + ) +} + /// otel traces has json array of events /// this function flattens the `Event` object /// and returns a `Vec` of `Map` of the flattened json @@ -933,65 +963,3 @@ mod tests { } } } - -/// Flattens OpenTelemetry traces from protobuf format -pub fn flatten_otel_traces_protobuf(message: &ExportTraceServiceRequest) -> Vec { - let mut vec_otel_json = Vec::new(); - for resource_spans in &message.resource_spans { - let mut resource_spans_json = Map::new(); - if let Some(resource) = &resource_spans.resource { - insert_attributes(&mut resource_spans_json, &resource.attributes); - resource_spans_json.insert( - "resource_dropped_attributes_count".to_string(), - Value::Number(resource.dropped_attributes_count.into()), - ); - } - - let mut vec_resource_spans_json: Vec> = Vec::new(); - for scope_spans in &resource_spans.scope_spans { - // 1) collect all span JSON under this scope - let mut scope_spans_json = Vec::new(); - for span in &scope_spans.spans { - scope_spans_json.extend(flatten_span_record(span)); - } - - // 2) build a Map of scope‐level fields - let mut scope_span_json = Map::new(); - if let Some(scope) = &scope_spans.scope { - scope_span_json.insert("scope_name".to_string(), Value::String(scope.name.clone())); - scope_span_json.insert( - "scope_version".to_string(), - Value::String(scope.version.clone()), - ); - insert_attributes(&mut scope_span_json, &scope.attributes); - scope_span_json.insert( - "scope_dropped_attributes_count".to_string(), - Value::Number(scope.dropped_attributes_count.into()), - ); - } - scope_span_json.insert( - "scope_schema_url".to_string(), - Value::String(scope_spans.schema_url.clone()), - ); - - // 3) merge scope fields into each span record - for span_json in &mut scope_spans_json { - span_json.extend(scope_span_json.clone()); - } - - // 4) append to the resource‐level accumulator - vec_resource_spans_json.extend(scope_spans_json); - } - - resource_spans_json.insert( - "resource_schema_url".to_string(), - Value::String(resource_spans.schema_url.clone()), - ); - - for resource_spans_json_item in &mut vec_resource_spans_json { - resource_spans_json_item.extend(resource_spans_json.clone()); - vec_otel_json.push(Value::Object(resource_spans_json_item.clone())); - } - } - vec_otel_json -} From b582d9d0b4ccd9f003e7c61f03b59df3ee82ade5 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 28 Jul 2025 22:45:17 -0700 Subject: [PATCH 4/4] error handling --- src/handlers/http/ingest.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 0daa8687c..89ba947b9 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -291,10 +291,17 @@ where ))); } } + } else { + return Err(PostError::Invalid(anyhow::anyhow!( + "Unsupported Content-Type: {}. Expected application/json or application/x-protobuf", + content_type + ))); } } None => { - return Err(PostError::Header(ParseHeaderError::InvalidValue)); + return Err(PostError::Invalid(anyhow::anyhow!( + "Missing Content-Type header. Expected application/json or application/x-protobuf" + ))); } }