diff --git a/core/liteflow/core/abstractions/workflow_host.py b/core/liteflow/core/abstractions/workflow_host.py index 783c04f..83e742e 100644 --- a/core/liteflow/core/abstractions/workflow_host.py +++ b/core/liteflow/core/abstractions/workflow_host.py @@ -10,6 +10,9 @@ def register_workflow(self, workflow: Workflow): def start_workflow(self, workflow_id, version, data): pass + def stop_workflow(self, workflow_wid): + pass + def publish_event(self, event_name, event_key, event_data=object(), effective_date=None): pass diff --git a/core/liteflow/core/builders/step_builder.py b/core/liteflow/core/builders/step_builder.py index c6817a5..e03518c 100644 --- a/core/liteflow/core/builders/step_builder.py +++ b/core/liteflow/core/builders/step_builder.py @@ -89,8 +89,10 @@ def for_each(self, collection: Callable[[Any, StepExecutionContext], List]): return step_builder - def on_error(self, error_behavior): + def on_error(self, error_behavior, error_max_retry=0, error_retry_interval=10): self.step.error_behavior = error_behavior + self.step.error_max_retry = error_max_retry + self.step.error_retry_interval = error_retry_interval return self def do(self, builder: Callable): diff --git a/core/liteflow/core/models/workflow_step.py b/core/liteflow/core/models/workflow_step.py index 0a759ea..200b459 100644 --- a/core/liteflow/core/models/workflow_step.py +++ b/core/liteflow/core/models/workflow_step.py @@ -17,6 +17,8 @@ def __init__(self, body: T): self.body = body self.name = None self.error_behavior = WorkflowStep.RETRY + self.error_max_retry = 0 + self.error_retry_interval = 10 self.outcomes: List[StepOutcome] = [] self.children = [] self.inputs: List[IOMapping] = [] diff --git a/core/liteflow/core/services/execution_result_processor.py b/core/liteflow/core/services/execution_result_processor.py index 7abe53d..9798ba9 100644 --- a/core/liteflow/core/services/execution_result_processor.py +++ b/core/liteflow/core/services/execution_result_processor.py @@ -53,8 +53,11 @@ def handle_step_exception(self, workflow: WorkflowInstance, definition: Workflow pointer.status = ExecutionPointer.FAILED if step.error_behavior == WorkflowStep.RETRY: + # print("retry ::: {}".format(pointer.retry_count)) + if step.error_max_retry != 0 and pointer.retry_count >= step.error_max_retry: + workflow.status = WorkflowInstance.TERMINATED pointer.retry_count += 1 - pointer.sleep_until = datetime.utcnow() + timedelta(seconds=10) #TODO: make confiurable + pointer.sleep_until = datetime.utcnow() + timedelta(seconds=step.error_retry_interval) step.prime_for_retry(pointer) elif step.error_behavior == WorkflowStep.SUSPEND: workflow.status = WorkflowInstance.SUSPENDED diff --git a/core/liteflow/core/services/workflow_host.py b/core/liteflow/core/services/workflow_host.py index 38b250e..6071dc1 100644 --- a/core/liteflow/core/services/workflow_host.py +++ b/core/liteflow/core/services/workflow_host.py @@ -39,6 +39,10 @@ def start_workflow(self, workflow_id, version, data): return workflow_id + def stop_workflow(self, workflow_wid): + workflow = self._persistence_service.get_workflow_instance(workflow_wid) + workflow.status = WorkflowInstance.COMPLETE + def publish_event(self, event_name, event_key, event_data=object(), effective_date=None): self._logger.debug(f"Publishing event {event_name} {event_key}") diff --git a/samples/on_error.py b/samples/on_error.py index 1f95226..36fd7fc 100644 --- a/samples/on_error.py +++ b/samples/on_error.py @@ -34,7 +34,7 @@ def build(self, builder: WorkflowBuilder): builder\ .start_with(Hello)\ .then(Explode)\ - .on_error(WorkflowStep.RETRY)\ + .on_error(WorkflowStep.RETRY, error_max_retry=1, error_retry_interval=3)\ .then(Goodbye)