diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 9f907ecfb..842c4c310 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -55,6 +55,7 @@ members = [ "rag_retrieval_demo", "rag_indexing_demo", "adversarial_testing_demo", + "local_compute_mesh_demo", ] [workspace.package] diff --git a/examples/local_compute_mesh_demo/Cargo.toml b/examples/local_compute_mesh_demo/Cargo.toml new file mode 100644 index 000000000..f654f3da4 --- /dev/null +++ b/examples/local_compute_mesh_demo/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "local_compute_mesh_demo" +version.workspace = true +edition.workspace = true + +[dependencies] +tokio.workspace = true +mofa-foundation = {path = "../../crates/mofa-foundation" } +tracing.workspace = true +tracing-subscriber.workspace = true +serde.workspace = true +serde_yaml.workspace = true + +[lints] +workspace = true diff --git a/examples/local_compute_mesh_demo/README.md b/examples/local_compute_mesh_demo/README.md new file mode 100644 index 000000000..db4664c16 --- /dev/null +++ b/examples/local_compute_mesh_demo/README.md @@ -0,0 +1,133 @@ +# Local Compute Mesh Demo + +This demo showcases the MoFA compute mesh routing capabilities, demonstrating how inference requests can be routed between local and cloud backends based on configurable policies. + +## Overview + +The compute mesh enables intelligent routing of inference requests between: + +- **Local backends**: Run inference on local hardware (CPUs/GPUs) +- **Cloud backends**: Route to cloud providers (OpenAI, Anthropic, etc.) + +## Features + +- **Multiple Routing Policies**: LocalFirstWithCloudFallback, LocalOnly, CloudOnly, LatencyOptimized, CostOptimized +- **Memory-Aware Scheduling**: Automatic memory management and model eviction +- **Performance Benchmarking**: Built-in metrics collection for latency and throughput + +## Running the Demo + +### Basic Usage + +```bash +cargo run --example local_compute_mesh_demo -- "Explain photosynthesis" +``` + +### With Custom Prompt + +```bash +cargo run --example local_compute_mesh_demo -- "What is quantum computing?" +``` + +## Performance Benchmark + +The demo includes comprehensive performance benchmarking that measures: + +### Metrics Collected + +| Metric | Description | +|--------|-------------| +| `latency_ms` | Total time from request start to completion | +| `time_to_first_token_ms` | Time to receive the first token | +| `tokens_streamed` | Total number of tokens generated | +| `tokens_per_second` | Token generation throughput | +| `total_time_ms` | Total streaming duration | + +### How Latency is Measured + +1. **Request Start**: Timer begins when the inference request is created +2. **First Token**: Records the time when the first token is received/streamed +3. **Completion**: Timer ends when all tokens have been streamed + +The latency is measured using `std::time::Instant` for high-resolution timing. + +### How Tokens/Second is Calculated + +``` +tokens_per_second = tokens_streamed / (total_time_ms / 1000.0) +``` + +This gives you the actual streaming throughput during generation. + +### Comparing Local vs Cloud Routing + +The demo runs three scenarios to compare different routing policies: + +1. **LocalFirstWithCloudFallback**: Tries local first, falls back to cloud if needed +2. **CloudOnly**: Always routes to cloud provider +3. **LocalOnly**: Always uses local backend + +Example output: + +``` +[workflow] executing step: generate_response +[inference] sending request to orchestrator... + +[router] policy: LocalFirstWithCloudFallback +[router] selected backend: local + +[stream] This +[stream] is +[stream] a +... + +[metrics] +backend: local +latency_ms: 820 +time_to_first_token_ms: 45 +tokens_streamed: 27 +tokens_per_second: 32.9 +total_time_ms: 910 +``` + +## Configuration + +The demo can be configured via the `workflow.yaml` file: + +- **routing_policy**: Choose the routing strategy +- **memory_capacity_mb**: Set local model memory limit +- **cloud_provider**: Configure cloud provider fallback + +## Architecture + +``` +┌─────────────────────────────────────────────────────────────┐ +│ Compute Mesh Demo │ +├─────────────────────────────────────────────────────────────┤ +│ ┌─────────────────────────────────────────────────────┐ │ +│ │ InferenceOrchestrator │ │ +│ │ │ │ +│ │ ┌─────────────┐ ┌─────────────┐ │ │ +│ │ │ Routing │ │ Model │ │ │ +│ │ │ Policy │ │ Pool │ │ │ +│ │ └─────────────┘ └─────────────┘ │ │ +│ │ │ │ │ │ +│ │ ▼ ▼ │ │ +│ │ ┌─────────────┐ ┌─────────────┐ │ │ +│ │ │ Local │ │ Cloud │ │ │ +│ │ │ Execution │ │ Fallback │ │ │ +│ │ └─────────────┘ └─────────────┘ │ │ +│ └─────────────────────────────────────────────────────┘ │ +└─────────────────────────────────────────────────────────────┘ +``` + +## Use Cases + +- **Edge Deployment**: Run models locally on edge devices +- **Cost Optimization**: Minimize cloud API costs +- **Latency Sensitive**: Prioritize fast response times +- **Hybrid Mesh**: Combine local and cloud for best experience + +## License + +This demo is part of the MoFA project and follows the same license terms. diff --git a/examples/local_compute_mesh_demo/src/main.rs b/examples/local_compute_mesh_demo/src/main.rs new file mode 100644 index 000000000..a0824630a --- /dev/null +++ b/examples/local_compute_mesh_demo/src/main.rs @@ -0,0 +1,308 @@ +//! Local Compute Mesh Demo with Performance Benchmarking +//! +//! This demo demonstrates the compute mesh routing between local and cloud inference backends. +//! It includes comprehensive performance benchmarking to measure latency, throughput, and token metrics. + +use std::time::Instant; +use mofa_foundation::inference::{ + InferenceOrchestrator, + InferenceRequest, + OrchestratorConfig, + RequestPriority, + RoutingPolicy, +}; +use tracing::{info, Level}; +use tracing_subscriber::FmtSubscriber; + +/// Performance metrics collected during inference +#[derive(Debug, Clone)] +struct PerformanceMetrics { + /// Backend used for inference (local or cloud) + backend: String, + /// Time from request start to first token (ms) + time_to_first_token_ms: f64, + /// Total time for streaming completion (ms) + total_stream_time_ms: f64, + /// Total latency from start to end (ms) + total_latency_ms: f64, + /// Number of tokens generated/streamed + tokens_streamed: usize, + /// Tokens per second throughput + tokens_per_second: f64, +} + +impl PerformanceMetrics { + /// Print metrics in structured format + fn print_metrics(&self) { + info!(""); + info!("[metrics]"); + info!("backend: {}", self.backend); + info!("latency_ms: {:.0}", self.total_latency_ms); + info!("time_to_first_token_ms: {:.0}", self.time_to_first_token_ms); + info!("tokens_streamed: {}", self.tokens_streamed); + info!("tokens_per_second: {:.1}", self.tokens_per_second); + info!("total_time_ms: {:.0}", self.total_stream_time_ms); + } + + /// Create metrics from timing data + fn from_timing( + backend: &str, + start_time: Instant, + first_token_time: Option, + end_time: Instant, + tokens_streamed: usize, + ) -> Self { + let total_time = end_time.duration_since(start_time); + let total_time_ms = total_time.as_secs_f64() * 1000.0; + + let time_to_first_token_ms = first_token_time + .map(|t| t.duration_since(start_time).as_secs_f64() * 1000.0) + .unwrap_or(0.0); + + let tokens_per_second = if total_time_ms > 0.0 { + tokens_streamed as f64 / (total_time_ms / 1000.0) + } else { + 0.0 + }; + + Self { + backend: backend.to_string(), + time_to_first_token_ms, + total_stream_time_ms: total_time_ms, + total_latency_ms: total_time_ms, + tokens_streamed, + tokens_per_second, + } + } +} + +/// Simulated streaming response for demo purposes +/// In a real implementation, this would stream tokens from the LLM +struct StreamingResponse { + tokens: Vec, +} + +impl StreamingResponse { + /// Simulate streaming tokens with timing + fn stream_and_measure( + &self, + backend: &str, + mut on_token: impl FnMut(&str, Option), + ) -> PerformanceMetrics { + let start_time = Instant::now(); + let mut first_token_time: Option = None; + let mut token_count = 0; + + for token in &self.tokens { + // Simulate token arrival timing + let now = Instant::now(); + if first_token_time.is_none() { + first_token_time = Some(now); + } + token_count += 1; + on_token(token, first_token_time); + + // Small delay to simulate streaming + std::thread::sleep(std::time::Duration::from_millis(10)); + } + + let end_time = Instant::now(); + PerformanceMetrics::from_timing( + backend, + start_time, + first_token_time, + end_time, + token_count, + ) + } +} + +/// Execute inference with the orchestrator and measure performance +fn execute_inference_with_benchmark( + orchestrator: &mut InferenceOrchestrator, + prompt: &str, + model_id: &str, +) -> (String, PerformanceMetrics) { + // Start timing + let start_time = Instant::now(); + + // Create inference request + let request = InferenceRequest::new(model_id, prompt, 4096) + .with_priority(RequestPriority::Normal); + + info!("[inference] sending request to orchestrator..."); + + // Execute inference + let result = orchestrator.infer(&request); + + // Determine backend type for metrics + let backend_type = match &result.routed_to { + mofa_foundation::inference::RoutedBackend::Local { .. } => "local", + mofa_foundation::inference::RoutedBackend::Cloud { .. } => "cloud", + mofa_foundation::inference::RoutedBackend::Rejected { .. } => "rejected", + }; + + info!("[router] policy: {:?}", orchestrator.routing_policy()); + info!("[router] selected backend: {}", backend_type); + + // Simulate streaming response for demo + // In production, this would stream actual tokens from the LLM + let response_text = format!( + "This is a simulated response for: {}. In production, \ + this would stream actual tokens from the local or cloud LLM.", + prompt + ); + + let tokens: Vec = response_text + .split_whitespace() + .map(|s| s.to_string()) + .collect(); + + let streaming_response = StreamingResponse { + tokens, + }; + + // Stream tokens and measure + let metrics = streaming_response.stream_and_measure(backend_type, |token, _first_token| { + info!("[stream] {}", token); + }); + + let end_time = Instant::now(); + let total_latency = end_time.duration_since(start_time).as_secs_f64() * 1000.0; + + // Combine with actual request latency + let final_metrics = PerformanceMetrics { + backend: metrics.backend.clone(), + time_to_first_token_ms: metrics.time_to_first_token_ms, + total_stream_time_ms: metrics.total_stream_time_ms, + total_latency_ms: total_latency, + tokens_streamed: metrics.tokens_streamed, + tokens_per_second: metrics.tokens_per_second, + }; + + (result.output, final_metrics) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Initialize logging + let subscriber = FmtSubscriber::builder() + .with_max_level(Level::INFO) + .with_target(false) + .with_thread_ids(false) + .with_file(true) + .with_line_number(true) + .finish(); + + tracing::subscriber::set_global_default(subscriber) + .expect("setting default subscriber failed"); + + info!("========================================"); + info!(" MoFA Compute Mesh Demo "); + info!(" with Performance Benchmarking "); + info!("========================================"); + info!(""); + + // Get prompt from command line args or use default + let prompt = std::env::args() + .nth(1) + .unwrap_or_else(|| "Explain photosynthesis".to_string()); + + info!("[workflow] executing step: generate_response"); + info!("Prompt: {}", prompt); + info!(""); + + // Demo 1: LocalFirstWithCloudFallback policy (default) + info!("=== Demo 1: LocalFirstWithCloudFallback ==="); + demo_local_first(&prompt).await?; + + info!(""); + info!("=== Demo 2: CloudOnly ==="); + demo_cloud_only(&prompt).await?; + + info!(""); + info!("=== Demo 3: LocalOnly ==="); + demo_local_only(&prompt).await?; + + info!(""); + info!("========================================"); + info!(" Demo completed! "); + info!("========================================"); + + Ok(()) +} + +/// Demo with LocalFirstWithCloudFallback policy +async fn demo_local_first(prompt: &str) -> Result<(), Box> { + let config = OrchestratorConfig { + memory_capacity_mb: 16384, + defer_threshold: 0.75, + reject_threshold: 0.90, + model_pool_capacity: 5, + routing_policy: RoutingPolicy::LocalFirstWithCloudFallback, + cloud_provider: "openai".to_string(), + ..Default::default() + }; + + let mut orchestrator = InferenceOrchestrator::new(config); + + let (_result, metrics) = execute_inference_with_benchmark( + &mut orchestrator, + prompt, + "llama-3-7b", + ); + + metrics.print_metrics(); + + Ok(()) +} + +/// Demo with CloudOnly policy +async fn demo_cloud_only(prompt: &str) -> Result<(), Box> { + let config = OrchestratorConfig { + memory_capacity_mb: 16384, + defer_threshold: 0.75, + reject_threshold: 0.90, + model_pool_capacity: 5, + routing_policy: RoutingPolicy::CloudOnly, + cloud_provider: "openai".to_string(), + ..Default::default() + }; + + let mut orchestrator = InferenceOrchestrator::new(config); + + let (_result, metrics) = execute_inference_with_benchmark( + &mut orchestrator, + prompt, + "llama-3-7b", + ); + + metrics.print_metrics(); + + Ok(()) +} + +/// Demo with LocalOnly policy +async fn demo_local_only(prompt: &str) -> Result<(), Box> { + let config = OrchestratorConfig { + memory_capacity_mb: 16384, + defer_threshold: 0.75, + reject_threshold: 0.90, + model_pool_capacity: 5, + routing_policy: RoutingPolicy::LocalOnly, + cloud_provider: "openai".to_string(), + ..Default::default() + }; + + let mut orchestrator = InferenceOrchestrator::new(config); + + let (_result, metrics) = execute_inference_with_benchmark( + &mut orchestrator, + prompt, + "llama-3-7b", + ); + + metrics.print_metrics(); + + Ok(()) +} diff --git a/examples/local_compute_mesh_demo/workflow.yaml b/examples/local_compute_mesh_demo/workflow.yaml new file mode 100644 index 000000000..5db5d0dda --- /dev/null +++ b/examples/local_compute_mesh_demo/workflow.yaml @@ -0,0 +1,35 @@ +# Compute Mesh Workflow Configuration +# This file defines the workflow for the local compute mesh demo + +name: compute-mesh-demo +version: "1.0" + +# Workflow steps +steps: + - id: generate_response + name: Generate LLM Response + description: Generate a response using the compute mesh routing + config: + # Routing policy: LocalFirstWithCloudFallback, LocalOnly, CloudOnly, LatencyOptimized, CostOptimized + routing_policy: LocalFirstWithCloudFallback + + # Model configuration + model: llama-3-7b + max_tokens: 2048 + temperature: 0.7 + + # Memory configuration (MB) + memory_capacity_mb: 16384 + + # Cloud provider fallback + cloud_provider: openai + +# Metrics collection +metrics: + enabled: true + collect: + - latency_ms + - tokens_streamed + - tokens_per_second + - time_to_first_token_ms + - total_time_ms