diff --git a/crates/mofa-monitoring/README.md b/crates/mofa-monitoring/README.md index 6375acfaf..87a77c30b 100644 --- a/crates/mofa-monitoring/README.md +++ b/crates/mofa-monitoring/README.md @@ -20,6 +20,39 @@ mofa-monitoring = "0.1" - HTTP server for dashboard UI - Static file embedding for frontend assets +## Quick Start + +```rust +use mofa_monitoring::tracing::{AgentTracer, TracerConfig, SamplingStrategy}; +use mofa_monitoring::{DashboardServer, DashboardConfig}; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + // 1. Start the monitoring dashboard on port 8080 + let server = DashboardServer::new(DashboardConfig::new().with_port(8080)); + tokio::spawn(async move { let _ = server.start().await; }); + + // 2. Create a tracer for your agent + let _tracer = AgentTracer::new(TracerConfig::new("my-agent") + .with_sampling(SamplingStrategy::AlwaysOn)); + + // 3. Use the tracer to instrument agent operations + // (see API docs for AgentTracer methods) + Ok(()) +} +``` + +## Environment Variables (OTLP Export) + +Enable the `otlp-metrics` feature and set these environment variables: + +| Variable | Default | Description | +|---|---|---| +| `OTEL_EXPORTER_OTLP_ENDPOINT` | `http://localhost:4317` | OTLP gRPC collector endpoint | +| `OTEL_EXPORTER_OTLP_HEADERS` | — | Comma-separated `key=value` auth headers | +| `OTEL_SERVICE_NAME` | `mofa-agent` | Service name label in all exported spans | +| `OTEL_RESOURCE_ATTRIBUTES` | — | Additional resource attributes (e.g. `env=prod`) | + ## Prometheus Export `DashboardServer` now exposes a Prometheus-compatible endpoint at `/metrics`. diff --git a/crates/mofa-monitoring/src/tracing/exporter.rs b/crates/mofa-monitoring/src/tracing/exporter.rs index 97a6f7292..4c70f64e3 100644 --- a/crates/mofa-monitoring/src/tracing/exporter.rs +++ b/crates/mofa-monitoring/src/tracing/exporter.rs @@ -338,21 +338,28 @@ impl JaegerExporter { } } -/// OTLP 导出器配置 -/// OTLP exporter configuration +/// Configuration for the [`OtlpExporter`]. +/// +/// Defaults follow the standard OpenTelemetry Collector ports: +/// - gRPC: `http://localhost:4317` +/// - HTTP/JSON or HTTP/Protobuf: `http://localhost:4318` +/// +/// You can override at runtime via standard OTLP environment variables: +/// - `OTEL_EXPORTER_OTLP_ENDPOINT` — base URL for all signals +/// - `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT` — overrides endpoint for traces only +/// - `OTEL_EXPORTER_OTLP_HEADERS` — comma-separated key=value pairs, +/// e.g. `"Authorization=Bearer ,x-env=prod"` +/// - `OTEL_EXPORTER_OTLP_TIMEOUT` — timeout in milliseconds #[derive(Debug, Clone)] pub struct OtlpConfig { - /// Endpoint - /// Endpoint + /// OTLP collector endpoint URL. Default: `http://localhost:4317` (gRPC). pub endpoint: String, - /// 协议 (grpc 或 http) - /// Protocol (grpc or http) + /// Wire protocol. Default: [`OtlpProtocol::Grpc`]. pub protocol: OtlpProtocol, - /// Headers - /// Headers + /// Additional headers sent with every export request. + /// Commonly used for bearer-token authentication. pub headers: std::collections::HashMap, - /// 超时(毫秒) - /// Timeout (milliseconds) + /// Per-request timeout in milliseconds. Default: 10 000 ms. pub timeout_ms: u64, } @@ -374,8 +381,11 @@ impl Default for OtlpConfig { } } -/// OTLP 导出器 -/// OTLP Exporter +/// Exports spans over the OpenTelemetry Protocol (OTLP). +/// +/// Compatible with any OTLP-capable backend including OpenTelemetry Collector, +/// Grafana Tempo, and Jaeger ≥ 1.35 (use its OTLP port 4317/4318, not the +/// legacy Thrift UDP agent which is deprecated as of Jaeger 1.35). pub struct OtlpExporter { config: ExporterConfig, otlp_config: OtlpConfig, diff --git a/crates/mofa-monitoring/src/tracing/instrumentation.rs b/crates/mofa-monitoring/src/tracing/instrumentation.rs index 34d93ebca..fe6dabcaf 100644 --- a/crates/mofa-monitoring/src/tracing/instrumentation.rs +++ b/crates/mofa-monitoring/src/tracing/instrumentation.rs @@ -11,7 +11,31 @@ use std::collections::HashMap; use std::sync::Arc; use tokio::sync::RwLock; -/// Agent Tracer +/// High-level tracing helper for MoFA agents. +/// +/// Wraps a low-level [`Tracer`] with agent-aware span naming conventions +/// (`agent..`) and automatically attaches `agent.id` and +/// `agent.operation` attributes to every span. +/// +/// # Examples +/// ```rust,no_run +/// use std::sync::Arc; +/// use mofa_monitoring::tracing::{ +/// AgentTracer, ConsoleExporter, ExporterConfig, SimpleSpanProcessor, +/// TracerConfig, Tracer, +/// }; +/// +/// # async fn example() { +/// let exporter = Arc::new(ConsoleExporter::new(ExporterConfig::new("my-agent"))); +/// let processor = Arc::new(SimpleSpanProcessor::new(exporter)); +/// let tracer = Arc::new(Tracer::new(TracerConfig::new("my-agent"), processor)); +/// let agent_tracer = AgentTracer::new(tracer); +/// +/// let span = agent_tracer.start_operation("agent-001", "llm.call", None).await; +/// // ... perform work ... +/// agent_tracer.end_span(&span).await; +/// # } +/// ``` pub struct AgentTracer { tracer: Arc, propagator: Arc, @@ -377,7 +401,11 @@ impl MessageTracer { } } -/// Traced Agent Wrapper +/// Wraps any agent type `A` and provides a `traced_operation` method that +/// automatically starts/ends a span around the given async closure. +/// +/// Prefer this over manual `start_operation`/`end_span` calls when you don't +/// need to keep a span reference beyond the operation's lifetime. pub struct TracedAgent { agent: A, tracer: Arc, @@ -468,7 +496,31 @@ impl TracedWorkflow { } } -/// Helper function: trace Agent operation +/// Convenience wrapper: starts a span, calls `f` with it, then ends the span. +/// +/// Use this instead of manually calling `start_operation`/`end_span` when +/// you don't need to access the span outside the closure. +/// +/// # Examples +/// ```rust,no_run +/// use std::sync::Arc; +/// use mofa_monitoring::tracing::{ +/// AgentTracer, ConsoleExporter, ExporterConfig, SimpleSpanProcessor, +/// TracerConfig, Tracer, trace_agent_operation, +/// }; +/// +/// # async fn example() { +/// # let exporter = Arc::new(ConsoleExporter::new(ExporterConfig::new("x"))); +/// # let processor = Arc::new(SimpleSpanProcessor::new(exporter)); +/// # let tracer = Arc::new(Tracer::new(TracerConfig::new("x"), processor)); +/// let agent_tracer = AgentTracer::new(tracer); +/// +/// let answer = trace_agent_operation( +/// &agent_tracer, "agent-001", "llm.call", None, +/// |_span| async move { "response".to_string() }, +/// ).await; +/// # } +/// ``` pub async fn trace_agent_operation( tracer: &AgentTracer, agent_id: &str, diff --git a/crates/mofa-monitoring/src/tracing/span.rs b/crates/mofa-monitoring/src/tracing/span.rs index 3193f4399..db9273aa0 100644 --- a/crates/mofa-monitoring/src/tracing/span.rs +++ b/crates/mofa-monitoring/src/tracing/span.rs @@ -11,25 +11,31 @@ use std::collections::HashMap; use std::sync::Arc; use tokio::sync::RwLock; -/// Span 类型 -/// Span types +/// Describes the relationship between a span and the component that produced it. +/// +/// Backends use `SpanKind` to build service dependency maps and correctly +/// attribute cross-service latency. +/// +/// | Kind | Use for | +/// |------|---------| +/// | `Internal` | In-process work: LLM reasoning, tool selection, prompt building | +/// | `Server` | Handling an inbound network request (e.g. an HTTP handler in an agent gateway) | +/// | `Client` | Outbound call to an external service: REST API, database, vector store | +/// | `Producer` | Publishing a message to a queue or event bus | +/// | `Consumer` | Receiving and processing a message from a queue or event bus | #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] pub enum SpanKind { - /// 内部操作 - /// Internal operation + /// In-process operation with no network boundary. + /// Default for most MoFA agent work: LLM calls, tool execution, memory lookup. #[default] Internal, - /// 服务器端(处理请求) - /// Server side (handling request) + /// This span handles an inbound synchronous request. Server, - /// 客户端(发起请求) - /// Client side (initiating request) + /// This span initiates a synchronous call to an external service. Client, - /// 消息生产者 - /// Message producer + /// This span publishes a message to an asynchronous channel. Producer, - /// 消息消费者 - /// Message consumer + /// This span processes a message received from an asynchronous channel. Consumer, } diff --git a/crates/mofa-monitoring/src/tracing/tracer.rs b/crates/mofa-monitoring/src/tracing/tracer.rs index f2f4e6a5a..3ec071552 100644 --- a/crates/mofa-monitoring/src/tracing/tracer.rs +++ b/crates/mofa-monitoring/src/tracing/tracer.rs @@ -13,20 +13,48 @@ use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use tokio::sync::RwLock; +/// Determines which spans are recorded and exported to the backend. +/// +/// Choose a strategy based on your traffic volume and observability needs: +/// +/// | Strategy | Best for | +/// |---|---| +/// | `AlwaysOn` | Development, low-traffic services | +/// | `AlwaysOff` | Temporarily disabling tracing without code changes | +/// | `Probabilistic` | High-traffic production (e.g. 0.01 = 1%) | +/// | `RateLimiting` | Bursty traffic with a hard cap per second | +/// | `ParentBased` | Microservices — inherit the sampling decision from the caller | +/// /// 采样策略 /// Sampling strategy #[derive(Debug, Clone, Default)] pub enum SamplingStrategy { + /// Record every span. Use during development or for low-traffic services + /// where 100% visibility is acceptable. + /// /// 始终采样 /// Always sample #[default] AlwaysOn, + /// Record no spans. Useful for disabling tracing without changing code. + /// /// 从不采样 /// Never sample AlwaysOff, + /// Record a random fraction of traces. The value must be in `[0.0, 1.0]`. + /// Sampling is stable: the same `trace_id` always produces the same decision. + /// + /// # Example + /// ```rust,ignore + /// SamplingStrategy::Probabilistic(0.05) // sample 5% of traces + /// ``` + /// /// 按概率采样 /// Probabilistic sampling Probabilistic(f64), + /// Admit at most `traces_per_second` new root spans per second. + /// Excess spans are dropped. Thread-safe via atomic CAS. + /// /// 基于速率限制采样 /// Rate-limiting based sampling RateLimiting { @@ -34,6 +62,10 @@ pub enum SamplingStrategy { /// Holds (timestamp_secs << 32) | (count) state: Arc, }, + /// Inherit the sampling decision from the parent span's context. + /// If there is no parent, fall back to the `root` strategy. + /// Use this in multi-service deployments so the caller controls sampling. + /// /// 父级决定 /// Parent-based decision ParentBased { root: Box }, @@ -106,6 +138,20 @@ impl SamplingStrategy { } } +/// Configuration for the MoFA distributed tracer. +/// +/// Use [`TracerConfig::new`] for a quick setup with sensible defaults, +/// or build the struct directly for full control. +/// +/// # Example +/// ```rust,no_run +/// use mofa_monitoring::tracing::{TracerConfig, SamplingStrategy}; +/// +/// let config = TracerConfig::new("my-agent") +/// .with_version("1.2.3") +/// .with_sampling(SamplingStrategy::Probabilistic(0.1)); +/// ``` +/// /// Tracer 配置 /// Tracer configuration #[derive(Debug, Clone)] @@ -189,8 +235,14 @@ pub trait SpanProcessor: Send + Sync { async fn force_flush(&self) -> Result<(), String>; } -/// 简单 Span 处理器 - 直接导出 -/// Simple Span Processor - Export directly +/// Exports each span synchronously as soon as it ends. +/// +/// **When to use**: development, testing, or very low-throughput services where +/// you want immediate visibility and can tolerate the per-span export latency. +/// +/// **Trade-off vs [`BatchSpanProcessor`]**: every span end call blocks until the +/// exporter round-trip completes (network I/O for remote exporters). For production +/// workloads prefer `BatchSpanProcessor` which offloads exports to a background task. pub struct SimpleSpanProcessor { exporter: Arc, } @@ -223,8 +275,17 @@ impl SpanProcessor for SimpleSpanProcessor { } } -/// 批处理 Span 处理器 -/// Batch Span Processor +/// Buffers completed spans and exports them in batches on a background Tokio task. +/// +/// **When to use**: production or any service where exporter latency would otherwise +/// appear in your application's critical path. +/// +/// Configure the trade-offs via [`ExporterConfig`]: +/// - `batch_size` — maximum spans per export call (default: 512) +/// - `export_interval_ms` — maximum time a span waits in the buffer (default: 5 000 ms) +/// - `max_queue_size` — spans are dropped when the queue exceeds this limit (default: 2 048) +/// +/// Call `force_flush` before process exit to avoid losing buffered spans. pub struct BatchSpanProcessor { exporter: Arc, buffer: Arc>>, @@ -434,8 +495,28 @@ impl Tracer { } } -/// Tracer Provider - 管理多个 Tracer -/// Tracer Provider - Manages multiple Tracers +/// Factory and lifecycle manager for [`Tracer`] instances. +/// +/// A single `TracerProvider` owns the [`SpanProcessor`] (and therefore the exporter), +/// so all tracers it creates share the same export pipeline. This matches the +/// OpenTelemetry spec's `TracerProvider` concept. +/// +/// # Examples +/// ```rust,no_run +/// use std::sync::Arc; +/// use mofa_monitoring::tracing::{ +/// ConsoleExporter, ExporterConfig, SimpleSpanProcessor, TracerConfig, TracerProvider, +/// }; +/// +/// # async fn example() { +/// let exporter = Arc::new(ConsoleExporter::new(ExporterConfig::new("my-agent"))); +/// let processor = Arc::new(SimpleSpanProcessor::new(exporter)); +/// let provider = TracerProvider::new(TracerConfig::new("my-agent"), processor); +/// +/// // Obtain a tracer scoped to a specific component +/// let tracer = provider.tracer("rag-pipeline").await; +/// # } +/// ``` pub struct TracerProvider { config: TracerConfig, processor: Arc, @@ -510,7 +591,11 @@ impl TracerProvider { } /// 全局 Tracer -/// Global Tracer +/// Process-wide singleton that holds a reference to the active [`TracerProvider`]. +/// +/// Use `GlobalTracer` when library code needs to emit spans without requiring +/// callers to pass a tracer explicitly. Call [`GlobalTracer::set_provider`] once +/// at startup, then use [`global_tracer()`] anywhere to obtain a [`Tracer`]. pub struct GlobalTracer { provider: Arc>>>, }