diff --git a/opentelemetry-otlp/Cargo.toml b/opentelemetry-otlp/Cargo.toml index 9987d233a8..7dc81f991b 100644 --- a/opentelemetry-otlp/Cargo.toml +++ b/opentelemetry-otlp/Cargo.toml @@ -56,6 +56,8 @@ tonic = { workspace = true, features = ["router", "server"] } trace = ["opentelemetry/trace", "opentelemetry_sdk/trace", "opentelemetry-proto/trace"] metrics = ["opentelemetry/metrics", "opentelemetry_sdk/metrics", "opentelemetry-proto/metrics"] logs = ["opentelemetry/logs", "opentelemetry_sdk/logs", "opentelemetry-proto/logs"] +zpages = ["trace", "opentelemetry-proto/zpages"] +profiles = ["opentelemetry-proto/profiles"] internal-logs = ["tracing", "opentelemetry/internal-logs"] # add ons @@ -71,9 +73,9 @@ tls = ["tonic/tls-ring"] tls-roots = ["tls", "tonic/tls-native-roots"] tls-webpki-roots = ["tls", "tonic/tls-webpki-roots"] -# http binary -http-proto = ["prost", "opentelemetry-http", "opentelemetry-proto/gen-tonic-messages", "http", "trace", "metrics"] -http-json = ["serde_json", "prost", "opentelemetry-http", "opentelemetry-proto/gen-tonic-messages", "opentelemetry-proto/with-serde", "http", "trace", "metrics"] +# http binary +http-proto = ["prost", "opentelemetry-http", "opentelemetry-proto/prost", "http", "trace", "metrics"] +http-json = ["serde_json", "prost", "opentelemetry-http", "opentelemetry-proto/prost", "opentelemetry-proto/with-serde", "http", "trace", "metrics"] reqwest-blocking-client = ["reqwest/blocking", "opentelemetry-http/reqwest"] reqwest-client = ["reqwest", "opentelemetry-http/reqwest"] reqwest-rustls = ["reqwest", "opentelemetry-http/reqwest-rustls"] diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index 5d31660ab6..ceab00808d 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -7,11 +7,13 @@ use http::{HeaderName, HeaderValue, Uri}; #[cfg(feature = "http-json")] use opentelemetry::otel_debug; use opentelemetry_http::HttpClient; -use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema; +use crate::transform::common::tonic::ResourceAttributesWithSchema; #[cfg(feature = "logs")] -use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope; +use crate::transform::logs::tonic::group_logs_by_resource_and_scope; +#[cfg(feature = "metrics")] +use crate::transform::metrics::tonic::resource_metrics_to_export_request; #[cfg(feature = "trace")] -use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope; +use crate::transform::trace::tonic::group_spans_by_resource_and_scope; #[cfg(feature = "logs")] use opentelemetry_sdk::logs::LogBatch; #[cfg(feature = "trace")] @@ -263,7 +265,7 @@ pub(crate) struct OtlpHttpClient { _timeout: Duration, #[allow(dead_code)] // would be removed once we support set_resource for metrics and traces. - resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema, + resource: crate::transform::common::tonic::ResourceAttributesWithSchema, } impl OtlpHttpClient { @@ -330,7 +332,7 @@ impl OtlpHttpClient { ) -> Option<(Vec, &'static str)> { use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; - let req: ExportMetricsServiceRequest = metrics.into(); + let req: ExportMetricsServiceRequest = resource_metrics_to_export_request(metrics); match self.protocol { #[cfg(feature = "http-json")] diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index 9f8b9d8a6d..8d298e9fa2 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -9,7 +9,7 @@ use std::time; use tokio::sync::Mutex; use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request}; -use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope; +use crate::transform::logs::tonic::group_logs_by_resource_and_scope; use super::BoxInterceptor; @@ -17,7 +17,7 @@ pub(crate) struct TonicLogsClient { inner: Mutex>, #[allow(dead_code)] // would be removed once we support set_resource for metrics. - resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema, + resource: crate::transform::common::tonic::ResourceAttributesWithSchema, } struct ClientInner { diff --git a/opentelemetry-otlp/src/exporter/tonic/metrics.rs b/opentelemetry-otlp/src/exporter/tonic/metrics.rs index 13813c7305..d857bc21e2 100644 --- a/opentelemetry-otlp/src/exporter/tonic/metrics.rs +++ b/opentelemetry-otlp/src/exporter/tonic/metrics.rs @@ -3,7 +3,7 @@ use std::sync::Mutex; use opentelemetry::otel_debug; use opentelemetry_proto::tonic::collector::metrics::v1::{ - metrics_service_client::MetricsServiceClient, ExportMetricsServiceRequest, + metrics_service_client::MetricsServiceClient, }; use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult}; use opentelemetry_sdk::metrics::data::ResourceMetrics; @@ -11,6 +11,7 @@ use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Chann use super::BoxInterceptor; use crate::metric::MetricsClient; +use crate::transform::metrics::tonic::resource_metrics_to_export_request; pub(crate) struct TonicMetricsClient { inner: Mutex>, @@ -81,7 +82,7 @@ impl MetricsClient for TonicMetricsClient { .export(Request::from_parts( metadata, extensions, - ExportMetricsServiceRequest::from(metrics), + resource_metrics_to_export_request(metrics), )) .await; diff --git a/opentelemetry-otlp/src/exporter/tonic/trace.rs b/opentelemetry-otlp/src/exporter/tonic/trace.rs index 4378c37a04..6d84b40b2e 100644 --- a/opentelemetry-otlp/src/exporter/tonic/trace.rs +++ b/opentelemetry-otlp/src/exporter/tonic/trace.rs @@ -5,7 +5,7 @@ use opentelemetry::otel_debug; use opentelemetry_proto::tonic::collector::trace::v1::{ trace_service_client::TraceServiceClient, ExportTraceServiceRequest, }; -use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope; +use crate::transform::trace::tonic::group_spans_by_resource_and_scope; use opentelemetry_sdk::error::OTelSdkError; use opentelemetry_sdk::{ error::OTelSdkResult, @@ -19,7 +19,7 @@ pub(crate) struct TonicTracesClient { inner: Option, #[allow(dead_code)] // would be removed once we support set_resource for metrics. - resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema, + resource: crate::transform::common::tonic::ResourceAttributesWithSchema, } struct ClientInner { diff --git a/opentelemetry-otlp/src/lib.rs b/opentelemetry-otlp/src/lib.rs index 5f79aa99f4..c8d2449a04 100644 --- a/opentelemetry-otlp/src/lib.rs +++ b/opentelemetry-otlp/src/lib.rs @@ -368,6 +368,10 @@ mod metric; #[cfg(any(feature = "http-proto", feature = "http-json", feature = "grpc-tonic"))] mod span; +// Transform logic moved from opentelemetry-proto for SDK decoupling +/// Transformation utilities for converting SDK types to protobuf types. +pub mod transform; + pub use crate::exporter::Compression; pub use crate::exporter::ExportConfig; pub use crate::exporter::ExporterBuildError; diff --git a/opentelemetry-otlp/src/transform/common.rs b/opentelemetry-otlp/src/transform/common.rs new file mode 100644 index 0000000000..3a629b5ec2 --- /dev/null +++ b/opentelemetry-otlp/src/transform/common.rs @@ -0,0 +1,164 @@ +#[cfg(all( + any(feature = "http-proto", feature = "http-json", feature = "grpc-tonic"), + any(feature = "trace", feature = "metrics", feature = "logs") +))] +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +#[cfg(all( + any(feature = "http-proto", feature = "http-json", feature = "grpc-tonic"), + any(feature = "trace", feature = "metrics", feature = "logs") +))] +pub(crate) fn to_nanos(time: SystemTime) -> u64 { + time.duration_since(UNIX_EPOCH) + .unwrap_or_else(|_| Duration::from_secs(0)) + .as_nanos() as u64 +} + +#[cfg(any(feature = "http-proto", feature = "http-json", feature = "grpc-tonic"))] +/// Tonic-specific common transformation utilities. +pub mod tonic { + use opentelemetry_proto::tonic::common::v1::{ + any_value, AnyValue, ArrayValue, InstrumentationScope, KeyValue, + }; + use opentelemetry::{Array, Value}; + use std::borrow::Cow; + + #[cfg(any(feature = "trace", feature = "logs"))] + #[derive(Debug, Default)] + /// Resource attributes with schema information. + pub struct ResourceAttributesWithSchema { + /// Resource attributes. + pub attributes: Attributes, + /// Schema URL for the resource. + pub schema_url: Option, + } + + #[cfg(any(feature = "trace", feature = "logs"))] + impl From<&opentelemetry_sdk::Resource> for ResourceAttributesWithSchema { + fn from(resource: &opentelemetry_sdk::Resource) -> Self { + ResourceAttributesWithSchema { + attributes: resource_attributes(resource), + schema_url: resource.schema_url().map(ToString::to_string), + } + } + } + + #[cfg(any(feature = "trace", feature = "logs"))] + use opentelemetry_sdk::Resource; + + /// Creates instrumentation scope from scope and target. + pub fn instrumentation_scope_from_scope_and_target( + scope: opentelemetry::InstrumentationScope, + target: Option>, + ) -> InstrumentationScope { + if let Some(t) = target { + InstrumentationScope { + name: t.to_string(), + version: String::new(), + attributes: vec![], + ..Default::default() + } + } else { + InstrumentationScope { + name: scope.name().to_owned(), + version: scope.version().map(ToOwned::to_owned).unwrap_or_default(), + attributes: Attributes::from(scope.attributes().cloned()).0, + ..Default::default() + } + } + } + + /// Creates instrumentation scope from scope reference and target. + pub fn instrumentation_scope_from_scope_ref_and_target( + scope: &opentelemetry::InstrumentationScope, + target: Option>, + ) -> InstrumentationScope { + if let Some(t) = target { + InstrumentationScope { + name: t.to_string(), + version: String::new(), + attributes: vec![], + ..Default::default() + } + } else { + InstrumentationScope { + name: scope.name().to_owned(), + version: scope.version().map(ToOwned::to_owned).unwrap_or_default(), + attributes: Attributes::from(scope.attributes().cloned()).0, + ..Default::default() + } + } + } + + /// Wrapper type for Vec<`KeyValue`> + #[derive(Default, Debug)] + pub struct Attributes(pub ::std::vec::Vec); + + impl> From for Attributes { + fn from(kvs: I) -> Self { + Attributes( + kvs.into_iter() + .map(|api_kv| KeyValue { + key: api_kv.key.as_str().to_string(), + value: Some(value_to_any_value(api_kv.value)), + }) + .collect(), + ) + } + } + + #[cfg(feature = "logs")] + impl, V: Into> FromIterator<(K, V)> for Attributes { + fn from_iter>(iter: T) -> Self { + Attributes( + iter.into_iter() + .map(|(k, v)| KeyValue { + key: k.into(), + value: Some(v.into()), + }) + .collect(), + ) + } + } + + /// Converts OpenTelemetry value to protobuf any value. + pub fn value_to_any_value(value: Value) -> AnyValue { + AnyValue { + value: match value { + Value::Bool(val) => Some(any_value::Value::BoolValue(val)), + Value::I64(val) => Some(any_value::Value::IntValue(val)), + Value::F64(val) => Some(any_value::Value::DoubleValue(val)), + Value::String(val) => Some(any_value::Value::StringValue(val.to_string())), + Value::Array(array) => Some(any_value::Value::ArrayValue(match array { + Array::Bool(vals) => array_into_proto(vals), + Array::I64(vals) => array_into_proto(vals), + Array::F64(vals) => array_into_proto(vals), + Array::String(vals) => array_into_proto(vals), + _ => unreachable!("Nonexistent array type"), // Needs to be updated when new array types are added + })), + _ => unreachable!("Nonexistent value type"), // Needs to be updated when new value types are added + }, + } + } + + fn array_into_proto(vals: Vec) -> ArrayValue + where + Value: From, + { + let values = vals + .into_iter() + .map(|val| value_to_any_value(Value::from(val))) + .collect(); + + ArrayValue { values } + } + + #[cfg(any(feature = "trace", feature = "logs"))] + pub(crate) fn resource_attributes(resource: &Resource) -> Attributes { + resource + .iter() + .map(|(k, v)| opentelemetry::KeyValue::new(k.clone(), v.clone())) + .collect::>() + .into() + } +} diff --git a/opentelemetry-proto/src/transform/logs.rs b/opentelemetry-otlp/src/transform/logs.rs similarity index 52% rename from opentelemetry-proto/src/transform/logs.rs rename to opentelemetry-otlp/src/transform/logs.rs index 65d0598216..e88304360c 100644 --- a/opentelemetry-proto/src/transform/logs.rs +++ b/opentelemetry-otlp/src/transform/logs.rs @@ -1,31 +1,30 @@ -#[cfg(feature = "gen-tonic-messages")] +#[cfg(any(feature = "http-proto", feature = "http-json", feature = "grpc-tonic"))] +/// Tonic-specific transformation utilities for logs. pub mod tonic { - use crate::{ - tonic::{ - common::v1::{ - any_value::Value, AnyValue, ArrayValue, InstrumentationScope, KeyValue, - KeyValueList, - }, - logs::v1::{LogRecord, ResourceLogs, ScopeLogs, SeverityNumber}, - resource::v1::Resource, + use opentelemetry_proto::tonic::{ + common::v1::{ + any_value::Value, AnyValue, ArrayValue, KeyValue, + KeyValueList, }, - transform::common::{to_nanos, tonic::ResourceAttributesWithSchema}, + logs::v1::{LogRecord, ResourceLogs, ScopeLogs, SeverityNumber}, + resource::v1::Resource, + }; + use crate::transform::common::{ + to_nanos, + tonic::{ResourceAttributesWithSchema, instrumentation_scope_from_scope_ref_and_target}, }; use opentelemetry::logs::{AnyValue as LogsAnyValue, Severity}; use opentelemetry_sdk::logs::LogBatch; use std::borrow::Cow; use std::collections::HashMap; - impl From for AnyValue { - fn from(value: LogsAnyValue) -> Self { - AnyValue { - value: Some(value.into()), - } + fn any_value_from_logs_any_value(value: LogsAnyValue) -> AnyValue { + AnyValue { + value: Some(value_from_logs_any_value(value)), } } - impl From for Value { - fn from(value: LogsAnyValue) -> Self { + fn value_from_logs_any_value(value: LogsAnyValue) -> Value { match value { LogsAnyValue::Double(f) => Value::DoubleValue(f), LogsAnyValue::Int(i) => Value::IntValue(i), @@ -34,9 +33,7 @@ pub mod tonic { LogsAnyValue::ListAny(v) => Value::ArrayValue(ArrayValue { values: v .into_iter() - .map(|v| AnyValue { - value: Some(v.into()), - }) + .map(|v| any_value_from_logs_any_value(v)) .collect(), }), LogsAnyValue::Map(m) => Value::KvlistValue(KeyValueList { @@ -44,127 +41,107 @@ pub mod tonic { .into_iter() .map(|(key, value)| KeyValue { key: key.into(), - value: Some(AnyValue { - value: Some(value.into()), - }), + value: Some(any_value_from_logs_any_value(value)), }) .collect(), }), LogsAnyValue::Bytes(v) => Value::BytesValue(*v), _ => unreachable!("Nonexistent value type"), } - } } - impl From<&opentelemetry_sdk::logs::SdkLogRecord> for LogRecord { - fn from(log_record: &opentelemetry_sdk::logs::SdkLogRecord) -> Self { - let trace_context = log_record.trace_context(); - let severity_number = match log_record.severity_number() { - Some(Severity::Trace) => SeverityNumber::Trace, - Some(Severity::Trace2) => SeverityNumber::Trace2, - Some(Severity::Trace3) => SeverityNumber::Trace3, - Some(Severity::Trace4) => SeverityNumber::Trace4, - Some(Severity::Debug) => SeverityNumber::Debug, - Some(Severity::Debug2) => SeverityNumber::Debug2, - Some(Severity::Debug3) => SeverityNumber::Debug3, - Some(Severity::Debug4) => SeverityNumber::Debug4, - Some(Severity::Info) => SeverityNumber::Info, - Some(Severity::Info2) => SeverityNumber::Info2, - Some(Severity::Info3) => SeverityNumber::Info3, - Some(Severity::Info4) => SeverityNumber::Info4, - Some(Severity::Warn) => SeverityNumber::Warn, - Some(Severity::Warn2) => SeverityNumber::Warn2, - Some(Severity::Warn3) => SeverityNumber::Warn3, - Some(Severity::Warn4) => SeverityNumber::Warn4, - Some(Severity::Error) => SeverityNumber::Error, - Some(Severity::Error2) => SeverityNumber::Error2, - Some(Severity::Error3) => SeverityNumber::Error3, - Some(Severity::Error4) => SeverityNumber::Error4, - Some(Severity::Fatal) => SeverityNumber::Fatal, - Some(Severity::Fatal2) => SeverityNumber::Fatal2, - Some(Severity::Fatal3) => SeverityNumber::Fatal3, - Some(Severity::Fatal4) => SeverityNumber::Fatal4, - None => SeverityNumber::Unspecified, - }; + /// Converts SDK log record to protobuf log record. + pub fn sdk_log_record_to_proto_log_record(log_record: &opentelemetry_sdk::logs::SdkLogRecord) -> LogRecord { + let trace_context = log_record.trace_context(); + let severity_number = match log_record.severity_number() { + Some(Severity::Trace) => SeverityNumber::Trace, + Some(Severity::Trace2) => SeverityNumber::Trace2, + Some(Severity::Trace3) => SeverityNumber::Trace3, + Some(Severity::Trace4) => SeverityNumber::Trace4, + Some(Severity::Debug) => SeverityNumber::Debug, + Some(Severity::Debug2) => SeverityNumber::Debug2, + Some(Severity::Debug3) => SeverityNumber::Debug3, + Some(Severity::Debug4) => SeverityNumber::Debug4, + Some(Severity::Info) => SeverityNumber::Info, + Some(Severity::Info2) => SeverityNumber::Info2, + Some(Severity::Info3) => SeverityNumber::Info3, + Some(Severity::Info4) => SeverityNumber::Info4, + Some(Severity::Warn) => SeverityNumber::Warn, + Some(Severity::Warn2) => SeverityNumber::Warn2, + Some(Severity::Warn3) => SeverityNumber::Warn3, + Some(Severity::Warn4) => SeverityNumber::Warn4, + Some(Severity::Error) => SeverityNumber::Error, + Some(Severity::Error2) => SeverityNumber::Error2, + Some(Severity::Error3) => SeverityNumber::Error3, + Some(Severity::Error4) => SeverityNumber::Error4, + Some(Severity::Fatal) => SeverityNumber::Fatal, + Some(Severity::Fatal2) => SeverityNumber::Fatal2, + Some(Severity::Fatal3) => SeverityNumber::Fatal3, + Some(Severity::Fatal4) => SeverityNumber::Fatal4, + None => SeverityNumber::Unspecified, + }; - LogRecord { - time_unix_nano: log_record.timestamp().map(to_nanos).unwrap_or_default(), - observed_time_unix_nano: to_nanos(log_record.observed_timestamp().unwrap()), - attributes: { - log_record - .attributes_iter() - .map(|kv| KeyValue { - key: kv.0.to_string(), - value: Some(AnyValue { - value: Some(kv.1.clone().into()), - }), - }) - .collect() - }, - event_name: log_record.event_name().unwrap_or_default().into(), - severity_number: severity_number.into(), - severity_text: log_record - .severity_text() - .map(Into::into) - .unwrap_or_default(), - body: log_record.body().cloned().map(Into::into), - dropped_attributes_count: 0, - flags: trace_context - .map(|ctx| { - ctx.trace_flags - .map(|flags| flags.to_u8() as u32) - .unwrap_or_default() + LogRecord { + time_unix_nano: log_record.timestamp().map(to_nanos).unwrap_or_default(), + observed_time_unix_nano: to_nanos(log_record.observed_timestamp().unwrap()), + attributes: { + log_record + .attributes_iter() + .map(|kv| KeyValue { + key: kv.0.to_string(), + value: Some(any_value_from_logs_any_value(kv.1.clone())), }) - .unwrap_or_default(), - span_id: trace_context - .map(|ctx| ctx.span_id.to_bytes().to_vec()) - .unwrap_or_default(), - trace_id: trace_context - .map(|ctx| ctx.trace_id.to_bytes().to_vec()) - .unwrap_or_default(), - } + .collect() + }, + event_name: log_record.event_name().unwrap_or_default().into(), + severity_number: severity_number.into(), + severity_text: log_record + .severity_text() + .map(Into::into) + .unwrap_or_default(), + body: log_record.body().cloned().map(any_value_from_logs_any_value), + dropped_attributes_count: 0, + flags: trace_context + .map(|ctx| { + ctx.trace_flags + .map(|flags| flags.to_u8() as u32) + .unwrap_or_default() + }) + .unwrap_or_default(), + span_id: trace_context + .map(|ctx| ctx.span_id.to_bytes().to_vec()) + .unwrap_or_default(), + trace_id: trace_context + .map(|ctx| ctx.trace_id.to_bytes().to_vec()) + .unwrap_or_default(), } } - impl - From<( - ( - &opentelemetry_sdk::logs::SdkLogRecord, - &opentelemetry::InstrumentationScope, - ), - &ResourceAttributesWithSchema, - )> for ResourceLogs - { - fn from( - data: ( - ( - &opentelemetry_sdk::logs::SdkLogRecord, - &opentelemetry::InstrumentationScope, - ), - &ResourceAttributesWithSchema, - ), - ) -> Self { - let ((log_record, instrumentation), resource) = data; - - ResourceLogs { - resource: Some(Resource { - attributes: resource.attributes.0.clone(), - dropped_attributes_count: 0, - entity_refs: vec![], - }), - schema_url: resource.schema_url.clone().unwrap_or_default(), - scope_logs: vec![ScopeLogs { - schema_url: instrumentation - .schema_url() - .map(ToOwned::to_owned) - .unwrap_or_default(), - scope: Some((instrumentation, log_record.target().cloned()).into()), - log_records: vec![log_record.into()], - }], - } + /// Converts log data to resource logs. + pub fn log_data_to_resource_logs( + log_record: &opentelemetry_sdk::logs::SdkLogRecord, + instrumentation: &opentelemetry::InstrumentationScope, + resource: &ResourceAttributesWithSchema, + ) -> ResourceLogs { + ResourceLogs { + resource: Some(Resource { + attributes: resource.attributes.0.clone(), + dropped_attributes_count: 0, + entity_refs: vec![], + }), + schema_url: resource.schema_url.clone().unwrap_or_default(), + scope_logs: vec![ScopeLogs { + schema_url: instrumentation + .schema_url() + .map(ToOwned::to_owned) + .unwrap_or_default(), + scope: Some(instrumentation_scope_from_scope_ref_and_target(instrumentation, log_record.target().cloned())), + log_records: vec![sdk_log_record_to_proto_log_record(log_record)], + }], } } + /// Groups logs by resource and instrumentation scope. pub fn group_logs_by_resource_and_scope( logs: LogBatch<'_>, resource: &ResourceAttributesWithSchema, @@ -195,14 +172,14 @@ pub mod tonic { let scope_logs = scope_map .into_iter() .map(|(key, log_data)| ScopeLogs { - scope: Some(InstrumentationScope::from(( + scope: Some(instrumentation_scope_from_scope_ref_and_target( log_data.first().unwrap().1, Some(key.into_owned().into()), - ))), + )), schema_url: resource.schema_url.clone().unwrap_or_default(), log_records: log_data .into_iter() - .map(|(log_record, _)| log_record.into()) + .map(|(log_record, _)| sdk_log_record_to_proto_log_record(log_record)) .collect(), }) .collect(); @@ -271,7 +248,7 @@ mod tests { let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema let grouped_logs = - crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource); + super::tonic::group_logs_by_resource_and_scope(log_batch, &resource); assert_eq!(grouped_logs.len(), 1); let resource_logs = &grouped_logs[0]; @@ -291,7 +268,7 @@ mod tests { let log_batch = LogBatch::new(&logs); let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema let grouped_logs = - crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource); + super::tonic::group_logs_by_resource_and_scope(log_batch, &resource); assert_eq!(grouped_logs.len(), 1); let resource_logs = &grouped_logs[0]; diff --git a/opentelemetry-otlp/src/transform/metrics.rs b/opentelemetry-otlp/src/transform/metrics.rs new file mode 100644 index 0000000000..ab84d693cd --- /dev/null +++ b/opentelemetry-otlp/src/transform/metrics.rs @@ -0,0 +1,347 @@ +// The prost currently will generate a non optional deprecated field for labels. +// We cannot assign value to it otherwise clippy will complain. +// We cannot ignore it as it's not an optional field. +// We can remove this after we removed the labels field from proto. +#[allow(deprecated)] +#[cfg(any(feature = "http-proto", feature = "http-json", feature = "grpc-tonic"))] +/// Tonic-specific transformation utilities for metrics. +pub mod tonic { + use std::fmt::Debug; + + use opentelemetry::{otel_debug, Key, Value}; + use opentelemetry_sdk::metrics::data::{ + AggregatedMetrics, Exemplar as SdkExemplar, + ExponentialHistogram as SdkExponentialHistogram, Gauge as SdkGauge, + Histogram as SdkHistogram, Metric as SdkMetric, MetricData, ResourceMetrics, + ScopeMetrics as SdkScopeMetrics, Sum as SdkSum, + }; + use opentelemetry_sdk::metrics::Temporality; + use opentelemetry_sdk::Resource as SdkResource; + + use opentelemetry_proto::tonic::{ + collector::metrics::v1::ExportMetricsServiceRequest, + common::v1::KeyValue, + metrics::v1::{ + exemplar, exemplar::Value as TonicExemplarValue, + exponential_histogram_data_point::Buckets as TonicBuckets, + metric::Data as TonicMetricData, number_data_point, + number_data_point::Value as TonicDataPointValue, + AggregationTemporality, + DataPointFlags as TonicDataPointFlags, Exemplar as TonicExemplar, + ExponentialHistogram as TonicExponentialHistogram, + ExponentialHistogramDataPoint as TonicExponentialHistogramDataPoint, + Gauge as TonicGauge, Histogram as TonicHistogram, + HistogramDataPoint as TonicHistogramDataPoint, Metric as TonicMetric, + NumberDataPoint as TonicNumberDataPoint, ResourceMetrics as TonicResourceMetrics, + ScopeMetrics as TonicScopeMetrics, Sum as TonicSum, + }, + resource::v1::Resource as TonicResource, + }; + use crate::transform::common::{ + to_nanos, + tonic::{instrumentation_scope_from_scope_ref_and_target, value_to_any_value}, + }; + + /// Converts u64 value to exemplar value. + pub fn exemplar_value_from_u64(value: u64) -> exemplar::Value { + exemplar::Value::AsInt(i64::try_from(value).unwrap_or_default()) + } + + /// Converts i64 value to exemplar value. + pub fn exemplar_value_from_i64(value: i64) -> exemplar::Value { + exemplar::Value::AsInt(value) + } + + /// Converts f64 value to exemplar value. + pub fn exemplar_value_from_f64(value: f64) -> exemplar::Value { + exemplar::Value::AsDouble(value) + } + + /// Converts u64 value to number data point value. + pub fn number_data_point_value_from_u64(value: u64) -> number_data_point::Value { + number_data_point::Value::AsInt(i64::try_from(value).unwrap_or_default()) + } + + /// Converts i64 value to number data point value. + pub fn number_data_point_value_from_i64(value: i64) -> number_data_point::Value { + number_data_point::Value::AsInt(value) + } + + /// Converts f64 value to number data point value. + pub fn number_data_point_value_from_f64(value: f64) -> number_data_point::Value { + number_data_point::Value::AsDouble(value) + } + + /// Converts key-value reference to protobuf key-value. + pub fn key_value_from_key_value_ref(kv: (&Key, &Value)) -> KeyValue { + KeyValue { + key: kv.0.to_string(), + value: Some(value_to_any_value(kv.1.clone())), + } + } + + /// Converts OpenTelemetry key-value to protobuf key-value. + pub fn key_value_from_otel_key_value(kv: &opentelemetry::KeyValue) -> KeyValue { + KeyValue { + key: kv.key.to_string(), + value: Some(value_to_any_value(kv.value.clone())), + } + } + + /// Converts SDK temporality to protobuf aggregation temporality. + pub fn temporality_to_aggregation_temporality(temporality: Temporality) -> AggregationTemporality { + match temporality { + Temporality::Cumulative => AggregationTemporality::Cumulative, + Temporality::Delta => AggregationTemporality::Delta, + other => { + otel_debug!( + name: "AggregationTemporality::Unknown", + message = "Unknown temporality,using default instead.", + unknown_temporality = format!("{:?}", other), + default_temporality = format!("{:?}", Temporality::Cumulative) + ); + AggregationTemporality::Cumulative + } + } + } + + /// Converts resource metrics to export request. + pub fn resource_metrics_to_export_request(rm: &ResourceMetrics) -> ExportMetricsServiceRequest { + ExportMetricsServiceRequest { + resource_metrics: vec![TonicResourceMetrics { + resource: Some(sdk_resource_to_tonic_resource(rm.resource())), + scope_metrics: rm.scope_metrics().map(sdk_scope_metrics_to_tonic_scope_metrics).collect(), + schema_url: rm + .resource() + .schema_url() + .map(|s| s.to_string()) + .unwrap_or_default(), + }], + } + } + + /// Converts SDK resource to protobuf resource. + pub fn sdk_resource_to_tonic_resource(resource: &SdkResource) -> TonicResource { + TonicResource { + attributes: resource.iter().map(key_value_from_key_value_ref).collect(), + dropped_attributes_count: 0, + entity_refs: vec![], // internal and currently unused + } + } + + /// Converts SDK scope metrics to protobuf scope metrics. + pub fn sdk_scope_metrics_to_tonic_scope_metrics(sm: &SdkScopeMetrics) -> TonicScopeMetrics { + TonicScopeMetrics { + scope: Some(instrumentation_scope_from_scope_ref_and_target(sm.scope(), None)), + metrics: sm.metrics().map(sdk_metric_to_tonic_metric).collect(), + schema_url: sm + .scope() + .schema_url() + .map(ToOwned::to_owned) + .unwrap_or_default(), + } + } + + /// Converts SDK metric to protobuf metric. + pub fn sdk_metric_to_tonic_metric(metric: &SdkMetric) -> TonicMetric { + TonicMetric { + name: metric.name().to_string(), + description: metric.description().to_string(), + unit: metric.unit().to_string(), + metadata: vec![], // internal and currently unused + data: Some(match metric.data() { + AggregatedMetrics::F64(data) => metric_data_to_tonic_metric_data(data), + AggregatedMetrics::U64(data) => metric_data_to_tonic_metric_data(data), + AggregatedMetrics::I64(data) => metric_data_to_tonic_metric_data(data), + }), + } + } + + /// Converts SDK metric data to protobuf metric data. + pub fn metric_data_to_tonic_metric_data(data: &MetricData) -> TonicMetricData + where + T: Numeric + Debug, + { + match data { + MetricData::Gauge(gauge) => TonicMetricData::Gauge(sdk_gauge_to_tonic_gauge(gauge)), + MetricData::Sum(sum) => TonicMetricData::Sum(sdk_sum_to_tonic_sum(sum)), + MetricData::Histogram(hist) => TonicMetricData::Histogram(sdk_histogram_to_tonic_histogram(hist)), + MetricData::ExponentialHistogram(hist) => { + TonicMetricData::ExponentialHistogram(sdk_exponential_histogram_to_tonic_exponential_histogram(hist)) + } + } + } + + /// Trait for numeric values that can be converted to protobuf values. + pub trait Numeric: Copy { + // lossy at large values for u64 and i64 but otlp histograms only handle float values + /// Converts the numeric value to f64. + fn into_f64(self) -> f64; + /// Converts the numeric value to protobuf data point value. + fn to_data_point_value(self) -> TonicDataPointValue; + /// Converts the numeric value to protobuf exemplar value. + fn to_exemplar_value(self) -> TonicExemplarValue; + } + + impl Numeric for u64 { + fn into_f64(self) -> f64 { + self as f64 + } + + fn to_data_point_value(self) -> TonicDataPointValue { + number_data_point_value_from_u64(self) + } + + fn to_exemplar_value(self) -> TonicExemplarValue { + exemplar_value_from_u64(self) + } + } + + impl Numeric for i64 { + fn into_f64(self) -> f64 { + self as f64 + } + + fn to_data_point_value(self) -> TonicDataPointValue { + number_data_point_value_from_i64(self) + } + + fn to_exemplar_value(self) -> TonicExemplarValue { + exemplar_value_from_i64(self) + } + } + + impl Numeric for f64 { + fn into_f64(self) -> f64 { + self + } + + fn to_data_point_value(self) -> TonicDataPointValue { + number_data_point_value_from_f64(self) + } + + fn to_exemplar_value(self) -> TonicExemplarValue { + exemplar_value_from_f64(self) + } + } + + /// Converts SDK histogram to protobuf histogram. + pub fn sdk_histogram_to_tonic_histogram(hist: &SdkHistogram) -> TonicHistogram + where + T: Numeric, + { + TonicHistogram { + data_points: hist + .data_points() + .map(|dp| TonicHistogramDataPoint { + attributes: dp.attributes().map(|kv| key_value_from_otel_key_value(kv)).collect(), + start_time_unix_nano: to_nanos(hist.start_time()), + time_unix_nano: to_nanos(hist.time()), + count: dp.count(), + sum: Some(dp.sum().into_f64()), + bucket_counts: dp.bucket_counts().collect(), + explicit_bounds: dp.bounds().collect(), + exemplars: dp.exemplars().map(|ex| sdk_exemplar_to_tonic_exemplar(ex)).collect(), + flags: TonicDataPointFlags::default() as u32, + min: dp.min().map(Numeric::into_f64), + max: dp.max().map(Numeric::into_f64), + }) + .collect(), + aggregation_temporality: temporality_to_aggregation_temporality(hist.temporality()).into(), + } + } + + /// Converts SDK exponential histogram to protobuf exponential histogram. + pub fn sdk_exponential_histogram_to_tonic_exponential_histogram(hist: &SdkExponentialHistogram) -> TonicExponentialHistogram + where + T: Numeric, + { + TonicExponentialHistogram { + data_points: hist + .data_points() + .map(|dp| TonicExponentialHistogramDataPoint { + attributes: dp.attributes().map(|kv| key_value_from_otel_key_value(kv)).collect(), + start_time_unix_nano: to_nanos(hist.start_time()), + time_unix_nano: to_nanos(hist.time()), + count: dp.count() as u64, + sum: Some(dp.sum().into_f64()), + scale: dp.scale().into(), + zero_count: dp.zero_count(), + positive: Some(TonicBuckets { + offset: dp.positive_bucket().offset(), + bucket_counts: dp.positive_bucket().counts().collect(), + }), + negative: Some(TonicBuckets { + offset: dp.negative_bucket().offset(), + bucket_counts: dp.negative_bucket().counts().collect(), + }), + flags: TonicDataPointFlags::default() as u32, + exemplars: dp.exemplars().map(|ex| sdk_exemplar_to_tonic_exemplar(ex)).collect(), + min: dp.min().map(Numeric::into_f64), + max: dp.max().map(Numeric::into_f64), + zero_threshold: dp.zero_threshold(), + }) + .collect(), + aggregation_temporality: temporality_to_aggregation_temporality(hist.temporality()).into(), + } + } + + /// Converts SDK sum to protobuf sum. + pub fn sdk_sum_to_tonic_sum(sum: &SdkSum) -> TonicSum + where + T: Numeric + Debug, + { + TonicSum { + data_points: sum + .data_points() + .map(|dp| TonicNumberDataPoint { + attributes: dp.attributes().map(|kv| key_value_from_otel_key_value(kv)).collect(), + start_time_unix_nano: to_nanos(sum.start_time()), + time_unix_nano: to_nanos(sum.time()), + exemplars: dp.exemplars().map(|ex| sdk_exemplar_to_tonic_exemplar(ex)).collect(), + flags: TonicDataPointFlags::default() as u32, + value: Some(dp.value().to_data_point_value()), + }) + .collect(), + aggregation_temporality: temporality_to_aggregation_temporality(sum.temporality()).into(), + is_monotonic: sum.is_monotonic(), + } + } + + /// Converts SDK gauge to protobuf gauge. + pub fn sdk_gauge_to_tonic_gauge(gauge: &SdkGauge) -> TonicGauge + where + T: Numeric + Debug, + { + TonicGauge { + data_points: gauge + .data_points() + .map(|dp| TonicNumberDataPoint { + attributes: dp.attributes().map(|kv| key_value_from_otel_key_value(kv)).collect(), + start_time_unix_nano: gauge.start_time().map(to_nanos).unwrap_or_default(), + time_unix_nano: to_nanos(gauge.time()), + exemplars: dp.exemplars().map(|ex| sdk_exemplar_to_tonic_exemplar(ex)).collect(), + flags: TonicDataPointFlags::default() as u32, + value: Some(dp.value().to_data_point_value()), + }) + .collect(), + } + } + + /// Converts SDK exemplar to protobuf exemplar. + pub fn sdk_exemplar_to_tonic_exemplar(ex: &SdkExemplar) -> TonicExemplar + where + T: Numeric, + { + TonicExemplar { + filtered_attributes: ex + .filtered_attributes() + .map(|kv| key_value_from_key_value_ref((&kv.key, &kv.value))) + .collect(), + time_unix_nano: to_nanos(ex.time()), + span_id: ex.span_id().into(), + trace_id: ex.trace_id().into(), + value: Some(ex.value.to_exemplar_value()), + } + } +} diff --git a/opentelemetry-proto/src/transform/mod.rs b/opentelemetry-otlp/src/transform/mod.rs similarity index 50% rename from opentelemetry-proto/src/transform/mod.rs rename to opentelemetry-otlp/src/transform/mod.rs index f0b7b86d3d..8f6fdb431b 100644 --- a/opentelemetry-proto/src/transform/mod.rs +++ b/opentelemetry-otlp/src/transform/mod.rs @@ -1,12 +1,18 @@ +//! Common transformation utilities for converting SDK types to protobuf types. + +/// Common transformation utilities. pub mod common; #[cfg(feature = "metrics")] +/// Metrics transformation utilities. pub mod metrics; #[cfg(feature = "trace")] +/// Trace transformation utilities. pub mod trace; #[cfg(feature = "logs")] +/// Logs transformation utilities. pub mod logs; #[cfg(feature = "zpages")] diff --git a/opentelemetry-proto/src/transform/profiles.rs b/opentelemetry-otlp/src/transform/profiles.rs similarity index 100% rename from opentelemetry-proto/src/transform/profiles.rs rename to opentelemetry-otlp/src/transform/profiles.rs diff --git a/opentelemetry-otlp/src/transform/trace.rs b/opentelemetry-otlp/src/transform/trace.rs new file mode 100644 index 0000000000..cfcb67ebcc --- /dev/null +++ b/opentelemetry-otlp/src/transform/trace.rs @@ -0,0 +1,315 @@ +#[cfg(any(feature = "http-proto", feature = "http-json", feature = "grpc-tonic"))] +/// Tonic-specific transformation utilities for traces. +pub mod tonic { + use opentelemetry_proto::tonic::resource::v1::Resource; + use opentelemetry_proto::tonic::trace::v1::{span, status, ResourceSpans, ScopeSpans, Span, Status}; + use crate::transform::common::{ + to_nanos, + tonic::{Attributes, ResourceAttributesWithSchema, instrumentation_scope_from_scope_and_target, instrumentation_scope_from_scope_ref_and_target}, + }; + use opentelemetry::trace; + use opentelemetry::trace::{Link, SpanId, SpanKind}; + use opentelemetry_sdk::trace::SpanData; + use std::collections::HashMap; + + /// Converts SDK span kind to protobuf span kind. + pub fn span_kind_to_proto_span_kind(span_kind: SpanKind) -> span::SpanKind { + match span_kind { + SpanKind::Client => span::SpanKind::Client, + SpanKind::Consumer => span::SpanKind::Consumer, + SpanKind::Internal => span::SpanKind::Internal, + SpanKind::Producer => span::SpanKind::Producer, + SpanKind::Server => span::SpanKind::Server, + } + } + + /// Converts SDK trace status to protobuf status code. + pub fn trace_status_to_proto_status_code(status: &trace::Status) -> status::StatusCode { + match status { + trace::Status::Ok => status::StatusCode::Ok, + trace::Status::Unset => status::StatusCode::Unset, + trace::Status::Error { .. } => status::StatusCode::Error, + } + } + + /// Converts SDK link to protobuf link. + pub fn link_to_proto_link(link: Link) -> span::Link { + span::Link { + trace_id: link.span_context.trace_id().to_bytes().to_vec(), + span_id: link.span_context.span_id().to_bytes().to_vec(), + trace_state: link.span_context.trace_state().header(), + attributes: Attributes::from(link.attributes).0, + dropped_attributes_count: link.dropped_attributes_count, + flags: link.span_context.trace_flags().to_u8() as u32, + } + } + + /// Converts SDK span data to protobuf span. + pub fn span_data_to_proto_span(source_span: opentelemetry_sdk::trace::SpanData) -> Span { + let span_kind: span::SpanKind = span_kind_to_proto_span_kind(source_span.span_kind); + Span { + trace_id: source_span.span_context.trace_id().to_bytes().to_vec(), + span_id: source_span.span_context.span_id().to_bytes().to_vec(), + trace_state: source_span.span_context.trace_state().header(), + parent_span_id: { + if source_span.parent_span_id != SpanId::INVALID { + source_span.parent_span_id.to_bytes().to_vec() + } else { + vec![] + } + }, + flags: source_span.span_context.trace_flags().to_u8() as u32, + name: source_span.name.into_owned(), + kind: span_kind as i32, + start_time_unix_nano: to_nanos(source_span.start_time), + end_time_unix_nano: to_nanos(source_span.end_time), + dropped_attributes_count: source_span.dropped_attributes_count, + attributes: Attributes::from(source_span.attributes).0, + dropped_events_count: source_span.events.dropped_count, + events: source_span + .events + .into_iter() + .map(|event| span::Event { + time_unix_nano: to_nanos(event.timestamp), + name: event.name.into(), + attributes: Attributes::from(event.attributes).0, + dropped_attributes_count: event.dropped_attributes_count, + }) + .collect(), + dropped_links_count: source_span.links.dropped_count, + links: source_span.links.into_iter().map(link_to_proto_link).collect(), + status: Some(Status { + code: trace_status_to_proto_status_code(&source_span.status).into(), + message: match source_span.status { + trace::Status::Error { description } => description.to_string(), + _ => Default::default(), + }, + }), + } + } + + /// Creates a new resource spans from span data and resource. + pub fn new_resource_spans(source_span: SpanData, resource: &ResourceAttributesWithSchema) -> ResourceSpans { + ResourceSpans { + resource: Some(Resource { + attributes: resource.attributes.0.clone(), + dropped_attributes_count: 0, + entity_refs: vec![], + }), + schema_url: resource.schema_url.clone().unwrap_or_default(), + scope_spans: vec![ScopeSpans { + schema_url: source_span + .instrumentation_scope + .schema_url() + .map(ToOwned::to_owned) + .unwrap_or_default(), + scope: Some(instrumentation_scope_from_scope_and_target(source_span.instrumentation_scope.clone(), None)), + spans: vec![span_data_to_proto_span(source_span)], + }], + } + } + + /// Groups spans by resource and instrumentation scope. + pub fn group_spans_by_resource_and_scope( + spans: Vec, + resource: &ResourceAttributesWithSchema, + ) -> Vec { + // Group spans by their instrumentation scope + let scope_map = spans.iter().fold( + HashMap::new(), + |mut scope_map: HashMap<&opentelemetry::InstrumentationScope, Vec<&SpanData>>, span| { + let instrumentation = &span.instrumentation_scope; + scope_map.entry(instrumentation).or_default().push(span); + scope_map + }, + ); + + // Convert the grouped spans into ScopeSpans + let scope_spans = scope_map + .into_iter() + .map(|(instrumentation, span_records)| ScopeSpans { + scope: Some(instrumentation_scope_from_scope_ref_and_target(instrumentation, None)), + schema_url: resource.schema_url.clone().unwrap_or_default(), + spans: span_records + .into_iter() + .map(|span_data| span_data_to_proto_span(span_data.clone())) + .collect(), + }) + .collect(); + + // Wrap ScopeSpans into a single ResourceSpans + vec![ResourceSpans { + resource: Some(Resource { + attributes: resource.attributes.0.clone(), + dropped_attributes_count: 0, + entity_refs: vec![], + }), + scope_spans, + schema_url: resource.schema_url.clone().unwrap_or_default(), + }] + } + +} // End of tonic module + +#[cfg(test)] +mod tests { + use opentelemetry_proto::tonic::common::v1::any_value::Value; + use crate::transform::common::tonic::ResourceAttributesWithSchema; + use opentelemetry::time::now; + use opentelemetry::trace::{ + SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState, + }; + use opentelemetry::InstrumentationScope; + use opentelemetry::KeyValue; + use opentelemetry_sdk::resource::Resource; + use opentelemetry_sdk::trace::SpanData; + use opentelemetry_sdk::trace::{SpanEvents, SpanLinks}; + use std::borrow::Cow; + use std::time::Duration; + + fn create_test_span_data(instrumentation_name: &'static str) -> SpanData { + let span_context = SpanContext::new( + TraceId::from_u128(123), + SpanId::from_u64(456), + TraceFlags::default(), + false, + TraceState::default(), + ); + + SpanData { + span_context, + parent_span_id: SpanId::from_u64(0), + span_kind: SpanKind::Internal, + name: Cow::Borrowed("test_span"), + start_time: now(), + end_time: now() + Duration::from_secs(1), + attributes: vec![KeyValue::new("key", "value")], + dropped_attributes_count: 0, + events: SpanEvents::default(), + links: SpanLinks::default(), + status: Status::Unset, + instrumentation_scope: InstrumentationScope::builder(instrumentation_name).build(), + } + } + + #[test] + fn test_group_spans_by_resource_and_scope_single_scope() { + let resource = Resource::builder_empty() + .with_attribute(KeyValue::new("resource_key", "resource_value")) + .build(); + let span_data = create_test_span_data("lib1"); + + let spans = vec![span_data.clone()]; + let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema + + let grouped_spans = + super::tonic::group_spans_by_resource_and_scope(spans, &resource); + + assert_eq!(grouped_spans.len(), 1); + + let resource_spans = &grouped_spans[0]; + assert_eq!( + resource_spans.resource.as_ref().unwrap().attributes.len(), + 1 + ); + assert_eq!( + resource_spans.resource.as_ref().unwrap().attributes[0].key, + "resource_key" + ); + assert_eq!( + resource_spans.resource.as_ref().unwrap().attributes[0] + .value + .clone() + .unwrap() + .value + .unwrap(), + Value::StringValue("resource_value".to_string()) + ); + + let scope_spans = &resource_spans.scope_spans; + assert_eq!(scope_spans.len(), 1); + + let scope_span = &scope_spans[0]; + assert_eq!(scope_span.scope.as_ref().unwrap().name, "lib1"); + assert_eq!(scope_span.spans.len(), 1); + + assert_eq!( + scope_span.spans[0].trace_id, + span_data.span_context.trace_id().to_bytes().to_vec() + ); + } + + #[test] + fn test_group_spans_by_resource_and_scope_multiple_scopes() { + let resource = Resource::builder_empty() + .with_attribute(KeyValue::new("resource_key", "resource_value")) + .build(); + let span_data1 = create_test_span_data("lib1"); + let span_data2 = create_test_span_data("lib1"); + let span_data3 = create_test_span_data("lib2"); + + let spans = vec![span_data1.clone(), span_data2.clone(), span_data3.clone()]; + let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema + + let grouped_spans = + super::tonic::group_spans_by_resource_and_scope(spans, &resource); + + assert_eq!(grouped_spans.len(), 1); + + let resource_spans = &grouped_spans[0]; + assert_eq!( + resource_spans.resource.as_ref().unwrap().attributes.len(), + 1 + ); + assert_eq!( + resource_spans.resource.as_ref().unwrap().attributes[0].key, + "resource_key" + ); + assert_eq!( + resource_spans.resource.as_ref().unwrap().attributes[0] + .value + .clone() + .unwrap() + .value + .unwrap(), + Value::StringValue("resource_value".to_string()) + ); + + let scope_spans = &resource_spans.scope_spans; + assert_eq!(scope_spans.len(), 2); + + // Check the scope spans for both lib1 and lib2 + let mut lib1_scope_span = None; + let mut lib2_scope_span = None; + + for scope_span in scope_spans { + match scope_span.scope.as_ref().unwrap().name.as_str() { + "lib1" => lib1_scope_span = Some(scope_span), + "lib2" => lib2_scope_span = Some(scope_span), + _ => {} + } + } + + let lib1_scope_span = lib1_scope_span.expect("lib1 scope span not found"); + let lib2_scope_span = lib2_scope_span.expect("lib2 scope span not found"); + + assert_eq!(lib1_scope_span.scope.as_ref().unwrap().name, "lib1"); + assert_eq!(lib2_scope_span.scope.as_ref().unwrap().name, "lib2"); + + assert_eq!(lib1_scope_span.spans.len(), 2); + assert_eq!(lib2_scope_span.spans.len(), 1); + + assert_eq!( + lib1_scope_span.spans[0].trace_id, + span_data1.span_context.trace_id().to_bytes().to_vec() + ); + assert_eq!( + lib1_scope_span.spans[1].trace_id, + span_data2.span_context.trace_id().to_bytes().to_vec() + ); + assert_eq!( + lib2_scope_span.spans[0].trace_id, + span_data3.span_context.trace_id().to_bytes().to_vec() + ); + } +} diff --git a/opentelemetry-otlp/src/transform/tracez.rs b/opentelemetry-otlp/src/transform/tracez.rs new file mode 100644 index 0000000000..f6f1d718f4 --- /dev/null +++ b/opentelemetry-otlp/src/transform/tracez.rs @@ -0,0 +1,65 @@ +#[cfg(all(feature = "grpc-tonic", feature = "zpages"))] +mod tonic { + use opentelemetry::trace::{Event, Status}; + use opentelemetry_sdk::trace::SpanData; + + use opentelemetry_proto::tonic::{ + trace::v1::{span::Event as SpanEvent, Status as SpanStatus}, + tracez::v1::{ErrorData, LatencyData, RunningData}, + }; + use crate::transform::common::{to_nanos, tonic::Attributes}; + use crate::transform::trace::tonic::link_to_proto_link; + + pub fn span_data_to_latency_data(span_data: SpanData) -> LatencyData { + LatencyData { + traceid: span_data.span_context.trace_id().to_bytes().to_vec(), + spanid: span_data.span_context.span_id().to_bytes().to_vec(), + parentid: span_data.parent_span_id.to_bytes().to_vec(), + starttime: to_nanos(span_data.start_time), + endtime: to_nanos(span_data.end_time), + attributes: Attributes::from(span_data.attributes).0, + events: span_data.events.iter().cloned().map(event_to_span_event).collect(), + links: span_data.links.iter().cloned().map(link_to_proto_link).collect(), + } + } + + pub fn span_data_to_error_data(span_data: SpanData) -> ErrorData { + ErrorData { + traceid: span_data.span_context.trace_id().to_bytes().to_vec(), + spanid: span_data.span_context.span_id().to_bytes().to_vec(), + parentid: span_data.parent_span_id.to_bytes().to_vec(), + starttime: to_nanos(span_data.start_time), + attributes: Attributes::from(span_data.attributes).0, + events: span_data.events.iter().cloned().map(event_to_span_event).collect(), + links: span_data.links.iter().cloned().map(link_to_proto_link).collect(), + status: match span_data.status { + Status::Error { description } => Some(SpanStatus { + message: description.to_string(), + code: 2, + }), + _ => None, + }, + } + } + + pub fn span_data_to_running_data(span_data: SpanData) -> RunningData { + RunningData { + traceid: span_data.span_context.trace_id().to_bytes().to_vec(), + spanid: span_data.span_context.span_id().to_bytes().to_vec(), + parentid: span_data.parent_span_id.to_bytes().to_vec(), + starttime: to_nanos(span_data.start_time), + attributes: Attributes::from(span_data.attributes).0, + events: span_data.events.iter().cloned().map(event_to_span_event).collect(), + links: span_data.links.iter().cloned().map(link_to_proto_link).collect(), + } + } + + pub fn event_to_span_event(event: Event) -> SpanEvent { + SpanEvent { + time_unix_nano: to_nanos(event.timestamp), + name: event.name.to_string(), + attributes: Attributes::from(event.attributes).0, + dropped_attributes_count: event.dropped_attributes_count, + } + } +} diff --git a/opentelemetry-otlp/tests/integration_test/Cargo.toml b/opentelemetry-otlp/tests/integration_test/Cargo.toml index 6b156d18fc..b78e84a90e 100644 --- a/opentelemetry-otlp/tests/integration_test/Cargo.toml +++ b/opentelemetry-otlp/tests/integration_test/Cargo.toml @@ -10,7 +10,7 @@ autobenches = false [dependencies] opentelemetry = { path = "../../../opentelemetry", features = [] } opentelemetry_sdk = { path = "../../../opentelemetry-sdk", features = ["rt-tokio", "testing"] } -opentelemetry-proto = { path = "../../../opentelemetry-proto", features = ["gen-tonic-messages", "trace", "logs", "metrics", "with-serde"] } +opentelemetry-proto = { path = "../../../opentelemetry-proto", features = ["prost", "trace", "logs", "metrics", "with-serde"] } tokio = { workspace = true, features = ["full"] } serde_json = { workspace = true } testcontainers = { workspace = true, features = ["http_wait"] } diff --git a/opentelemetry-proto/Cargo.toml b/opentelemetry-proto/Cargo.toml index d9b00bfffc..4950893921 100644 --- a/opentelemetry-proto/Cargo.toml +++ b/opentelemetry-proto/Cargo.toml @@ -34,14 +34,13 @@ path = "tests/json_serde.rs" default = ["full"] full = ["gen-tonic", "trace", "logs", "metrics", "zpages", "with-serde", "internal-logs"] -# crates used to generate rs files -gen-tonic = ["gen-tonic-messages", "tonic/channel"] -gen-tonic-messages = ["tonic", "prost"] +# crates used to generate rs files +gen-tonic = ["tonic", "prost", "tonic/channel"] # telemetry pillars and functions -trace = ["opentelemetry/trace", "opentelemetry_sdk/trace"] -metrics = ["opentelemetry/metrics", "opentelemetry_sdk/metrics"] -logs = ["opentelemetry/logs", "opentelemetry_sdk/logs"] +trace = ["opentelemetry/trace"] +metrics = ["opentelemetry/metrics"] +logs = ["opentelemetry/logs"] zpages = ["trace"] profiles = [] testing = ["opentelemetry/testing"] @@ -55,9 +54,8 @@ with-serde = ["serde", "const-hex", "base64"] tonic = { workspace = true, optional = true, features = ["codegen", "prost"] } prost = { workspace = true, optional = true } opentelemetry = { version = "0.30", default-features = false, path = "../opentelemetry" } -opentelemetry_sdk = { version = "0.30", default-features = false, path = "../opentelemetry-sdk" } schemars = { workspace = true, optional = true } -serde = { workspace = true, optional = true, features = ["serde_derive"] } +serde = { workspace = true, optional = true, features = ["serde_derive", "std"] } const-hex = { workspace = true, optional = true } base64 = { workspace = true, optional = true } diff --git a/opentelemetry-proto/src/lib.rs b/opentelemetry-proto/src/lib.rs index 209df125a6..3e087dc19e 100644 --- a/opentelemetry-proto/src/lib.rs +++ b/opentelemetry-proto/src/lib.rs @@ -32,7 +32,6 @@ #[doc(hidden)] mod proto; -#[cfg(feature = "gen-tonic-messages")] +// Enable tonic module when prost is available for protobuf types +#[cfg(feature = "prost")] pub use proto::tonic; - -pub mod transform; diff --git a/opentelemetry-proto/src/proto.rs b/opentelemetry-proto/src/proto.rs index 3055f02a40..1f0937a4c0 100644 --- a/opentelemetry-proto/src/proto.rs +++ b/opentelemetry-proto/src/proto.rs @@ -1,7 +1,7 @@ /// provide serde support for proto traceIds and spanIds. /// Those are hex encoded strings in the jsons but they are byte arrays in the proto. /// See https://opentelemetry.io/docs/specs/otlp/#json-protobuf-encoding for more details -#[cfg(all(feature = "with-serde", feature = "gen-tonic-messages"))] +#[cfg(all(feature = "with-serde", feature = "prost"))] pub(crate) mod serializers { use crate::tonic::common::v1::any_value::{self, Value}; use crate::tonic::common::v1::AnyValue; @@ -215,7 +215,7 @@ pub(crate) mod serializers { } } -#[cfg(feature = "gen-tonic-messages")] +#[cfg(feature = "prost")] #[path = "proto/tonic"] /// Generated files using [`tonic`](https://docs.rs/crate/tonic) and [`prost`](https://docs.rs/crate/prost) pub mod tonic { @@ -297,6 +297,4 @@ pub mod tonic { #[path = "opentelemetry.proto.profiles.v1development.rs"] pub mod v1; } - - pub use crate::transform::common::tonic::Attributes; } diff --git a/opentelemetry-proto/src/transform/common.rs b/opentelemetry-proto/src/transform/common.rs deleted file mode 100644 index 37efc27199..0000000000 --- a/opentelemetry-proto/src/transform/common.rs +++ /dev/null @@ -1,179 +0,0 @@ -#[cfg(all( - feature = "gen-tonic-messages", - any(feature = "trace", feature = "metrics", feature = "logs") -))] -use std::time::{Duration, SystemTime, UNIX_EPOCH}; - -#[cfg(all( - feature = "gen-tonic-messages", - any(feature = "trace", feature = "metrics", feature = "logs") -))] -pub(crate) fn to_nanos(time: SystemTime) -> u64 { - time.duration_since(UNIX_EPOCH) - .unwrap_or_else(|_| Duration::from_secs(0)) - .as_nanos() as u64 -} - -#[cfg(feature = "gen-tonic-messages")] -pub mod tonic { - use crate::proto::tonic::common::v1::{ - any_value, AnyValue, ArrayValue, InstrumentationScope, KeyValue, - }; - use opentelemetry::{Array, Value}; - use std::borrow::Cow; - - #[cfg(any(feature = "trace", feature = "logs"))] - #[derive(Debug, Default)] - pub struct ResourceAttributesWithSchema { - pub attributes: Attributes, - pub schema_url: Option, - } - - #[cfg(any(feature = "trace", feature = "logs"))] - impl From<&opentelemetry_sdk::Resource> for ResourceAttributesWithSchema { - fn from(resource: &opentelemetry_sdk::Resource) -> Self { - ResourceAttributesWithSchema { - attributes: resource_attributes(resource), - schema_url: resource.schema_url().map(ToString::to_string), - } - } - } - - #[cfg(any(feature = "trace", feature = "logs"))] - use opentelemetry_sdk::Resource; - - impl - From<( - opentelemetry::InstrumentationScope, - Option>, - )> for InstrumentationScope - { - fn from( - data: ( - opentelemetry::InstrumentationScope, - Option>, - ), - ) -> Self { - let (library, target) = data; - if let Some(t) = target { - InstrumentationScope { - name: t.to_string(), - version: String::new(), - attributes: vec![], - ..Default::default() - } - } else { - InstrumentationScope { - name: library.name().to_owned(), - version: library.version().map(ToOwned::to_owned).unwrap_or_default(), - attributes: Attributes::from(library.attributes().cloned()).0, - ..Default::default() - } - } - } - } - - impl - From<( - &opentelemetry::InstrumentationScope, - Option>, - )> for InstrumentationScope - { - fn from( - data: ( - &opentelemetry::InstrumentationScope, - Option>, - ), - ) -> Self { - let (library, target) = data; - if let Some(t) = target { - InstrumentationScope { - name: t.to_string(), - version: String::new(), - attributes: vec![], - ..Default::default() - } - } else { - InstrumentationScope { - name: library.name().to_owned(), - version: library.version().map(ToOwned::to_owned).unwrap_or_default(), - attributes: Attributes::from(library.attributes().cloned()).0, - ..Default::default() - } - } - } - } - - /// Wrapper type for Vec<`KeyValue`> - #[derive(Default, Debug)] - pub struct Attributes(pub ::std::vec::Vec); - - impl> From for Attributes { - fn from(kvs: I) -> Self { - Attributes( - kvs.into_iter() - .map(|api_kv| KeyValue { - key: api_kv.key.as_str().to_string(), - value: Some(api_kv.value.into()), - }) - .collect(), - ) - } - } - - #[cfg(feature = "logs")] - impl, V: Into> FromIterator<(K, V)> for Attributes { - fn from_iter>(iter: T) -> Self { - Attributes( - iter.into_iter() - .map(|(k, v)| KeyValue { - key: k.into(), - value: Some(v.into()), - }) - .collect(), - ) - } - } - - impl From for AnyValue { - fn from(value: Value) -> Self { - AnyValue { - value: match value { - Value::Bool(val) => Some(any_value::Value::BoolValue(val)), - Value::I64(val) => Some(any_value::Value::IntValue(val)), - Value::F64(val) => Some(any_value::Value::DoubleValue(val)), - Value::String(val) => Some(any_value::Value::StringValue(val.to_string())), - Value::Array(array) => Some(any_value::Value::ArrayValue(match array { - Array::Bool(vals) => array_into_proto(vals), - Array::I64(vals) => array_into_proto(vals), - Array::F64(vals) => array_into_proto(vals), - Array::String(vals) => array_into_proto(vals), - _ => unreachable!("Nonexistent array type"), // Needs to be updated when new array types are added - })), - _ => unreachable!("Nonexistent value type"), // Needs to be updated when new value types are added - }, - } - } - } - - fn array_into_proto(vals: Vec) -> ArrayValue - where - Value: From, - { - let values = vals - .into_iter() - .map(|val| AnyValue::from(Value::from(val))) - .collect(); - - ArrayValue { values } - } - - #[cfg(any(feature = "trace", feature = "logs"))] - pub(crate) fn resource_attributes(resource: &Resource) -> Attributes { - resource - .iter() - .map(|(k, v)| opentelemetry::KeyValue::new(k.clone(), v.clone())) - .collect::>() - .into() - } -} diff --git a/opentelemetry-proto/src/transform/metrics.rs b/opentelemetry-proto/src/transform/metrics.rs deleted file mode 100644 index a9370d3d80..0000000000 --- a/opentelemetry-proto/src/transform/metrics.rs +++ /dev/null @@ -1,332 +0,0 @@ -// The prost currently will generate a non optional deprecated field for labels. -// We cannot assign value to it otherwise clippy will complain. -// We cannot ignore it as it's not an optional field. -// We can remove this after we removed the labels field from proto. -#[allow(deprecated)] -#[cfg(feature = "gen-tonic-messages")] -pub mod tonic { - use std::fmt::Debug; - - use opentelemetry::{otel_debug, Key, Value}; - use opentelemetry_sdk::metrics::data::{ - AggregatedMetrics, Exemplar as SdkExemplar, - ExponentialHistogram as SdkExponentialHistogram, Gauge as SdkGauge, - Histogram as SdkHistogram, Metric as SdkMetric, MetricData, ResourceMetrics, - ScopeMetrics as SdkScopeMetrics, Sum as SdkSum, - }; - use opentelemetry_sdk::metrics::Temporality; - use opentelemetry_sdk::Resource as SdkResource; - - use crate::proto::tonic::{ - collector::metrics::v1::ExportMetricsServiceRequest, - common::v1::KeyValue, - metrics::v1::{ - exemplar, exemplar::Value as TonicExemplarValue, - exponential_histogram_data_point::Buckets as TonicBuckets, - metric::Data as TonicMetricData, number_data_point, - number_data_point::Value as TonicDataPointValue, - AggregationTemporality as TonicTemporality, AggregationTemporality, - DataPointFlags as TonicDataPointFlags, Exemplar as TonicExemplar, - ExponentialHistogram as TonicExponentialHistogram, - ExponentialHistogramDataPoint as TonicExponentialHistogramDataPoint, - Gauge as TonicGauge, Histogram as TonicHistogram, - HistogramDataPoint as TonicHistogramDataPoint, Metric as TonicMetric, - NumberDataPoint as TonicNumberDataPoint, ResourceMetrics as TonicResourceMetrics, - ScopeMetrics as TonicScopeMetrics, Sum as TonicSum, - }, - resource::v1::Resource as TonicResource, - }; - use crate::transform::common::to_nanos; - - impl From for exemplar::Value { - fn from(value: u64) -> Self { - exemplar::Value::AsInt(i64::try_from(value).unwrap_or_default()) - } - } - - impl From for exemplar::Value { - fn from(value: i64) -> Self { - exemplar::Value::AsInt(value) - } - } - - impl From for exemplar::Value { - fn from(value: f64) -> Self { - exemplar::Value::AsDouble(value) - } - } - - impl From for number_data_point::Value { - fn from(value: u64) -> Self { - number_data_point::Value::AsInt(i64::try_from(value).unwrap_or_default()) - } - } - - impl From for number_data_point::Value { - fn from(value: i64) -> Self { - number_data_point::Value::AsInt(value) - } - } - - impl From for number_data_point::Value { - fn from(value: f64) -> Self { - number_data_point::Value::AsDouble(value) - } - } - - impl From<(&Key, &Value)> for KeyValue { - fn from(kv: (&Key, &Value)) -> Self { - KeyValue { - key: kv.0.to_string(), - value: Some(kv.1.clone().into()), - } - } - } - - impl From<&opentelemetry::KeyValue> for KeyValue { - fn from(kv: &opentelemetry::KeyValue) -> Self { - KeyValue { - key: kv.key.to_string(), - value: Some(kv.value.clone().into()), - } - } - } - - impl From for AggregationTemporality { - fn from(temporality: Temporality) -> Self { - match temporality { - Temporality::Cumulative => AggregationTemporality::Cumulative, - Temporality::Delta => AggregationTemporality::Delta, - other => { - otel_debug!( - name: "AggregationTemporality::Unknown", - message = "Unknown temporality,using default instead.", - unknown_temporality = format!("{:?}", other), - default_temporality = format!("{:?}", Temporality::Cumulative) - ); - AggregationTemporality::Cumulative - } - } - } - } - - impl From<&ResourceMetrics> for ExportMetricsServiceRequest { - fn from(rm: &ResourceMetrics) -> Self { - ExportMetricsServiceRequest { - resource_metrics: vec![TonicResourceMetrics { - resource: Some((rm.resource()).into()), - scope_metrics: rm.scope_metrics().map(Into::into).collect(), - schema_url: rm - .resource() - .schema_url() - .map(Into::into) - .unwrap_or_default(), - }], - } - } - } - - impl From<&SdkResource> for TonicResource { - fn from(resource: &SdkResource) -> Self { - TonicResource { - attributes: resource.iter().map(Into::into).collect(), - dropped_attributes_count: 0, - entity_refs: vec![], // internal and currently unused - } - } - } - - impl From<&SdkScopeMetrics> for TonicScopeMetrics { - fn from(sm: &SdkScopeMetrics) -> Self { - TonicScopeMetrics { - scope: Some((sm.scope(), None).into()), - metrics: sm.metrics().map(Into::into).collect(), - schema_url: sm - .scope() - .schema_url() - .map(ToOwned::to_owned) - .unwrap_or_default(), - } - } - } - - impl From<&SdkMetric> for TonicMetric { - fn from(metric: &SdkMetric) -> Self { - TonicMetric { - name: metric.name().to_string(), - description: metric.description().to_string(), - unit: metric.unit().to_string(), - metadata: vec![], // internal and currently unused - data: Some(match metric.data() { - AggregatedMetrics::F64(data) => data.into(), - AggregatedMetrics::U64(data) => data.into(), - AggregatedMetrics::I64(data) => data.into(), - }), - } - } - } - - impl From<&MetricData> for TonicMetricData - where - T: Numeric + Debug, - { - fn from(data: &MetricData) -> Self { - match data { - MetricData::Gauge(gauge) => TonicMetricData::Gauge(gauge.into()), - MetricData::Sum(sum) => TonicMetricData::Sum(sum.into()), - MetricData::Histogram(hist) => TonicMetricData::Histogram(hist.into()), - MetricData::ExponentialHistogram(hist) => { - TonicMetricData::ExponentialHistogram(hist.into()) - } - } - } - } - - trait Numeric: Into + Into + Copy { - // lossy at large values for u64 and i64 but otlp histograms only handle float values - fn into_f64(self) -> f64; - } - - impl Numeric for u64 { - fn into_f64(self) -> f64 { - self as f64 - } - } - - impl Numeric for i64 { - fn into_f64(self) -> f64 { - self as f64 - } - } - - impl Numeric for f64 { - fn into_f64(self) -> f64 { - self - } - } - - impl From<&SdkHistogram> for TonicHistogram - where - T: Numeric, - { - fn from(hist: &SdkHistogram) -> Self { - TonicHistogram { - data_points: hist - .data_points() - .map(|dp| TonicHistogramDataPoint { - attributes: dp.attributes().map(Into::into).collect(), - start_time_unix_nano: to_nanos(hist.start_time()), - time_unix_nano: to_nanos(hist.time()), - count: dp.count(), - sum: Some(dp.sum().into_f64()), - bucket_counts: dp.bucket_counts().collect(), - explicit_bounds: dp.bounds().collect(), - exemplars: dp.exemplars().map(Into::into).collect(), - flags: TonicDataPointFlags::default() as u32, - min: dp.min().map(Numeric::into_f64), - max: dp.max().map(Numeric::into_f64), - }) - .collect(), - aggregation_temporality: TonicTemporality::from(hist.temporality()).into(), - } - } - } - - impl From<&SdkExponentialHistogram> for TonicExponentialHistogram - where - T: Numeric, - { - fn from(hist: &SdkExponentialHistogram) -> Self { - TonicExponentialHistogram { - data_points: hist - .data_points() - .map(|dp| TonicExponentialHistogramDataPoint { - attributes: dp.attributes().map(Into::into).collect(), - start_time_unix_nano: to_nanos(hist.start_time()), - time_unix_nano: to_nanos(hist.time()), - count: dp.count() as u64, - sum: Some(dp.sum().into_f64()), - scale: dp.scale().into(), - zero_count: dp.zero_count(), - positive: Some(TonicBuckets { - offset: dp.positive_bucket().offset(), - bucket_counts: dp.positive_bucket().counts().collect(), - }), - negative: Some(TonicBuckets { - offset: dp.negative_bucket().offset(), - bucket_counts: dp.negative_bucket().counts().collect(), - }), - flags: TonicDataPointFlags::default() as u32, - exemplars: dp.exemplars().map(Into::into).collect(), - min: dp.min().map(Numeric::into_f64), - max: dp.max().map(Numeric::into_f64), - zero_threshold: dp.zero_threshold(), - }) - .collect(), - aggregation_temporality: TonicTemporality::from(hist.temporality()).into(), - } - } - } - - impl From<&SdkSum> for TonicSum - where - T: Debug + Into + Into + Copy, - { - fn from(sum: &SdkSum) -> Self { - TonicSum { - data_points: sum - .data_points() - .map(|dp| TonicNumberDataPoint { - attributes: dp.attributes().map(Into::into).collect(), - start_time_unix_nano: to_nanos(sum.start_time()), - time_unix_nano: to_nanos(sum.time()), - exemplars: dp.exemplars().map(Into::into).collect(), - flags: TonicDataPointFlags::default() as u32, - value: Some(dp.value().into()), - }) - .collect(), - aggregation_temporality: TonicTemporality::from(sum.temporality()).into(), - is_monotonic: sum.is_monotonic(), - } - } - } - - impl From<&SdkGauge> for TonicGauge - where - T: Debug + Into + Into + Copy, - { - fn from(gauge: &SdkGauge) -> Self { - TonicGauge { - data_points: gauge - .data_points() - .map(|dp| TonicNumberDataPoint { - attributes: dp.attributes().map(Into::into).collect(), - start_time_unix_nano: gauge.start_time().map(to_nanos).unwrap_or_default(), - time_unix_nano: to_nanos(gauge.time()), - exemplars: dp.exemplars().map(Into::into).collect(), - flags: TonicDataPointFlags::default() as u32, - value: Some(dp.value().into()), - }) - .collect(), - } - } - } - - impl From<&SdkExemplar> for TonicExemplar - where - T: Into + Copy, - { - fn from(ex: &SdkExemplar) -> Self { - TonicExemplar { - filtered_attributes: ex - .filtered_attributes() - .map(|kv| (&kv.key, &kv.value).into()) - .collect(), - time_unix_nano: to_nanos(ex.time()), - span_id: ex.span_id().into(), - trace_id: ex.trace_id().into(), - value: Some(ex.value.into()), - } - } - } -} diff --git a/opentelemetry-proto/src/transform/trace.rs b/opentelemetry-proto/src/transform/trace.rs deleted file mode 100644 index 231834a30d..0000000000 --- a/opentelemetry-proto/src/transform/trace.rs +++ /dev/null @@ -1,355 +0,0 @@ -#[cfg(feature = "gen-tonic-messages")] -pub mod tonic { - use crate::proto::tonic::resource::v1::Resource; - use crate::proto::tonic::trace::v1::{span, status, ResourceSpans, ScopeSpans, Span, Status}; - use crate::transform::common::{ - to_nanos, - tonic::{Attributes, ResourceAttributesWithSchema}, - }; - use opentelemetry::trace; - use opentelemetry::trace::{Link, SpanId, SpanKind}; - use opentelemetry_sdk::trace::SpanData; - use std::collections::HashMap; - - impl From for span::SpanKind { - fn from(span_kind: SpanKind) -> Self { - match span_kind { - SpanKind::Client => span::SpanKind::Client, - SpanKind::Consumer => span::SpanKind::Consumer, - SpanKind::Internal => span::SpanKind::Internal, - SpanKind::Producer => span::SpanKind::Producer, - SpanKind::Server => span::SpanKind::Server, - } - } - } - - impl From<&trace::Status> for status::StatusCode { - fn from(status: &trace::Status) -> Self { - match status { - trace::Status::Ok => status::StatusCode::Ok, - trace::Status::Unset => status::StatusCode::Unset, - trace::Status::Error { .. } => status::StatusCode::Error, - } - } - } - - impl From for span::Link { - fn from(link: Link) -> Self { - span::Link { - trace_id: link.span_context.trace_id().to_bytes().to_vec(), - span_id: link.span_context.span_id().to_bytes().to_vec(), - trace_state: link.span_context.trace_state().header(), - attributes: Attributes::from(link.attributes).0, - dropped_attributes_count: link.dropped_attributes_count, - flags: link.span_context.trace_flags().to_u8() as u32, - } - } - } - impl From for Span { - fn from(source_span: opentelemetry_sdk::trace::SpanData) -> Self { - let span_kind: span::SpanKind = source_span.span_kind.into(); - Span { - trace_id: source_span.span_context.trace_id().to_bytes().to_vec(), - span_id: source_span.span_context.span_id().to_bytes().to_vec(), - trace_state: source_span.span_context.trace_state().header(), - parent_span_id: { - if source_span.parent_span_id != SpanId::INVALID { - source_span.parent_span_id.to_bytes().to_vec() - } else { - vec![] - } - }, - flags: source_span.span_context.trace_flags().to_u8() as u32, - name: source_span.name.into_owned(), - kind: span_kind as i32, - start_time_unix_nano: to_nanos(source_span.start_time), - end_time_unix_nano: to_nanos(source_span.end_time), - dropped_attributes_count: source_span.dropped_attributes_count, - attributes: Attributes::from(source_span.attributes).0, - dropped_events_count: source_span.events.dropped_count, - events: source_span - .events - .into_iter() - .map(|event| span::Event { - time_unix_nano: to_nanos(event.timestamp), - name: event.name.into(), - attributes: Attributes::from(event.attributes).0, - dropped_attributes_count: event.dropped_attributes_count, - }) - .collect(), - dropped_links_count: source_span.links.dropped_count, - links: source_span.links.into_iter().map(Into::into).collect(), - status: Some(Status { - code: status::StatusCode::from(&source_span.status).into(), - message: match source_span.status { - trace::Status::Error { description } => description.to_string(), - _ => Default::default(), - }, - }), - } - } - } - - impl ResourceSpans { - pub fn new(source_span: SpanData, resource: &ResourceAttributesWithSchema) -> Self { - let span_kind: span::SpanKind = source_span.span_kind.into(); - ResourceSpans { - resource: Some(Resource { - attributes: resource.attributes.0.clone(), - dropped_attributes_count: 0, - entity_refs: vec![], - }), - schema_url: resource.schema_url.clone().unwrap_or_default(), - scope_spans: vec![ScopeSpans { - schema_url: source_span - .instrumentation_scope - .schema_url() - .map(ToOwned::to_owned) - .unwrap_or_default(), - scope: Some((source_span.instrumentation_scope, None).into()), - spans: vec![Span { - trace_id: source_span.span_context.trace_id().to_bytes().to_vec(), - span_id: source_span.span_context.span_id().to_bytes().to_vec(), - trace_state: source_span.span_context.trace_state().header(), - parent_span_id: { - if source_span.parent_span_id != SpanId::INVALID { - source_span.parent_span_id.to_bytes().to_vec() - } else { - vec![] - } - }, - flags: source_span.span_context.trace_flags().to_u8() as u32, - name: source_span.name.into_owned(), - kind: span_kind as i32, - start_time_unix_nano: to_nanos(source_span.start_time), - end_time_unix_nano: to_nanos(source_span.end_time), - dropped_attributes_count: source_span.dropped_attributes_count, - attributes: Attributes::from(source_span.attributes).0, - dropped_events_count: source_span.events.dropped_count, - events: source_span - .events - .into_iter() - .map(|event| span::Event { - time_unix_nano: to_nanos(event.timestamp), - name: event.name.into(), - attributes: Attributes::from(event.attributes).0, - dropped_attributes_count: event.dropped_attributes_count, - }) - .collect(), - dropped_links_count: source_span.links.dropped_count, - links: source_span.links.into_iter().map(Into::into).collect(), - status: Some(Status { - code: status::StatusCode::from(&source_span.status).into(), - message: match source_span.status { - trace::Status::Error { description } => description.to_string(), - _ => Default::default(), - }, - }), - }], - }], - } - } - } - - pub fn group_spans_by_resource_and_scope( - spans: Vec, - resource: &ResourceAttributesWithSchema, - ) -> Vec { - // Group spans by their instrumentation scope - let scope_map = spans.iter().fold( - HashMap::new(), - |mut scope_map: HashMap<&opentelemetry::InstrumentationScope, Vec<&SpanData>>, span| { - let instrumentation = &span.instrumentation_scope; - scope_map.entry(instrumentation).or_default().push(span); - scope_map - }, - ); - - // Convert the grouped spans into ScopeSpans - let scope_spans = scope_map - .into_iter() - .map(|(instrumentation, span_records)| ScopeSpans { - scope: Some((instrumentation, None).into()), - schema_url: resource.schema_url.clone().unwrap_or_default(), - spans: span_records - .into_iter() - .map(|span_data| span_data.clone().into()) - .collect(), - }) - .collect(); - - // Wrap ScopeSpans into a single ResourceSpans - vec![ResourceSpans { - resource: Some(Resource { - attributes: resource.attributes.0.clone(), - dropped_attributes_count: 0, - entity_refs: vec![], - }), - scope_spans, - schema_url: resource.schema_url.clone().unwrap_or_default(), - }] - } -} - -#[cfg(test)] -mod tests { - use crate::tonic::common::v1::any_value::Value; - use crate::transform::common::tonic::ResourceAttributesWithSchema; - use opentelemetry::time::now; - use opentelemetry::trace::{ - SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState, - }; - use opentelemetry::InstrumentationScope; - use opentelemetry::KeyValue; - use opentelemetry_sdk::resource::Resource; - use opentelemetry_sdk::trace::SpanData; - use opentelemetry_sdk::trace::{SpanEvents, SpanLinks}; - use std::borrow::Cow; - use std::time::Duration; - - fn create_test_span_data(instrumentation_name: &'static str) -> SpanData { - let span_context = SpanContext::new( - TraceId::from_u128(123), - SpanId::from_u64(456), - TraceFlags::default(), - false, - TraceState::default(), - ); - - SpanData { - span_context, - parent_span_id: SpanId::from_u64(0), - span_kind: SpanKind::Internal, - name: Cow::Borrowed("test_span"), - start_time: now(), - end_time: now() + Duration::from_secs(1), - attributes: vec![KeyValue::new("key", "value")], - dropped_attributes_count: 0, - events: SpanEvents::default(), - links: SpanLinks::default(), - status: Status::Unset, - instrumentation_scope: InstrumentationScope::builder(instrumentation_name).build(), - } - } - - #[test] - fn test_group_spans_by_resource_and_scope_single_scope() { - let resource = Resource::builder_empty() - .with_attribute(KeyValue::new("resource_key", "resource_value")) - .build(); - let span_data = create_test_span_data("lib1"); - - let spans = vec![span_data.clone()]; - let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema - - let grouped_spans = - crate::transform::trace::tonic::group_spans_by_resource_and_scope(spans, &resource); - - assert_eq!(grouped_spans.len(), 1); - - let resource_spans = &grouped_spans[0]; - assert_eq!( - resource_spans.resource.as_ref().unwrap().attributes.len(), - 1 - ); - assert_eq!( - resource_spans.resource.as_ref().unwrap().attributes[0].key, - "resource_key" - ); - assert_eq!( - resource_spans.resource.as_ref().unwrap().attributes[0] - .value - .clone() - .unwrap() - .value - .unwrap(), - Value::StringValue("resource_value".to_string()) - ); - - let scope_spans = &resource_spans.scope_spans; - assert_eq!(scope_spans.len(), 1); - - let scope_span = &scope_spans[0]; - assert_eq!(scope_span.scope.as_ref().unwrap().name, "lib1"); - assert_eq!(scope_span.spans.len(), 1); - - assert_eq!( - scope_span.spans[0].trace_id, - span_data.span_context.trace_id().to_bytes().to_vec() - ); - } - - #[test] - fn test_group_spans_by_resource_and_scope_multiple_scopes() { - let resource = Resource::builder_empty() - .with_attribute(KeyValue::new("resource_key", "resource_value")) - .build(); - let span_data1 = create_test_span_data("lib1"); - let span_data2 = create_test_span_data("lib1"); - let span_data3 = create_test_span_data("lib2"); - - let spans = vec![span_data1.clone(), span_data2.clone(), span_data3.clone()]; - let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema - - let grouped_spans = - crate::transform::trace::tonic::group_spans_by_resource_and_scope(spans, &resource); - - assert_eq!(grouped_spans.len(), 1); - - let resource_spans = &grouped_spans[0]; - assert_eq!( - resource_spans.resource.as_ref().unwrap().attributes.len(), - 1 - ); - assert_eq!( - resource_spans.resource.as_ref().unwrap().attributes[0].key, - "resource_key" - ); - assert_eq!( - resource_spans.resource.as_ref().unwrap().attributes[0] - .value - .clone() - .unwrap() - .value - .unwrap(), - Value::StringValue("resource_value".to_string()) - ); - - let scope_spans = &resource_spans.scope_spans; - assert_eq!(scope_spans.len(), 2); - - // Check the scope spans for both lib1 and lib2 - let mut lib1_scope_span = None; - let mut lib2_scope_span = None; - - for scope_span in scope_spans { - match scope_span.scope.as_ref().unwrap().name.as_str() { - "lib1" => lib1_scope_span = Some(scope_span), - "lib2" => lib2_scope_span = Some(scope_span), - _ => {} - } - } - - let lib1_scope_span = lib1_scope_span.expect("lib1 scope span not found"); - let lib2_scope_span = lib2_scope_span.expect("lib2 scope span not found"); - - assert_eq!(lib1_scope_span.scope.as_ref().unwrap().name, "lib1"); - assert_eq!(lib2_scope_span.scope.as_ref().unwrap().name, "lib2"); - - assert_eq!(lib1_scope_span.spans.len(), 2); - assert_eq!(lib2_scope_span.spans.len(), 1); - - assert_eq!( - lib1_scope_span.spans[0].trace_id, - span_data1.span_context.trace_id().to_bytes().to_vec() - ); - assert_eq!( - lib1_scope_span.spans[1].trace_id, - span_data2.span_context.trace_id().to_bytes().to_vec() - ); - assert_eq!( - lib2_scope_span.spans[0].trace_id, - span_data3.span_context.trace_id().to_bytes().to_vec() - ); - } -} diff --git a/opentelemetry-proto/src/transform/tracez.rs b/opentelemetry-proto/src/transform/tracez.rs deleted file mode 100644 index 3147a82cbb..0000000000 --- a/opentelemetry-proto/src/transform/tracez.rs +++ /dev/null @@ -1,72 +0,0 @@ -#[cfg(all(feature = "gen-tonic-messages", feature = "zpages"))] -mod tonic { - use opentelemetry::trace::{Event, Status}; - use opentelemetry_sdk::trace::SpanData; - - use crate::proto::tonic::{ - trace::v1::{span::Event as SpanEvent, Status as SpanStatus}, - tracez::v1::{ErrorData, LatencyData, RunningData}, - }; - use crate::transform::common::{to_nanos, tonic::Attributes}; - - impl From for LatencyData { - fn from(span_data: SpanData) -> Self { - LatencyData { - traceid: span_data.span_context.trace_id().to_bytes().to_vec(), - spanid: span_data.span_context.span_id().to_bytes().to_vec(), - parentid: span_data.parent_span_id.to_bytes().to_vec(), - starttime: to_nanos(span_data.start_time), - endtime: to_nanos(span_data.end_time), - attributes: Attributes::from(span_data.attributes).0, - events: span_data.events.iter().cloned().map(Into::into).collect(), - links: span_data.links.iter().cloned().map(Into::into).collect(), - } - } - } - - impl From for ErrorData { - fn from(span_data: SpanData) -> Self { - ErrorData { - traceid: span_data.span_context.trace_id().to_bytes().to_vec(), - spanid: span_data.span_context.span_id().to_bytes().to_vec(), - parentid: span_data.parent_span_id.to_bytes().to_vec(), - starttime: to_nanos(span_data.start_time), - attributes: Attributes::from(span_data.attributes).0, - events: span_data.events.iter().cloned().map(Into::into).collect(), - links: span_data.links.iter().cloned().map(Into::into).collect(), - status: match span_data.status { - Status::Error { description } => Some(SpanStatus { - message: description.to_string(), - code: 2, - }), - _ => None, - }, - } - } - } - - impl From for RunningData { - fn from(span_data: SpanData) -> Self { - RunningData { - traceid: span_data.span_context.trace_id().to_bytes().to_vec(), - spanid: span_data.span_context.span_id().to_bytes().to_vec(), - parentid: span_data.parent_span_id.to_bytes().to_vec(), - starttime: to_nanos(span_data.start_time), - attributes: Attributes::from(span_data.attributes).0, - events: span_data.events.iter().cloned().map(Into::into).collect(), - links: span_data.links.iter().cloned().map(Into::into).collect(), - } - } - } - - impl From for SpanEvent { - fn from(event: Event) -> Self { - SpanEvent { - time_unix_nano: to_nanos(event.timestamp), - name: event.name.to_string(), - attributes: Attributes::from(event.attributes).0, - dropped_attributes_count: event.dropped_attributes_count, - } - } - } -} diff --git a/opentelemetry-proto/tests/json_serde.rs b/opentelemetry-proto/tests/json_serde.rs index 982ef2798a..8a620c77bf 100644 --- a/opentelemetry-proto/tests/json_serde.rs +++ b/opentelemetry-proto/tests/json_serde.rs @@ -1,4 +1,4 @@ -#[cfg(all(feature = "with-serde", feature = "gen-tonic-messages"))] +#[cfg(all(feature = "with-serde", feature = "gen-tonic"))] mod json_serde { #[cfg(feature = "logs")] use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;