Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions crates/mofa-monitoring/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,39 @@ mofa-monitoring = "0.1"
- HTTP server for dashboard UI
- Static file embedding for frontend assets

## Quick Start

```rust
use mofa_monitoring::tracing::{AgentTracer, TracerConfig, SamplingStrategy};
use mofa_monitoring::{DashboardServer, DashboardConfig};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
// 1. Start the monitoring dashboard on port 8080
let server = DashboardServer::new(DashboardConfig::new().with_port(8080));
tokio::spawn(async move { let _ = server.start().await; });

// 2. Create a tracer for your agent
let _tracer = AgentTracer::new(TracerConfig::new("my-agent")
.with_sampling(SamplingStrategy::AlwaysOn));

// 3. Use the tracer to instrument agent operations
// (see API docs for AgentTracer methods)
Ok(())
}
```

## Environment Variables (OTLP Export)

Enable the `otlp-metrics` feature and set these environment variables:

| Variable | Default | Description |
|---|---|---|
| `OTEL_EXPORTER_OTLP_ENDPOINT` | `http://localhost:4317` | OTLP gRPC collector endpoint |
| `OTEL_EXPORTER_OTLP_HEADERS` | — | Comma-separated `key=value` auth headers |
| `OTEL_SERVICE_NAME` | `mofa-agent` | Service name label in all exported spans |
| `OTEL_RESOURCE_ATTRIBUTES` | — | Additional resource attributes (e.g. `env=prod`) |

## Prometheus Export

`DashboardServer` now exposes a Prometheus-compatible endpoint at `/metrics`.
Expand Down
34 changes: 22 additions & 12 deletions crates/mofa-monitoring/src/tracing/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,21 +338,28 @@ impl JaegerExporter {
}
}

/// OTLP 导出器配置
/// OTLP exporter configuration
/// Configuration for the [`OtlpExporter`].
///
/// Defaults follow the standard OpenTelemetry Collector ports:
/// - gRPC: `http://localhost:4317`
/// - HTTP/JSON or HTTP/Protobuf: `http://localhost:4318`
///
/// You can override at runtime via standard OTLP environment variables:
/// - `OTEL_EXPORTER_OTLP_ENDPOINT` — base URL for all signals
/// - `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT` — overrides endpoint for traces only
/// - `OTEL_EXPORTER_OTLP_HEADERS` — comma-separated key=value pairs,
/// e.g. `"Authorization=Bearer <token>,x-env=prod"`
/// - `OTEL_EXPORTER_OTLP_TIMEOUT` — timeout in milliseconds
#[derive(Debug, Clone)]
pub struct OtlpConfig {
/// Endpoint
/// Endpoint
/// OTLP collector endpoint URL. Default: `http://localhost:4317` (gRPC).
pub endpoint: String,
/// 协议 (grpc 或 http)
/// Protocol (grpc or http)
/// Wire protocol. Default: [`OtlpProtocol::Grpc`].
pub protocol: OtlpProtocol,
/// Headers
/// Headers
/// Additional headers sent with every export request.
/// Commonly used for bearer-token authentication.
pub headers: std::collections::HashMap<String, String>,
/// 超时(毫秒)
/// Timeout (milliseconds)
/// Per-request timeout in milliseconds. Default: 10 000 ms.
pub timeout_ms: u64,
}

Expand All @@ -374,8 +381,11 @@ impl Default for OtlpConfig {
}
}

/// OTLP 导出器
/// OTLP Exporter
/// Exports spans over the OpenTelemetry Protocol (OTLP).
///
/// Compatible with any OTLP-capable backend including OpenTelemetry Collector,
/// Grafana Tempo, and Jaeger ≥ 1.35 (use its OTLP port 4317/4318, not the
/// legacy Thrift UDP agent which is deprecated as of Jaeger 1.35).
pub struct OtlpExporter {
config: ExporterConfig,
otlp_config: OtlpConfig,
Expand Down
58 changes: 55 additions & 3 deletions crates/mofa-monitoring/src/tracing/instrumentation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,31 @@ use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;

/// Agent Tracer
/// High-level tracing helper for MoFA agents.
///
/// Wraps a low-level [`Tracer`] with agent-aware span naming conventions
/// (`agent.<id>.<operation>`) and automatically attaches `agent.id` and
/// `agent.operation` attributes to every span.
///
/// # Examples
/// ```rust,no_run
/// use std::sync::Arc;
/// use mofa_monitoring::tracing::{
/// AgentTracer, ConsoleExporter, ExporterConfig, SimpleSpanProcessor,
/// TracerConfig, Tracer,
/// };
///
/// # async fn example() {
/// let exporter = Arc::new(ConsoleExporter::new(ExporterConfig::new("my-agent")));
/// let processor = Arc::new(SimpleSpanProcessor::new(exporter));
/// let tracer = Arc::new(Tracer::new(TracerConfig::new("my-agent"), processor));
/// let agent_tracer = AgentTracer::new(tracer);
///
/// let span = agent_tracer.start_operation("agent-001", "llm.call", None).await;
/// // ... perform work ...
/// agent_tracer.end_span(&span).await;
/// # }
/// ```
pub struct AgentTracer {
tracer: Arc<Tracer>,
propagator: Arc<dyn TracePropagator>,
Expand Down Expand Up @@ -377,7 +401,11 @@ impl MessageTracer {
}
}

/// Traced Agent Wrapper
/// Wraps any agent type `A` and provides a `traced_operation` method that
/// automatically starts/ends a span around the given async closure.
///
/// Prefer this over manual `start_operation`/`end_span` calls when you don't
/// need to keep a span reference beyond the operation's lifetime.
pub struct TracedAgent<A> {
agent: A,
tracer: Arc<AgentTracer>,
Expand Down Expand Up @@ -468,7 +496,31 @@ impl<W> TracedWorkflow<W> {
}
}

/// Helper function: trace Agent operation
/// Convenience wrapper: starts a span, calls `f` with it, then ends the span.
///
/// Use this instead of manually calling `start_operation`/`end_span` when
/// you don't need to access the span outside the closure.
///
/// # Examples
/// ```rust,no_run
/// use std::sync::Arc;
/// use mofa_monitoring::tracing::{
/// AgentTracer, ConsoleExporter, ExporterConfig, SimpleSpanProcessor,
/// TracerConfig, Tracer, trace_agent_operation,
/// };
///
/// # async fn example() {
/// # let exporter = Arc::new(ConsoleExporter::new(ExporterConfig::new("x")));
/// # let processor = Arc::new(SimpleSpanProcessor::new(exporter));
/// # let tracer = Arc::new(Tracer::new(TracerConfig::new("x"), processor));
/// let agent_tracer = AgentTracer::new(tracer);
///
/// let answer = trace_agent_operation(
/// &agent_tracer, "agent-001", "llm.call", None,
/// |_span| async move { "response".to_string() },
/// ).await;
/// # }
/// ```
pub async fn trace_agent_operation<F, Fut, R>(
tracer: &AgentTracer,
agent_id: &str,
Expand Down
30 changes: 18 additions & 12 deletions crates/mofa-monitoring/src/tracing/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,31 @@ use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;

/// Span 类型
/// Span types
/// Describes the relationship between a span and the component that produced it.
///
/// Backends use `SpanKind` to build service dependency maps and correctly
/// attribute cross-service latency.
///
/// | Kind | Use for |
/// |------|---------|
/// | `Internal` | In-process work: LLM reasoning, tool selection, prompt building |
/// | `Server` | Handling an inbound network request (e.g. an HTTP handler in an agent gateway) |
/// | `Client` | Outbound call to an external service: REST API, database, vector store |
/// | `Producer` | Publishing a message to a queue or event bus |
/// | `Consumer` | Receiving and processing a message from a queue or event bus |
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
pub enum SpanKind {
/// 内部操作
/// Internal operation
/// In-process operation with no network boundary.
/// Default for most MoFA agent work: LLM calls, tool execution, memory lookup.
#[default]
Internal,
/// 服务器端(处理请求)
/// Server side (handling request)
/// This span handles an inbound synchronous request.
Server,
/// 客户端(发起请求)
/// Client side (initiating request)
/// This span initiates a synchronous call to an external service.
Client,
/// 消息生产者
/// Message producer
/// This span publishes a message to an asynchronous channel.
Producer,
/// 消息消费者
/// Message consumer
/// This span processes a message received from an asynchronous channel.
Consumer,
}

Expand Down
99 changes: 92 additions & 7 deletions crates/mofa-monitoring/src/tracing/tracer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,59 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::RwLock;

/// Determines which spans are recorded and exported to the backend.
///
/// Choose a strategy based on your traffic volume and observability needs:
///
/// | Strategy | Best for |
/// |---|---|
/// | `AlwaysOn` | Development, low-traffic services |
/// | `AlwaysOff` | Temporarily disabling tracing without code changes |
/// | `Probabilistic` | High-traffic production (e.g. 0.01 = 1%) |
/// | `RateLimiting` | Bursty traffic with a hard cap per second |
/// | `ParentBased` | Microservices — inherit the sampling decision from the caller |
///
/// 采样策略
/// Sampling strategy
#[derive(Debug, Clone, Default)]
pub enum SamplingStrategy {
/// Record every span. Use during development or for low-traffic services
/// where 100% visibility is acceptable.
///
/// 始终采样
/// Always sample
#[default]
AlwaysOn,
/// Record no spans. Useful for disabling tracing without changing code.
///
/// 从不采样
/// Never sample
AlwaysOff,
/// Record a random fraction of traces. The value must be in `[0.0, 1.0]`.
/// Sampling is stable: the same `trace_id` always produces the same decision.
///
/// # Example
/// ```rust,ignore
/// SamplingStrategy::Probabilistic(0.05) // sample 5% of traces
/// ```
///
/// 按概率采样
/// Probabilistic sampling
Probabilistic(f64),
/// Admit at most `traces_per_second` new root spans per second.
/// Excess spans are dropped. Thread-safe via atomic CAS.
///
/// 基于速率限制采样
/// Rate-limiting based sampling
RateLimiting {
traces_per_second: u64,
/// Holds (timestamp_secs << 32) | (count)
state: Arc<AtomicU64>,
},
/// Inherit the sampling decision from the parent span's context.
/// If there is no parent, fall back to the `root` strategy.
/// Use this in multi-service deployments so the caller controls sampling.
///
/// 父级决定
/// Parent-based decision
ParentBased { root: Box<SamplingStrategy> },
Expand Down Expand Up @@ -106,6 +138,20 @@ impl SamplingStrategy {
}
}

/// Configuration for the MoFA distributed tracer.
///
/// Use [`TracerConfig::new`] for a quick setup with sensible defaults,
/// or build the struct directly for full control.
///
/// # Example
/// ```rust,no_run
/// use mofa_monitoring::tracing::{TracerConfig, SamplingStrategy};
///
/// let config = TracerConfig::new("my-agent")
/// .with_version("1.2.3")
/// .with_sampling(SamplingStrategy::Probabilistic(0.1));
/// ```
///
/// Tracer 配置
/// Tracer configuration
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -189,8 +235,14 @@ pub trait SpanProcessor: Send + Sync {
async fn force_flush(&self) -> Result<(), String>;
}

/// 简单 Span 处理器 - 直接导出
/// Simple Span Processor - Export directly
/// Exports each span synchronously as soon as it ends.
///
/// **When to use**: development, testing, or very low-throughput services where
/// you want immediate visibility and can tolerate the per-span export latency.
///
/// **Trade-off vs [`BatchSpanProcessor`]**: every span end call blocks until the
/// exporter round-trip completes (network I/O for remote exporters). For production
/// workloads prefer `BatchSpanProcessor` which offloads exports to a background task.
pub struct SimpleSpanProcessor {
exporter: Arc<dyn TracingExporter>,
}
Expand Down Expand Up @@ -223,8 +275,17 @@ impl SpanProcessor for SimpleSpanProcessor {
}
}

/// 批处理 Span 处理器
/// Batch Span Processor
/// Buffers completed spans and exports them in batches on a background Tokio task.
///
/// **When to use**: production or any service where exporter latency would otherwise
/// appear in your application's critical path.
///
/// Configure the trade-offs via [`ExporterConfig`]:
/// - `batch_size` — maximum spans per export call (default: 512)
/// - `export_interval_ms` — maximum time a span waits in the buffer (default: 5 000 ms)
/// - `max_queue_size` — spans are dropped when the queue exceeds this limit (default: 2 048)
///
/// Call `force_flush` before process exit to avoid losing buffered spans.
pub struct BatchSpanProcessor {
exporter: Arc<dyn TracingExporter>,
buffer: Arc<RwLock<Vec<SpanData>>>,
Expand Down Expand Up @@ -434,8 +495,28 @@ impl Tracer {
}
}

/// Tracer Provider - 管理多个 Tracer
/// Tracer Provider - Manages multiple Tracers
/// Factory and lifecycle manager for [`Tracer`] instances.
///
/// A single `TracerProvider` owns the [`SpanProcessor`] (and therefore the exporter),
/// so all tracers it creates share the same export pipeline. This matches the
/// OpenTelemetry spec's `TracerProvider` concept.
///
/// # Examples
/// ```rust,no_run
/// use std::sync::Arc;
/// use mofa_monitoring::tracing::{
/// ConsoleExporter, ExporterConfig, SimpleSpanProcessor, TracerConfig, TracerProvider,
/// };
///
/// # async fn example() {
/// let exporter = Arc::new(ConsoleExporter::new(ExporterConfig::new("my-agent")));
/// let processor = Arc::new(SimpleSpanProcessor::new(exporter));
/// let provider = TracerProvider::new(TracerConfig::new("my-agent"), processor);
///
/// // Obtain a tracer scoped to a specific component
/// let tracer = provider.tracer("rag-pipeline").await;
/// # }
/// ```
pub struct TracerProvider {
config: TracerConfig,
processor: Arc<dyn SpanProcessor>,
Expand Down Expand Up @@ -510,7 +591,11 @@ impl TracerProvider {
}

/// 全局 Tracer
/// Global Tracer
/// Process-wide singleton that holds a reference to the active [`TracerProvider`].
///
/// Use `GlobalTracer` when library code needs to emit spans without requiring
/// callers to pass a tracer explicitly. Call [`GlobalTracer::set_provider`] once
/// at startup, then use [`global_tracer()`] anywhere to obtain a [`Tracer`].
pub struct GlobalTracer {
provider: Arc<RwLock<Option<Arc<TracerProvider>>>>,
}
Expand Down
Loading