Skip to content

Pipeline state dump and load #352

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: main
Choose a base branch
from

Conversation

NathalieCharbel
Copy link
Contributor

@NathalieCharbel NathalieCharbel commented Jun 4, 2025

Description

This PR introduces state management capabilities to the Pipeline class, enabling:

  • Checkpointing pipeline execution at specific components using run_until
  • Resuming pipeline execution from saved states using resume_from
  • Saving/loading pipeline state to/from a json file or or passing it directly between pipeline runs.

The state includes pipeline configuration, execution results, and final results from previous runs. This feature is particularly useful for:

  • Debugging long-running pipelines
  • Recovering from failures
  • Comparing component implementations with deterministic inputs

Type of Change

  • New feature
  • Bug fix
  • Breaking change
  • Documentation update
  • Project configuration change

Complexity

Complexity: low

How Has This Been Tested?

  • Unit tests
  • E2E tests
  • Manual tests

Checklist

The following requirements should have been met (depending on the changes in the branch):

  • Documentation has been updated
  • Unit tests have been updated
  • E2E tests have been updated
  • Examples have been updated
  • New files have copyright header
  • CLA (https://neo4j.com/developer/cla/) has been signed
  • CHANGELOG.md updated if appropriate

@NathalieCharbel NathalieCharbel requested a review from a team as a code owner June 4, 2025 10:18
.. code:: python

# Run pipeline until a specific component
state = await pipeline.run_until(data, stop_after="component_name", state_file="state.json")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are returning the state, I think it would be cleaner let to the user save it to file or not, I'm not sure adding the state_file option is helpful.

result = await pipeline.resume_from(state, data, start_from="component_name")

# Alternatively, load state from file
result = await pipeline.resume_from(None, data, start_from="component_name", state_file="state.json")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also because of that, None as first argument is not super nice IMHO :)

keys_to_remove = [
key for key in self._data.keys() if key.startswith(run_id_prefix)
]
for key in keys_to_remove:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here we are removing all results from a previous run with this run_id, right?

@@ -140,6 +140,7 @@ def __init__(
}
"""
self.missing_inputs: dict[str, list[str]] = defaultdict()
self._current_run_id: Optional[str] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can not be saved in the Pipeline instance, since concurrent runs will override it.

@@ -600,20 +650,23 @@ async def run(
result=await self.get_final_results(orchestrator.run_id),
)

def dump_state(self, run_id: str) -> Dict[str, Any]:
def dump_state(self) -> Dict[str, Any]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we will need the run_id as parameter here as a consequence of my first comment

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants