diff --git a/.gitignore b/.gitignore index 7f5e57bb..55bfeb44 100644 --- a/.gitignore +++ b/.gitignore @@ -47,4 +47,6 @@ Thumbs.db # Project specific target -TODO.md \ No newline at end of file +TODO.md + +*.wav diff --git a/Cargo.lock b/Cargo.lock index 06e10894..54f7e7dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -559,6 +559,20 @@ dependencies = [ "typenum", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.9.0" @@ -1014,6 +1028,12 @@ dependencies = [ "ahash", ] +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.4" @@ -2998,13 +3018,18 @@ dependencies = [ name = "stepflow-plugin" version = "0.1.0" dependencies = [ + "dashmap", "dynosaur", "error-stack", "futures", + "once_cell", "serde", + "serde_json", "stepflow-core", "stepflow-state", "thiserror 2.0.12", + "tokio", + "tracing", "trait-variant", "uuid", ] @@ -3020,6 +3045,7 @@ dependencies = [ "serde", "serde_json", "stepflow-core", + "stepflow-execution", "stepflow-plugin", "thiserror 2.0.12", "tokio", diff --git a/crates/stepflow-analysis/src/tracker.rs b/crates/stepflow-analysis/src/tracker.rs index d0a3aa87..81bd307f 100644 --- a/crates/stepflow-analysis/src/tracker.rs +++ b/crates/stepflow-analysis/src/tracker.rs @@ -87,6 +87,16 @@ pub struct DependencyTracker { completed: BitSet, } +impl Clone for DependencyTracker { + fn clone(&self) -> Self { + Self { + dependencies: self.dependencies.clone(), + blocking: self.blocking.clone(), + completed: self.completed.clone(), + } + } +} + impl DependencyTracker { pub fn new(dependencies: Arc) -> Self { let blocking = dependencies diff --git a/crates/stepflow-builtins/src/eval.rs b/crates/stepflow-builtins/src/eval.rs index 9a4b10b5..89c925d5 100644 --- a/crates/stepflow-builtins/src/eval.rs +++ b/crates/stepflow-builtins/src/eval.rs @@ -84,6 +84,16 @@ impl BuiltinComponent for EvalComponent { let result_value = match nested_result { FlowResult::Success { result } => result.as_ref().clone(), FlowResult::Skipped => serde_json::Value::Null, + FlowResult::Streaming { stream_id, metadata, chunk, chunk_index, is_final } => { + // For streaming results, return the metadata and chunk info + serde_json::json!({ + "stream_id": stream_id, + "metadata": metadata.as_ref(), + "chunk": chunk, + "chunk_index": chunk_index, + "is_final": is_final + }) + } FlowResult::Failed { error } => { // Propagate the failure from the nested workflow return Ok(FlowResult::Failed { error }); diff --git a/crates/stepflow-core/src/flow_result.rs b/crates/stepflow-core/src/flow_result.rs index 55625d56..8d564b80 100644 --- a/crates/stepflow-core/src/flow_result.rs +++ b/crates/stepflow-core/src/flow_result.rs @@ -49,6 +49,19 @@ impl FlowError { pub enum FlowResult { /// The step execution was successful. Success { result: ValueRef }, + /// The step is streaming data. + Streaming { + /// Stream identifier + stream_id: String, + /// Metadata about the stream + metadata: ValueRef, + /// Base64 encoded chunk data + chunk: String, + /// Chunk index + chunk_index: usize, + /// Whether this is the final chunk + is_final: bool, + }, /// The step was skipped. Skipped, /// The step failed with the given error. @@ -57,6 +70,12 @@ pub enum FlowResult { impl From for FlowResult { fn from(value: serde_json::Value) -> Self { + // First try to deserialize as a proper FlowResult + if let Ok(flow_result) = serde_json::from_value::(value.clone()) { + return flow_result; + } + + // If that fails, wrap in Success as fallback let result = ValueRef::new(value); Self::Success { result } } @@ -70,6 +89,15 @@ impl FlowResult { } } + pub fn streaming(&self) -> Option<(String, ValueRef, String, usize, bool)> { + match self { + Self::Streaming { stream_id, metadata, chunk, chunk_index, is_final } => { + Some((stream_id.clone(), metadata.clone(), chunk.clone(), *chunk_index, *is_final)) + } + _ => None, + } + } + pub fn skipped(&self) -> bool { matches!(self, Self::Skipped) } diff --git a/crates/stepflow-core/src/workflow/step.rs b/crates/stepflow-core/src/workflow/step.rs index 31da8829..14829f67 100644 --- a/crates/stepflow-core/src/workflow/step.rs +++ b/crates/stepflow-core/src/workflow/step.rs @@ -3,7 +3,7 @@ use crate::schema::SchemaRef; use schemars::JsonSchema; /// A step in a workflow that executes a component with specific arguments. -#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq, JsonSchema, utoipa::ToSchema)] +#[derive(Clone, serde::Serialize, serde::Deserialize, Debug, PartialEq, JsonSchema, utoipa::ToSchema)] #[serde(rename_all = "camelCase")] pub struct Step { /// Optional identifier for the step @@ -28,6 +28,10 @@ pub struct Step { /// Arguments to pass to the component for this step #[serde(default, skip_serializing_if = "ValueRef::is_null")] pub input: ValueRef, + + /// Whether this step is a streaming step (doesn't persist results) + #[serde(default, skip_serializing_if = "std::ops::Not::not")] + pub streaming: bool, } #[derive( @@ -134,6 +138,7 @@ mod tests { default_value: Some(ValueRef::from("fallback")), }, input: serde_json::Value::Null.into(), + streaming: false, }; let yaml = serde_yaml_ng::to_string(&step).unwrap(); @@ -152,6 +157,7 @@ mod tests { skip_if: None, on_error: ErrorAction::Fail, input: serde_json::Value::Null.into(), + streaming: false, }; let yaml = serde_yaml_ng::to_string(&step).unwrap(); diff --git a/crates/stepflow-execution/src/error.rs b/crates/stepflow-execution/src/error.rs index be169897..7cf7c294 100644 --- a/crates/stepflow-execution/src/error.rs +++ b/crates/stepflow-execution/src/error.rs @@ -42,6 +42,8 @@ pub enum ExecutionError { ExecutionNotFound(Uuid), #[error("workflow '{0}' not found")] WorkflowNotFound(FlowHash), + #[error("streaming operation failed")] + StreamingError, } pub type Result> = std::result::Result; diff --git a/crates/stepflow-execution/src/executor.rs b/crates/stepflow-execution/src/executor.rs index 7610bf93..a4becd33 100644 --- a/crates/stepflow-execution/src/executor.rs +++ b/crates/stepflow-execution/src/executor.rs @@ -11,7 +11,7 @@ use stepflow_core::{ }; use stepflow_plugin::{Context, DynPlugin, ExecutionContext, Plugin as _}; use stepflow_state::{InMemoryStateStore, StateStore}; -use tokio::sync::{RwLock, oneshot}; +use tokio::sync::{RwLock, oneshot, Mutex}; use uuid::Uuid; type FutureFlowResult = futures::future::Shared>; @@ -25,7 +25,7 @@ pub struct StepFlowExecutor { // TODO: Should write execution state to the state store for persistence. pending: Arc>>, /// Active debug sessions for step-by-step execution control - debug_sessions: Arc>>, + debug_sessions: Arc>>>>, // Keep a weak reference to self for spawning tasks without circular references self_weak: std::sync::Weak, } @@ -108,13 +108,13 @@ impl StepFlowExecutor { } /// Get or create a debug session for step-by-step execution control - pub async fn debug_session(&self, execution_id: Uuid) -> Result { + pub async fn debug_session(&self, execution_id: Uuid) -> Result>> { // Check if session already exists { let sessions = self.debug_sessions.read().await; - if let Some(_session) = sessions.get(&execution_id) { - // Return a clone of the session (WorkflowExecutor should implement Clone if needed) - // For now, we'll create a new session each time since WorkflowExecutor is not Clone + if let Some(session) = sessions.get(&execution_id) { + tracing::info!("Reusing existing WorkflowExecutor for debug session: {}", execution_id); + return Ok(session.clone()); } } @@ -148,7 +148,36 @@ impl StepFlowExecutor { self.state_store.clone(), )?; - Ok(workflow_executor) + // Store the executor for future reuse + let executor_arc = Arc::new(tokio::sync::Mutex::new(workflow_executor)); + { + let mut sessions = self.debug_sessions.write().await; + sessions.insert(execution_id, executor_arc.clone()); + } + + Ok(executor_arc) + } + + /// Get a workflow executor for debug sessions + pub async fn get_workflow_executor( + &self, + execution_id: Uuid, + ) -> Result>>> { + let debug_sessions = self.debug_sessions.read().await; + Ok(debug_sessions.get(&execution_id).cloned()) + } + + /// Get the flow for a specific execution (for streaming pipeline coordinator) + pub fn flow(&self) -> Option> { + // This is a placeholder - in a real implementation, we'd need to store flows + // For now, return None since we don't have access to the flow + None + } + + /// Get an existing debug session without creating a new one + pub async fn get_debug_session(&self, execution_id: Uuid) -> Option>> { + let sessions = self.debug_sessions.read().await; + sessions.get(&execution_id).cloned() } } @@ -183,18 +212,38 @@ impl Context for StepFlowExecutor { pending.insert(execution_id, rx.shared()); } + // Create the WorkflowExecutor and store it in debug_sessions for streaming access + // TODO: Consider using a separate `active_executions` map instead of `debug_sessions` + // for normal execution, to keep debug sessions separate from streaming access + let workflow_executor = WorkflowExecutor::new( + executor.clone(), + flow.clone(), + workflow_hash.clone(), + execution_id, + input.clone(), + executor.state_store().clone(), + ).map_err(|e| stepflow_plugin::PluginError::new(format!("Failed to create workflow executor: {:?}", e))) + .change_context(stepflow_plugin::PluginError::new("Failed to create workflow executor"))?; + + // Store in debug_sessions for streaming chunk access + { + let mut debug_sessions = self.debug_sessions.write().await; + debug_sessions.insert(execution_id, Arc::new(Mutex::new(workflow_executor))); + tracing::info!("Stored WorkflowExecutor in debug_sessions for execution ID: {}", execution_id); + } + // Spawn the execution tokio::spawn(async move { tracing::info!("Executing workflow using tracker-based execution"); let state_store = executor.state_store.clone(); let result = execute_workflow( - executor, - flow, + executor.clone(), + flow.clone(), workflow_hash, execution_id, input, - state_store, + state_store.clone(), ) .await; @@ -217,6 +266,14 @@ impl Context for StepFlowExecutor { // Send the result back let _ = tx.send(flow_result); + + // Clean up the debug session immediately after execution completes + // This prevents unbounded growth of the debug_sessions map + { + let mut debug_sessions = executor.debug_sessions.write().await; + debug_sessions.remove(&execution_id); + tracing::debug!("Cleaned up debug session for execution {}", execution_id); + } }); Ok(execution_id) @@ -237,38 +294,29 @@ impl Context for StepFlowExecutor { &self, execution_id: Uuid, ) -> BoxFuture<'_, stepflow_plugin::Result> { + let pending = self.pending.clone(); + async move { - // Remove and get the receiver for this execution - let receiver = { - let pending = self.pending.read().await; - pending.get(&execution_id).cloned() + let future = { + let pending = pending.read().await; + pending + .get(&execution_id) + .ok_or_else(|| stepflow_plugin::PluginError::new("Execution not found")) + .change_context(stepflow_plugin::PluginError::new("Execution not found"))? + .clone() }; - match receiver { - Some(rx) => { - match rx.await { - Ok(result) => Ok(result), - Err(_) => { - // The sender was dropped, indicating the execution was cancelled or failed - Ok(FlowResult::Failed { - error: stepflow_core::FlowError::new( - 410, - "Nested flow execution was cancelled", - ), - }) - } - } - } - None => { - // Execution ID not found - Ok(FlowResult::Failed { - error: stepflow_core::FlowError::new( - 404, - format!("No execution found for ID: {}", execution_id), - ), - }) - } + let result = future.await.map_err(|_| stepflow_plugin::PluginError::new("Execution failed")) + .change_context(stepflow_plugin::PluginError::new("Execution failed")); + + // Clean up the pending entry after getting the result to prevent memory leaks + { + let mut pending_write = pending.write().await; + pending_write.remove(&execution_id); + tracing::debug!("Cleaned up pending execution {}", execution_id); } + + result } .boxed() } @@ -276,6 +324,38 @@ impl Context for StepFlowExecutor { fn state_store(&self) -> &Arc { &self.state_store } + + fn executor(&self) -> Option> { + Some(Arc::new(StepFlowExecutorWrapper(self.self_weak.clone()))) + } +} + +/// Wrapper to provide Executor trait implementation for StepFlowExecutor +struct StepFlowExecutorWrapper(std::sync::Weak); + +impl stepflow_plugin::Executor for StepFlowExecutorWrapper { + fn get_workflow_executor( + &self, + execution_id: Uuid, + ) -> BoxFuture<'_, stepflow_plugin::Result>>> { + let weak = self.0.clone(); + + async move { + if let Some(executor) = weak.upgrade() { + match executor.get_workflow_executor(execution_id).await { + Ok(Some(workflow_executor)) => { + Ok(Some(Box::new(workflow_executor) as Box)) + } + Ok(None) => Ok(None), + Err(e) => Err(stepflow_plugin::PluginError::new(format!("Failed to get workflow executor: {:?}", e))) + .change_context(stepflow_plugin::PluginError::new("Failed to get workflow executor")), + } + } else { + Ok(None) + } + } + .boxed() + } } #[cfg(test)] diff --git a/crates/stepflow-execution/src/value_resolver.rs b/crates/stepflow-execution/src/value_resolver.rs index c505e1c5..f11af293 100644 --- a/crates/stepflow-execution/src/value_resolver.rs +++ b/crates/stepflow-execution/src/value_resolver.rs @@ -10,6 +10,7 @@ use uuid::Uuid; use crate::{ExecutionError, Result}; /// Value resolver for handling expression and JSON value resolution +#[derive(Clone)] pub struct ValueResolver { /// Execution ID of the workflow we are resolving for. /// @@ -34,6 +35,11 @@ impl ValueResolver { } } + /// Get the workflow input for this resolver. + pub fn workflow_input(&self) -> &ValueRef { + &self.input + } + /// Resolve a ValueRef, returning a FlowResult. /// This is the main entry point for value resolution. pub async fn resolve(&self, value: &ValueRef) -> Result { @@ -97,6 +103,10 @@ impl ValueResolver { // NOTE: Skip actions are applied after path resolution. match path_result { FlowResult::Success { result } => Ok(FlowResult::Success { result }), + FlowResult::Streaming { stream_id, metadata, chunk, chunk_index, is_final } => { + // For streaming results, we can't apply skip actions, so just pass through + Ok(FlowResult::Streaming { stream_id, metadata, chunk, chunk_index, is_final }) + } FlowResult::Skipped => { match expr.on_skip() { Some(SkipAction::UseDefault { default_value }) => { @@ -151,6 +161,11 @@ impl ValueResolver { FlowResult::Success { result } => { result_map.insert(k.clone(), result.as_ref().clone()); } + FlowResult::Streaming { stream_id, metadata, chunk, chunk_index, is_final } => { + // For streaming results in objects, we can't handle them properly + // Return the streaming result as-is + return Ok(FlowResult::Streaming { stream_id, metadata, chunk, chunk_index, is_final }); + } FlowResult::Skipped => { return Ok(FlowResult::Skipped); } @@ -171,6 +186,11 @@ impl ValueResolver { FlowResult::Success { result } => { result_array.push(result.as_ref().clone()); } + FlowResult::Streaming { stream_id, metadata, chunk, chunk_index, is_final } => { + // For streaming results in arrays, we can't handle them properly + // Return the streaming result as-is + return Ok(FlowResult::Streaming { stream_id, metadata, chunk, chunk_index, is_final }); + } FlowResult::Skipped => { return Ok(FlowResult::Skipped); } diff --git a/crates/stepflow-execution/src/workflow_executor.rs b/crates/stepflow-execution/src/workflow_executor.rs index a3d7d628..6c6c58cc 100644 --- a/crates/stepflow-execution/src/workflow_executor.rs +++ b/crates/stepflow-execution/src/workflow_executor.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::sync::Arc; use bit_set::BitSet; @@ -12,9 +13,17 @@ use stepflow_core::{ use stepflow_plugin::{DynPlugin, ExecutionContext, Plugin as _}; use stepflow_state::{StateStore, StepResult}; use uuid::Uuid; +use tokio::sync::mpsc; use crate::{ExecutionError, Result, StepFlowExecutor, value_resolver::ValueResolver}; +/// Helper macro for streaming step logging +macro_rules! stream_log { + ($level:ident, $step_id:expr, $($arg:tt)*) => { + tracing::$level!("[STREAM {}] {}", $step_id, format!($($arg)*)) + }; +} + /// Execute a workflow and return the result. pub(crate) async fn execute_workflow( executor: Arc, @@ -24,16 +33,28 @@ pub(crate) async fn execute_workflow( input: ValueRef, state_store: Arc, ) -> Result { - let mut workflow_executor = WorkflowExecutor::new( - executor, - flow, - workflow_hash, - execution_id, - input, - state_store, - )?; - - workflow_executor.execute_to_completion().await + // Check if there's already a debug session for this execution ID + let existing_debug_session = executor.get_debug_session(execution_id).await; + + if let Some(debug_session) = existing_debug_session { + // Use the existing debug session + tracing::info!("Using existing debug session for execution ID: {}", execution_id); + let mut workflow_executor = debug_session.lock().await; + workflow_executor.execute_to_completion().await + } else { + // Create a new workflow executor + tracing::info!("Executing workflow using tracker-based execution"); + let mut workflow_executor = WorkflowExecutor::new( + executor, + flow, + workflow_hash, + execution_id, + input, + state_store, + )?; + + workflow_executor.execute_to_completion().await + } } /// Workflow executor that manages the execution of a single workflow. @@ -53,6 +74,8 @@ pub struct WorkflowExecutor { flow: Arc, /// Execution context for this session context: ExecutionContext, + /// Optional streaming pipeline coordinator + streaming_coordinator: Option>>, } impl WorkflowExecutor { @@ -79,6 +102,55 @@ impl WorkflowExecutor { // Create execution context let context = executor.execution_context(execution_id); + // Initialize streaming coordinator if workflow has streaming steps + let streaming_coordinator = if flow.steps.iter().any(|step| step.streaming) { + let mut pipeline_steps: Vec = flow.steps.iter() + .enumerate() + .filter(|(_, step)| step.streaming) + .map(|(index, _)| index) + .collect(); + + if !pipeline_steps.is_empty() { + // Log the initial order (source order) + tracing::info!("Initial pipeline order (source order): {:?}", + pipeline_steps.iter().map(|i| &flow.steps[*i].id).collect::>() + ); + + // Sort pipeline steps by dependencies using a topological sort + pipeline_steps = sort_streaming_steps_by_dependencies(&flow, pipeline_steps)?; + + // Log the final pipeline order to verify it's correct + tracing::info!("Final pipeline will run in this order: {:?}", + pipeline_steps.iter().map(|i| &flow.steps[*i].id).collect::>() + ); + tracing::info!("Pipeline step indices and components: {:?}", + pipeline_steps.iter().map(|i| (*i, &flow.steps[*i].id, &flow.steps[*i].component)).collect::>() + ); + + tracing::info!("[DEBUG-INIT] Creating streaming coordinator in WorkflowExecutor::new"); + + // Create chunk channel for this execution + let (chunk_tx, chunk_rx) = mpsc::channel::(100); + + // Register the chunk sender in the global registry + stepflow_plugin::streaming::register_chunk_sender(execution_id, chunk_tx); + + let coordinator = StreamingPipelineCoordinator::new( + executor.clone(), + flow.clone(), + pipeline_steps, + context.clone(), + resolver.clone(), + chunk_rx, + ); + Some(Arc::new(tokio::sync::Mutex::new(coordinator))) + } else { + None + } + } else { + None + }; + Ok(Self { tracker, resolver, @@ -86,6 +158,7 @@ impl WorkflowExecutor { executor, flow, context, + streaming_coordinator, }) } @@ -99,6 +172,17 @@ impl WorkflowExecutor { &self.flow } + /// Check if the streaming pipeline is still active (has active receivers) + pub fn is_streaming_pipeline_active(&self) -> bool { + if let Some(_coord_arc) = &self.streaming_coordinator { + // For now, just check if coordinator exists - we can't easily check receivers without async + true + } else { + // No coordinator means no streaming pipeline + false + } + } + /// Get currently runnable step indices. pub fn get_runnable_step_indices(&self) -> BitSet { self.tracker.unblocked_steps() @@ -111,6 +195,19 @@ impl WorkflowExecutor { tracing::debug!("Starting execution of {} steps", self.flow.steps.len()); + // Start streaming pipeline coordinator concurrently if it exists + let streaming_task = if let Some(coordinator_arc) = &self.streaming_coordinator { + tracing::info!("Starting streaming pipeline coordinator concurrently with main execution"); + + let coord = coordinator_arc.clone(); + // Start the pipeline execution in a separate task (single-phase, no setup needed) + Some(tokio::spawn(async move { + StreamingPipelineCoordinator::run_pipeline_without_lock(coord).await + })) + } else { + None + }; + // Start initial unblocked steps let initial_unblocked = self.tracker.unblocked_steps(); tracing::debug!( @@ -132,6 +229,23 @@ impl WorkflowExecutor { // Update tracker and store result let newly_unblocked = self.tracker.complete_step(completed_step_index); + // Update step status based on result + let final_status = match &step_result { + FlowResult::Success { .. } => stepflow_core::status::StepStatus::Completed, + FlowResult::Failed { .. } => stepflow_core::status::StepStatus::Failed, + FlowResult::Skipped => stepflow_core::status::StepStatus::Skipped, + FlowResult::Streaming { .. } => stepflow_core::status::StepStatus::Running, // Keep as running for streaming + }; + + self.state_store + .update_step_status( + self.context.execution_id(), + completed_step_index, + final_status, + ) + .await + .change_context_lazy(|| ExecutionError::StateError)?; + // Record the completed result in the state store let step_id = &self.flow.steps[completed_step_index].id; tracing::debug!( @@ -156,7 +270,23 @@ impl WorkflowExecutor { .await?; } - // All tasks completed - try to complete the workflow + // Wait for streaming pipeline to complete if it was started + if let Some(streaming_task) = streaming_task { + tracing::info!("Waiting for streaming pipeline coordinator to complete"); + match streaming_task.await { + Ok(result) => { + if let Err(e) = result { + tracing::warn!("Streaming pipeline coordinator completed with error: {:?}", e); + } else { + tracing::info!("Streaming pipeline coordinator completed successfully"); + } + } + Err(e) => { + tracing::warn!("Streaming pipeline coordinator task panicked: {:?}", e); + } + } + } + self.resolve_workflow_output().await } @@ -283,7 +413,18 @@ impl WorkflowExecutor { })?; // Keep executing until the target step is runnable or completed + let max_iterations = 1000; // Safety limit to prevent infinite loops + let mut iteration_count = 0; + loop { + iteration_count += 1; + if iteration_count > max_iterations { + tracing::error!("execute_until_runnable exceeded maximum iterations ({}), stopping execution", max_iterations); + return Err(ExecutionError::StepFailed { + step: format!("execute_until_runnable for {}", target_step_id) + }.into()); + } + let runnable = self.tracker.unblocked_steps(); // Check if target step is runnable @@ -369,6 +510,7 @@ impl WorkflowExecutor { FlowResult::Success { .. } => CoreStepStatus::Completed, FlowResult::Skipped => CoreStepStatus::Skipped, FlowResult::Failed { .. } => CoreStepStatus::Failed, + FlowResult::Streaming { .. } => CoreStepStatus::Running, // Streaming steps are considered running }, Err(_) => CoreStepStatus::Blocked, } @@ -404,10 +546,31 @@ impl WorkflowExecutor { .into()); } + // Update step status to Running + self.state_store + .update_step_status( + self.context.execution_id(), + step_index, + stepflow_core::status::StepStatus::Running, + ) + .await + .change_context_lazy(|| ExecutionError::StateError)?; + // Check skip condition if present if let Some(skip_if) = &step.skip_if { if self.should_skip_step(skip_if).await? { let result = FlowResult::Skipped; + + // Update step status to Skipped + self.state_store + .update_step_status( + self.context.execution_id(), + step_index, + stepflow_core::status::StepStatus::Skipped, + ) + .await + .change_context_lazy(|| ExecutionError::StateError)?; + self.record_step_completion(step_index, &result).await?; return Ok(StepExecutionResult::new( step_index, @@ -421,34 +584,52 @@ impl WorkflowExecutor { // Resolve step inputs let step_input = match self.resolver.resolve(&step.input).await? { FlowResult::Success { result } => result, + FlowResult::Streaming { stream_id: _, metadata, chunk: _, chunk_index: _, is_final: _ } => { + // For streaming steps, we can handle streaming inputs + // For now, just return the metadata as the input + metadata + } FlowResult::Skipped => { - // Step inputs contain skipped values - skip this step - let result = FlowResult::Skipped; - self.record_step_completion(step_index, &result).await?; - return Ok(StepExecutionResult::new( - step_index, - step_id, - component_string, - result, - )); + return Err(ExecutionError::StepNotRunnable { + step: step_id.clone(), + } + .into()); } - FlowResult::Failed { error } => { - return Ok(StepExecutionResult::new( - step_index, - step_id, - component_string, - FlowResult::Failed { error }, - )); + FlowResult::Failed { error: _ } => { + return Err(ExecutionError::StepFailed { step: step_id }.into()); } }; - // Get plugin and execute the step - let plugin = self.executor.get_plugin(&step.component).await?; - let result = execute_step_async(plugin, step, step_input, self.context.clone()).await?; - - // Record the result - self.record_step_completion(step_index, &result).await?; + // Check if this is a streaming step + if step.streaming { + tracing::info!("Step {} is a streaming step, using streaming execution", step.id); + // For streaming steps, return a StepExecutionResult with a placeholder Streaming result + let streaming_result = FlowResult::Streaming { + stream_id: format!("stream_{}", step.id), + metadata: stepflow_core::workflow::ValueRef::new(serde_json::json!({ + "step_id": step.id, + "step_index": step_index, + "streaming": true + })), + chunk: "".to_string(), + chunk_index: 0, + is_final: false, + }; + return Ok(StepExecutionResult::new( + step_index, + step_id, + component_string, + streaming_result, + )); + } + // Regular non-streaming step execution + let plugin = self.executor.get_plugin(&step.component).await?; + let flow = self.flow.clone(); + let context = self.context.clone() + .with_step(self.flow.steps[step_index].id.clone()); + let step = &flow.steps[step_index]; + let result = execute_step_async(plugin, step, step_input, context).await?; Ok(StepExecutionResult::new( step_index, step_id, @@ -502,6 +683,7 @@ impl WorkflowExecutor { FlowResult::Success { result } => Ok(result.is_truthy()), FlowResult::Skipped => Ok(false), // Don't skip if condition references skipped values FlowResult::Failed { .. } => Ok(false), // Don't skip if condition evaluation failed + FlowResult::Streaming { .. } => Ok(false), // Don't skip if condition references streaming values } } @@ -553,6 +735,14 @@ impl WorkflowExecutor { .union_with(&self.skip_step(&step_id, step_index).await?); continue; } + Ok(FlowResult::Streaming { .. }) => { + // Step inputs contain streaming values - this is not supported for regular steps + tracing::error!( + "Step {} has streaming inputs which is not supported for non-streaming steps", + step_id + ); + return Err(ExecutionError::StepFailed { step: step_id }.into()); + } Ok(FlowResult::Failed { error }) => { tracing::error!( "Failed to resolve inputs for step {} - input resolution failed: {:?}", @@ -587,6 +777,16 @@ impl WorkflowExecutor { let newly_unblocked_from_skip = self.tracker.complete_step(step_index); let skip_result = FlowResult::Skipped; + // Update step status to Skipped + self.state_store + .update_step_status( + self.context.execution_id(), + step_index, + stepflow_core::status::StepStatus::Skipped, + ) + .await + .change_context_lazy(|| ExecutionError::StateError)?; + // Record the skipped result in the state store self.state_store .record_step_result( @@ -619,17 +819,35 @@ impl WorkflowExecutor { let step = &self.flow.steps[step_index]; tracing::debug!("Starting execution of step {}", step.id); - // Get plugin for this step + // Update step status to Running + self.state_store + .update_step_status( + self.context.execution_id(), + step_index, + stepflow_core::status::StepStatus::Running, + ) + .await + .change_context_lazy(|| ExecutionError::StateError)?; + + // Check if this is a streaming step + if step.streaming { + tracing::info!("Step {} is a streaming step, using streaming execution", step.id); + // For streaming steps, just mark as running; the coordinator will handle execution + return Ok(()); + } + + // Regular non-streaming step execution let plugin = self.executor.get_plugin(&step.component).await?; // Clone necessary data for the async task let flow = self.flow.clone(); - let context = self.context.clone(); + let context = self.context.clone() + .with_step(self.flow.steps[step_index].id.clone()); // Create the async task let task_future: BoxFuture<'static, (usize, Result)> = Box::pin(async move { let step = &flow.steps[step_index]; - let result = execute_step_async(plugin, step, step_input, context).await; + let result = execute_step_async(plugin.clone(), step, step_input, context).await; (step_index, result) }); @@ -637,6 +855,136 @@ impl WorkflowExecutor { Ok(()) } + + /// Execute a streaming step continuously. + /// This method runs the step in a loop, processing chunks as they arrive. + pub async fn execute_streaming_step( + &mut self, + step_index: usize, + ) -> Result<()> { + let step = &self.flow.steps[step_index]; + let step_id = step.id.clone(); + + // Check if the step is runnable + if !self.tracker.unblocked_steps().contains(step_index) { + return Err(ExecutionError::StepNotRunnable { + step: step.id.clone(), + } + .into()); + } + + // Check if this is actually a streaming step + if !step.streaming { + return Err(ExecutionError::StepNotRunnable { + step: step.id.clone(), + } + .into()); + } + + // Check if this is part of a streaming pipeline + if self.is_streaming_pipeline_step(step_index) { + return self.execute_streaming_pipeline_step(step_index).await; + } + + // Log error: Individual streaming step execution (not part of a pipeline) + tracing::error!("Streaming step {} is not part of a streaming pipeline, cannot execute individually", step_id); + Err(ExecutionError::StepNotRunnable { + step: step.id.clone(), + } + .into()) + + + } + + /// Check if a step is part of a streaming pipeline (has streaming inputs/outputs) + fn is_streaming_pipeline_step(&self, step_index: usize) -> bool { + let step = &self.flow.steps[step_index]; + + // Check if this step has streaming inputs from other streaming steps + for (other_index, other_step) in self.flow.steps.iter().enumerate() { + if other_index != step_index && other_step.streaming { + // Check if this step references the other streaming step + if self.step_references_other_step(step, other_step) { + return true; + } + } + } + + false + } + + /// Check if a step references another step in its inputs + fn step_references_other_step(&self, step: &stepflow_core::workflow::Step, other_step: &stepflow_core::workflow::Step) -> bool { + // Simple check: look for step references in the input + let input_str = serde_json::to_string(&step.input).unwrap_or_default(); + input_str.contains(&format!("step: {}", other_step.id)) + } + + /// Execute a step that's part of a streaming pipeline + async fn execute_streaming_pipeline_step(&mut self, step_index: usize) -> Result<()> { + let step = &self.flow.steps[step_index]; + let step_id = step.id.clone(); + + tracing::info!("Executing streaming pipeline step: {}", step_id); + + // Update step status to Running + self.state_store + .update_step_status( + self.context.execution_id(), + step_index, + stepflow_core::status::StepStatus::Running, + ) + .await + .change_context_lazy(|| ExecutionError::StateError)?; + + // Reuse the coordinator created in WorkflowExecutor::new + let pipeline_result = if let Some(coord_arc) = &self.streaming_coordinator { + StreamingPipelineCoordinator::run_pipeline_without_lock(coord_arc.clone()).await + } else { + return Err(ExecutionError::Internal.into()); + }; + + match pipeline_result { + Ok(_) => { + // Update step status to Completed + self.state_store + .update_step_status( + self.context.execution_id(), + step_index, + stepflow_core::status::StepStatus::Completed, + ) + .await + .change_context_lazy(|| ExecutionError::StateError)?; + } + Err(e) => { + // Update step status to Failed + self.state_store + .update_step_status( + self.context.execution_id(), + step_index, + stepflow_core::status::StepStatus::Failed, + ) + .await + .change_context_lazy(|| ExecutionError::StateError)?; + return Err(e); + } + } + + // Update dependency tracker + self.tracker.complete_step(step_index); + + Ok(()) + } + +} + +impl Drop for WorkflowExecutor { + fn drop(&mut self) { + // Clean up the global chunk sender registry when the executor is dropped + let execution_id = self.execution_id(); + stepflow_plugin::streaming::unregister_chunk_sender(execution_id); + tracing::debug!("Cleaned up chunk sender for execution {}", execution_id); + } } /// Execute a single step asynchronously. @@ -732,6 +1080,710 @@ pub struct StepInspection { pub state: CoreStepStatus, } +/// Coordinates streaming execution between multiple steps in a pipeline +struct StreamingPipelineCoordinator { + executor: Arc, + flow: Arc, + pipeline_steps: Vec, + context: ExecutionContext, + resolver: ValueResolver, + step_downstream_senders: HashMap)>>, + step_receivers: HashMap>, + step_senders: HashMap>, + incoming_chunks: mpsc::Receiver, +} + +impl StreamingPipelineCoordinator { + + + fn new( + executor: Arc, + flow: Arc, + pipeline_steps: Vec, + context: ExecutionContext, + resolver: ValueResolver, + incoming_chunks: mpsc::Receiver, + ) -> Self { + let mut step_receivers = std::collections::HashMap::new(); + let mut step_downstream_senders = std::collections::HashMap::new(); + let mut step_senders = std::collections::HashMap::new(); + + tracing::info!("[DEBUG-CHANNEL-SETUP] Creating channels for pipeline steps: {:?}", pipeline_steps); + + // Create input receivers for each step + for &step_index in &pipeline_steps { + let step_id = flow.steps[step_index].id.clone(); + let (input_tx, input_rx) = tokio::sync::mpsc::channel(100); + let sender_clone = input_tx.clone(); + + // Log channel creation + tracing::info!( + "[CHANNEL-DEBUG] Created channel for step {} (index {})", + step_id, + step_index + ); + + step_senders.insert(step_id.clone(), sender_clone); + step_receivers.insert(step_id.clone(), input_rx); + } + + // Set up the pipeline connections + tracing::info!("[DEBUG-CHANNEL-SETUP] Setting up pipeline connections for {} steps", pipeline_steps.len()); + for i in 0..pipeline_steps.len() { + let step_index = pipeline_steps[i]; + let step_id = flow.steps[step_index].id.clone(); + tracing::info!("[DEBUG-CHANNEL-SETUP] Processing step {} ({}) at position {}", step_id, step_index, i); + + // Set up downstream senders for this step + let mut downstream_connections = Vec::new(); + if i < pipeline_steps.len() - 1 { + // This step sends to the next step's input + let next_step_index = pipeline_steps[i + 1]; + let next_step_id = flow.steps[next_step_index].id.clone(); + if let Some(next_step_sender) = step_senders.get(&next_step_id).cloned() { + downstream_connections.push((next_step_id.clone(), next_step_sender)); + } + } + step_downstream_senders.insert(step_id, downstream_connections); + } + + Self { + executor, + flow, + pipeline_steps, + context, + resolver, + step_receivers, + step_downstream_senders, + step_senders, + incoming_chunks, + } + } + + + + + + /// Run the pipeline without holding the mutex lock + /// This allows route_streaming_chunk to acquire the lock while the pipeline is running + async fn run_pipeline_without_lock(coord_arc: Arc>) -> Result<()> { + tracing::info!("[DEBUG-PIPELINE] Starting pipeline execution without lock"); + + // Resolve step inputs and spawn tasks while holding the lock to avoid race conditions + let mut handles = Vec::new(); + { + let mut guard = coord_arc.lock().await; + let pipeline_steps = guard.pipeline_steps.clone(); + let executor = guard.executor.clone(); + let flow = guard.flow.clone(); + let context = guard.context.clone(); + + // Resolve step inputs first + let mut step_inputs = std::collections::HashMap::new(); + for &step_idx in &pipeline_steps { + let input = guard.resolve_step_input(step_idx).await?; + step_inputs.insert(step_idx, input); + } + + // Now spawn all tasks while still holding the lock to prevent race conditions + for &step_idx in &pipeline_steps { + let step_id = flow.steps[step_idx].id.clone(); + + // Take the receiver for this step - this is the correct approach + // The sender stays in the coordinator so route_chunk_to_running_pipeline can send to it + let rx = guard.step_receivers.remove(&step_id).ok_or_else(|| { + tracing::error!("[DEBUG-CHANNEL] No receiver found for step {}", step_id); + ExecutionError::Internal + })?; + tracing::info!("[DEBUG-CHANNEL] Moved receiver for step {} to task", step_id); + let _sender = guard.step_senders.get(&step_id).unwrap().clone(); // Keep sender in map for handle_chunk + let downstream = guard.step_downstream_senders + .get(&step_id).cloned().unwrap_or_default(); + + tracing::info!("[DEBUG-CHANNEL] Step {} spawning with {} downstream channels", step_id, downstream.len()); + let input = step_inputs.remove(&step_idx).ok_or_else(|| { + ExecutionError::Internal + })?; + + tracing::info!("Starting step task for {} with receiver while holding lock", step_id); + let plugin = executor.get_plugin(&flow.steps[step_idx].component).await?; + let context = context.clone() + .with_step(step_id.clone()); + let step = flow.steps[step_idx].clone(); + let is_source = step_idx == pipeline_steps[0]; + + // Spawn while still holding the pieces and the lock + let h = tokio::spawn(async move { + if is_source { + tracing::info!("Step task {} is source - waiting for generator to complete", step_id); + // For source steps, don't run the generator here (it's run separately) + // Just wait a bit and then exit - the generator runs independently + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + tracing::info!("Step task {} source step exiting (generator runs separately)", step_id); + Ok(()) + } else { + tracing::info!("Step task {} about to call run_streaming_step_simple", step_id); + let result = run_streaming_step_simple(plugin, step, input, context, rx, downstream, is_source).await; + tracing::info!("Step task {} finished run_streaming_step_simple: {:?}", step_id, result.is_ok()); + result + } + }); + handles.push((step_idx, h)); + } + } // Lock dropped NOW - after all tasks are spawned with their receivers + + // Capture flow and pipeline_steps outside of the lock for later use + let (flow, pipeline_steps) = { + let guard = coord_arc.lock().await; + (guard.flow.clone(), guard.pipeline_steps.clone()) + }; + + // Give all tasks a moment to start + tracing::info!("[DEBUG-PIPELINE] Giving tasks 500ms to start up"); + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + + // Now trigger the source component to start generating chunks + if let Some(source_step_idx) = pipeline_steps.first() { + let source_step_id = flow.steps[*source_step_idx].id.clone(); + tracing::info!("[DEBUG-PIPELINE] Triggering source component {} to start generating", source_step_id); + + // Get the step input for the source step + let source_input = { + let guard = coord_arc.lock().await; + guard.resolve_step_input(*source_step_idx).await? + }; + + // Get the plugin for the source step + let source_plugin = { + let guard = coord_arc.lock().await; + guard.executor.get_plugin(&flow.steps[*source_step_idx].component).await? + }; + + // Create execution context for the source step + let source_context = { + let guard = coord_arc.lock().await; + guard.context.clone().with_step(source_step_id.clone()) + }; + + // Trigger the source component in a separate task (fire and forget) + let source_step = flow.steps[*source_step_idx].clone(); + tokio::spawn(async move { + tracing::info!("[DEBUG-GENERATOR] Starting source generator for {}", source_step_id); + match execute_step_async(source_plugin, &source_step, source_input, source_context).await { + Ok(result) => { + tracing::info!("[DEBUG-GENERATOR] Source generator {} completed: {:?}", source_step_id, result); + } + Err(e) => { + tracing::error!("[DEBUG-GENERATOR] Source generator {} failed: {:?}", source_step_id, e); + } + } + }); + } + + // Main loop: handle both incoming chunks and step completion using tokio::select + tracing::info!("[DEBUG-PIPELINE] Starting main loop to handle chunks and step completion"); + let mut remaining_handles = handles; + + // Extract incoming chunks receiver from coordinator + let mut incoming_chunks_rx = { + let mut guard = coord_arc.lock().await; + // Move the receiver out of the coordinator for the main loop + let (_dummy_tx, dummy_rx) = mpsc::channel::(1); + std::mem::replace(&mut guard.incoming_chunks, dummy_rx) + }; + + loop { + tokio::select! { + // Handle incoming chunks from the global registry + Some(chunk_json) = incoming_chunks_rx.recv() => { + tracing::info!("Main loop received chunk from global registry"); + + // Route the chunk to the appropriate step without locking the coordinator + match Self::route_chunk_to_steps(&coord_arc, chunk_json).await { + Ok(_) => { + tracing::info!("Successfully routed chunk to step"); + } + Err(e) => { + tracing::error!("Failed to route chunk to step: {:?}", e); + } + } + } + + // Handle step completions + _ = async { + // Check if any handles are ready + let mut i = 0; + while i < remaining_handles.len() { + let (_step_idx, handle) = &mut remaining_handles[i]; + if handle.is_finished() { + let (step_idx, handle) = remaining_handles.remove(i); + let step_id = &flow.steps[step_idx].id; + + match handle.await { + Ok(result) => { + if let Err(e) = result { + tracing::warn!("Step {} completed with error: {:?}", step_id, e); + return Err(e); + } else { + tracing::info!("Step {} completed successfully", step_id); + } + } + Err(e) => { + tracing::warn!("Step {} task panicked: {:?}", step_id, e); + return Err(ExecutionError::Internal.into()); + } + } + } else { + i += 1; + } + } + + // If no handles completed, sleep briefly to avoid busy loop + if !remaining_handles.is_empty() { + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + } + + Ok::<(), error_stack::Report>(()) + } => { + // Handle step completion result + } + } + + // Exit when all handles are done + if remaining_handles.is_empty() { + tracing::info!("[DEBUG-PIPELINE] All step handles completed"); + break; + } + } + + tracing::info!("[DEBUG-PIPELINE] run_pipeline_without_lock completed successfully"); + Ok(()) + } + + /// Route a chunk to the appropriate step without requiring a full coordinator lock + async fn route_chunk_to_steps( + coord_arc: &Arc>, + chunk_json: serde_json::Value, + ) -> Result<()> { + let map = serde_json::from_value::>(chunk_json) + .map_err(|e| ExecutionError::MalformedReference { message: e.to_string() })?; + + // Extract chunk metadata from top-level map first + let chunk_index = map.get("chunk_index").and_then(|v| v.as_u64()).unwrap_or(0) as usize; + let mut is_final = map.get("is_final").and_then(|v| v.as_bool()).unwrap_or(false); + let source_step_id = map.get("step_id").and_then(|v| v.as_str()).map(|s| s.to_string()); + + // If we have a nested chunk object, also check its is_final flag (it takes precedence) + if let Some(chunk_obj) = map.get("chunk").and_then(|v| v.as_object()) { + if let Some(nested_is_final) = chunk_obj.get("is_final").and_then(|v| v.as_bool()) { + is_final = nested_is_final; + } + } + + tracing::debug!("Routing chunk {} from step {:?} (is_final={})", chunk_index, source_step_id, is_final); + + // Get the information we need from the coordinator briefly + let (target_step_id, step_senders) = { + let coord = coord_arc.lock().await; + + let source_step_id = source_step_id.unwrap_or_else(|| { + let first_step_idx = coord.pipeline_steps[0]; + coord.flow.steps[first_step_idx].id.clone() + }); + + // Find the target step (next in pipeline after source) + let source_step_pipeline_index = coord.pipeline_steps.iter() + .enumerate() + .find_map(|(i, &step_idx)| { + if coord.flow.steps[step_idx].id == source_step_id { + Some(i) + } else { + None + } + }); + + let target_step_id = if let Some(source_idx) = source_step_pipeline_index { + if source_idx + 1 < coord.pipeline_steps.len() { + let target_step_idx = coord.pipeline_steps[source_idx + 1]; + coord.flow.steps[target_step_idx].id.clone() + } else { + // This is the last step, no target + return Ok(()); + } + } else { + // Source step not found in pipeline + return Ok(()); + }; + + (target_step_id, coord.step_senders.clone()) + }; + + // Send to the target step's channel + if let Some(tx) = step_senders.get(&target_step_id) { + // Extract chunk data - handle nested component response format + let chunk_str = if let Some(chunk_value) = map.get("chunk") { + if let Some(chunk_str) = chunk_value.as_str() { + // Direct string chunk + chunk_str + } else if let Some(chunk_obj) = chunk_value.as_object() { + // Nested object - extract the inner "chunk" field + if let Some(inner_chunk) = chunk_obj.get("chunk").and_then(|v| v.as_str()) { + inner_chunk + } else { + "" + } + } else { + "" + } + } else { + "" + }; + + // Create a FlowResult from the chunk data + let fr = FlowResult::Streaming { + stream_id: map.get("stream_id").and_then(|v| v.as_str()).unwrap_or("unknown").to_string(), + metadata: stepflow_core::workflow::ValueRef::new(serde_json::json!(map)), + chunk: chunk_str.to_string(), + chunk_index, + is_final, + }; + + match tx.send(fr).await { + Ok(_) => { + tracing::debug!("Successfully routed chunk {} to step {}", chunk_index, target_step_id); + } + Err(e) => { + if is_final { + tracing::info!("Ignoring send error for final chunk as step might have completed"); + return Ok(()); + } + tracing::error!("Failed to send chunk {} to step {}: {:?}", chunk_index, target_step_id, e); + return Err(ExecutionError::StepFailed { step: target_step_id }.into()); + } + } + } else { + tracing::warn!("No channel for target step {}", target_step_id); + } + + Ok(()) + } + + async fn resolve_step_input(&self, step_index: usize) -> Result { + // For streaming steps, we need simpler input resolution + // since they don't depend on other steps' outputs + let step = &self.flow.steps[step_index]; + + // For streaming steps, resolve the input expression directly + // If it fails, fall back to the workflow input + if step.streaming { + match self.resolver.resolve(&step.input).await { + Ok(FlowResult::Success { result }) => Ok(result), + Ok(FlowResult::Streaming { metadata, .. }) => Ok(metadata), + _ => { + // Fall back to workflow input for streaming steps + tracing::info!("[DEBUG-RESOLVE] Falling back to workflow input for streaming step {}", step.id); + Ok(self.resolver.workflow_input().clone()) + } + } + } else { + // For non-streaming steps, use the full resolver + let step_input = match self.resolver.resolve(&step.input).await? { + FlowResult::Success { result } => result, + FlowResult::Streaming { metadata, .. } => metadata, + FlowResult::Skipped => { + return Err(ExecutionError::StepNotRunnable { + step: step.id.clone(), + } + .into()); + } + FlowResult::Failed { error: _ } => { + return Err(ExecutionError::StepFailed { step: step.id.clone() }.into()); + } + }; + Ok(step_input) + } + } + + +} + +/// A per-step streaming loop: receive chunks, call your component, forward every chunk downstream, +/// exit only when `is_final == true`. +async fn run_streaming_step_simple( + plugin: Arc>, + step: stepflow_core::workflow::Step, + _input: stepflow_core::workflow::ValueRef, + context: ExecutionContext, + mut rx: mpsc::Receiver, + downstream: Vec<(String, mpsc::Sender)>, + is_source: bool, +) -> Result<()> { + let step_id = step.id.clone(); + stream_log!(info, &step_id, "starting (is_source={}, downstream={})", is_source, downstream.len()); + + // Log channel details with step names + stream_log!(info, &step_id, "receiver ready"); + for (i, (downstream_id, tx)) in downstream.iter().enumerate() { + stream_log!(info, &step_id, "downstream[{}] to step {} is_closed: {}", i, downstream_id, tx.is_closed()); + } + + // For source steps, we now rely on the notification system to start the generator + // The generator will be triggered when the first chunk request comes in + if is_source { + stream_log!(info, &step_id, "source step entering receiver loop, generator will start via notifications"); + } else { + stream_log!(info, &step_id, "sink/middle step entering receiver loop"); + } + + // Now loop for all chunks coming through the coordinator's routing system + let mut _last_stream_id = String::new(); + let mut _last_metadata = stepflow_core::workflow::ValueRef::new(serde_json::Value::Null); + let mut _last_chunk = String::new(); + let mut _last_chunk_index = 0; + let mut _last_is_final = false; + + loop { + stream_log!(debug, &step_id, "waiting for chunk via receiver"); + + // Check if the channel has been closed already + if rx.is_closed() { + stream_log!(warn, &step_id, "channel is already closed before receiving any data"); + break; + } + + // Add timeout to prevent indefinite blocking + let recv_result = match tokio::time::timeout( + std::time::Duration::from_secs(10), // 10 second timeout + rx.recv() + ).await { + Ok(result) => { + stream_log!(info, &step_id, "received data from channel: is_some={}", result.is_some()); + result + }, + Err(_) => { + stream_log!(warn, &step_id, "TIMEOUT waiting for chunk after 10 seconds"); + // Continue with loop to try again + continue; + } + }; + + match recv_result { + Some(FlowResult::Streaming { stream_id, metadata, chunk, chunk_index, is_final }) => { + stream_log!(info, &step_id, "processing chunk #{} (is_final={})", chunk_index, is_final); + + // Store the streaming metadata for potential use in non-streaming case + _last_stream_id = stream_id.clone(); + _last_metadata = metadata.clone(); + _last_chunk = chunk.clone(); + _last_chunk_index = chunk_index; + _last_is_final = is_final; + + // Process the chunk with the component (for non-source steps) + let (final_stream_id, final_metadata, final_chunk, final_chunk_index, final_is_final) = + if !is_source { + // For non-source steps, process the chunk with the component + // Create input for the component from the chunk + let chunk_input_data = serde_json::json!({ + "stream_id": stream_id, + "chunk": chunk, + "chunk_index": chunk_index, + "is_final": is_final, + "metadata": metadata.as_ref() + }); + + // Call the component with the chunk + let chunk_input = stepflow_core::workflow::ValueRef::new(chunk_input_data); + + let component_result = execute_step_async( + plugin.clone(), + &step, + chunk_input, + context.clone().with_step(step.id.clone()) + ).await; + + match component_result { + Ok(FlowResult::Success { result: _ }) => { + // For success results, forward the original chunk + (stream_id, metadata, chunk, chunk_index, is_final) + } + Ok(FlowResult::Streaming { stream_id: new_stream_id, metadata: new_metadata, chunk: new_chunk, chunk_index: new_chunk_index, is_final: new_is_final }) => { + // Component returned a streaming result, use it + (new_stream_id, new_metadata, new_chunk, new_chunk_index, new_is_final) + } + Ok(other) => { + stream_log!(warn, &step_id, "component returned unexpected result for chunk #{}: {:?}", chunk_index, other); + (stream_id, metadata, chunk, chunk_index, is_final) + } + Err(e) => { + stream_log!(error, &step_id, "component failed processing chunk #{}: {:?}", chunk_index, e); + // On error, still forward the original chunk but log the error + (stream_id, metadata, chunk, chunk_index, is_final) + } + } + } else { + // Source step just forwards the chunk as-is + (stream_id, metadata, chunk, chunk_index, is_final) + }; + + // Forward to downstream steps + + if downstream.is_empty() { + stream_log!(warn, &step_id, "no downstream channels to forward to!"); + } + + // Check if any downstream channels are closed + let closed_channels = downstream.iter() + .enumerate() + //.filter(|pair| pair.1.1.is_closed()) + .filter(|(_, (_, sender))| sender.is_closed()) + .map(|(i, _)| i) + .collect::>(); + + if !closed_channels.is_empty() { + stream_log!(warn, &step_id, "downstream channels closed: {:?}", closed_channels); + } + + // Forward to all downstream steps + for (i, (downstream_id, tx)) in downstream.iter().enumerate() { + if tx.is_closed() { + stream_log!(warn, &step_id, "skipping downstream[{}] to step {} - channel is closed", i, downstream_id); + continue; + } + + let fr = FlowResult::Streaming { + stream_id: final_stream_id.clone(), + metadata: final_metadata.clone(), + chunk: final_chunk.clone(), + chunk_index: final_chunk_index, + is_final: final_is_final, + }; + + match tx.send(fr).await { + Ok(_) => { + stream_log!(debug, &step_id, "forwarded chunk #{} to step {}", final_chunk_index, downstream_id); + } + Err(e) => { + stream_log!(error, &step_id, "failed to forward chunk #{} to step {}: {:?}", + final_chunk_index, downstream_id, e); + } + } + } + + // If this is the final chunk, exit the loop + if final_is_final { + stream_log!(info, &step_id, "received final chunk, exiting streaming loop"); + break; + } + } + Some(other) => { + stream_log!(warn, &step_id, "received non-streaming result: {:?}", other); + // Handle non-streaming results... + } + None => { + stream_log!(warn, &step_id, "receiver channel closed, exiting streaming loop"); + break; + } + } + } + + stream_log!(info, &step_id, "streaming loop completed"); + Ok(()) +} + +/// Sort streaming steps by their dependencies using a topological sort +/// This ensures that source steps come before steps that depend on them +fn sort_streaming_steps_by_dependencies( + flow: &Flow, + streaming_steps: Vec, +) -> Result> { + use std::collections::{HashMap, HashSet, VecDeque}; + + // Create a map of step ID to index for quick lookup + let _step_id_to_index: HashMap = streaming_steps + .iter() + .map(|&idx| (flow.steps[idx].id.clone(), idx)) + .collect(); + + // Build dependency graph for streaming steps only + let mut dependencies: HashMap> = HashMap::new(); + let mut dependents: HashMap> = HashMap::new(); + + for &step_idx in &streaming_steps { + dependencies.insert(step_idx, HashSet::new()); + dependents.insert(step_idx, HashSet::new()); + } + + // Analyze dependencies between streaming steps + for &step_idx in &streaming_steps { + let step = &flow.steps[step_idx]; + + // Check if this step's input references other streaming steps + let input_str = serde_json::to_string(&step.input).unwrap_or_default(); + + for &other_step_idx in &streaming_steps { + if step_idx != other_step_idx { + let other_step_id = &flow.steps[other_step_idx].id; + + // Check if step references other_step in its input + if input_str.contains(&format!("step: {}", other_step_id)) || + input_str.contains(&format!("\"step\": \"{}\"", other_step_id)) { + // step_idx depends on other_step_idx + dependencies.get_mut(&step_idx).unwrap().insert(other_step_idx); + dependents.get_mut(&other_step_idx).unwrap().insert(step_idx); + + tracing::info!("Detected dependency: {} depends on {}", + step.id, other_step_id); + } + } + } + } + + // Topological sort using Kahn's algorithm + let mut result = Vec::new(); + let mut queue = VecDeque::new(); + let mut remaining_deps = dependencies.clone(); + + // Find steps with no dependencies (source steps) + for &step_idx in &streaming_steps { + if remaining_deps[&step_idx].is_empty() { + queue.push_back(step_idx); + tracing::info!("Found source streaming step: {}", flow.steps[step_idx].id); + } + } + + while let Some(current_step) = queue.pop_front() { + result.push(current_step); + + // Remove this step from its dependents' dependency lists + for &dependent_step in &dependents[¤t_step] { + remaining_deps.get_mut(&dependent_step).unwrap().remove(¤t_step); + + // If the dependent now has no dependencies, add it to the queue + if remaining_deps[&dependent_step].is_empty() { + queue.push_back(dependent_step); + } + } + } + + // Check for circular dependencies + if result.len() != streaming_steps.len() { + let remaining: Vec = streaming_steps + .iter() + .filter(|&&idx| !result.contains(&idx)) + .map(|&idx| flow.steps[idx].id.clone()) + .collect(); + + tracing::error!("Circular dependency detected in streaming steps: {:?}", remaining); + return Err(ExecutionError::Internal.into()); + } + + tracing::info!("Topological sort result: {:?}", + result.iter().map(|i| &flow.steps[*i].id).collect::>() + ); // Add the closing parenthesis here + + Ok(result) +} + #[cfg(test)] mod tests { use super::*; @@ -745,7 +1797,7 @@ mod tests { pub async fn create_workflow_from_yaml_simple( yaml_str: &str, mock_behaviors: Vec<(&str, FlowResult)>, - ) -> (Arc, Arc, FlowHash) { + ) { // Parse the YAML workflow let flow: Flow = serde_yaml_ng::from_str(yaml_str).expect("Failed to parse YAML workflow"); let flow = Arc::new(flow); @@ -795,7 +1847,6 @@ mod tests { ) -> Result { let (executor, flow, workflow_hash) = create_workflow_from_yaml_simple(yaml_str, mock_behaviors).await; - let execution_id = Uuid::new_v4(); let state_store: Arc = Arc::new(InMemoryStateStore::new()); let input_ref = ValueRef::new(input); @@ -962,6 +2013,7 @@ output: .await .unwrap(); + // Check the final result match result { FlowResult::Success { result } => { assert_eq!(result.as_ref(), &json!({"final": 30})); @@ -1013,7 +2065,6 @@ output: .await .unwrap(); - // Initially, only step1 should be runnable let runnable = workflow_executor.get_runnable_step_indices(); assert_eq!(runnable.len(), 1); assert!(runnable.contains(0)); // step1 @@ -1063,8 +2114,6 @@ steps: component: mock://parallel1 input: $from: - workflow: input - - id: step2 component: mock://parallel2 input: $from: @@ -1074,7 +2123,6 @@ steps: input: step1: $from: - step: step1 step2: $from: step: step2 @@ -1084,7 +2132,6 @@ output: "#; let workflow_input = json!({"value": 42}); - let step1_output = json!({"step1": "done"}); let step2_output = json!({"step2": "done"}); let final_output = json!({"both": "completed"}); @@ -1099,7 +2146,6 @@ output: ( "mock://parallel2", FlowResult::Success { - result: ValueRef::new(step2_output.clone()), }, ), ( @@ -1251,7 +2297,6 @@ steps: input: mode: error output: - $from: step: failing_step "#; @@ -1394,3 +2439,10 @@ output: } } } + + + + + + + diff --git a/crates/stepflow-main/src/error.rs b/crates/stepflow-main/src/error.rs index a0f3f38b..c98ed978 100644 --- a/crates/stepflow-main/src/error.rs +++ b/crates/stepflow-main/src/error.rs @@ -36,6 +36,8 @@ pub enum MainError { ReplCommand(String), #[error("Configuration error")] Configuration, + #[error("No active debug session")] + NoDebugSession, } pub type Result> = std::result::Result; diff --git a/crates/stepflow-main/src/repl.rs b/crates/stepflow-main/src/repl.rs index 6332d93f..421ede39 100644 --- a/crates/stepflow-main/src/repl.rs +++ b/crates/stepflow-main/src/repl.rs @@ -4,7 +4,7 @@ use error_stack::ResultExt as _; use rustyline::{DefaultEditor, error::ReadlineError}; use std::{path::PathBuf, sync::Arc}; use stepflow_core::workflow::{Flow, FlowHash, ValueRef}; -use stepflow_execution::{StepFlowExecutor, WorkflowExecutor}; +use stepflow_execution::{StepExecutionResult, StepFlowExecutor, WorkflowExecutor}; use stepflow_plugin::Context as _; use crate::{ @@ -19,7 +19,8 @@ pub struct LastRun { pub workflow_hash: FlowHash, pub workflow_path: PathBuf, pub input: ValueRef, - pub last_execution: Option, + pub last_execution: Option>>, + pub execution_id: Option, } impl LastRun { @@ -35,6 +36,7 @@ impl LastRun { workflow_path, input, last_execution: None, + execution_id: None, } } @@ -66,32 +68,46 @@ impl LastRun { pub async fn create_debug_execution( &mut self, executor: &Arc, - ) -> Result<&mut WorkflowExecutor> { - let state_store = executor.state_store(); + ) -> Result<()> { let execution_id = uuid::Uuid::new_v4(); - let workflow_executor = WorkflowExecutor::new( - executor.clone(), + + // Submit the workflow to get it registered + executor.submit_flow( self.workflow.clone(), self.workflow_hash.clone(), - execution_id, self.input.clone(), - state_store.clone(), - ) - .change_context(MainError::FlowExecution)?; - - self.last_execution = Some(workflow_executor); - Ok(self.last_execution.as_mut().unwrap()) + ).await.change_context(MainError::FlowExecution)?; + + // Get the debug session executor + let debug_executor = executor.debug_session(execution_id) + .await + .change_context(MainError::FlowExecution)?; + + // Store the execution ID for later reference + self.execution_id = Some(execution_id); + + // Store the executor + self.last_execution = Some(debug_executor); + + Ok(()) } /// Get the current debug execution, if any - pub fn debug_execution(&mut self) -> Option<&mut WorkflowExecutor> { - self.last_execution.as_mut() + pub async fn execute_step(&mut self, step_id: &str) -> Result { + if let Some(executor) = &self.last_execution { + let mut workflow_executor = executor.lock().await; + workflow_executor.execute_step_by_id(step_id).await + .change_context(MainError::FlowExecution) + } else { + Err(error_stack::report!(MainError::NoDebugSession)) + } } /// Update input and clear any existing execution pub fn update_input(&mut self, input: ValueRef) { self.input = input; - self.last_execution = None; // Clear execution since input changed + self.last_execution = None; + self.execution_id = None; } } @@ -386,7 +402,10 @@ async fn handle_steps_command(state: &ReplState) -> Result<()> { if let Some(last_run) = &state.last_run { if let Some(debug_session) = &last_run.last_execution { - let all_steps = debug_session.list_all_steps().await; + // Lock the mutex to access the WorkflowExecutor + let workflow_executor = debug_session.lock().await; + + let all_steps = workflow_executor.list_all_steps().await; println!("Workflow steps ({} total):", all_steps.len()); for step_status in &all_steps { println!( @@ -421,7 +440,10 @@ async fn handle_runnable_command(state: &ReplState) -> Result<()> { if let Some(last_run) = &state.last_run { if let Some(debug_session) = &last_run.last_execution { - let runnable_steps = debug_session.get_runnable_steps().await; + // Lock the mutex to access the WorkflowExecutor + let workflow_executor = debug_session.lock().await; + + let runnable_steps = workflow_executor.get_runnable_steps().await; if runnable_steps.is_empty() { println!( "No steps are currently runnable. All dependencies may be satisfied or workflow is complete." @@ -458,8 +480,11 @@ async fn handle_run_step_command(step_id: String, state: &mut ReplState) -> Resu } if let Some(last_run) = &mut state.last_run { - if let Some(debug_session) = last_run.debug_execution() { - match debug_session.execute_step_by_id(&step_id).await { + if let Some(debug_session) = &last_run.last_execution { + // Lock the mutex to access the WorkflowExecutor + let mut workflow_executor = debug_session.lock().await; + + match workflow_executor.execute_step_by_id(&step_id).await { Ok(result) => { print_step_result(&step_id, &result.result)?; } @@ -490,8 +515,11 @@ async fn handle_run_steps_command(step_ids: Vec, state: &mut ReplState) } if let Some(last_run) = &mut state.last_run { - if let Some(debug_session) = last_run.debug_execution() { - match debug_session.execute_steps(&step_ids).await { + if let Some(debug_session) = &last_run.last_execution { + // Lock the mutex to access the WorkflowExecutor + let mut workflow_executor = debug_session.lock().await; + + match workflow_executor.execute_steps(&step_ids).await { Ok(results) => { println!("Executed {} steps:", results.len()); for result in results { @@ -525,8 +553,11 @@ async fn handle_run_all_command(state: &mut ReplState) -> Result<()> { } if let Some(last_run) = &mut state.last_run { - if let Some(debug_session) = last_run.debug_execution() { - match debug_session.execute_all_runnable().await { + if let Some(debug_session) = &last_run.last_execution { + // Lock the mutex to access the WorkflowExecutor + let mut workflow_executor = debug_session.lock().await; + + match workflow_executor.execute_all_runnable().await { Ok(results) => { if results.is_empty() { println!("No runnable steps to execute."); @@ -564,9 +595,12 @@ async fn handle_continue_command(state: &mut ReplState) -> Result<()> { } if let Some(last_run) = &mut state.last_run { - if let Some(debug_session) = last_run.debug_execution() { + if let Some(debug_session) = &last_run.last_execution { + // Lock the mutex to access the WorkflowExecutor + let mut workflow_executor = debug_session.lock().await; + println!("Continuing workflow execution to completion..."); - match debug_session.execute_to_completion().await { + match workflow_executor.execute_to_completion().await { Ok(final_result) => { // Print final result let result_json = serde_json::to_string_pretty(&final_result) @@ -616,7 +650,10 @@ async fn handle_inspect_command(step_id: String, state: &ReplState) -> Result<() if let Some(last_run) = &state.last_run { if let Some(debug_session) = &last_run.last_execution { - match debug_session.inspect_step(&step_id).await { + // Lock the mutex to access the WorkflowExecutor + let workflow_executor = debug_session.lock().await; + + match workflow_executor.inspect_step(&step_id).await { Ok(inspection) => { println!("Step '{}' inspection:", step_id); println!(" Index: {}", inspection.metadata.step_index); @@ -663,7 +700,10 @@ async fn handle_completed_command(state: &ReplState) -> Result<()> { if let Some(last_run) = &state.last_run { if let Some(debug_session) = &last_run.last_execution { - match debug_session.get_completed_steps().await { + // Lock the mutex to access the WorkflowExecutor + let workflow_executor = debug_session.lock().await; + + match workflow_executor.get_completed_steps().await { Ok(completed_steps) => { if completed_steps.is_empty() { println!("No steps have been completed yet."); @@ -674,6 +714,7 @@ async fn handle_completed_command(state: &ReplState) -> Result<()> { stepflow_core::FlowResult::Success { .. } => "SUCCESS", stepflow_core::FlowResult::Skipped => "SKIPPED", stepflow_core::FlowResult::Failed { .. } => "FAILED", + stepflow_core::FlowResult::Streaming { .. } => "RUNNING", }; println!( " [{}] {} ({}): {}", @@ -713,7 +754,10 @@ async fn handle_output_command(step_id: String, state: &ReplState) -> Result<()> if let Some(last_run) = &state.last_run { if let Some(debug_session) = &last_run.last_execution { - match debug_session.get_step_output(&step_id).await { + // Lock the mutex to access the WorkflowExecutor + let workflow_executor = debug_session.lock().await; + + match workflow_executor.get_step_output(&step_id).await { Ok(result) => { println!("Output of step '{}':", step_id); print_flow_result(&result)?; @@ -755,6 +799,14 @@ fn print_flow_result(result: &stepflow_core::FlowResult) -> Result<()> { stepflow_core::FlowResult::Failed { error } => { println!("Result: FAILED - {}", error); } + stepflow_core::FlowResult::Streaming { stream_id, metadata, chunk, chunk_index, is_final } => { + println!("Result: STREAMING"); + println!(" Stream ID: {}", stream_id); + println!(" Metadata: {}", serde_json::to_string_pretty(metadata).unwrap_or_else(|_| "".to_string())); + println!(" Chunk: {}... ({} bytes base64)", &chunk[..chunk.len().min(32)], chunk.len()); + println!(" Chunk Index: {}", chunk_index); + println!(" Final: {}", is_final); + } } Ok(()) } diff --git a/crates/stepflow-plugin/Cargo.toml b/crates/stepflow-plugin/Cargo.toml index b0966026..f3c00682 100644 --- a/crates/stepflow-plugin/Cargo.toml +++ b/crates/stepflow-plugin/Cargo.toml @@ -23,7 +23,12 @@ serde.workspace = true stepflow-core.workspace = true stepflow-state.workspace = true thiserror.workspace = true +tokio.workspace = true trait-variant.workspace = true +tracing.workspace = true uuid.workspace = true +dashmap = "6.0" +once_cell = "1.19" +serde_json.workspace = true [dev-dependencies] \ No newline at end of file diff --git a/crates/stepflow-plugin/src/context.rs b/crates/stepflow-plugin/src/context.rs index 5a9e0b9f..1c0e5ebc 100644 --- a/crates/stepflow-plugin/src/context.rs +++ b/crates/stepflow-plugin/src/context.rs @@ -40,6 +40,20 @@ pub trait Context: Send + Sync { /// Get the state store for this executor. fn state_store(&self) -> &Arc; + + /// Get a reference to the executor for advanced operations. + fn executor(&self) -> Option> { + None + } +} + +/// Trait for executor operations that require access to the StepFlowExecutor +pub trait Executor: Send + Sync { + /// Get a workflow executor for debug sessions + fn get_workflow_executor( + &self, + execution_id: Uuid, + ) -> BoxFuture<'_, crate::Result>>>; } /// Execution context that combines a Context with an execution ID. @@ -47,6 +61,8 @@ pub trait Context: Send + Sync { pub struct ExecutionContext { context: Arc, execution_id: Uuid, + /// Optional step ID for streaming pipelines + step_id: Option, } impl ExecutionContext { @@ -55,14 +71,26 @@ impl ExecutionContext { Self { context, execution_id, + step_id: None, } } + /// Add step ID to this context (builder pattern) + pub fn with_step(mut self, step_id: String) -> Self { + self.step_id = Some(step_id); + self + } + /// Get the execution ID for this context. pub fn execution_id(&self) -> Uuid { self.execution_id } + /// Get the step ID for this context (if set). + pub fn step_id(&self) -> Option<&str> { + self.step_id.as_deref() + } + /// Get a reference to the state store. pub fn state_store(&self) -> &Arc { self.context.state_store() diff --git a/crates/stepflow-plugin/src/error.rs b/crates/stepflow-plugin/src/error.rs index a2343fc5..0a1b647a 100644 --- a/crates/stepflow-plugin/src/error.rs +++ b/crates/stepflow-plugin/src/error.rs @@ -27,6 +27,14 @@ pub enum PluginError { InvalidInput, #[error("error creating plugin")] CreatePlugin, + #[error("generic error: {0}")] + Generic(String), +} + +impl PluginError { + pub fn new(message: impl Into) -> Self { + Self::Generic(message.into()) + } } pub type Result> = std::result::Result; diff --git a/crates/stepflow-plugin/src/lib.rs b/crates/stepflow-plugin/src/lib.rs index f85de956..388482d2 100644 --- a/crates/stepflow-plugin/src/lib.rs +++ b/crates/stepflow-plugin/src/lib.rs @@ -1,7 +1,8 @@ mod context; mod error; mod plugin; +pub mod streaming; -pub use context::{Context, ExecutionContext}; +pub use context::{Context, ExecutionContext, Executor}; pub use error::{PluginError, Result}; pub use plugin::{DynPlugin, Plugin, PluginConfig}; diff --git a/crates/stepflow-plugin/src/streaming.rs b/crates/stepflow-plugin/src/streaming.rs new file mode 100644 index 00000000..6deaff14 --- /dev/null +++ b/crates/stepflow-plugin/src/streaming.rs @@ -0,0 +1,32 @@ +use dashmap::DashMap; +use once_cell::sync::Lazy; +use serde_json::Value; +use tokio::sync::mpsc; +use uuid::Uuid; + +/// Maps execution_id → Sender. +/// This global registry allows streaming chunk handlers to route chunks without acquiring locks. +pub static STREAM_CHUNK_SENDERS: Lazy>> = + Lazy::new(DashMap::new); + +/// Register a chunk sender for an execution ID +pub fn register_chunk_sender(execution_id: Uuid, sender: mpsc::Sender) { + tracing::debug!("Registering chunk sender for execution {}", execution_id); + STREAM_CHUNK_SENDERS.insert(execution_id, sender); +} + +/// Unregister a chunk sender for an execution ID (cleanup) +pub fn unregister_chunk_sender(execution_id: Uuid) { + tracing::debug!("Unregistering chunk sender for execution {}", execution_id); + STREAM_CHUNK_SENDERS.remove(&execution_id); +} + +/// Send a chunk to the registered sender for an execution ID +pub async fn send_chunk(execution_id: Uuid, chunk: Value) -> Result<(), String> { + if let Some(sender) = STREAM_CHUNK_SENDERS.get(&execution_id) { + sender.send(chunk).await + .map_err(|e| format!("stream-chunk channel closed: {}", e)) + } else { + Err(format!("no streaming channel for exec {}", execution_id)) + } +} \ No newline at end of file diff --git a/crates/stepflow-protocol/Cargo.toml b/crates/stepflow-protocol/Cargo.toml index 08a76542..59c9e95f 100644 --- a/crates/stepflow-protocol/Cargo.toml +++ b/crates/stepflow-protocol/Cargo.toml @@ -18,6 +18,7 @@ indexmap.workspace = true serde_json.workspace = true serde.workspace = true stepflow-core.workspace = true +stepflow-execution.workspace = true stepflow-plugin.workspace = true thiserror.workspace = true tokio.workspace = true diff --git a/crates/stepflow-protocol/src/blob_handlers.rs b/crates/stepflow-protocol/src/blob_handlers.rs index 61cc2e58..78d5ced0 100644 --- a/crates/stepflow-protocol/src/blob_handlers.rs +++ b/crates/stepflow-protocol/src/blob_handlers.rs @@ -1,8 +1,10 @@ +use std::fmt; use error_stack::ResultExt as _; use futures::future::{BoxFuture, FutureExt as _}; use serde::{Deserialize, Serialize}; use serde_json::value::RawValue; use std::sync::Arc; + use stepflow_plugin::Context; use tokio::sync::mpsc; use uuid::Uuid; @@ -159,3 +161,77 @@ impl IncomingHandler for GetBlobHandler { .boxed() } } + +/// Handler for streaming_chunk notifications from component servers. +pub struct StreamingChunkHandler; + +impl IncomingHandler for StreamingChunkHandler { + fn handle_incoming( + &self, + _method: String, + params: Box, + id: Option, + response_tx: mpsc::Sender, + context: Arc, + ) -> BoxFuture<'static, error_stack::Result<(), StdioError>> { + async move { + // This is a notification (no ID), so we don't send a response + // Instead, we need to route the streaming chunk through the global registry + match serde_json::from_str::(params.get()) { + Ok(notification) => { + tracing::info!("Received streaming chunk for request {}: step_id={:?}, chunk_index={}", + notification.request_id, notification.step_id, notification.chunk_index); + + if let Ok(execution_id) = Uuid::parse_str(¬ification.request_id) { + let chunk = serde_json::json!({ + "request_id": notification.request_id, + "stream_id": notification.stream_id, + "chunk_index": notification.chunk_index, + "is_final": notification.is_final, + "step_id": notification.step_id, + "chunk": notification.chunk + }); + + match stepflow_plugin::streaming::send_chunk(execution_id, chunk).await { + Ok(_) => { + tracing::info!("Successfully routed streaming chunk to execution {}", execution_id); + } + Err(e) => { + tracing::error!("Failed to route streaming chunk: {}", e); + } + } + } else { + tracing::warn!("Invalid execution ID in streaming chunk: {}", notification.request_id); + } + + Ok(()) + } + Err(e) => { + tracing::error!("Failed to parse streaming chunk notification: {}", e); + Ok(()) + } + } + } + .boxed() + } +} + +#[derive(Deserialize)] +struct StreamingChunkNotification { + request_id: String, + stream_id: String, + chunk_index: u64, + is_final: bool, + step_id: Option, + chunk: serde_json::Value, +} + +impl fmt::Debug for StreamingChunkNotification { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("StreamingChunkNotification") + .field("request_id", &self.request_id) + // just show that it exists, not its contents + .field("chunk", &"") + .finish() + } +} diff --git a/crates/stepflow-protocol/src/incoming_handler.rs b/crates/stepflow-protocol/src/incoming_handler.rs index 7efc6019..d41b0af5 100644 --- a/crates/stepflow-protocol/src/incoming_handler.rs +++ b/crates/stepflow-protocol/src/incoming_handler.rs @@ -7,7 +7,7 @@ use tokio::sync::mpsc; use uuid::Uuid; use crate::stdio::StdioError; -use crate::{GetBlobHandler, PutBlobHandler}; +use crate::{GetBlobHandler, PutBlobHandler, StreamingChunkHandler}; /// Trait for handling incoming method calls and notifications from component servers. /// @@ -45,6 +45,7 @@ static INCOMING_HANDLERS: LazyLock = LazyLock::new(|| { let mut registry = IncomingHandlerRegistry::new(); registry.register("put_blob", Box::new(PutBlobHandler)); registry.register("get_blob", Box::new(GetBlobHandler)); + registry.register("streaming_chunk", Box::new(StreamingChunkHandler)); registry }); @@ -76,7 +77,9 @@ impl IncomingHandlerRegistry { response_tx: mpsc::Sender, context: Arc, ) { + tracing::debug!("Looking for handler for method: {}", method); if let Some(handler) = self.handlers.get(&method) { + tracing::debug!("Found handler for method: {}", method); // Now we can spawn the handler with owned values let future = handler.handle_incoming(method.clone(), params, id, response_tx.clone(), context); @@ -86,6 +89,7 @@ impl IncomingHandlerRegistry { } }); } else { + tracing::debug!("No handler found for method: {}", method); // Send error response for unknown method if it's a method call (has ID) if let Some(id) = id { tracing::error!("Unknown method: {}", method); diff --git a/crates/stepflow-protocol/src/lib.rs b/crates/stepflow-protocol/src/lib.rs index 2043a805..14e3f1de 100644 --- a/crates/stepflow-protocol/src/lib.rs +++ b/crates/stepflow-protocol/src/lib.rs @@ -4,5 +4,5 @@ mod incoming_handler; mod schema; pub mod stdio; -pub use blob_handlers::{GetBlobHandler, PutBlobHandler}; +pub use blob_handlers::{GetBlobHandler, PutBlobHandler, StreamingChunkHandler}; pub use incoming_handler::{IncomingHandler, IncomingHandlerRegistry}; diff --git a/crates/stepflow-protocol/src/schema/component_execute.rs b/crates/stepflow-protocol/src/schema/component_execute.rs index b4c2e609..c90b8690 100644 --- a/crates/stepflow-protocol/src/schema/component_execute.rs +++ b/crates/stepflow-protocol/src/schema/component_execute.rs @@ -1,5 +1,6 @@ use serde::{Deserialize, Serialize}; use stepflow_core::workflow::{Component, ValueRef}; +use uuid::Uuid; use crate::schema::Method; @@ -8,6 +9,8 @@ use crate::schema::Method; pub struct Request { pub component: Component, pub input: ValueRef, + pub execution_id: Uuid, + pub step_id: String, } /// Response to the initialization request. diff --git a/crates/stepflow-protocol/src/stdio/client.rs b/crates/stepflow-protocol/src/stdio/client.rs index 7448a22e..63c7a994 100644 --- a/crates/stepflow-protocol/src/stdio/client.rs +++ b/crates/stepflow-protocol/src/stdio/client.rs @@ -33,6 +33,7 @@ impl Client { let (outgoing_tx, outgoing_rx) = mpsc::channel(100); let (pending_tx, pending_rx) = mpsc::channel(100); + //let recv_span = tracing::info_span!("recv_message_loop", command = ?launcher.command, args = ?launcher.args); let recv_span = tracing::info_span!("recv_message_loop", command = ?launcher.command, args = ?launcher.args); let loop_handle = tokio::spawn( recv_message_loop( diff --git a/crates/stepflow-protocol/src/stdio/plugin.rs b/crates/stepflow-protocol/src/stdio/plugin.rs index 4cfa1e9b..d4fd3f2a 100644 --- a/crates/stepflow-protocol/src/stdio/plugin.rs +++ b/crates/stepflow-protocol/src/stdio/plugin.rs @@ -140,7 +140,7 @@ impl Plugin for StdioPlugin { async fn execute( &self, component: &Component, - _context: ExecutionContext, + context: ExecutionContext, input: ValueRef, ) -> Result { let client_handle = self.client_handle().await?; @@ -148,10 +148,107 @@ impl Plugin for StdioPlugin { .request(&crate::schema::component_execute::Request { component: component.clone(), input, + execution_id: context.execution_id(), + step_id: context.step_id().unwrap_or("unknown").to_string(), }) .await .change_context(PluginError::Execution)?; + tracing::debug!("StdioPlugin: Received response: {:?}", response.output); + + // Check if the response contains a FlowResult by looking for the "outcome" field + if let Some(outcome_obj) = response.output.as_object() { + if let Some(outcome_value) = outcome_obj.get("outcome") { + if let Some(outcome_str) = outcome_value.as_str() { + tracing::debug!("StdioPlugin: Found outcome field with value: {}", outcome_str); + // If the response has an "outcome" field, try to deserialize it as a FlowResult + match outcome_str { + "streaming" => { + // Try to deserialize as FlowResult::Streaming + tracing::debug!("StdioPlugin: Attempting to deserialize streaming result"); + match serde_json::from_value::(response.output.as_ref().clone()) { + Ok(flow_result) => { + tracing::info!("StdioPlugin: Successfully deserialized streaming result"); + return Ok(flow_result); + } + Err(e) => { + tracing::warn!("Failed to deserialize streaming result: {}, attempting flexible construction", e); + // Try to flexibly construct FlowResult::Streaming by adapting the structure + if let Some(obj) = response.output.as_ref().as_object() { + // Try to extract core streaming fields with flexible typing + let stream_id = obj.get("stream_id") + .and_then(|v| v.as_str()) + .unwrap_or("unknown"); + + let chunk = obj.get("chunk") + .and_then(|v| v.as_str()) + .unwrap_or(""); + + let chunk_index = obj.get("chunk_index") + .and_then(|v| v.as_u64().or_else(|| v.as_i64().map(|i| i as u64))) + .unwrap_or(0) as usize; + + let is_final = obj.get("is_final") + .and_then(|v| v.as_bool()) + .unwrap_or(false); + + // If we have a metadata field, use it; otherwise create one from remaining fields + let metadata = if let Some(existing_metadata) = obj.get("metadata") { + stepflow_core::workflow::ValueRef::new(existing_metadata.clone()) + } else { + // Create metadata from all fields except the core streaming ones + let mut metadata_obj = serde_json::Map::new(); + for (key, value) in obj.iter() { + if !matches!(key.as_str(), "outcome" | "stream_id" | "chunk" | "chunk_index" | "is_final") { + metadata_obj.insert(key.clone(), value.clone()); + } + } + stepflow_core::workflow::ValueRef::new(serde_json::Value::Object(metadata_obj)) + }; + + let flow_result = stepflow_core::FlowResult::Streaming { + stream_id: stream_id.to_string(), + metadata, + chunk: chunk.to_string(), + chunk_index, + is_final, + }; + + tracing::info!("StdioPlugin: Successfully constructed streaming result flexibly"); + return Ok(flow_result); + } + tracing::warn!("Failed to flexibly construct streaming result, falling back to Success"); + } + } + } + "success" | "failed" | "skipped" => { + // Try to deserialize as any FlowResult variant + match serde_json::from_value::(response.output.as_ref().clone()) { + Ok(flow_result) => { + tracing::debug!("StdioPlugin: Successfully deserialized FlowResult: {:?}", flow_result); + return Ok(flow_result); + } + Err(e) => { + tracing::warn!("Failed to deserialize FlowResult: {}, falling back to Success", e); + } + } + } + _ => { + // Unknown outcome, treat as regular success + tracing::debug!("StdioPlugin: Unknown outcome '{}', treating as regular success", outcome_str); + } + } + } else { + tracing::debug!("StdioPlugin: outcome field is not a string"); + } + } else { + tracing::debug!("StdioPlugin: No outcome field found in response"); + } + } else { + tracing::debug!("StdioPlugin: Response output is not an object"); + } + + // Default behavior: wrap in Success Ok(FlowResult::Success { result: response.output, }) diff --git a/crates/stepflow-protocol/src/stdio/recv_message_loop.rs b/crates/stepflow-protocol/src/stdio/recv_message_loop.rs index b296535e..18314983 100644 --- a/crates/stepflow-protocol/src/stdio/recv_message_loop.rs +++ b/crates/stepflow-protocol/src/stdio/recv_message_loop.rs @@ -11,6 +11,7 @@ use tokio::{ }, }; use tokio_stream::{StreamExt as _, wrappers::LinesStream}; +use tracing::info; use uuid::Uuid; use crate::stdio::{Result, StdioError}; @@ -33,6 +34,8 @@ impl ReceiveMessageLoop { let to_child = child.stdin.take().expect("stdin requested"); let from_child_stdout = child.stdout.take().expect("stdout requested"); + + // Use BufReader with default capacity - it will automatically handle large messages let from_child_stdout = LinesStream::new(BufReader::new(from_child_stdout).lines()); let from_child_stderr = child.stderr.take().expect("stderr requested"); @@ -88,12 +91,189 @@ impl ReceiveMessageLoop { Some(stderr_line) = self.from_child_stderr.next() => { let stderr_line = stderr_line.change_context(StdioError::Recv)?; tracing::info!("Component stderr: {stderr_line}"); + eprintln!("[component stderr] {stderr_line}"); Ok(true) } Some(line) = self.from_child_stdout.next() => { let line = line.change_context(StdioError::Recv)?; - tracing::info!("Received line from child: {line:?}"); - let msg = OwnedIncoming::try_new(line).change_context(StdioError::Recv)?; + + // Check if the line is suspiciously long (might indicate truncation) + if line.len() > 1024 * 1024 { + tracing::warn!("Received very long message ({} chars), may be truncated", line.len()); + } + + tracing::info!("Received line from child"); + + // Add better error handling for JSON parsing + let msg = match OwnedIncoming::try_new(line.clone()) { + Ok(msg) => msg, + Err(e) => { + tracing::error!("Failed to parse JSON message: {}", e); + tracing::error!("Message length: {} characters", line.len()); + + // Check if the JSON appears to be truncated + if !line.trim().ends_with('}') { + tracing::error!("JSON appears to be truncated - doesn't end with '}}'"); + } + + // Check for common JSON syntax issues + let open_braces = line.chars().filter(|&c| c == '{').count(); + let close_braces = line.chars().filter(|&c| c == '}').count(); + if open_braces != close_braces { + tracing::error!("JSON brace mismatch: {} opening, {} closing", open_braces, close_braces); + } + + if line.len() > 1000 { + tracing::error!("Message preview (first 500 chars): {}", &line[..500]); + tracing::error!("Message preview (last 500 chars): {}", &line[line.len()-500..]); + } else { + tracing::error!("Full message: {}", line); + } + + // Try to handle concatenated JSON messages + if open_braces > 1 && close_braces > 1 { + tracing::info!("Attempting to split concatenated JSON messages"); + let mut current_pos = 0; + let mut brace_count = 0; + let mut start_pos = 0; + let mut messages_parsed = 0; + + for (i, ch) in line.chars().enumerate() { + if ch == '{' { + if brace_count == 0 { + start_pos = i; + } + brace_count += 1; + } else if ch == '}' { + brace_count -= 1; + if brace_count == 0 { + // We have a complete JSON object + let json_str = &line[start_pos..=i]; + match OwnedIncoming::try_new(json_str.to_string()) { + Ok(msg) => { + tracing::info!("Successfully parsed concatenated message #{}", messages_parsed + 1); + messages_parsed += 1; + + // Process this message + match (msg.method, msg.params, msg.id) { + (Some(method), Some(params), _) => { + let method_owned = method.to_string(); + let params_owned: Box = params.to_owned(); + + if let Ok(v)=serde_json::from_str::(params_owned.get()){info!(method=%method_owned,request_id=%v["request_id"].as_str().unwrap_or(""),stream_id=%v["stream_id"].as_str().unwrap_or(""),chunk_index=v["chunk_index"].as_u64().unwrap_or_default(),is_final=v["is_final"].as_bool().unwrap_or(false),output_file=%v["output_file"].as_str().unwrap_or(""),"Received incoming method call");} + IncomingHandlerRegistry::instance().spawn_handle_incoming(method_owned, params_owned, msg.id, self.outgoing_tx.clone(), context.clone()); + } + (None, None, Some(id)) => { + // Handle method response + if let Some(pending) = self.pending_requests.remove(&id) { + pending.send(msg).map_err(|_| StdioError::Send)?; + } + } + _ => { + tracing::warn!("Received invalid concatenated message: {:?}", msg); + } + } + } + Err(parse_err) => { + tracing::warn!("Failed to parse concatenated JSON message: {}", parse_err); + } + } + } + } + } + + if messages_parsed > 0 { + tracing::info!("Successfully parsed {} concatenated messages", messages_parsed); + return Ok(true); + } + } + + // Try more aggressive message recovery for edge cases + tracing::info!("Attempting aggressive message recovery"); + let mut messages_parsed = 0; + + // Try to find valid JSON objects by looking for complete message patterns + let mut pos = 0; + while pos < line.len() { + // Find the start of a potential JSON object + if let Some(start) = line[pos..].find("{\"jsonrpc\":\"2.0\"") { + let start_pos = pos + start; + + // Try to find the matching closing brace + let mut brace_count = 0; + let mut end_pos = start_pos; + let mut found_end = false; + + for (i, ch) in line[start_pos..].chars().enumerate() { + if ch == '{' { + brace_count += 1; + } else if ch == '}' { + brace_count -= 1; + if brace_count == 0 { + end_pos = start_pos + i; + found_end = true; + break; + } + } + } + + if found_end { + let json_str = &line[start_pos..=end_pos]; + match OwnedIncoming::try_new(json_str.to_string()) { + Ok(msg) => { + tracing::info!("Successfully recovered message #{}", messages_parsed + 1); + messages_parsed += 1; + + // Process this message + match (msg.method, msg.params, msg.id) { + (Some(method), Some(params), _) => { + let method_owned = method.to_string(); + let params_owned: Box = params.to_owned(); + + if let Ok(v)=serde_json::from_str::(params_owned.get()){info!(method=%method_owned,request_id=%v["request_id"].as_str().unwrap_or(""),stream_id=%v["stream_id"].as_str().unwrap_or(""),chunk_index=v["chunk_index"].as_u64().unwrap_or_default(),is_final=v["is_final"].as_bool().unwrap_or(false),output_file=%v["output_file"].as_str().unwrap_or(""),"Received incoming method call");} + IncomingHandlerRegistry::instance().spawn_handle_incoming(method_owned, params_owned, msg.id, self.outgoing_tx.clone(), context.clone()); + } + (None, None, Some(id)) => { + // Handle method response + if let Some(pending) = self.pending_requests.remove(&id) { + pending.send(msg).map_err(|_| StdioError::Send)?; + } + } + _ => { + tracing::warn!("Received invalid recovered message: {:?}", msg); + } + } + + // Move position to after this message + pos = end_pos + 1; + } + Err(parse_err) => { + tracing::warn!("Failed to parse recovered JSON message: {}", parse_err); + pos = start_pos + 1; + } + } + } else { + // No matching end found, move to next potential start + pos = start_pos + 1; + } + } else { + // No more JSON objects found + break; + } + } + + if messages_parsed > 0 { + tracing::info!("Successfully recovered {} messages", messages_parsed); + return Ok(true); + } + + // Instead of returning an error and terminating the loop, + // log the error and continue processing other messages + tracing::warn!("Skipping malformed message and continuing..."); + return Ok(true); + } + }; + match (msg.method, msg.params, msg.id) { (Some(method), Some(params), _) => { // Incoming method call or notification. @@ -102,7 +282,7 @@ impl ReceiveMessageLoop { let params_owned: Box = params.to_owned(); // Handle the incoming method call - tracing::info!("Received incoming method call: {} with params: {:?}", method_owned, params_owned); + if let Ok(v)=serde_json::from_str::(params_owned.get()){info!(method=%method_owned,request_id=%v["request_id"].as_str().unwrap_or(""),stream_id=%v["stream_id"].as_str().unwrap_or(""),chunk_index=v["chunk_index"].as_u64().unwrap_or_default(),is_final=v["is_final"].as_bool().unwrap_or(false),output_file=%v["output_file"].as_str().unwrap_or(""),"Received incoming method call");} IncomingHandlerRegistry::instance().spawn_handle_incoming(method_owned, params_owned, msg.id, self.outgoing_tx.clone(), context.clone()); Ok(true) } diff --git a/crates/stepflow-server/src/api/debug.rs b/crates/stepflow-server/src/api/debug.rs index 1d62c3c0..0972929a 100644 --- a/crates/stepflow-server/src/api/debug.rs +++ b/crates/stepflow-server/src/api/debug.rs @@ -59,13 +59,16 @@ pub async fn debug_execute_step( Json(req): Json, ) -> Result, ErrorResponse> { // Get the debug session for this run - let mut debug_session = executor + let debug_session = executor .debug_session(run_id) .await .change_context(ServerError::ExecutionNotFound(run_id))?; + // Lock the mutex to access the WorkflowExecutor + let mut workflow_executor = debug_session.lock().await; + // Execute the requested steps - let step_results = debug_session.execute_steps(&req.step_ids).await?; + let step_results = workflow_executor.execute_steps(&req.step_ids).await?; // Convert results to the expected format let mut results = std::collections::HashMap::new(); @@ -96,19 +99,23 @@ pub async fn debug_continue( Path(run_id): Path, ) -> Result, ErrorResponse> { // Get the debug session for this run - let mut debug_session = executor + let debug_session = executor .debug_session(run_id) .await .change_context(ServerError::ExecutionNotFound(run_id))?; + // Lock the mutex to access the WorkflowExecutor + let mut workflow_executor = debug_session.lock().await; + // Continue run to completion - let final_result = debug_session.execute_to_completion().await?; + let final_result = workflow_executor.execute_to_completion().await?; // Update run status based on the result let state_store = executor.state_store(); let status = match &final_result { FlowResult::Success { .. } => ExecutionStatus::Completed, FlowResult::Failed { .. } | FlowResult::Skipped => ExecutionStatus::Failed, + FlowResult::Streaming { .. } => ExecutionStatus::Running, }; state_store @@ -148,8 +155,11 @@ pub async fn debug_get_runnable( .await .change_context(ServerError::ExecutionNotFound(run_id))?; + // Lock the mutex to access the WorkflowExecutor + let workflow_executor = debug_session.lock().await; + // Get runnable steps - let runnable_steps = debug_session + let runnable_steps = workflow_executor .get_runnable_steps() .await .into_iter() diff --git a/crates/stepflow-server/src/api/runs.rs b/crates/stepflow-server/src/api/runs.rs index e5c76702..5c0165d2 100644 --- a/crates/stepflow-server/src/api/runs.rs +++ b/crates/stepflow-server/src/api/runs.rs @@ -212,6 +212,19 @@ pub async fn create_run( debug: debug_mode, })) } + FlowResult::Streaming { stream_id, metadata, chunk, chunk_index, is_final } => { + // For streaming workflows, mark as running + state_store + .update_execution_status(run_id, ExecutionStatus::Running, None) + .await?; + + Ok(Json(CreateRunResponse { + run_id, + result: Some(flow_result), + status: ExecutionStatus::Running, + debug: debug_mode, + })) + } FlowResult::Failed { .. } | FlowResult::Skipped => { // Update execution status to failed state_store diff --git a/crates/stepflow-state-sql/src/sqlite_state_store.rs b/crates/stepflow-state-sql/src/sqlite_state_store.rs index 0ae09434..28b5c2a5 100644 --- a/crates/stepflow-state-sql/src/sqlite_state_store.rs +++ b/crates/stepflow-state-sql/src/sqlite_state_store.rs @@ -1005,7 +1005,45 @@ impl StateStore for SqliteStateStore { } Ok(runnable_steps) - } - .boxed() + }.boxed() + } + + fn get_step_status( + &self, + execution_id: Uuid, + step_index: usize, + ) -> BoxFuture<'_, error_stack::Result> { + let pool = self.pool.clone(); + + async move { + let sql = "SELECT status FROM step_info WHERE execution_id = ? AND step_index = ?"; + + let row = sqlx::query(sql) + .bind(execution_id.to_string()) + .bind(step_index as i64) + .fetch_optional(&pool) + .await + .change_context(StateError::Internal)?; + + let row = row.ok_or_else(|| { + error_stack::report!(StateError::StepResultNotFoundByIndex { + execution_id: execution_id.to_string(), + step_idx: step_index, + }) + })?; + + let status_str: String = row.get("status"); + let status = match status_str.as_str() { + "blocked" => stepflow_core::status::StepStatus::Blocked, + "runnable" => stepflow_core::status::StepStatus::Runnable, + "running" => stepflow_core::status::StepStatus::Running, + "completed" => stepflow_core::status::StepStatus::Completed, + "failed" => stepflow_core::status::StepStatus::Failed, + "skipped" => stepflow_core::status::StepStatus::Skipped, + _ => stepflow_core::status::StepStatus::Blocked, // Default fallback + }; + + Ok(status) + }.boxed() } } diff --git a/crates/stepflow-state/src/in_memory.rs b/crates/stepflow-state/src/in_memory.rs index 5503c00a..90e663e7 100644 --- a/crates/stepflow-state/src/in_memory.rs +++ b/crates/stepflow-state/src/in_memory.rs @@ -688,30 +688,46 @@ impl StateStore for InMemoryStateStore { &self, execution_id: uuid::Uuid, ) -> BoxFuture<'_, error_stack::Result, crate::StateError>> { - let step_info_map = self.step_info.clone(); + let step_info = self.step_info.clone(); async move { - let step_info_guard = step_info_map.read().await; + let step_info = step_info.read().await; + let execution_steps = step_info.get(&execution_id).ok_or_else(|| { + error_stack::report!(crate::StateError::ExecutionNotFound { execution_id }) + })?; - // Get all step info for this execution - let execution_steps = step_info_guard - .get(&execution_id) + let runnable_steps: Vec = execution_steps + .values() + .filter(|step| matches!(step.status, stepflow_core::status::StepStatus::Runnable)) .cloned() - .unwrap_or_default(); + .collect(); + + Ok(runnable_steps) + } + .boxed() + } - // Find steps that are marked as runnable - let mut runnable_steps = Vec::new(); + fn get_step_status( + &self, + execution_id: uuid::Uuid, + step_index: usize, + ) -> BoxFuture<'_, error_stack::Result> { + let step_info = self.step_info.clone(); - for step_info in execution_steps.values() { - if step_info.status == stepflow_core::status::StepStatus::Runnable { - runnable_steps.push(step_info.clone()); - } - } + async move { + let step_info = step_info.read().await; + let execution_steps = step_info.get(&execution_id).ok_or_else(|| { + error_stack::report!(crate::StateError::ExecutionNotFound { execution_id }) + })?; - // Sort by step_index for consistent ordering - runnable_steps.sort_by_key(|step| step.step_index); + let step = execution_steps.get(&step_index).ok_or_else(|| { + error_stack::report!(crate::StateError::StepResultNotFoundByIndex { + execution_id: execution_id.to_string(), + step_idx: step_index, + }) + })?; - Ok(runnable_steps) + Ok(step.status) } .boxed() } diff --git a/crates/stepflow-state/src/state_store.rs b/crates/stepflow-state/src/state_store.rs index fd2d6174..bd3e0850 100644 --- a/crates/stepflow-state/src/state_store.rs +++ b/crates/stepflow-state/src/state_store.rs @@ -305,6 +305,20 @@ pub trait StateStore: Send + Sync { &self, execution_id: Uuid, ) -> BoxFuture<'_, error_stack::Result, StateError>>; + + /// Get the status of a specific step. + /// + /// # Arguments + /// * `execution_id` - The unique identifier for the workflow execution + /// * `step_index` - The index of the step within the workflow (0-based) + /// + /// # Returns + /// The step status if found, or an error if not found + fn get_step_status( + &self, + execution_id: Uuid, + step_index: usize, + ) -> BoxFuture<'_, error_stack::Result>; } /// The step result. diff --git a/examples/audio-streaming-pipeline.yaml b/examples/audio-streaming-pipeline.yaml new file mode 100644 index 00000000..becab47a --- /dev/null +++ b/examples/audio-streaming-pipeline.yaml @@ -0,0 +1,83 @@ +name: Audio Streaming Pipeline +description: Real-time audio processing pipeline with microphone input and file output + +input: + operation: + type: string + description: Audio processing operation (amplify, passthrough) + default: "amplify" + sample_rate: + type: integer + description: Audio sample rate in Hz + default: 44100 + channels: + type: integer + description: Number of audio channels + default: 1 + chunk_size: + type: integer + description: Size of audio chunks in samples + default: 1024 + frequency: + type: number + description: Frequency for test sine wave (Hz) + default: 440.0 + source: + type: string + description: Audio source (microphone, system_audio, sine_wave) + default: "microphone" + duration: + type: number + description: Recording duration in seconds + default: 5.0 + output_file: + type: string + description: Output WAV file path + default: "output_audio.wav" + device_name: + type: string + description: Name of audio device to use + default: "C922 Pro Stream Webcam" + +steps: + # Streaming audio source that generates PCM chunks + - id: audio_source + component: python://audio_stream_source + streaming: true + input: + sample_rate: { $from: { workflow: input }, path: sample_rate } + channels: { $from: { workflow: input }, path: channels } + chunk_size: { $from: { workflow: input }, path: chunk_size } + frequency: { $from: { workflow: input }, path: frequency } + source: { $from: { workflow: input }, path: source } + duration: { $from: { workflow: input }, path: duration } + output_file: { $from: { workflow: input }, path: output_file } + device_name: { $from: { workflow: input }, path: device_name } + + # Process the audio chunks + - id: process_chunk + component: python://audio_chunk_processor + streaming: true + input: + chunk: { $from: { step: audio_source }, path: chunk } + chunk_index: { $from: { step: audio_source }, path: chunk_index } + stream_id: { $from: { step: audio_source }, path: stream_id } + sample_rate: { $from: { step: audio_source }, path: sample_rate } + channels: { $from: { step: audio_source }, path: channels } + operation: { $from: { workflow: input }, path: operation } + output_file: { $from: { workflow: input }, path: output_file } + + # Output the processed chunks + - id: output_chunk + component: python://audio_sink + streaming: true + input: + chunk: { $from: { step: process_chunk }, path: chunk } + chunk_index: { $from: { step: process_chunk }, path: chunk_index } + stream_id: { $from: { step: process_chunk }, path: stream_id } + output_file: { $from: { workflow: input }, path: output_file } + +output: + # Note: Streaming steps don't have final results in the state store + # The output file is written directly by the audio_sink component + message: "Audio streaming pipeline completed" \ No newline at end of file diff --git a/examples/audio_input.json b/examples/audio_input.json new file mode 100644 index 00000000..6a4a01bd --- /dev/null +++ b/examples/audio_input.json @@ -0,0 +1,11 @@ +{ + "source": "microphone", + "operation": "amplify", + "sample_rate": 44100, + "channels": 1, + "chunk_size": 1024, + "frequency": 440.0, + "duration": 5.0, + "output_file": "output.wav", + "device_name": "C922 Pro Stream Webcam" +} diff --git a/examples/stepflow-config.yml b/examples/stepflow-config.yml new file mode 100644 index 00000000..9cb6a04c --- /dev/null +++ b/examples/stepflow-config.yml @@ -0,0 +1,7 @@ +plugins: + - name: builtins + type: builtin + - name: python + type: stdio + command: /home/phact/Desktop/stepflow/sdks/python/.venv/bin/stepflow_sdk + args: [] \ No newline at end of file diff --git a/examples/test_audio_pipeline.sh b/examples/test_audio_pipeline.sh new file mode 100755 index 00000000..3ede2265 --- /dev/null +++ b/examples/test_audio_pipeline.sh @@ -0,0 +1,142 @@ +#!/bin/bash + +# Audio Pipeline Test Script +# Usage: ./test_audio_pipeline.sh [source] [operation] [duration] [output_file] [device_name] +# Can be run from either the examples directory or the repo root directory + +set -e # Exit on any error + +# Always reinstall the Python SDK in editable mode before running the test +cd "$(dirname "$0")/../sdks/python" +uv pip install -e . +cd - > /dev/null + +# Get script directory and current working directory +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +CURRENT_DIR="$(pwd)" + +# Determine if we're running from examples directory or root +if [[ "$CURRENT_DIR" == "$SCRIPT_DIR" ]]; then + # Running from examples directory + INPUT_FILE="audio_input.json" + FLOW_FILE="audio-streaming-pipeline.yaml" + CONFIG_FILE="stepflow-config.yml" + INPUT_DIR="." + echo "📍 Running from examples directory" +else + # Running from root directory + INPUT_FILE="examples/audio_input.json" + FLOW_FILE="examples/audio-streaming-pipeline.yaml" + CONFIG_FILE="examples/stepflow-config.yml" + INPUT_DIR="examples" + echo "📍 Running from repo root directory" +fi + +# Check if required files exist +if [[ ! -f "$INPUT_FILE" ]]; then + echo "❌ Error: Input file not found: $INPUT_FILE" + exit 1 +fi + +if [[ ! -f "$FLOW_FILE" ]]; then + echo "❌ Error: Flow file not found: $FLOW_FILE" + exit 1 +fi + +if [[ ! -f "$CONFIG_FILE" ]]; then + echo "❌ Error: Config file not found: $CONFIG_FILE" + exit 1 +fi + +# Parse command line arguments (all optional) +SOURCE=${1:-"microphone"} +OPERATION=${2:-"amplify"} +DURATION=${3:-"3.0"} +OUTPUT_FILE=${4:-"test_workflow_webcam.wav"} +DEVICE_NAME=${5:-"C922 Pro Stream Webcam"} + +# Set defaults for other parameters if not set +SAMPLE_RATE=${SAMPLE_RATE:-44100} +CHANNELS=${CHANNELS:-1} +CHUNK_SIZE=${CHUNK_SIZE:-1024} +FREQUENCY=${FREQUENCY:-440.0} + +# Determine the absolute path for the output file +# The Python SDK runs from the examples directory, so it will create the file there +if [[ "$CURRENT_DIR" == "$SCRIPT_DIR" ]]; then + # Running from examples directory + ABSOLUTE_OUTPUT_FILE="$CURRENT_DIR/$OUTPUT_FILE" +else + # Running from root directory + ABSOLUTE_OUTPUT_FILE="$SCRIPT_DIR/$OUTPUT_FILE" +fi + +echo "🎵 Testing Audio Streaming Pipeline" +echo "Source: $SOURCE" +echo "Operation: $OPERATION" +echo "Duration: ${DURATION}s" +echo "Output: $ABSOLUTE_OUTPUT_FILE" +echo "Device: $DEVICE_NAME" +echo "" + +# Create temporary input file +TEMP_INPUT=$(mktemp --suffix=.json) +cat > "$TEMP_INPUT" << EOF +{ + "source": "$SOURCE", + "duration": $DURATION, + "sample_rate": $SAMPLE_RATE, + "channels": $CHANNELS, + "chunk_size": $CHUNK_SIZE, + "frequency": $FREQUENCY, + "output_file": "$OUTPUT_FILE", + "device_name": "$DEVICE_NAME", + "operation": "$OPERATION" +} +EOF + +echo "📝 Using input configuration:" +cat "$TEMP_INPUT" | jq '.' +echo "" + +# Run the workflow +echo "🚀 Starting workflow execution..." +if [[ "$CURRENT_DIR" == "$SCRIPT_DIR" ]]; then + # Running from examples directory - run from current directory + cargo run -- run --flow "$FLOW_FILE" --input "$TEMP_INPUT" +else + # Running from root directory + cargo run -- run --flow "$FLOW_FILE" --input "$TEMP_INPUT" +fi + +echo "" +echo "✅ Test completed!" +echo "📁 Output file: $ABSOLUTE_OUTPUT_FILE" + +# Check if file was created +if [ -f "$ABSOLUTE_OUTPUT_FILE" ]; then + echo "📊 File info:" + file "$ABSOLUTE_OUTPUT_FILE" + echo "📏 File size: $(ls -lh $ABSOLUTE_OUTPUT_FILE | awk '{print $5}')" + echo "🎵 Duration: $(soxi -D $ABSOLUTE_OUTPUT_FILE 2>/dev/null || echo 'Unknown') seconds" +else + # Check if file was created in examples directory (where Python SDK runs from) + EXAMPLES_OUTPUT_FILE="examples/$OUTPUT_FILE" + if [ -f "$EXAMPLES_OUTPUT_FILE" ]; then + echo "📊 File found in examples directory:" + file "$EXAMPLES_OUTPUT_FILE" + echo "📏 File size: $(ls -lh $EXAMPLES_OUTPUT_FILE | awk '{print $5}')" + echo "🎵 Duration: $(soxi -D $EXAMPLES_OUTPUT_FILE 2>/dev/null || echo 'Unknown') seconds" + echo "💡 Note: File was created in examples/ directory by the Python SDK" + else + echo "❌ Output file not found in expected location: $ABSOLUTE_OUTPUT_FILE" + echo "🔍 Checking for any .wav files in examples/ directory:" + find examples/ -name "*.wav" -type f 2>/dev/null || echo "No .wav files found in examples/" + fi +fi + +# Clean up temporary input file +rm -f "$TEMP_INPUT" + +echo "" +echo "🎉 Audio pipeline test finished!" diff --git a/sdks/python/pyproject.toml b/sdks/python/pyproject.toml index 7aaeb343..607bfc54 100644 --- a/sdks/python/pyproject.toml +++ b/sdks/python/pyproject.toml @@ -8,6 +8,9 @@ requires-python = ">=3.13" dependencies = [ "msgspec>=0.19.0", "jsonschema>=4.17.0", + "pyaudio>=0.2.11", + "sounddevice", + "numpy", ] [dependency-groups] diff --git a/sdks/python/src/stepflow_sdk/audio_components.py b/sdks/python/src/stepflow_sdk/audio_components.py new file mode 100644 index 00000000..a712d793 --- /dev/null +++ b/sdks/python/src/stepflow_sdk/audio_components.py @@ -0,0 +1,669 @@ +""" +Audio streaming components for Stepflow. +Supports PCM 16-bit audio streaming with base64 encoding. +""" + +import base64 +import json +import time +import uuid +from typing import Any, Dict, Optional +from dataclasses import dataclass +import pyaudio +import wave +import os +import threading +import queue +import sys +import numpy as np + + +try: + import sounddevice as sd + SOUNDDEVICE_AVAILABLE = True +except ImportError: + SOUNDDEVICE_AVAILABLE = False + + +@dataclass +class AudioChunk: + """Represents a chunk of PCM 16-bit audio data.""" + data: bytes + sample_rate: int + channels: int + chunk_index: int + timestamp: float + stream_id: str + + +class AudioStreamSource: + """Individual audio source component for generating audio chunks.""" + + def __init__(self, sample_rate: int = 16000, channels: int = 1, chunk_size: int = 1024, stream_id: str = None): + self.sample_rate = sample_rate + self.channels = channels + self.chunk_size = chunk_size + self.stream_id = stream_id or str(uuid.uuid4()) + + def start_microphone_stream(self): + """Initialize microphone stream.""" + self.audio = pyaudio.PyAudio() + self.stream = self.audio.open( + format=pyaudio.paInt16, + channels=self.channels, + rate=self.sample_rate, + input=True, + frames_per_buffer=self.chunk_size + ) + + def _record_audio(self): + """Record a single chunk of audio.""" + return self.stream.read(self.chunk_size, exception_on_overflow=False) + + def stop_microphone_stream(self): + """Stop and clean up microphone stream.""" + if hasattr(self, 'stream'): + self.stream.stop_stream() + self.stream.close() + if hasattr(self, 'audio'): + self.audio.terminate() + + def get_microphone_chunk(self) -> AudioChunk: + """Get a single chunk of audio from microphone.""" + data = self._record_audio() + return AudioChunk( + data=data, + sample_rate=self.sample_rate, + channels=self.channels, + chunk_index=0, + timestamp=time.time(), + stream_id=self.stream_id + ) + + def generate_sine_wave_chunk(self, frequency: float = 440.0, duration: float = 0.1) -> AudioChunk: + """Generate a sine wave chunk for testing.""" + import math + + samples = [] + for i in range(self.chunk_size): + t = i / self.sample_rate + sample = int(32767 * 0.3 * math.sin(2 * math.pi * frequency * t)) + samples.append(sample) + + data = b''.join(sample.to_bytes(2, 'little', signed=True) for sample in samples) + + return AudioChunk( + data=data, + sample_rate=self.sample_rate, + channels=self.channels, + chunk_index=0, + timestamp=time.time(), + stream_id=self.stream_id + ) + + def start_system_audio_stream(self): + """Initialize system audio capture (if sounddevice is available).""" + if not SOUNDDEVICE_AVAILABLE: + raise ImportError("sounddevice not available for system audio capture") + + # Find system audio device + self.device_info = self._find_system_audio_device() + if not self.device_info: + raise RuntimeError("No suitable system audio device found") + + + def _find_system_audio_device(self): + """Find a suitable system audio device.""" + devices = sd.query_devices() + + # Look for output devices that can be used for loopback + for device in devices: + if device['max_inputs'] > 0 and device['max_outputs'] > 0: + # This device supports both input and output (potential loopback) + return device + + # Fallback to default device + return sd.query_devices(kind='input') + + def get_system_audio_chunk(self): + """Get a single chunk of system audio.""" + if not SOUNDDEVICE_AVAILABLE: + raise ImportError("sounddevice not available") + + # Record a chunk of system audio + recording = sd.rec( + int(self.chunk_size), + samplerate=self.sample_rate, + channels=self.channels, + dtype='int16', + device=self.device_info['index'] + ) + sd.wait() + + # Convert to bytes + data = recording.tobytes() + + return AudioChunk( + data=data, + sample_rate=self.sample_rate, + channels=self.channels, + chunk_index=0, + timestamp=time.time(), + stream_id=self.stream_id + ) + + + + +def audio_stream_source(data: Dict[str, Any], context=None): + """ + Component that generates audio stream chunks. + + Input: + source: str - audio source type ("sine_wave", "microphone", "system_audio") + duration: float - duration in seconds + sample_rate: int - sample rate in Hz (will auto-detect if not supported) + channels: int - number of audio channels + chunk_size: int - size of each chunk in samples + frequency: float - frequency for sine wave (if source is sine_wave) + output_file: str - path to output WAV file + device_name: str - name of audio device to use (e.g., "C922 Pro Stream Webcam") + + Output: + Streaming audio chunks with metadata + """ + + # Extract parameters from input data + source_type = data.get('source', 'sine_wave') + requested_sample_rate = data.get('sample_rate', 44100) + device_name = data.get('device_name', None) + duration = data.get('duration', 5.0) + channels = data.get('channels', 1) + chunk_size = data.get('chunk_size', 1024) + frequency = data.get('frequency', 440.0) + output_file = data.get('output_file', 'output_audio.wav') + + + start_time = time.time() + + + stream_id = str(uuid.uuid4()) + + + # Initialize PyAudio + audio = pyaudio.PyAudio() + + # Find device if specified + device_index = None + if device_name: + for i in range(audio.get_device_count()): + info = audio.get_device_info_by_index(i) + if device_name.lower() in info['name'].lower(): + device_index = i + break + + # Auto-detect sample rate if device is specified and requested rate fails + sample_rate = requested_sample_rate + if device_index is not None: + # First try the requested sample rate + try: + test_stream = audio.open(format=pyaudio.paInt16, + channels=channels, + rate=requested_sample_rate, + input=True, + input_device_index=device_index, + frames_per_buffer=chunk_size) + test_stream.close() + sample_rate = requested_sample_rate + except OSError: + # Try alternative sample rates if requested rate fails + sample_rates = [16000, 22050, 44100, 48000] + for rate in sample_rates: + if rate == requested_sample_rate: + continue # Skip the requested rate since it already failed + try: + test_stream = audio.open(format=pyaudio.paInt16, + channels=channels, + rate=rate, + input=True, + input_device_index=device_index, + frames_per_buffer=chunk_size) + test_stream.close() + sample_rate = rate + break + except OSError: + continue + + # Calculate how many chunks we need for the full duration + chunk_duration = chunk_size / sample_rate # seconds per chunk + total_chunks = int(duration / chunk_duration) + + + # Collect all chunks for direct WAV file writing + all_chunks = [] + + if source_type == 'microphone': + # Set up continuous recording with queue + audio_queue = queue.Queue() + recording_stop = threading.Event() + recording_error = None + + def record_audio_continuously(): + """Background thread that continuously records audio.""" + nonlocal recording_error + try: + if device_index is not None: + stream = audio.open( + format=pyaudio.paInt16, + channels=channels, + rate=sample_rate, + input=True, + input_device_index=device_index, + frames_per_buffer=chunk_size + ) + else: + stream = audio.open( + format=pyaudio.paInt16, + channels=channels, + rate=sample_rate, + input=True, + frames_per_buffer=chunk_size + ) + + + chunk_index = 0 + while not recording_stop.is_set() and chunk_index < total_chunks: + try: + data = stream.read(chunk_size, exception_on_overflow=False) + audio_queue.put((chunk_index, data)) + chunk_index += 1 + except Exception as e: + recording_error = e + break + + stream.stop_stream() + stream.close() + + except Exception as e: + recording_error = e + + # Start recording thread + recording_thread = threading.Thread(target=record_audio_continuously, daemon=True) + recording_thread.start() + + + # Process chunks from the recording thread + processed_chunks = 0 + + while processed_chunks < total_chunks: + try: + # Wait indefinitely for chunks - no timeout + chunk_index, chunk_data = audio_queue.get() + all_chunks.append((chunk_index, chunk_data)) + + # Determine if this is the final chunk + is_final = chunk_index >= total_chunks - 1 + + + # Yield the chunk + yield { + "outcome": "streaming", + "stream_id": stream_id, + "sample_rate": sample_rate, + "channels": channels, + "chunk_size": chunk_size, + "format": "pcm_16bit", + "chunk": base64.b64encode(chunk_data).decode('utf-8'), + "chunk_index": chunk_index, + "is_final": is_final, + "output_file": output_file + } + + processed_chunks += 1 + + except Exception as e: + break + + + # Stop recording + recording_stop.set() + recording_thread.join(timeout=2.0) + + + # Check for any recording errors after completion + if recording_error: + raise RuntimeError(f"Microphone recording failed: {recording_error}") + + elif source_type == 'system_audio': + # Similar implementation for system audio + try: + source = AudioStreamSource(sample_rate, channels, chunk_size, stream_id=stream_id) + source.start_system_audio_stream() + + for chunk_index in range(total_chunks): + chunk_start_time = time.time() + chunk = source.get_system_audio_chunk() + + audio_capture_time = time.time() + + all_chunks.append(chunk.data) + chunk_b64 = base64.b64encode(chunk.data).decode('utf-8') + is_final = chunk_index >= total_chunks - 1 + + yield { + "outcome": "streaming", + "stream_id": stream_id, + "sample_rate": chunk.sample_rate, + "channels": chunk.channels, + "chunk_size": len(chunk.data), + "format": "pcm_16bit", + "chunk": chunk_b64, + "chunk_index": chunk_index, + "is_final": is_final, + "output_file": output_file + } + except Exception as e: + raise RuntimeError(f"System audio capture failed: {e}") + + elif source_type == 'sine_wave': + # Generate sine wave chunks (no queue needed) + source = AudioStreamSource(sample_rate, channels, chunk_size) + + for chunk_index in range(total_chunks): + chunk_start_time = time.time() + chunk = source.generate_sine_wave_chunk(frequency, chunk_duration) + + audio_capture_time = time.time() + + all_chunks.append(chunk.data) + chunk_b64 = base64.b64encode(chunk.data).decode('utf-8') + is_final = chunk_index >= total_chunks - 1 + + yield { + "outcome": "streaming", + "stream_id": stream_id, + "sample_rate": chunk.sample_rate, + "channels": chunk.channels, + "chunk_size": len(chunk.data), + "format": "pcm_16bit", + "chunk": chunk_b64, + "chunk_index": chunk_index, + "is_final": is_final, + "output_file": output_file + } + else: + raise ValueError(f"Unsupported audio source type: {source_type}. Supported types: microphone, system_audio, sine_wave") + + + # Note: WAV file writing is now handled by the audio_sink component + # to ensure we write the processed audio, not just the source audio + + + +def audio_chunk_processor(data: Dict[str, Any], context=None) -> Dict[str, Any]: + """ + Component that processes PCM 16-bit audio chunks. + + Input: + chunk: str - base64 encoded PCM data + chunk_index: int + sample_rate: int + channels: int + operation: str - processing operation ("amplify", "filter", "analyze") + + Output: + Processed chunk or analysis results + """ + + # Handle streaming chunk format - extract the actual chunk data + if 'outcome' in data and data['outcome'] == 'streaming': + # This is a streaming chunk, extract the chunk data + chunk_b64 = data.get('chunk', '') + chunk_index = data.get('chunk_index', 0) + sample_rate = data.get('sample_rate', 16000) + channels = data.get('channels', 1) + stream_id = data.get('stream_id', f'processed_{chunk_index}') + is_final = data.get('is_final', False) + operation = data.get('operation', 'passthrough') # Default operation + # Pass through output_file from workflow input + output_file = data.get('output_file', 'output_audio.wav') + else: + # Direct input format + chunk_b64 = data.get('chunk', '') + chunk_index = data.get('chunk_index', 0) + sample_rate = data.get('sample_rate', 16000) + channels = data.get('channels', 1) + stream_id = data.get('stream_id', f'processed_{chunk_index}') + is_final = data.get('is_final', False) + operation = data.get('operation', 'passthrough') + output_file = data.get('output_file', 'output_audio.wav') + + # Decode base64 chunk + chunk_data = base64.b64decode(chunk_b64) + + # Convert bytes to samples + samples = [] + for i in range(0, len(chunk_data), 2): + sample = int.from_bytes(chunk_data[i:i+2], 'little', signed=True) + samples.append(sample) + + + if operation == "amplify": + # Amplify the audio (multiply by gain factor) + gain = data.get('gain', 2.0) + amplified_samples = [int(sample * gain) for sample in samples] + + # Convert back to bytes + amplified_data = b''.join(sample.to_bytes(2, 'little', signed=True) for sample in amplified_samples) + amplified_b64 = base64.b64encode(amplified_data).decode('utf-8') + + result = { + "outcome": "streaming", + "stream_id": stream_id, + "sample_rate": sample_rate, + "channels": channels, + "operation": "amplify", + "gain": gain, + "chunk": amplified_b64, + "chunk_index": chunk_index, + "is_final": is_final, + "output_file": output_file + } + + elif operation == "analyze": + # Analyze the audio chunk + if samples: + max_amplitude = max(abs(sample) for sample in samples) + avg_amplitude = sum(abs(sample) for sample in samples) / len(samples) + rms = (sum(sample * sample for sample in samples) / len(samples)) ** 0.5 + else: + max_amplitude = avg_amplitude = rms = 0 + + result = { + "outcome": "success", + "result": { + "chunk_index": chunk_index, + "sample_count": len(samples), + "max_amplitude": max_amplitude, + "avg_amplitude": avg_amplitude, + "rms": rms, + "sample_rate": sample_rate, + "channels": channels + }, + "output_file": output_file + } + + else: + # Pass through unchanged + result = { + "outcome": "streaming", + "stream_id": stream_id, + "sample_rate": sample_rate, + "channels": channels, + "operation": "passthrough", + "chunk": chunk_b64, + "chunk_index": chunk_index, + "is_final": is_final, + "output_file": output_file + } + + return result + + +def audio_sink(data: Dict[str, Any], context=None) -> Dict[str, Any]: + """ + Component that receives and processes audio chunks (sink). + + Input: + chunk: str - base64 encoded PCM data + chunk_index: int + stream_id: str + output_file: str (optional) - path to output WAV file + play_audio: bool (optional) - whether to play audio in real time + + Output: + Confirmation of chunk received and file written + """ + + # Global storage for accumulating chunks across function calls + if not hasattr(audio_sink, '_chunk_storage'): + audio_sink._chunk_storage = {} + + # Handle streaming chunk format - extract the actual chunk data + if 'outcome' in data and data['outcome'] == 'streaming': + # This is a streaming chunk, extract the chunk data + chunk_b64 = data.get('chunk', '') + chunk_index = data.get('chunk_index', 0) + stream_id = data.get('stream_id', 'unknown') + sample_rate = data.get('sample_rate', 16000) + channels = data.get('channels', 1) + is_final = data.get('is_final', False) + # For streaming chunks, get output_file from the data (passed from workflow) + output_file = data.get('output_file', 'output_audio.wav') + play_audio = data.get('play_audio', False) + else: + # Direct input format + chunk_b64 = data.get('chunk', '') + chunk_index = data.get('chunk_index', 0) + stream_id = data.get('stream_id', 'unknown') + sample_rate = data.get('sample_rate', 16000) + channels = data.get('channels', 1) + is_final = data.get('is_final', False) + output_file = data.get('output_file', 'output_audio.wav') + play_audio = data.get('play_audio', False) + + + # Decode the chunk + if chunk_b64: + chunk_data = base64.b64decode(chunk_b64) + + # Store the chunk for later writing + if stream_id not in audio_sink._chunk_storage: + audio_sink._chunk_storage[stream_id] = { + 'chunks': [], + 'sample_rate': sample_rate, + 'channels': channels, + 'output_file': output_file + } + + audio_sink._chunk_storage[stream_id]['chunks'].append(chunk_data) + + + # Convert to samples for analysis + samples = [] + for i in range(0, len(chunk_data), 2): + sample = int.from_bytes(chunk_data[i:i+2], 'little', signed=True) + samples.append(sample) + + # Calculate audio levels + if samples: + max_amplitude = max(abs(sample) for sample in samples) + avg_amplitude = sum(abs(sample) for sample in samples) / len(samples) + rms = (sum(sample * sample for sample in samples) / len(samples)) ** 0.5 + else: + max_amplitude = avg_amplitude = rms = 0 + + + # Play audio if requested + if play_audio: + try: + import sounddevice as sd + import numpy as np + + # Convert to numpy array + audio_array = np.array(samples, dtype=np.int16) + + # Play the audio + sd.play(audio_array, samplerate=sample_rate) + sd.wait() + + + except ImportError: + # sounddevice not available, skip audio playback + pass + except Exception as e: + # Audio playback failed + pass + + # Write WAV file if this is the final chunk + if is_final: + + if stream_id in audio_sink._chunk_storage: + try: + storage = audio_sink._chunk_storage[stream_id] + all_audio_data = b''.join(storage['chunks']) + + + # Ensure the output directory exists + output_dir = os.path.dirname(output_file) + if output_dir and not os.path.exists(output_dir): + os.makedirs(output_dir, exist_ok=True) + + with wave.open(output_file, 'wb') as wav_file: + wav_file.setnchannels(storage['channels']) + wav_file.setsampwidth(2) # 16-bit + wav_file.setframerate(storage['sample_rate']) + wav_file.writeframes(all_audio_data) + + + # Verify file was created + if os.path.exists(output_file): + file_size = os.path.getsize(output_file) + + # Clean up storage for this stream + del audio_sink._chunk_storage[stream_id] + + except Exception as e: + # Error writing WAV file + import traceback + traceback.print_exc(file=sys.stderr) + else: + # Stream not found in storage when final chunk received + pass + + result = { + "outcome": "success", + "result": { + "chunk_index": chunk_index, + "stream_id": stream_id, + "max_amplitude": max_amplitude, + "avg_amplitude": avg_amplitude, + "rms": rms, + "sample_count": len(samples), + "chunk_size_bytes": len(chunk_data), + "output_file": output_file, + "is_final": is_final, + "total_chunks_stored": len(audio_sink._chunk_storage.get(stream_id, {}).get('chunks', [])) + } + } + else: + result = { + "outcome": "success", + "result": { + "chunk_index": chunk_index, + "stream_id": stream_id, + "message": "No audio data received" + } + } + + return result \ No newline at end of file diff --git a/sdks/python/src/stepflow_sdk/main.py b/sdks/python/src/stepflow_sdk/main.py index 3950ee68..99010e13 100644 --- a/sdks/python/src/stepflow_sdk/main.py +++ b/sdks/python/src/stepflow_sdk/main.py @@ -4,6 +4,7 @@ from typing import Any, List from stepflow_sdk.server import StepflowStdioServer from stepflow_sdk.context import StepflowContext +from stepflow_sdk.audio_components import audio_stream_source, audio_chunk_processor, audio_sink # Create server instance server = StepflowStdioServer() @@ -508,6 +509,11 @@ async def wrapper(data): except Exception as e: raise ValueError(f"Code compilation failed: {e}") +# Register audio components +server.component(name="audio_stream_source")(audio_stream_source) +server.component(name="audio_chunk_processor")(audio_chunk_processor) +server.component(name="audio_sink")(audio_sink) + def main(): # Start the server server.run() diff --git a/sdks/python/src/stepflow_sdk/protocol.py b/sdks/python/src/stepflow_sdk/protocol.py index 043d6ed7..af2b87b6 100644 --- a/sdks/python/src/stepflow_sdk/protocol.py +++ b/sdks/python/src/stepflow_sdk/protocol.py @@ -30,6 +30,8 @@ class ComponentInfoResponse(Struct, kw_only=True): class ComponentExecuteRequest(Struct, kw_only=True): component: str input: msgspec.Raw + execution_id: str + step_id: str class ComponentExecuteResponse(Struct, kw_only=True): output: Any \ No newline at end of file diff --git a/sdks/python/src/stepflow_sdk/server.py b/sdks/python/src/stepflow_sdk/server.py index 0105d8c6..dbfb2cc4 100644 --- a/sdks/python/src/stepflow_sdk/server.py +++ b/sdks/python/src/stepflow_sdk/server.py @@ -67,6 +67,9 @@ def decorator(f: Callable) -> Callable: # Extract description from parameter or docstring component_description = description or (f.__doc__.strip() if f.__doc__ else None) + # Check if this is a generator function + is_generator = inspect.isgeneratorfunction(f) + self._components[component_name] = ComponentEntry( name=component_name, function=f, @@ -75,8 +78,9 @@ def decorator(f: Callable) -> Callable: description=component_description ) - # Store whether function expects context + # Store whether function expects context and is a generator f._expects_context = expects_context + f._is_generator = is_generator @wraps(f) def wrapper(*args, **kwargs): @@ -130,8 +134,10 @@ async def _handle_method_request(self, request: Message) -> Message | None: ) case "component_execute": execute_request = msgspec.json.decode(request.params, type=ComponentExecuteRequest) + print(f"DEBUG: Executing component: {execute_request.component}", file=sys.stderr) component = self.get_component(execute_request.component) if not component: + print(f"DEBUG: Component {execute_request.component} not found!", file=sys.stderr) return Message( id=id, error={ @@ -140,29 +146,65 @@ async def _handle_method_request(self, request: Message) -> Message | None: "data": None } ) + print(f"DEBUG: Component found, executing function", file=sys.stderr) try: # Parse input parameters into the expected type input = msgspec.json.decode(execute_request.input, type=component.input_type) + print(f"DEBUG: Input parsed successfully: {input}", file=sys.stderr) - # Execute component with or without context - import asyncio - import inspect + # Execute the component function + output = component.function(input) + print(f"DEBUG: Component function executed, output type: {type(output)}", file=sys.stderr) - if hasattr(component.function, '_expects_context') and component.function._expects_context: - if inspect.iscoroutinefunction(component.function): - output = await component.function(input, self._context) + # Check if this is a generator function + if hasattr(component.function, '_is_generator') and component.function._is_generator: + # For generators, we need to yield each result as streaming + if inspect.isgenerator(output): + results = [] + chunk_index = 0 + for result in output: + results.append(result) + is_final = False # We'll mark the last chunk as final later if needed + + # Send streaming notification immediately via stdout instead of queuing + streaming_notification = { + "jsonrpc": "2.0", + "method": "streaming_chunk", + "params": { + "request_id": str(execute_request.execution_id), + "stream_id": str(execute_request.execution_id), + "chunk_index": chunk_index, + "is_final": is_final, + "step_id": execute_request.step_id, + "chunk": result + } + } + # Send immediately via stdout + notification_bytes = msgspec.json.encode(streaming_notification) + b"\n" + sys.stdout.buffer.write(notification_bytes) + sys.stdout.buffer.flush() + print(f"Sent outgoing message", file=sys.stderr) + + chunk_index += 1 + + # Return the final result (last chunk) + if results: + return Message( + id=id, + result=ComponentExecuteResponse(output=results[-1]), + ) else: - output = component.function(input, self._context) + # Not actually a generator, treat as normal + return Message( + id=id, + result=ComponentExecuteResponse(output=output), + ) else: - if inspect.iscoroutinefunction(component.function): - output = await component.function(input) - else: - output = component.function(input) - - return Message( - id=id, - result=ComponentExecuteResponse(output=output), - ) + # Normal non-generator function + return Message( + id=id, + result=ComponentExecuteResponse(output=output), + ) except Exception as e: return Message( id=id, @@ -332,7 +374,10 @@ async def _send_outgoing_message(self, message_data, writer: asyncio.StreamWrite message_bytes = msgspec.json.encode(message_data) + b"\n" writer.write(message_bytes) await writer.drain() - print(f"Sent outgoing message: {message_data}", file=sys.stderr) + # Ensure the message is fully written and flushed + if hasattr(writer, '_transport') and hasattr(writer._transport, 'flush'): + writer._transport.flush() + print(f"Sent outgoing message", file=sys.stderr) except Exception as e: print(f"Error sending outgoing message: {e}", file=sys.stderr) diff --git a/sdks/python/uv.lock b/sdks/python/uv.lock index 9c368820..34640d06 100644 --- a/sdks/python/uv.lock +++ b/sdks/python/uv.lock @@ -10,6 +10,28 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/77/06/bb80f5f86020c4551da315d78b3ab75e8228f89f0162f2c3a819e407941a/attrs-25.3.0-py3-none-any.whl", hash = "sha256:427318ce031701fea540783410126f03899a97ffc6f61596ad581ac2e40e3bc3", size = 63815 }, ] +[[package]] +name = "cffi" +version = "1.17.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pycparser" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/fc/97/c783634659c2920c3fc70419e3af40972dbaf758daa229a7d6ea6135c90d/cffi-1.17.1.tar.gz", hash = "sha256:1c39c6016c32bc48dd54561950ebd6836e1670f2ae46128f67cf49e789c52824", size = 516621 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/8d/f8/dd6c246b148639254dad4d6803eb6a54e8c85c6e11ec9df2cffa87571dbe/cffi-1.17.1-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:f3a2b4222ce6b60e2e8b337bb9596923045681d71e5a082783484d845390938e", size = 182989 }, + { url = "https://files.pythonhosted.org/packages/8b/f1/672d303ddf17c24fc83afd712316fda78dc6fce1cd53011b839483e1ecc8/cffi-1.17.1-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:0984a4925a435b1da406122d4d7968dd861c1385afe3b45ba82b750f229811e2", size = 178802 }, + { url = "https://files.pythonhosted.org/packages/0e/2d/eab2e858a91fdff70533cab61dcff4a1f55ec60425832ddfdc9cd36bc8af/cffi-1.17.1-cp313-cp313-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:d01b12eeeb4427d3110de311e1774046ad344f5b1a7403101878976ecd7a10f3", size = 454792 }, + { url = "https://files.pythonhosted.org/packages/75/b2/fbaec7c4455c604e29388d55599b99ebcc250a60050610fadde58932b7ee/cffi-1.17.1-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:706510fe141c86a69c8ddc029c7910003a17353970cff3b904ff0686a5927683", size = 478893 }, + { url = "https://files.pythonhosted.org/packages/4f/b7/6e4a2162178bf1935c336d4da8a9352cccab4d3a5d7914065490f08c0690/cffi-1.17.1-cp313-cp313-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:de55b766c7aa2e2a3092c51e0483d700341182f08e67c63630d5b6f200bb28e5", size = 485810 }, + { url = "https://files.pythonhosted.org/packages/c7/8a/1d0e4a9c26e54746dc08c2c6c037889124d4f59dffd853a659fa545f1b40/cffi-1.17.1-cp313-cp313-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:c59d6e989d07460165cc5ad3c61f9fd8f1b4796eacbd81cee78957842b834af4", size = 471200 }, + { url = "https://files.pythonhosted.org/packages/26/9f/1aab65a6c0db35f43c4d1b4f580e8df53914310afc10ae0397d29d697af4/cffi-1.17.1-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:dd398dbc6773384a17fe0d3e7eeb8d1a21c2200473ee6806bb5e6a8e62bb73dd", size = 479447 }, + { url = "https://files.pythonhosted.org/packages/5f/e4/fb8b3dd8dc0e98edf1135ff067ae070bb32ef9d509d6cb0f538cd6f7483f/cffi-1.17.1-cp313-cp313-musllinux_1_1_aarch64.whl", hash = "sha256:3edc8d958eb099c634dace3c7e16560ae474aa3803a5df240542b305d14e14ed", size = 484358 }, + { url = "https://files.pythonhosted.org/packages/f1/47/d7145bf2dc04684935d57d67dff9d6d795b2ba2796806bb109864be3a151/cffi-1.17.1-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:72e72408cad3d5419375fc87d289076ee319835bdfa2caad331e377589aebba9", size = 488469 }, + { url = "https://files.pythonhosted.org/packages/bf/ee/f94057fa6426481d663b88637a9a10e859e492c73d0384514a17d78ee205/cffi-1.17.1-cp313-cp313-win32.whl", hash = "sha256:e03eab0a8677fa80d646b5ddece1cbeaf556c313dcfac435ba11f107ba117b5d", size = 172475 }, + { url = "https://files.pythonhosted.org/packages/7c/fc/6a8cb64e5f0324877d503c854da15d76c1e50eb722e320b15345c4d0c6de/cffi-1.17.1-cp313-cp313-win_amd64.whl", hash = "sha256:f6a16c31041f09ead72d69f583767292f750d24913dadacf5756b966aacb3f1a", size = 182009 }, +] + [[package]] name = "colorama" version = "0.4.6" @@ -99,6 +121,36 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/23/d8/f15b40611c2d5753d1abb0ca0da0c75348daf1252220e5dda2867bd81062/msgspec-0.19.0-cp313-cp313-win_amd64.whl", hash = "sha256:317050bc0f7739cb30d257ff09152ca309bf5a369854bbf1e57dffc310c1f20f", size = 187432 }, ] +[[package]] +name = "numpy" +version = "2.3.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/2e/19/d7c972dfe90a353dbd3efbbe1d14a5951de80c99c9dc1b93cd998d51dc0f/numpy-2.3.1.tar.gz", hash = "sha256:1ec9ae20a4226da374362cca3c62cd753faf2f951440b0e3b98e93c235441d2b", size = 20390372 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d4/bd/35ad97006d8abff8631293f8ea6adf07b0108ce6fec68da3c3fcca1197f2/numpy-2.3.1-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:25a1992b0a3fdcdaec9f552ef10d8103186f5397ab45e2d25f8ac51b1a6b97e8", size = 20889381 }, + { url = "https://files.pythonhosted.org/packages/f1/4f/df5923874d8095b6062495b39729178eef4a922119cee32a12ee1bd4664c/numpy-2.3.1-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:7dea630156d39b02a63c18f508f85010230409db5b2927ba59c8ba4ab3e8272e", size = 14152726 }, + { url = "https://files.pythonhosted.org/packages/8c/0f/a1f269b125806212a876f7efb049b06c6f8772cf0121139f97774cd95626/numpy-2.3.1-cp313-cp313-macosx_14_0_arm64.whl", hash = "sha256:bada6058dd886061f10ea15f230ccf7dfff40572e99fef440a4a857c8728c9c0", size = 5105145 }, + { url = "https://files.pythonhosted.org/packages/6d/63/a7f7fd5f375b0361682f6ffbf686787e82b7bbd561268e4f30afad2bb3c0/numpy-2.3.1-cp313-cp313-macosx_14_0_x86_64.whl", hash = "sha256:a894f3816eb17b29e4783e5873f92faf55b710c2519e5c351767c51f79d8526d", size = 6639409 }, + { url = "https://files.pythonhosted.org/packages/bf/0d/1854a4121af895aab383f4aa233748f1df4671ef331d898e32426756a8a6/numpy-2.3.1-cp313-cp313-manylinux_2_28_aarch64.whl", hash = "sha256:18703df6c4a4fee55fd3d6e5a253d01c5d33a295409b03fda0c86b3ca2ff41a1", size = 14257630 }, + { url = "https://files.pythonhosted.org/packages/50/30/af1b277b443f2fb08acf1c55ce9d68ee540043f158630d62cef012750f9f/numpy-2.3.1-cp313-cp313-manylinux_2_28_x86_64.whl", hash = "sha256:5902660491bd7a48b2ec16c23ccb9124b8abfd9583c5fdfa123fe6b421e03de1", size = 16627546 }, + { url = "https://files.pythonhosted.org/packages/6e/ec/3b68220c277e463095342d254c61be8144c31208db18d3fd8ef02712bcd6/numpy-2.3.1-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:36890eb9e9d2081137bd78d29050ba63b8dab95dff7912eadf1185e80074b2a0", size = 15562538 }, + { url = "https://files.pythonhosted.org/packages/77/2b/4014f2bcc4404484021c74d4c5ee8eb3de7e3f7ac75f06672f8dcf85140a/numpy-2.3.1-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:a780033466159c2270531e2b8ac063704592a0bc62ec4a1b991c7c40705eb0e8", size = 18360327 }, + { url = "https://files.pythonhosted.org/packages/40/8d/2ddd6c9b30fcf920837b8672f6c65590c7d92e43084c25fc65edc22e93ca/numpy-2.3.1-cp313-cp313-win32.whl", hash = "sha256:39bff12c076812595c3a306f22bfe49919c5513aa1e0e70fac756a0be7c2a2b8", size = 6312330 }, + { url = "https://files.pythonhosted.org/packages/dd/c8/beaba449925988d415efccb45bf977ff8327a02f655090627318f6398c7b/numpy-2.3.1-cp313-cp313-win_amd64.whl", hash = "sha256:8d5ee6eec45f08ce507a6570e06f2f879b374a552087a4179ea7838edbcbfa42", size = 12731565 }, + { url = "https://files.pythonhosted.org/packages/0b/c3/5c0c575d7ec78c1126998071f58facfc124006635da75b090805e642c62e/numpy-2.3.1-cp313-cp313-win_arm64.whl", hash = "sha256:0c4d9e0a8368db90f93bd192bfa771ace63137c3488d198ee21dfb8e7771916e", size = 10190262 }, + { url = "https://files.pythonhosted.org/packages/ea/19/a029cd335cf72f79d2644dcfc22d90f09caa86265cbbde3b5702ccef6890/numpy-2.3.1-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:b0b5397374f32ec0649dd98c652a1798192042e715df918c20672c62fb52d4b8", size = 20987593 }, + { url = "https://files.pythonhosted.org/packages/25/91/8ea8894406209107d9ce19b66314194675d31761fe2cb3c84fe2eeae2f37/numpy-2.3.1-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:c5bdf2015ccfcee8253fb8be695516ac4457c743473a43290fd36eba6a1777eb", size = 14300523 }, + { url = "https://files.pythonhosted.org/packages/a6/7f/06187b0066eefc9e7ce77d5f2ddb4e314a55220ad62dd0bfc9f2c44bac14/numpy-2.3.1-cp313-cp313t-macosx_14_0_arm64.whl", hash = "sha256:d70f20df7f08b90a2062c1f07737dd340adccf2068d0f1b9b3d56e2038979fee", size = 5227993 }, + { url = "https://files.pythonhosted.org/packages/e8/ec/a926c293c605fa75e9cfb09f1e4840098ed46d2edaa6e2152ee35dc01ed3/numpy-2.3.1-cp313-cp313t-macosx_14_0_x86_64.whl", hash = "sha256:2fb86b7e58f9ac50e1e9dd1290154107e47d1eef23a0ae9145ded06ea606f992", size = 6736652 }, + { url = "https://files.pythonhosted.org/packages/e3/62/d68e52fb6fde5586650d4c0ce0b05ff3a48ad4df4ffd1b8866479d1d671d/numpy-2.3.1-cp313-cp313t-manylinux_2_28_aarch64.whl", hash = "sha256:23ab05b2d241f76cb883ce8b9a93a680752fbfcbd51c50eff0b88b979e471d8c", size = 14331561 }, + { url = "https://files.pythonhosted.org/packages/fc/ec/b74d3f2430960044bdad6900d9f5edc2dc0fb8bf5a0be0f65287bf2cbe27/numpy-2.3.1-cp313-cp313t-manylinux_2_28_x86_64.whl", hash = "sha256:ce2ce9e5de4703a673e705183f64fd5da5bf36e7beddcb63a25ee2286e71ca48", size = 16693349 }, + { url = "https://files.pythonhosted.org/packages/0d/15/def96774b9d7eb198ddadfcbd20281b20ebb510580419197e225f5c55c3e/numpy-2.3.1-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:c4913079974eeb5c16ccfd2b1f09354b8fed7e0d6f2cab933104a09a6419b1ee", size = 15642053 }, + { url = "https://files.pythonhosted.org/packages/2b/57/c3203974762a759540c6ae71d0ea2341c1fa41d84e4971a8e76d7141678a/numpy-2.3.1-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:010ce9b4f00d5c036053ca684c77441f2f2c934fd23bee058b4d6f196efd8280", size = 18434184 }, + { url = "https://files.pythonhosted.org/packages/22/8a/ccdf201457ed8ac6245187850aff4ca56a79edbea4829f4e9f14d46fa9a5/numpy-2.3.1-cp313-cp313t-win32.whl", hash = "sha256:6269b9edfe32912584ec496d91b00b6d34282ca1d07eb10e82dfc780907d6c2e", size = 6440678 }, + { url = "https://files.pythonhosted.org/packages/f1/7e/7f431d8bd8eb7e03d79294aed238b1b0b174b3148570d03a8a8a8f6a0da9/numpy-2.3.1-cp313-cp313t-win_amd64.whl", hash = "sha256:2a809637460e88a113e186e87f228d74ae2852a2e0c44de275263376f17b5bdc", size = 12870697 }, + { url = "https://files.pythonhosted.org/packages/d4/ca/af82bf0fad4c3e573c6930ed743b5308492ff19917c7caaf2f9b6f9e2e98/numpy-2.3.1-cp313-cp313t-win_arm64.whl", hash = "sha256:eccb9a159db9aed60800187bc47a6d3451553f0e1b08b068d8b277ddfbb9b244", size = 10260376 }, +] + [[package]] name = "packaging" version = "25.0" @@ -117,6 +169,25 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/88/5f/e351af9a41f866ac3f1fac4ca0613908d9a41741cfcf2228f4ad853b697d/pluggy-1.5.0-py3-none-any.whl", hash = "sha256:44e1ad92c8ca002de6377e165f3e0f1be63266ab4d554740532335b9d75ea669", size = 20556 }, ] +[[package]] +name = "pyaudio" +version = "0.2.14" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/26/1d/8878c7752febb0f6716a7e1a52cb92ac98871c5aa522cba181878091607c/PyAudio-0.2.14.tar.gz", hash = "sha256:78dfff3879b4994d1f4fc6485646a57755c6ee3c19647a491f790a0895bd2f87", size = 47066 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/3a/77/66cd37111a87c1589b63524f3d3c848011d21ca97828422c7fde7665ff0d/PyAudio-0.2.14-cp313-cp313-win32.whl", hash = "sha256:95328285b4dab57ea8c52a4a996cb52be6d629353315be5bfda403d15932a497", size = 150982 }, + { url = "https://files.pythonhosted.org/packages/a5/8b/7f9a061c1cc2b230f9ac02a6003fcd14c85ce1828013aecbaf45aa988d20/PyAudio-0.2.14-cp313-cp313-win_amd64.whl", hash = "sha256:692d8c1446f52ed2662120bcd9ddcb5aa2b71f38bda31e58b19fb4672fffba69", size = 173655 }, +] + +[[package]] +name = "pycparser" +version = "2.22" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/1d/b2/31537cf4b1ca988837256c910a668b553fceb8f069bedc4b1c826024b52c/pycparser-2.22.tar.gz", hash = "sha256:491c8be9c040f5390f5bf44a5b07752bd07f56edf992381b05c701439eec10f6", size = 172736 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/13/a3/a812df4e2dd5696d1f351d58b8fe16a405b234ad2886a0dab9183fb78109/pycparser-2.22-py3-none-any.whl", hash = "sha256:c3702b6d3dd8c7abc1afa565d7e63d53a1d0bd86cdc24edd75470f4de499cfcc", size = 117552 }, +] + [[package]] name = "pytest" version = "8.3.5" @@ -205,6 +276,21 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b6/97/5a4b59697111c89477d20ba8a44df9ca16b41e737fa569d5ae8bff99e650/rpds_py-0.25.1-cp313-cp313t-win_amd64.whl", hash = "sha256:401ca1c4a20cc0510d3435d89c069fe0a9ae2ee6495135ac46bdd49ec0495763", size = 232218 }, ] +[[package]] +name = "sounddevice" +version = "0.5.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "cffi" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/91/a6/91e9f08ed37c7c9f56b5227c6aea7f2ae63ba2d59520eefb24e82cbdd589/sounddevice-0.5.2.tar.gz", hash = "sha256:c634d51bd4e922d6f0fa5e1a975cc897c947f61d31da9f79ba7ea34dff448b49", size = 53150 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/75/2d/582738fc01352a5bc20acac9221e58538365cecb3bb264838f66419df219/sounddevice-0.5.2-py3-none-any.whl", hash = "sha256:82375859fac2e73295a4ab3fc60bd4782743157adc339561c1f1142af472f505", size = 32450 }, + { url = "https://files.pythonhosted.org/packages/3f/6f/e3dd751face4fcb5be25e8abba22f25d8e6457ebd7e9ed79068b768dc0e5/sounddevice-0.5.2-py3-none-macosx_10_6_x86_64.macosx_10_6_universal2.whl", hash = "sha256:943f27e66037d41435bdd0293454072cdf657b594c9cde63cd01ee3daaac7ab3", size = 108088 }, + { url = "https://files.pythonhosted.org/packages/45/0b/bfad79af0b380aa7c0bfe73e4b03e0af45354a48ad62549489bd7696c5b0/sounddevice-0.5.2-py3-none-win32.whl", hash = "sha256:3a113ce614a2c557f14737cb20123ae6298c91fc9301eb014ada0cba6d248c5f", size = 312665 }, + { url = "https://files.pythonhosted.org/packages/e1/3e/61d88e6b0a7383127cdc779195cb9d83ebcf11d39bc961de5777e457075e/sounddevice-0.5.2-py3-none-win_amd64.whl", hash = "sha256:e18944b767d2dac3771a7771bdd7ff7d3acd7d334e72c4bedab17d1aed5dbc22", size = 363808 }, +] + [[package]] name = "stepflow-sdk" version = "0.1.0" @@ -212,6 +298,9 @@ source = { editable = "." } dependencies = [ { name = "jsonschema" }, { name = "msgspec" }, + { name = "numpy" }, + { name = "pyaudio" }, + { name = "sounddevice" }, ] [package.dev-dependencies] @@ -225,6 +314,9 @@ dev = [ requires-dist = [ { name = "jsonschema", specifier = ">=4.17.0" }, { name = "msgspec", specifier = ">=0.19.0" }, + { name = "numpy" }, + { name = "pyaudio", specifier = ">=0.2.11" }, + { name = "sounddevice" }, ] [package.metadata.requires-dev]