Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions mcp-logging/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,7 @@ pub use profiling::{
};
pub use sanitization::{LogSanitizer, SanitizationConfig};
pub use structured::{ErrorClass, StructuredContext, StructuredLogger};
pub use telemetry::{
BatchProcessingConfig, JaegerConfig, OtlpConfig, SamplingConfig, SamplingStrategy,
TelemetryConfig, TelemetryError, TelemetryManager, ZipkinConfig, propagation, spans,
};
pub use telemetry::spans;

/// Result type for logging operations
///
Expand Down
271 changes: 14 additions & 257 deletions mcp-logging/src/telemetry.rs
Original file line number Diff line number Diff line change
@@ -1,214 +1,7 @@
//! Simplified OpenTelemetry integration for distributed tracing
//! Tracing span utilities for MCP servers
//!
//! This module provides basic distributed tracing capabilities for MCP servers.
//! The full OpenTelemetry integration is complex and requires careful API matching.

use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tracing::info;

/// Telemetry configuration for OpenTelemetry
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TelemetryConfig {
/// Enable telemetry
pub enabled: bool,

/// Service name for traces
pub service_name: String,

/// Service version
pub service_version: String,

/// Service namespace (e.g., "mcp", "loxone")
pub service_namespace: Option<String>,

/// Deployment environment (dev, staging, prod)
pub environment: Option<String>,

/// OTLP exporter configuration
pub otlp: OtlpConfig,

/// Jaeger exporter configuration
pub jaeger: Option<JaegerConfig>,

/// Zipkin exporter configuration
pub zipkin: Option<ZipkinConfig>,

/// Sampling configuration
pub sampling: SamplingConfig,

/// Batch processing configuration
pub batch: BatchProcessingConfig,

/// Custom resource attributes
pub resource_attributes: HashMap<String, String>,

/// Enable console exporter for development
pub console_exporter: bool,
}

/// OTLP (OpenTelemetry Protocol) exporter configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OtlpConfig {
/// Enable OTLP exporter
pub enabled: bool,

/// OTLP endpoint URL
pub endpoint: String,

/// Optional headers for authentication
pub headers: HashMap<String, String>,

/// Timeout for exports
pub timeout_secs: u64,

/// Use TLS
pub tls_enabled: bool,
}

/// Jaeger exporter configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JaegerConfig {
/// Jaeger agent endpoint
pub agent_endpoint: String,

/// Jaeger collector endpoint
pub collector_endpoint: Option<String>,

/// Authentication username
pub username: Option<String>,

/// Authentication password
pub password: Option<String>,
}

/// Zipkin exporter configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ZipkinConfig {
/// Zipkin endpoint URL
pub endpoint: String,

/// Timeout for exports
pub timeout_secs: u64,
}

/// Sampling configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SamplingConfig {
/// Sampling strategy
pub strategy: SamplingStrategy,

/// Sampling rate (0.0 to 1.0) for ratio-based sampling
pub rate: f64,

/// Parent-based sampling configuration
pub parent_based: bool,
}

/// Sampling strategies
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SamplingStrategy {
/// Always sample
Always,
/// Never sample
Never,
/// Sample based on ratio
Ratio,
/// Parent-based sampling
ParentBased,
}

/// Batch processing configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BatchProcessingConfig {
/// Maximum batch size
pub max_batch_size: usize,

/// Batch timeout in milliseconds
pub batch_timeout_ms: u64,

/// Maximum queue size
pub max_queue_size: usize,

/// Export timeout in milliseconds
pub export_timeout_ms: u64,
}

impl Default for TelemetryConfig {
fn default() -> Self {
Self {
enabled: true,
service_name: "mcp-server".to_string(),
service_version: "1.0.0".to_string(),
service_namespace: Some("mcp".to_string()),
environment: Some("development".to_string()),
otlp: OtlpConfig {
enabled: true,
endpoint: "http://localhost:4317".to_string(),
headers: HashMap::new(),
timeout_secs: 10,
tls_enabled: false,
},
jaeger: None,
zipkin: None,
sampling: SamplingConfig {
strategy: SamplingStrategy::Ratio,
rate: 0.1, // 10% sampling by default
parent_based: true,
},
batch: BatchProcessingConfig {
max_batch_size: 512,
batch_timeout_ms: 1000,
max_queue_size: 2048,
export_timeout_ms: 30000,
},
resource_attributes: HashMap::new(),
console_exporter: false,
}
}
}

/// Telemetry manager for OpenTelemetry integration
pub struct TelemetryManager {
config: TelemetryConfig,
}

impl TelemetryManager {
/// Initialize telemetry with the given configuration
pub async fn new(config: TelemetryConfig) -> Result<Self, TelemetryError> {
let manager = Self { config };

if manager.config.enabled {
info!(
"Telemetry enabled for service: {} v{}",
manager.config.service_name, manager.config.service_version
);
// Note: Full OpenTelemetry integration requires matching API versions
// This is a simplified version that logs configuration
}

Ok(manager)
}

/// Shutdown telemetry
pub async fn shutdown(&self) -> Result<(), TelemetryError> {
if self.config.enabled {
info!("Shutting down telemetry");
}
Ok(())
}
}

/// Telemetry error types
#[derive(Debug, thiserror::Error)]
pub enum TelemetryError {
#[error("Initialization error: {0}")]
Initialization(String),

#[error("Configuration error: {0}")]
Configuration(String),
}
//! This module provides pre-configured tracing spans following semantic conventions
//! for common MCP operations. These work with any tracing subscriber.

/// Span utilities for common MCP operations
pub mod spans {
Expand Down Expand Up @@ -281,58 +74,22 @@ pub mod spans {
}
}

/// Context propagation utilities
pub mod propagation {
use std::collections::HashMap;

/// Extract OpenTelemetry context from headers (simplified)
pub fn extract_context_from_headers(_headers: &HashMap<String, String>) {
// Note: Full context propagation requires OpenTelemetry API
// This is a placeholder for the functionality
}

/// Inject context into headers (simplified)
pub fn inject_context_into_headers(_headers: &mut HashMap<String, String>) {
// Note: Full context injection requires OpenTelemetry API
// This is a placeholder for the functionality
}
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_telemetry_config_default() {
let config = TelemetryConfig::default();
assert!(config.enabled);
assert_eq!(config.service_name, "mcp-server");
assert!(config.otlp.enabled);
}

#[tokio::test]
async fn test_telemetry_manager_disabled() {
let config = TelemetryConfig {
enabled: false,
..Default::default()
};

let manager = TelemetryManager::new(config).await.unwrap();
assert!(manager.shutdown().await.is_ok());
}

#[test]
fn test_span_utilities() {
// Initialize tracing subscriber for test environment
let _guard = tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_test_writer()
.try_init();

let span = spans::mcp_request_span("tools/list", "req-123");
assert!(!span.is_disabled());

let span = spans::backend_operation_span("fetch_data", Some("users"));
assert!(!span.is_disabled());
// Note: Without a subscriber, spans are disabled by default.
// These tests verify the span creation functions work correctly.
// Span enablement depends on runtime subscriber configuration.
let _span = spans::mcp_request_span("tools/list", "req-123");
let _span = spans::backend_operation_span("fetch_data", Some("users"));
let _span = spans::backend_operation_span("fetch_data", None);
let _span = spans::auth_operation_span("login", Some("user-456"));
let _span = spans::auth_operation_span("login", None);
let _span = spans::external_api_span("api-service", "/endpoint", "GET");
let _span = spans::database_operation_span("SELECT", Some("users"));
let _span = spans::database_operation_span("SELECT", None);
}
}
29 changes: 1 addition & 28 deletions mcp-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ use crate::{backend::McpBackend, handler::GenericServerHandler, middleware::Midd
use pulseengine_mcp_auth::{AuthConfig, AuthenticationManager};
use pulseengine_mcp_logging::{
AlertConfig, AlertManager, DashboardConfig, DashboardManager, PerformanceProfiler,
PersistenceConfig, ProfilingConfig, SanitizationConfig, StructuredLogger, TelemetryConfig,
TelemetryManager,
PersistenceConfig, ProfilingConfig, SanitizationConfig, StructuredLogger,
};
use pulseengine_mcp_protocol::*;
use pulseengine_mcp_security::{SecurityConfig, SecurityMiddleware};
Expand Down Expand Up @@ -66,9 +65,6 @@ pub struct ServerConfig {
/// Metrics persistence configuration
pub persistence_config: Option<PersistenceConfig>,

/// Telemetry configuration
pub telemetry_config: TelemetryConfig,

/// Alert configuration
pub alert_config: AlertConfig,

Expand Down Expand Up @@ -100,7 +96,6 @@ impl Default for ServerConfig {
monitoring_config: crate::observability::default_config(),
sanitization_config: SanitizationConfig::default(),
persistence_config: None,
telemetry_config: TelemetryConfig::default(),
alert_config: AlertConfig::default(),
dashboard_config: DashboardConfig::default(),
profiling_config: ProfilingConfig::default(),
Expand All @@ -123,7 +118,6 @@ pub struct McpServer<B: McpBackend> {
logging_metrics: Arc<pulseengine_mcp_logging::MetricsCollector>,
#[allow(dead_code)]
logger: StructuredLogger,
telemetry: Option<TelemetryManager>,
alert_manager: Arc<AlertManager>,
dashboard_manager: Arc<DashboardManager>,
profiler: Option<Arc<PerformanceProfiler>>,
Expand All @@ -139,19 +133,6 @@ impl<B: McpBackend + 'static> McpServer<B> {

info!("Initializing MCP server with backend");

// Initialize telemetry
let telemetry = if config.telemetry_config.enabled {
let mut telemetry_config = config.telemetry_config.clone();
telemetry_config.service_name = config.server_info.server_info.name.clone();
telemetry_config.service_version = config.server_info.server_info.version.clone();

Some(TelemetryManager::new(telemetry_config).await.map_err(|e| {
ServerError::Configuration(format!("Failed to initialize telemetry: {e}"))
})?)
} else {
None
};

// Initialize authentication only if enabled
let auth_manager = if config.auth_config.enabled {
Arc::new(
Expand Down Expand Up @@ -226,7 +207,6 @@ impl<B: McpBackend + 'static> McpServer<B> {
monitoring_metrics,
logging_metrics,
logger,
telemetry,
alert_manager,
dashboard_manager,
profiler,
Expand Down Expand Up @@ -351,13 +331,6 @@ impl<B: McpBackend + 'static> McpServer<B> {
})?;
}

// Shutdown telemetry
if let Some(telemetry) = &self.telemetry {
telemetry.shutdown().await.map_err(|e| {
ServerError::Configuration(format!("Failed to shutdown telemetry: {e}"))
})?;
}

// Call backend shutdown hook
self.backend
.on_shutdown()
Expand Down