Skip to content

Streaming support #96

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open

Streaming support #96

wants to merge 14 commits into from

Conversation

phact
Copy link

@phact phact commented Jun 26, 2025

This pull request introduces support for streaming workflows, adds new functionality for managing debug sessions, and improves the handling of execution flows in the StepFlow system. Key changes include adding a Streaming variant to FlowResult, enhancing debug session management with tokio::sync::Mutex, and updating the ValueResolver to handle streaming results.

Streaming Workflow Support:

  • Added a Streaming variant to the FlowResult enum to represent streaming data with associated metadata, chunk information, and a final chunk indicator (crates/stepflow-core/src/flow_result.rs).
  • Updated the ValueResolver to pass through streaming results without applying skip actions and handle streaming results in objects and arrays (crates/stepflow-execution/src/value_resolver.rs). [1] [2] [3]
  • Modified the EvalComponent to serialize streaming results into JSON format for external use (crates/stepflow-builtins/src/eval.rs).

Debug Session Management:

  • Replaced WorkflowExecutor in debug_sessions with an Arc<tokio::sync::Mutex<WorkflowExecutor>> to allow safe concurrent access and reuse of debug sessions (crates/stepflow-execution/src/executor.rs). [1] [2]
  • Added methods to retrieve existing debug sessions, clean up sessions after execution, and manage workflow executors for streaming (crates/stepflow-execution/src/executor.rs). [1] [2] [3]

Execution Flow Enhancements:

  • Introduced cleanup mechanisms for pending executions and debug sessions to prevent memory leaks (crates/stepflow-execution/src/executor.rs).
  • Created a wrapper to implement the Executor trait for StepFlowExecutor, enabling integration with the plugin system (crates/stepflow-execution/src/executor.rs).

Additional Improvements:

  • Added a Clone implementation for DependencyTracker and other structs to support cloning where needed (crates/stepflow-analysis/src/tracker.rs, crates/stepflow-core/src/workflow/step.rs, crates/stepflow-execution/src/value_resolver.rs). [1] [2] [3]
  • Introduced a streaming flag in the Step struct to indicate whether a step is a streaming step (crates/stepflow-core/src/workflow/step.rs). [1] [2] [3]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant