Concurrent runs overwrite intermediate asset data #31870
Unanswered
maurakeith
asked this question in
Q&A
Replies: 0 comments
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
-
Hi there!
Summary
I am having a problem with concurrent runs for the same job overwriting each other's intermediate asset data. When my sensor triggers multiple runs simultaneously, a downstream asset in one run incorrectly loads the output from an asset in another concurrent run. This causes the pipelines to fail. The issue is that the intermediate data from one run is being overwritten by the data from a concurrent run, causing downstream assets to use the wrong input. I simplified the issue in a test pipeline that reproduces the error below. The logs from my test pipeline confirm this: the second run's asset_b_test logs the value from the first run's asset_a_test instead of its own.
Example Code
Here is a simplified version of my code that reproduces the issue:
Sensor:
test_sensor.py
This sensor, whenever it runs, will trigger 2 runs of my
test_job
. Therun_key
is a UUID, andrun_config
is another UUID, so they are guaranteed to be unique.Assets:
test_assets.py
I have 2 assets: asset a simply takes in the run config with
id_val
and returns it. asset b receives the output from asset a and returns f"Asset B received: {id_val}"Job:
job.py
Definition:
definitions.py
I see that the asset materializations from my runs are stored in a common location. That is, there isn't a run-specific location that asset materializations are stored in.
asset_a_test
andasset_b_test
are simply overwritten with every run:Troubleshooting Steps Taken
FilesystemIOManager
to attempt to save asset materializations to a unique locationOutput
class with metadata defined to differentiate asset materializationsIt appears I need a way to make Dagster's IO manager store intermediate outputs in a run-specific path to avoid these conflicts. Any guidance on how to properly implement a custom IO manager or another solution would be greatly appreciated.
Beta Was this translation helpful? Give feedback.
All reactions