From edac3dcbb7dace08e3f6ddfda61ef1c2488d6cbe Mon Sep 17 00:00:00 2001 From: mingxing Date: Thu, 12 Aug 2021 19:10:04 +0800 Subject: [PATCH 1/3] add stop control to the workflow_host --- core/liteflow/core/abstractions/workflow_host.py | 3 +++ core/liteflow/core/services/workflow_host.py | 4 ++++ 2 files changed, 7 insertions(+) diff --git a/core/liteflow/core/abstractions/workflow_host.py b/core/liteflow/core/abstractions/workflow_host.py index 783c04f..83e742e 100644 --- a/core/liteflow/core/abstractions/workflow_host.py +++ b/core/liteflow/core/abstractions/workflow_host.py @@ -10,6 +10,9 @@ def register_workflow(self, workflow: Workflow): def start_workflow(self, workflow_id, version, data): pass + def stop_workflow(self, workflow_wid): + pass + def publish_event(self, event_name, event_key, event_data=object(), effective_date=None): pass diff --git a/core/liteflow/core/services/workflow_host.py b/core/liteflow/core/services/workflow_host.py index 38b250e..6071dc1 100644 --- a/core/liteflow/core/services/workflow_host.py +++ b/core/liteflow/core/services/workflow_host.py @@ -39,6 +39,10 @@ def start_workflow(self, workflow_id, version, data): return workflow_id + def stop_workflow(self, workflow_wid): + workflow = self._persistence_service.get_workflow_instance(workflow_wid) + workflow.status = WorkflowInstance.COMPLETE + def publish_event(self, event_name, event_key, event_data=object(), effective_date=None): self._logger.debug(f"Publishing event {event_name} {event_key}") From f99e796b221bcbac3e4caaeb451bfa36ba188900 Mon Sep 17 00:00:00 2001 From: mingxing Date: Fri, 13 Aug 2021 19:31:16 +0800 Subject: [PATCH 2/3] add max retry and retry interval when setup on_error --- core/liteflow/core/builders/step_builder.py | 4 +++- core/liteflow/core/models/workflow_step.py | 2 ++ core/liteflow/core/services/execution_result_processor.py | 4 +++- samples/on_error.py | 2 +- 4 files changed, 9 insertions(+), 3 deletions(-) diff --git a/core/liteflow/core/builders/step_builder.py b/core/liteflow/core/builders/step_builder.py index c6817a5..e03518c 100644 --- a/core/liteflow/core/builders/step_builder.py +++ b/core/liteflow/core/builders/step_builder.py @@ -89,8 +89,10 @@ def for_each(self, collection: Callable[[Any, StepExecutionContext], List]): return step_builder - def on_error(self, error_behavior): + def on_error(self, error_behavior, error_max_retry=0, error_retry_interval=10): self.step.error_behavior = error_behavior + self.step.error_max_retry = error_max_retry + self.step.error_retry_interval = error_retry_interval return self def do(self, builder: Callable): diff --git a/core/liteflow/core/models/workflow_step.py b/core/liteflow/core/models/workflow_step.py index 0a759ea..200b459 100644 --- a/core/liteflow/core/models/workflow_step.py +++ b/core/liteflow/core/models/workflow_step.py @@ -17,6 +17,8 @@ def __init__(self, body: T): self.body = body self.name = None self.error_behavior = WorkflowStep.RETRY + self.error_max_retry = 0 + self.error_retry_interval = 10 self.outcomes: List[StepOutcome] = [] self.children = [] self.inputs: List[IOMapping] = [] diff --git a/core/liteflow/core/services/execution_result_processor.py b/core/liteflow/core/services/execution_result_processor.py index 7abe53d..886cb34 100644 --- a/core/liteflow/core/services/execution_result_processor.py +++ b/core/liteflow/core/services/execution_result_processor.py @@ -54,7 +54,9 @@ def handle_step_exception(self, workflow: WorkflowInstance, definition: Workflow if step.error_behavior == WorkflowStep.RETRY: pointer.retry_count += 1 - pointer.sleep_until = datetime.utcnow() + timedelta(seconds=10) #TODO: make confiurable + if step.error_max_retry != 0 and pointer.retry_count >= step.error_max_retry: + workflow.status = WorkflowInstance.TERMINATED + pointer.sleep_until = datetime.utcnow() + timedelta(seconds=step.error_retry_interval) step.prime_for_retry(pointer) elif step.error_behavior == WorkflowStep.SUSPEND: workflow.status = WorkflowInstance.SUSPENDED diff --git a/samples/on_error.py b/samples/on_error.py index 1f95226..36fd7fc 100644 --- a/samples/on_error.py +++ b/samples/on_error.py @@ -34,7 +34,7 @@ def build(self, builder: WorkflowBuilder): builder\ .start_with(Hello)\ .then(Explode)\ - .on_error(WorkflowStep.RETRY)\ + .on_error(WorkflowStep.RETRY, error_max_retry=1, error_retry_interval=3)\ .then(Goodbye) From 021dc0423be2a4f7999fe1830fb655ce5db095e4 Mon Sep 17 00:00:00 2001 From: mingxing Date: Fri, 13 Aug 2021 20:20:25 +0800 Subject: [PATCH 3/3] fix retry times error --- core/liteflow/core/services/execution_result_processor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/liteflow/core/services/execution_result_processor.py b/core/liteflow/core/services/execution_result_processor.py index 886cb34..9798ba9 100644 --- a/core/liteflow/core/services/execution_result_processor.py +++ b/core/liteflow/core/services/execution_result_processor.py @@ -53,9 +53,10 @@ def handle_step_exception(self, workflow: WorkflowInstance, definition: Workflow pointer.status = ExecutionPointer.FAILED if step.error_behavior == WorkflowStep.RETRY: - pointer.retry_count += 1 + # print("retry ::: {}".format(pointer.retry_count)) if step.error_max_retry != 0 and pointer.retry_count >= step.error_max_retry: workflow.status = WorkflowInstance.TERMINATED + pointer.retry_count += 1 pointer.sleep_until = datetime.utcnow() + timedelta(seconds=step.error_retry_interval) step.prime_for_retry(pointer) elif step.error_behavior == WorkflowStep.SUSPEND: