-
Hey dagster team! i have a job which should always trigger it self to run again after completing with a delay of x The question was originally asked in Dagster Slack. |
Beta Was this translation helpful? Give feedback.
Answered by
benpankow
Nov 30, 2023
Replies: 1 comment
-
You can accomplish this using a sensor which monitors to check when the last run of the job occurred & kick off another run if it's been long enough: from typing import Any
from dagster import (
RunRequest,
sensor,
RunsFilter,
Definitions,
SkipReason,
job,
SensorEvaluationContext,
)
from datetime import datetime, timedelta
@job
def my_monitored_job():
...
# Set this to longer than the time it takes to run the job, since we
# query runs by start time
RUN_START_LOOKBACK_MINS = 10
# Set this to the delay you want to wait before running the job again
DELAY_TO_RUN_JOB_MINS = 2
@sensor(job=my_monitored_job)
def run_job_after_delay_sensor(context: SensorEvaluationContext) -> Any:
monitored_job_name = my_monitored_job.name
lookback_time = datetime.utcnow() - timedelta(
minutes=DELAY_TO_RUN_JOB_MINS + RUN_START_LOOKBACK_MINS
)
# Check for any runs which ran within the last 2 minutes
run_records = context.instance.get_run_records(
filters=RunsFilter(
job_name=monitored_job_name,
created_after=lookback_time,
),
order_by="update_timestamp",
ascending=True,
)
created_before_time = datetime.now() - timedelta(minutes=DELAY_TO_RUN_JOB_MINS)
# Check to see if the most recent run of the job ended before the delay time
# If it did, then we should run the job again, otherwise we should wait
if run_records and (
not run_records[-1].end_time
or datetime.fromtimestamp(float(run_records[-1].end_time)) > created_before_time
):
return SkipReason(
f"Most recent run of job {monitored_job_name} ended at {datetime.fromtimestamp(float(run_records[-1].end_time or 0))} which is not before {created_before_time}"
)
else:
return RunRequest()
defs = Definitions(
jobs=[my_monitored_job],
sensors=[run_job_after_delay_sensor],
) |
Beta Was this translation helpful? Give feedback.
0 replies
Answer selected by
benpankow
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
You can accomplish this using a sensor which monitors to check when the last run of the job occurred & kick off another run if it's been long enough: