Skip to content

Commit fe2fa84

Browse files
Adding Target.VALIDATION to Workflows, including isTask bool (#930)
* initial cut for adding target validate * Add documentation and example migration * fix linting * bump to 4.0.0rc1 * bump to 4.0.0rc2 * refactor target.system * fix linting * Add VALIDATE to Workflow and is_task * fix linting * Update helper for backwards compatibility * Add is_task to proces from workflow table * revert and get db workflow from make_workflow * revert and get db workflow from make_workflow * revert and get db workflow from make_workflow * update readme' * Update after review * fix unit tests * update linting and docs * remove print * remove hidden navigation * fix linting --------- Co-authored-by: Igor van Spengen <[email protected]> Co-authored-by: Igor van Spengen <[email protected]>
1 parent dedb348 commit fe2fa84

27 files changed

+179
-60
lines changed

.bumpversion.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[bumpversion]
2-
current_version = 4.0.0rc1
2+
current_version = 4.0.0rc2
33
commit = False
44
tag = False
55
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(rc(?P<build>\d+))?

docs/architecture/application/workflow.md

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,14 +204,20 @@ Validate workflows run integrity checks on an existing subscription. Checking th
204204
```python
205205
params = dict(
206206
name="validate_node_enrollment",
207-
target="SYSTEM",
207+
target="VALIDATE",
208+
is_task=True,
208209
description="Validate Node Enrollment before production",
209210
tag="NodeEnrollment",
210211
search_phrase="Node Enrollment%",
211212
)
212213
```
213214

214-
It uses a `target` of `SYSTEM` - similar to how tasks are defined. That target is more of a free form sort of thing. Same thing with the `name` - that's the name of the actual workflow, and the `tag` is shared by of this set of workflows.
215+
It uses a `target` of `VALIDATE`. Unlike system tasks, which use the `target` of `SYSTEM` designation, validate
216+
workflows explicitly use `target="VALIDATE"` to distinguish themselves. This distinction reflects their different
217+
purposes.
218+
The `is_task` parameter is set to `True` to indicate that this workflow is a task. Tasks are workflows that are not
219+
directly associated with a subscription and are typically used for background processing or system maintenance.
220+
Both `SYSTEM` and `VALIDATE` workflows are considered tasks, but they serve different purposes.
215221

216222
Generally the steps raise assertions if a check fails, otherwise return OK to the state:
217223

docs/migration-guide/2.0.md

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,3 @@
1-
---
2-
hide:
3-
- navigation
4-
---
51
# 2.0 Migration Guide
62

73
In this document we'll help you migrate your orchestrator application from orchestrator-core 1.3 to 2.0.

docs/migration-guide/3.0.md

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,3 @@
1-
---
2-
hide:
3-
- navigation
4-
---
51
# 3.0 Migration Guide
62

73
In this document we describe the steps that should be taken to migrate from `orchestrator-core` v2 to v3.

docs/migration-guide/4.0.md

Lines changed: 59 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,69 @@
1-
---
2-
hide:
3-
- navigation
4-
---
51
# 4.0 Migration Guide
62

73
In this document we describe the steps that should be taken to migrate from `orchestrator-core` v3 to v4.
84

95
## About 4.0
106

7+
### Removed caching of domain models
8+
119
In this release we have removed the caching of domain models. Domain models will always be loaded from the database.
1210

11+
### Added Target.VALIDATE
12+
13+
In this release, a new workflow target, `VALIDATE`, has been added
14+
for [validation workflows](../architecture/application/workflow.md#validate).
15+
Previously, the `SYSTEM` target was used for validation workflows, which implied that they were expected to run in a
16+
system context. However, this was not appropriate for all validation workflows.
17+
To address this, the new `VALIDATE` target has been introduced specifically for validation workflows. The `SYSTEM`
18+
target is now reserved exclusively for system workflows.
19+
20+
The change of the `SYSTEM` target to `VALIDATE` is a breaking change, as it will break any workflows that are using the
21+
`SYSTEM` target for validation workflows. You will need to update your workflows to use the `VALIDATE` target instead.
22+
23+
In the Steps section below we describe how to update your workflows to use the new `VALIDATE` target.
24+
1325
## Steps
1426

15-
To use 4.0 all workflows must have run to completion. The `cache_domain_models` step no longer is part of the codebase
16-
therfore `in flight` workflows will fail.
27+
To use 4.0.0, all workflows must have run to completion. The `cache_domain_models` step no longer is part of the codebase
28+
therefore `in flight` workflows will fail.
29+
30+
After running the migration `(2025-05-08_cdf8758831d4_add_is_task_to_workflow.py)`, the workflow table should look like this:
31+
32+
```sql
33+
| workflow_id | name | target | is_task | description | created_at | deleted_at |
34+
|--------------------------------------|----------------------------|--------|---------|-------------------------------------------------------------------------|-----------------------------------|------------|
35+
| ded79954-f16e-422b-a204-7770a59757e8 | modify_note | MODIFY | FALSE | Modify Note | 2025-05-01 09:57:28.033504 +00:00 | <null> |
36+
| ca6a76ff-dd4e-4f66-9fb0-cee1878f0d20 | task_clean_up_tasks | SYSTEM | FALSE | Clean up old tasks | 2025-05-01 09:57:28.033504 +00:00 | <null> |
37+
| 40058c3d-0c95-47f4-a75f-93719299c5be | task_resume_workflows | SYSTEM | FALSE | Resume all workflows that are stuck on tasks with the status 'waiting' | 2025-05-01 09:57:28.033504 +00:00 | <null> |
38+
| 33b5520e-85d4-4ca1-8713-d26f7de5b7a5 | task_validate_products | SYSTEM | FALSE | Validate products | 2025-05-01 09:57:28.033504 +00:00 | <null> |
39+
| 94d4889e-6bb6-4724-a9d2-f21696fe6f43 | task_validate_product_type | SYSTEM | FALSE | Validate all subscriptions of Product Type | 2025-05-01 09:57:28.033504 +00:00 | <null> |
40+
| 0c4f3b8d-2a1e-4b5f-9a7c-6d8e0f1b2c3d | validate_some_thing | SYSTEM | FALSE | Validate The thing | 2025-05-01 09:57:28.033504 +00:00 | <null> |
41+
| f4b0a2c1-5d3e-4c8f-9b6d-7a2e5f3b8c4e | validate_another_thing | SYSTEM | FALSE | Validate Another thing | 2025-05-01 09:57:28.033504 +00:00 | <null> |
42+
```
43+
44+
The `target` in this table is no longer valid for the `validate_some_thing` and `validate_another_thing`
45+
workflows. You will need to update the `target` to `VALIDATE` for these workflows. You will also need to update the
46+
is_task column to `TRUE` for all targets that are `SYSTEM` or `VALIDATE`. This is because the `is_task` column is used to
47+
determine if a workflow is a task or not. If the `is_task` column is set to `FALSE`, the workflow will not be run as a task.
48+
Tasks are `SYSTEM` or `VALIDATE` workflows that are run in the context of a system.
49+
50+
Example on how to update the `target` and `is_task` for all workflows that start with `validate_`:
51+
52+
```sql
53+
UPDATE workflow
54+
SET target = 'VALIDATE', is_task = TRUE
55+
WHERE name LIKE 'validate_%';
56+
```
57+
58+
Example on how to update the `target` and `is_task` for all workflows that are `SYSTEM` or `VALIDATE`:
59+
60+
```sql
61+
UPDATE workflow
62+
SET is_task = TRUE
63+
WHERE target IN ('SYSTEM', 'VALIDATE');
64+
```
65+
66+
This will update the `target` for all workflows that are `SYSTEM` or `VALIDATE` and set the `is_task` column to `TRUE`.
67+
68+
This is a breaking change, so you will need to test your workflows after making this change to ensure that they are
69+
working as expected.

orchestrator/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
"""This is the orchestrator workflow engine."""
1515

16-
__version__ = "4.0.0rc1"
16+
__version__ = "4.0.0rc2"
1717

1818
from orchestrator.app import OrchestratorCore
1919
from orchestrator.settings import app_settings

orchestrator/cli/generator/templates/new_product_migration.j2

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,24 +63,28 @@ new_workflows = [
6363
{
6464
"name": "create_{{ product.variable }}",
6565
"target": Target.CREATE,
66+
"is_task": False,
6667
"description": "Create {{ product.name }}",
6768
"product_type": "{{ product.type }}",
6869
},
6970
{
7071
"name": "modify_{{ product.variable }}",
7172
"target": Target.MODIFY,
73+
"is_task": False,
7274
"description": "Modify {{ product.name }}",
7375
"product_type": "{{ product.type }}",
7476
},
7577
{
7678
"name": "terminate_{{ product.variable }}",
7779
"target": Target.TERMINATE,
80+
"is_task": False,
7881
"description": "Terminate {{ product.name }}",
7982
"product_type": "{{ product.type }}",
8083
},
8184
{
8285
"name": "validate_{{ product.variable }}",
83-
"target": Target.SYSTEM,
86+
"target": Target.VALIDATE,
87+
"is_task": True,
8488
"description": "Validate {{ product.name }}",
8589
"product_type": "{{ product.type }}",
8690
},

orchestrator/cli/migrate_tasks.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -150,22 +150,22 @@ def create_tasks_migration_wizard() -> tuple[list[dict], list[dict]]:
150150
- list of task items to add in the migration
151151
- list of task items to delete in the migration
152152
"""
153-
database_tasks = {
154-
task.name: task for task in list(db.session.scalars(select(WorkflowTable))) if task.target == Target.SYSTEM
155-
}
153+
database_tasks = {task.name: task for task in list(db.session.scalars(select(WorkflowTable))) if task.is_task}
156154
registered_wf_instances = {
157155
task: cast(Workflow, get_workflow(task)) for task in orchestrator.workflows.ALL_WORKFLOWS.keys()
158156
}
159157

158+
is_task = [Target.SYSTEM, Target.VALIDATE]
159+
160160
registered_tasks = dict(
161161
filter(
162-
lambda task: task[1].target == Target.SYSTEM and task[0] in database_tasks.keys(),
162+
lambda task: task[1].target in is_task and task[0] in database_tasks.keys(),
163163
registered_wf_instances.items(),
164164
)
165165
)
166166
available_tasks = dict(
167167
filter(
168-
lambda task: task[1].target == Target.SYSTEM and task[0] not in database_tasks.keys(),
168+
lambda task: task[1].target in is_task and task[0] not in database_tasks.keys(),
169169
registered_wf_instances.items(),
170170
)
171171
)

orchestrator/cli/migrate_workflows.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
from orchestrator.cli.helpers.input_helpers import _enumerate_menu_keys, _prompt_user_menu, get_user_input
1212
from orchestrator.cli.helpers.print_helpers import COLOR, noqa_print, print_fmt, str_fmt
1313
from orchestrator.db import ProductTable, WorkflowTable, db
14-
from orchestrator.targets import Target
1514
from orchestrator.workflows import LazyWorkflowInstance, get_workflow
1615

1716
# Workflows are registered via migrations with product type. For every product with the given product_type, there will be an entry in products_workflows.
@@ -183,7 +182,7 @@ def create_workflows_migration_wizard() -> tuple[list[dict], list[dict]]:
183182
"""
184183
database_workflows = list(db.session.scalars(select(WorkflowTable)))
185184
registered_workflows = orchestrator.workflows.ALL_WORKFLOWS
186-
system_workflow_names = {wf.name for wf in database_workflows if wf.target == Target.SYSTEM}
185+
system_workflow_names = {wf.name for wf in database_workflows if wf.is_task}
187186
registered_non_system_workflows = {k: v for k, v in registered_workflows.items() if k not in system_workflow_names}
188187

189188
unregistered_workflows = [wf for wf in database_workflows if wf.name not in registered_workflows.keys()]

orchestrator/db/models.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,8 @@ class WorkflowTable(BaseModel):
412412
)
413413
processes = relationship("ProcessTable", cascade="all, delete-orphan", back_populates="workflow")
414414

415+
is_task = mapped_column(Boolean, nullable=False, server_default=text("false"))
416+
415417
@staticmethod
416418
def select() -> Select:
417419
return (

orchestrator/migrations/helpers.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -135,18 +135,21 @@ def create_workflow(conn: sa.engine.Connection, workflow: dict) -> None:
135135
>>> workflow = {
136136
"name": "workflow_name",
137137
"target": "SYSTEM",
138+
"is_task": False,
138139
"description": "workflow description",
139140
"product_type": "product_type",
140141
}
141142
>>> create_workflow(conn, workflow)
142143
"""
144+
if not workflow.get("is_task", False):
145+
workflow["is_task"] = False
143146

144147
conn.execute(
145148
sa.text(
146149
"""
147150
WITH new_workflow AS (
148-
INSERT INTO workflows(name, target, description)
149-
VALUES (:name, :target, :description)
151+
INSERT INTO workflows(name, target, is_task, description)
152+
VALUES (:name, :target, :is_task, :description)
150153
ON CONFLICT DO NOTHING
151154
RETURNING workflow_id)
152155
INSERT
@@ -184,8 +187,8 @@ def create_task(conn: sa.engine.Connection, task: dict) -> None:
184187
conn.execute(
185188
sa.text(
186189
"""
187-
INSERT INTO workflows(name, target, description)
188-
VALUES (:name, 'SYSTEM', :description)
190+
INSERT INTO workflows(name, target, is_task, description)
191+
VALUES (:name, 'SYSTEM', TRUE, :description)
189192
ON CONFLICT DO NOTHING
190193
RETURNING workflow_id
191194
"""
@@ -206,6 +209,7 @@ def create_workflows(conn: sa.engine.Connection, new: dict) -> None:
206209
"workflow_name": {
207210
"workflow_id": "f2702074-3203-454c-b298-6dfa7675423d",
208211
"target": "CREATE",
212+
"is_task": False,
209213
"description": "Workflow description",
210214
"tag": "ProductBlockName1",
211215
"search_phrase": "Search Phrase%",
@@ -214,12 +218,16 @@ def create_workflows(conn: sa.engine.Connection, new: dict) -> None:
214218
"""
215219
for name, workflow in new.items():
216220
workflow["name"] = name
221+
222+
if not workflow.get("is_task", False):
223+
workflow["is_task"] = False
224+
217225
conn.execute(
218226
sa.text(
219227
"""
220228
WITH new_workflow AS (
221-
INSERT INTO workflows(workflow_id, name, target, description)
222-
VALUES (:workflow_id, :name, :target, :description)
229+
INSERT INTO workflows(workflow_id, name, target, is_task, description)
230+
VALUES (:workflow_id, :name, :target, :is_task, :description)
223231
RETURNING workflow_id)
224232
INSERT
225233
INTO products_workflows (product_id, workflow_id)

orchestrator/migrations/versions/schema/2025-02-20_68d14db1b8da_make_workflow_description_mandatory.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
# revision identifiers, used by Alembic.
1616
revision = "68d14db1b8da"
17-
down_revision = "bac6be6f2b4f"
17+
down_revision = "fc5c993a4b4a"
1818
branch_labels = None
1919
depends_on = None
2020

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
"""Add is_task to workflow.
2+
3+
Revision ID: 161918133bec
4+
Revises: 68d14db1b8da
5+
Create Date: 2025-05-08 11:25:51.966410
6+
7+
"""
8+
9+
import sqlalchemy as sa
10+
from alembic import op
11+
12+
# revision identifiers, used by Alembic.
13+
revision = "161918133bec"
14+
down_revision = "68d14db1b8da"
15+
branch_labels = None
16+
depends_on = None
17+
18+
19+
def upgrade() -> None:
20+
# ### commands auto generated by Alembic - please adjust! ###
21+
op.add_column("workflows", sa.Column("is_task", sa.Boolean(), server_default=sa.text("false"), nullable=False))
22+
# ### end Alembic commands ###
23+
24+
25+
def downgrade() -> None:
26+
# ### commands auto generated by Alembic - please adjust! ###
27+
op.drop_column("workflows", "is_task")
28+
# ### end Alembic commands ###

orchestrator/schedules/validate_subscriptions.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
get_subscriptions_on_product_table_in_sync,
2323
)
2424
from orchestrator.services.workflows import (
25-
get_system_product_workflows_for_subscription,
25+
get_validation_product_workflows_for_subscription,
2626
start_validation_workflow_for_workflows,
2727
)
2828
from orchestrator.settings import app_settings
@@ -42,14 +42,14 @@ def validate_subscriptions() -> None:
4242
subscriptions = get_subscriptions_on_product_table_in_sync()
4343

4444
for subscription in subscriptions:
45-
system_product_workflows = get_system_product_workflows_for_subscription(subscription)
45+
validation_product_workflows = get_validation_product_workflows_for_subscription(subscription)
4646

47-
if not system_product_workflows:
47+
if not validation_product_workflows:
4848
logger.warning(
4949
"SubscriptionTable has no validation workflow",
5050
subscription=subscription,
5151
product=subscription.product.name,
5252
)
5353
break
5454

55-
start_validation_workflow_for_workflows(subscription=subscription, workflows=system_product_workflows)
55+
start_validation_workflow_for_workflows(subscription=subscription, workflows=validation_product_workflows)

orchestrator/schemas/workflow.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
class WorkflowBaseSchema(OrchestratorBaseModel):
2525
name: str
2626
target: Target
27+
is_task: bool = False
2728
description: str | None = None
2829
created_at: datetime | None = None
2930

orchestrator/services/celery.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
from orchestrator.db import ProcessTable, db
2525
from orchestrator.services.input_state import store_input_state
2626
from orchestrator.services.processes import create_process, delete_process
27-
from orchestrator.targets import Target
27+
from orchestrator.services.workflows import get_workflow_by_name
2828
from orchestrator.workflows import get_workflow
2929
from pydantic_forms.types import State
3030

@@ -51,7 +51,11 @@ def _celery_start_process(
5151
if not workflow:
5252
raise_status(HTTPStatus.NOT_FOUND, "Workflow does not exist")
5353

54-
task_name = NEW_TASK if workflow.target == Target.SYSTEM else NEW_WORKFLOW
54+
wf_table = get_workflow_by_name(workflow.name)
55+
if not wf_table:
56+
raise_status(HTTPStatus.NOT_FOUND, "Workflow in Database does not exist")
57+
58+
task_name = NEW_TASK if wf_table.is_task else NEW_WORKFLOW
5559
trigger_task = get_celery_task(task_name)
5660
pstat = create_process(workflow_key, user_inputs, user)
5761
try:
@@ -80,7 +84,11 @@ def _celery_resume_process(
8084
last_process_status = process.last_status
8185
workflow = pstat.workflow
8286

83-
task_name = RESUME_TASK if workflow.target == Target.SYSTEM else RESUME_WORKFLOW
87+
wf_table = get_workflow_by_name(workflow.name)
88+
if not workflow or not wf_table:
89+
raise_status(HTTPStatus.NOT_FOUND, "Workflow does not exist")
90+
91+
task_name = RESUME_TASK if wf_table.is_task else RESUME_WORKFLOW
8492
trigger_task = get_celery_task(task_name)
8593

8694
user_inputs = user_inputs or [{}]

0 commit comments

Comments
 (0)