Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/cli/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ impl TracerOptions {
/// Get the tracer configuration based on the options
pub fn config(&self) -> Option<TracerConfig> {
if self.tracer_gcloud {
Some(TracerConfig::Gcloud(gcloud::GcloudConfig {
Some(TracerConfig::GCloud(gcloud::GcloudConfig {
project_id: self.gcloud_project_id.clone(),
}))
} else if self.tracer_otlp {
Expand Down
3 changes: 3 additions & 0 deletions crates/tracing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,6 @@ tracing-opentelemetry.workspace = true
bytes.workspace = true
chrono.workspace = true
http-body-util = "0.1.3"

[dev-dependencies]
tokio = { workspace = true, features = [ "macros", "rt-multi-thread" ] }
125 changes: 125 additions & 0 deletions crates/tracing/src/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
use std::fmt::Debug;

use tracing_subscriber::fmt::format::{DefaultFields, Format, Full, Json, JsonFields};
use tracing_subscriber::layer::{Layered, SubscriberExt};
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{fmt, EnvFilter, Registry};

use crate::fmt::{FmtLayer, LocalTime};
use crate::{Error, LogFormat, TelemetryTracer};

const DEFAULT_LOG_FILTER: &str = "katana_db::mdbx=trace,cairo_native::compiler=off,pipeline=debug,\
stage=debug,tasks=debug,executor=trace,forking::backend=trace,\
blockifier=off,jsonrpsee_server=off,hyper=off,messaging=debug,\
node=error,explorer=info,rpc=trace,pool=trace,info";

pub type NoopTracer = opentelemetry::trace::noop::NoopTracer;

#[derive(Debug)]
pub struct TracingBuilder<Telemetry = NoopTracer> {
filter: Option<EnvFilter>,
log_format: LogFormat,
tracer: Telemetry,
}

impl TracingBuilder {
/// Create a new tracing builder
pub fn new() -> Self {
Self { filter: None, log_format: LogFormat::Full, tracer: NoopTracer::new() }
}
}

impl TracingBuilder<NoopTracer> {
pub fn with_telemetry<T: TelemetryTracer>(self, tracer: T) -> TracingBuilder<T> {
TracingBuilder { filter: self.filter, log_format: self.log_format, tracer }
}
}

impl<Telemetry> TracingBuilder<Telemetry> {
/// Set the log format to JSON
pub fn json(self) -> TracingBuilder<Telemetry> {
TracingBuilder { log_format: LogFormat::Json, tracer: self.tracer, filter: self.filter }
}

/// Set a custom filter from a string
pub fn with_filter(mut self, filter: &str) -> Result<Self, Error> {
self.filter = Some(EnvFilter::try_new(filter)?);
Ok(self)
}

/// Use the default filter
pub fn with_default_filter(mut self) -> Result<Self, Error> {
self.filter = Some(EnvFilter::try_new(DEFAULT_LOG_FILTER)?);
Ok(self)
}

/// Use filter from environment variable (RUST_LOG)
pub fn with_env_filter(mut self) -> Result<Self, Error> {
self.filter = Some(EnvFilter::try_from_default_env()?);
Ok(self)
}

/// Use filter from environment with fallback to default
pub fn with_env_filter_or_default(mut self) -> Result<Self, Error> {
let default_filter = EnvFilter::try_new(DEFAULT_LOG_FILTER);
self.filter = Some(EnvFilter::try_from_default_env().or(default_filter)?);
Ok(self)
}
}

impl<Telemetry: TelemetryTracer> TracingBuilder<Telemetry> {
/// Try to initialize the tracing subscriber without telemetry
pub fn build(self) -> Result<TracingSubscriber<Telemetry>, Error> {
let filter = self.filter.unwrap_or_else(|| {
EnvFilter::try_new(DEFAULT_LOG_FILTER).expect("default filter should be valid")
});

let base_layer = fmt::layer().with_timer(LocalTime::new());

let fmt_layer = match self.log_format {
LogFormat::Full => FmtLayer::Full(base_layer),
LogFormat::Json => FmtLayer::Json(base_layer.json()),
};

Ok(TracingSubscriber {
tracer: self.tracer,
subscriber: tracing_subscriber::registry().with(filter).with(fmt_layer),
})
}
}

impl Default for TracingBuilder {
fn default() -> Self {
Self::new()
}
}

/// The base subscribe type created by [`TracingBuilder`] and used by [`TracingSubscriber`].
type BaseSubscriber = Layered<
FmtLayer<
fmt::Layer<Layered<EnvFilter, Registry>, DefaultFields, Format<Full, LocalTime>>,
fmt::Layer<Layered<EnvFilter, Registry>, JsonFields, Format<Json, LocalTime>>,
>,
Layered<EnvFilter, Registry>,
>;

pub struct TracingSubscriber<Telemetry> {
subscriber: BaseSubscriber,
tracer: Telemetry,
}

impl<Telemetry: TelemetryTracer> TracingSubscriber<Telemetry> {
pub fn init(self) {
self.tracer.init().unwrap();
self.subscriber.with(tracing_opentelemetry::layer().with_tracer(self.tracer)).init();
}
}

impl<Telemetry: Debug> Debug for TracingSubscriber<Telemetry> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TracingSubscriber")
.field("subscriber", &"..")
.field("tracer", &self.tracer)
.finish()
}
}
21 changes: 21 additions & 0 deletions crates/tracing/src/fmt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::fmt::Display;
use serde::{Deserialize, Serialize};
use tracing_subscriber::fmt::format::Writer;
use tracing_subscriber::fmt::time::{self};
use tracing_subscriber::Layer;

/// Format for logging output.
#[derive(Debug, Copy, Clone, PartialEq, Deserialize, Serialize, Default)]
Expand Down Expand Up @@ -66,3 +67,23 @@ impl time::FormatTime for LocalTime {
write!(w, "{}", time.format(DEFAULT_TIMESTAMP_FORMAT))
}
}

// Use an enum to preserve type information instead of Box<dyn>
pub enum FmtLayer<F, J> {
Full(F),
Json(J),
}

impl<S, F, J> Layer<S> for FmtLayer<F, J>
where
S: tracing::Subscriber,
F: Layer<S>,
J: Layer<S>,
{
fn on_layer(&mut self, subscriber: &mut S) {
match self {
FmtLayer::Full(layer) => layer.on_layer(subscriber),
FmtLayer::Json(layer) => layer.on_layer(subscriber),
}
}
}
178 changes: 141 additions & 37 deletions crates/tracing/src/gcloud.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
use http::Request;
use opentelemetry::trace::Tracer;
use opentelemetry_gcloud_trace::{GcpCloudTraceExporterBuilder, SdkTracer};
use opentelemetry_http::HeaderExtractor;
use opentelemetry_sdk::trace::SdkTracerProvider;
use opentelemetry_sdk::Resource;
use opentelemetry_stackdriver::google_trace_context_propagator::GoogleTraceContextPropagator;
use tower_http::trace::MakeSpan;
use tracing::Span;
use tracing_opentelemetry::OpenTelemetrySpanExt;
use tracing_opentelemetry::{OpenTelemetrySpanExt, PreSampledTracer};

use crate::Error;
use crate::{Error, TelemetryTracer};

#[derive(Debug, Clone, Default)]
pub struct GoogleStackDriverMakeSpan;

impl<B> MakeSpan<B> for GoogleStackDriverMakeSpan {
fn make_span(&mut self, request: &Request<B>) -> Span {
fn make_span(&mut self, request: &Request<B>) -> tracing::Span {
// Extract trace context from HTTP headers
let cx = opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.extract(&HeaderExtractor(request.headers()))
Expand All @@ -36,37 +37,140 @@ pub struct GcloudConfig {
pub project_id: Option<String>,
}

/// Initialize Google Cloud Trace exporter and OpenTelemetry propagators for Google Cloud trace
/// context support.
///
/// Make sure to set `GOOGLE_APPLICATION_CREDENTIALS` env var to authenticate to gcloud
pub(crate) async fn init_tracer(gcloud_config: &GcloudConfig) -> Result<SdkTracer, Error> {
rustls::crypto::ring::default_provider()
.install_default()
.map_err(|_| Error::InstallCryptoFailed)?;

let resource = Resource::builder().with_service_name("katana").build();

let mut trace_exporter = if let Some(project_id) = &gcloud_config.project_id {
GcpCloudTraceExporterBuilder::new(project_id.clone())
} else {
// Default will attempt to find project ID from environment variables in the following
// order:
// - GCP_PROJECT
// - PROJECT_ID
// - GCP_PROJECT_ID
GcpCloudTraceExporterBuilder::for_default_project_id().await?
};

trace_exporter = trace_exporter.with_resource(resource);

let tracer_provider = trace_exporter.create_provider().await?;
let tracer = trace_exporter.install(&tracer_provider).await?;

// Set the Google Cloud trace context propagator globally
// This will handle both extraction and injection of X-Cloud-Trace-Context headers
opentelemetry::global::set_text_map_propagator(GoogleTraceContextPropagator::default());
opentelemetry::global::set_tracer_provider(tracer_provider.clone());

Ok(tracer)
/// Builder for creating an OpenTelemetry layer with Google Cloud Trace exporter
#[derive(Debug, Clone)]
pub struct GCloudTracerBuilder {
service_name: String,
project_id: Option<String>,
resource: Option<Resource>,
}

/////////////////////////////////////////////////////////////////////////////////
// GCloudTracerBuilder implementations
/////////////////////////////////////////////////////////////////////////////////

impl GCloudTracerBuilder {
/// Create a new Google Cloud tracing builder
pub fn new() -> Self {
Self { service_name: "katana".to_string(), project_id: None, resource: None }
}

/// Set the service name
pub fn service_name(mut self, name: impl Into<String>) -> Self {
self.service_name = name.into();
self
}

/// Set the Google Cloud project ID
pub fn project_id(mut self, project_id: impl Into<String>) -> Self {
self.project_id = Some(project_id.into());
self
}

/// Set a custom resource
pub fn resource(mut self, resource: Resource) -> Self {
self.resource = Some(resource);
self
}

/// Build the OpenTelemetry layer (async because GCloud SDK requires it)
pub async fn build(self) -> Result<GCloudTracer, Error> {
// Install crypto provider
rustls::crypto::ring::default_provider()
.install_default()
.map_err(|_| Error::InstallCryptoFailed)?;

// Build resource with service name
let resource = self.resource.unwrap_or_else(|| {
Resource::builder().with_service_name(self.service_name.clone()).build()
});

// Create trace exporter
let mut trace_exporter = if let Some(project_id) = self.project_id {
GcpCloudTraceExporterBuilder::new(project_id)
} else {
// Default will attempt to find project ID from environment variables in the following
// order:
// - GCP_PROJECT
// - PROJECT_ID
// - GCP_PROJECT_ID
GcpCloudTraceExporterBuilder::for_default_project_id().await?
};

trace_exporter = trace_exporter.with_resource(resource);

// Create provider and install
let tracer_provider = trace_exporter.create_provider().await?;
let tracer = trace_exporter.install(&tracer_provider).await?;

Ok(GCloudTracer { tracer, tracer_provider })
}
}

impl Default for GCloudTracerBuilder {
fn default() -> Self {
Self::new()
}
}

/// Wrapper type for SdkTracer that implements the Tracer trait
#[derive(Debug, Clone)]
pub struct GCloudTracer {
tracer: SdkTracer,
tracer_provider: SdkTracerProvider,
}

/////////////////////////////////////////////////////////////////////////////////
// GCloudTracer implementations
/////////////////////////////////////////////////////////////////////////////////

impl GCloudTracer {
pub fn builder() -> GCloudTracerBuilder {
GCloudTracerBuilder::new()
}
}

impl Tracer for GCloudTracer {
type Span = <SdkTracer as Tracer>::Span;

#[inline]
fn build_with_context(
&self,
builder: opentelemetry::trace::SpanBuilder,
parent_cx: &opentelemetry::Context,
) -> Self::Span {
self.tracer.build_with_context(builder, parent_cx)
}
}

impl PreSampledTracer for GCloudTracer {
#[inline]
fn new_span_id(&self) -> opentelemetry::trace::SpanId {
self.tracer.new_span_id()
}

#[inline]
fn new_trace_id(&self) -> opentelemetry::trace::TraceId {
self.tracer.new_trace_id()
}

#[inline]
fn sampled_context(
&self,
data: &mut tracing_opentelemetry::OtelData,
) -> opentelemetry::Context {
self.tracer.sampled_context(data)
}
}

impl TelemetryTracer for GCloudTracer {
fn init(&self) -> Result<(), Error> {
// Set the Google Cloud trace context propagator globally
// This will handle both extraction and injection of X-Cloud-Trace-Context headers
opentelemetry::global::set_text_map_propagator(GoogleTraceContextPropagator::default());
opentelemetry::global::set_tracer_provider(self.tracer_provider.clone());
Ok(())
}
}

impl crate::__private::Sealed for GCloudTracer {}
Loading