diff --git a/docs/architecture/application/workflow.md b/docs/architecture/application/workflow.md index ef018a19e..a4b6c1ebe 100644 --- a/docs/architecture/application/workflow.md +++ b/docs/architecture/application/workflow.md @@ -1,4 +1,5 @@ # What is a workflow and how does it work? + The workflow engine is the core of the software, it has been created to execute a number of functions. - Safely and reliable manipulate customer `Subscriptions` from one state to the next and maintain auditability. @@ -6,130 +7,128 @@ The workflow engine is the core of the software, it has been created to execute - Execute step functions in order and allow the retry of previously failed process-steps in an idempotent way. - Atomically execute workflow functions. -## Create +## Best Practices -The "base" workflow out of a set is the `CREATE` workflow. That will create a subscription and all of the associated workflows "nest" under that. +The orchestrator will always attempt to be as robust as possible when executing workflow steps. +However, it is always up to the developer to implement the best practices as well as they can. -### Create migration +### Safeguards in the orchestrator; -The migration needs to define a specific set of parameters: +- **Atomic Step Execution**: Each step is treated as an atomic unit. + If a step fails, no partial changes are committed to the database. + Because of this, calling .commit() on the ORM within a step function is not allowed. +- **`insync` Subscription Requirement**: By default, workflows can only run on subscriptions that are marked as `insync`, unless explicitly configured otherwise. + This prevents multiple workflows from manipulating the same subscription concurrently. + One of the first actions a workflow should perform is to mark the subscription as `out of sync` to avoid conflicts. +- **Step Retry Behavior**: Failed steps can be retried indefinitely. Each retry starts from the state of the **last successfully completed** step. -```python -params_create = dict( - name="create_node_enrollment", - target="CREATE", - description="Create Node Enrollment Service", - tag="NodeEnrollment", - search_phrase="Node Enrollment%", -) -``` -The `name` is the actual name of the workflow as defined in the workflow code itself: +### Coding gotchas -```python -@create_workflow( - "Create Node Enrollment", - initial_input_form=initial_input_form_generator, - status=SubscriptionLifecycle.PROVISIONING -) -def create_node_enrollment() -> StepList: - return ( - begin - >> construct_node_enrollment_model - >> store_process_subscription() - ... - ... - ... -``` +- The orchestrator is best suited to be used as a data manipulator, not as a data transporter. + - Use the State log as a log of work, not a log of data. + - If the data you enter in the state is corrupt or wrong, you might need a difficult database query to update the state to resolve the conflict. +- Always retrieve external data at the moment it's needed during a step, not earlier. + This increases the robustness of the step. +- Each step function should perform a single, clearly defined unit of work. + Theoretically you can execute the whole workflow in a single step, However this does not help with traceability and reliability. -The `target` is `CREATE`, `description` is a human readable label and the `tag` is a specific string that will be used in all of the associated workflows. -### Create flow +## Workflows -Generally the initial step will be the form generator function to display information and gather user input. The first actual step (`construct_node_enrollment_model` here) is generally one that takes data gathered in the form input step (and any data gathered from external systems, etc) and constructs the populated domain model. +> [explanation to create a workflow in code](../../getting-started/workflows.md) -Note that at this point the subscription is created with a lifecycle state of `INITIAL`. +Workflows are composed of one or more **steps**, each representing a discrete unit of work in the subscription management process. +Steps are executed sequentially by the workflow engine and are the fundamental building blocks of workflows. -The domain model is then returned as part of the `subscription` object along with any other data downstream steps might want: +There are two high-level kinds of workflows: -```python -@step("Construct Node Enrollment model") -def construct_node_enrollment_model( - product: UUIDstr, customer_id: UUIDstr, esdb_node_id: int, select_node: str, url: str, uuid: str, role_id: str -) -> State: - subscription = NodeEnrollmentInactive.from_product_id( - product_id=product, customer_id=customer_id, status=SubscriptionLifecycle.INITIAL - ) - - subscription.ne.esdb_node_id = esdb_node_id - subscription.description = f"Node {select_node} Initial Subscription" - subscription.ne.esdb_node_uuid = uuid - subscription.ne.nso_service_id = uuid4() - subscription.ne.routing_domain = "esnet-293" - - role = map_role(role_id, select_node) - site_id = select_node.split("-")[0] # location short name - - return { - "subscription": subscription, - "subscription_id": subscription.subscription_id, - "subscription_description": subscription.description, - "role": role, - "site_id": site_id - } -``` +- workflows + - Defined for specific products. + - Perform operations like creating, modifying, or terminating subscriptions. +- tasks + - Not tied to a specific product and may not involve a subscription at all. + - Can be scheduled to run periodically or triggered manually. + - Useful for actions like cleanup jobs or triggering validations across multiple subscriptions. + - Examples can be found in `orchestrator.workflows.tasks`. -After that the the subscription is created and registered with the orchestrator: +Workflows and tasks need to be registered in the database and initialized as a `LazyWorkflowInstance` to work, see [registering workflows] for more info. -```python - >> store_process_subscription() -``` +### Subscription Workflow Types -The subsequent steps are the actual logic being executed by the workflow. It's a best practice to have each step execute one discrete operation so in case a step fails it can be restarted. To wit if a step contained: +Workflows are categorized based on the operations they perform on a subscription: -```python -@step("Do things rather than thing") -def do_things(subscription: NodeEnrollmentProvisioning): +- Create ([create_workflow]) + - The "base" workflow that initializes a new subscription for the product. + - Only one create workflow should exist per product. +- Modify ([modify_workflow]) + - Modify an existing subscription (e.g., updating parameters, migrating to another product). + - Multiple modify workflows can exist, each handling a specific type of modification. +- Terminate ([terminate_workflow]) + - Terminates the subscription and removes its data and references from external systems. + - External references should only be retained if they also hold historical records. + - Only one terminate workflow should exist per product. +- Validate ([validate_workflow]) + - Verifies that external systems are consistent with the orchestrator's subscription state. + - Only one validate workflow should exist per product. - do_x() - do_y() +### Default Workflows - do_z() +Registering a _Default Workflow_ attaches a given workflow to all Products. +To ensure this, modify the `DEFAULT_PRODUCT_WORKFLOWS` environment variable and add the workflow the database with a migration. - return {"subscription": subscription} -``` +By default, `DEFAULT_PRODUCT_WORKFLOWS` is set to `['modify_note']`. -And `do_z()` fails, restarting the workflow will execute the first two steps again and that might cause problems. +More about registering workflows can be found [here][registering-workflows] -The final step will make any final changes to the subscription information and change the state of the subscription to (usually) `PROVISIONING` or `ACTIVE`: -```python -@step("Update subscription name with node info") -def update_subscription_name_and_description(subscription: NodeEnrollmentProvisioning, select_node: str) -> State: - subscription = change_lifecycle(subscription, SubscriptionLifecycle.PROVISIONING) - subscription.description = f"Node {select_node} Provisioned (without system service)" +## Workflow Steps - return {"subscription": subscription} -``` +Workflows are composed of one or more **steps**, where each step is executed sequentially by the workflow engine and are the fundamental building blocks of workflows. + +### Step Characteristics -No other magic really, when this step completes successfully the workflow is done and the active subscription will show up in the orchestrator UI. +- **Atomicity**: Each step is atomic, either it fully completes or has no effect. This ensures data consistency and reliable state transitions. +- **Idempotency**: Steps should be designed to be safely repeatable without causing unintended side effects. +- **Traceability**: By breaking workflows into fine-grained steps, the orchestrator maintains clear audit trails and simplifies error handling and retries. -## Associated workflows +### Types of Steps -Now with an active subscription, the associated workflows (modify, validate, terminate, etc) "nest" under the active subscription in the UI. When they are executed they are run "on" the subscription they are associated with. +The orchestrator supports several kinds of steps to cover different use cases: -Like the `CREATE` workflow they *can* have an initial form generator step but they don't necessarily need one. For example a validate workflow probably would not need any additional input since it's just running checks on an existing subscription. +- **`step`** [functional docs for step] + Executes specific business logic or external API calls as part of the subscription process. + +- **`retrystep`** [functional docs for retrystep] + Similar to `step`, but designed for operations that may fail intermittently. These steps will automatically be retried periodically on failure. + +- **`inputstep`** [functional docs for inputstep] + Pauses the workflow to request and receive user input during execution. + +- **`conditional`** [functional docs for conditional] + Conditionally executes the step based on environment variables or process state. + If the condition evaluates to false, the step is skipped entirely. + +- **`callback_step`** [functional docs for callback_step] + Pauses workflow execution while waiting for a external event to complete. + +For a practical example of how to define reusable workflow steps—and how to leverage singledispatch for type-specific logic—see: +👉 [Reusable step functions and singledispatch usage] -These workflows have more in common with each other than not, it's mostly a matter of how they are registered with the system. ### Execution parameters -There are a few parameters to finetune workflow execution constraints. The recommended place to alter them is from the workflows module, i.e. in `workflows/__init__.py`. Refer to the examples below. +You can fine-tune workflow execution behavior using a set of configuration parameters. +The recommended location to define or override these is in `workflows/__init__.py`. +Below are examples of key configuration options: -1. `WF_USABLE_MAP`: configure subscription lifecycles on which a workflow is usable +1. `WF_USABLE_MAP`: Define usable subscription lifecycles for workflows. -By default, the associated workflow can only be run on a subscription with a lifecycle state set to `ACTIVE`. This behavior can be changed in the `WF_USABLE_MAP` data structure: +By default, the associated workflow can only be run on a subscription with a lifecycle state set to `ACTIVE`. +This behavior can be changed in the `WF_USABLE_MAP` data structure: + +> note: Terminate workflows are by default, allowed to run on subscriptions in any lifecycle state unless explicitly restricted in this map. ```python from orchestrator.services.subscriptions import WF_USABLE_MAP @@ -143,11 +142,12 @@ WF_USABLE_MAP.update( ) ``` -Now validate and provision can be run on subscriptions in either `ACTIVE` or `PROVISIONING` states and modify can *only* be run on subscriptions in the `PROVISIONING` state. The exception is terminate, those workflows can be run on subscriptions in any state unless constrained here. +Now validate and provision can be run on subscriptions in either `ACTIVE` or `PROVISIONING` states and modify can *only* be run on subscriptions in the `PROVISIONING` state. -2. `WF_BLOCKED_BY_IN_USE_BY_SUBSCRIPTIONS`: block modify workflows on subscriptions with unterminated `in_use_by` subscriptions +2. `WF_BLOCKED_BY_IN_USE_BY_SUBSCRIPTIONS`: Block modify workflows on subscriptions with unterminated `in_use_by` subscriptions -By default, only terminate workflows are prohibited from running on subscriptions with unterminated `in_use_by` subscriptions. This behavior can be changed in the `WF_BLOCKED_BY_IN_USE_BY_SUBSCRIPTIONS` data structure: +By default, only terminate workflows are prohibited from running on subscriptions with unterminated `in_use_by` subscriptions. +This behavior can be changed in the `WF_BLOCKED_BY_IN_USE_BY_SUBSCRIPTIONS` data structure: ```python from orchestrator.services.subscriptions import WF_BLOCKED_BY_IN_USE_BY_SUBSCRIPTIONS @@ -161,9 +161,10 @@ WF_BLOCKED_BY_IN_USE_BY_SUBSCRIPTIONS.update( With this configuration, both terminate and modify will not run on subscriptions with unterminated `in_use_by` subscriptions. -3. `WF_USABLE_WHILE_OUT_OF_SYNC`: allow specific workflows on out of sync subscriptions +3. `WF_USABLE_WHILE_OUT_OF_SYNC`: Allow specific workflows on out of sync subscriptions -By default, only system workflows (tasks) are allowed to run on subscriptions that are not in sync. This behavior can be changed with the `WF_USABLE_WHILE_OUT_OF_SYNC` data structure: +By default, only system workflows (tasks) are allowed to run on subscriptions that are not in sync. +This behavior can be changed with the `WF_USABLE_WHILE_OUT_OF_SYNC` data structure: ```python from orchestrator.services.subscriptions import WF_USABLE_WHILE_OUT_OF_SYNC @@ -181,115 +182,15 @@ Now this particular modify workflow can be run on subscriptions that are not in It is potentially dangerous to run workflows on subscriptions that are not in sync. Only use this for small and specific usecases, such as editing a description that is only used within orchestrator. -#### Initial state - -The first step of any of these associated workflows will be to fetch the subscription from the orchestrator: - -```python -@step("Load initial state") -def load_initial_state(subscription_id: UUIDstr) -> State: - subscription = NodeEnrollment.from_subscription(subscription_id) - - return { - "subscription": subscription, - } -``` - -The `subscription_id` is automatically passed in. - -### Validate - -Validate workflows run integrity checks on an existing subscription. Checking the state of associated data in an external system for example. The validate migration parameters look something like this: - -```python - params = dict( - name="validate_node_enrollment", - target="VALIDATE", - is_task=True, - description="Validate Node Enrollment before production", - tag="NodeEnrollment", - search_phrase="Node Enrollment%", - ) -``` - -It uses a `target` of `VALIDATE`. Unlike system tasks, which use the `target` of `SYSTEM` designation, validate -workflows explicitly use `target="VALIDATE"` to distinguish themselves. This distinction reflects their different -purposes. -The `is_task` parameter is set to `True` to indicate that this workflow is a task. Tasks are workflows that are not -directly associated with a subscription and are typically used for background processing or system maintenance. -Both `SYSTEM` and `VALIDATE` workflows are considered tasks, but they serve different purposes. - -Generally the steps raise assertions if a check fails, otherwise return OK to the state: - -```python -@step("Check NSO") -def check_nso(subscription: NodeEnrollment, node_name: str) -> State: - device = get_device(device_name=node_name) - - if device is None: - raise AssertionError(f"Device not found in NSO") - return {"check_nso": "OK"} -``` - -## Modify - -Very similar to validate but the migration params vary as one would expect with a different `target`: - -```python - params_modify = dict( - name="modify_node_enrollment", - target="MODIFY", - description="Modify Node Enrollment", - tag="NodeEnrollment", - search_phrase="Node Enrollment%" -) -``` - -It would make any desired changes to the existing subscription and if need by, change the lifecycle state at the end. For example, for our `CREATE` that put the initial sub into the state `PROVISIONING`, a secondary modify workflow will put it into production and then set the state to `ACTIVE` at the end: - -```python -@step("Activate Subscription") -def update_subscription_and_description(subscription: NodeEnrollmentProvisioning, node_name: str) -> State: - subscription = change_lifecycle(subscription, SubscriptionLifecycle.ACTIVE) - subscription.description = f"Node {node_name} Production" - - return {"subscription": subscription} -``` - -These also have the subscription id passed in in the initial step as outlined above. - -## Terminate - -Terminates a workflow and undoes changes that were made. - -The migration params are as one would suspect: - -```python - params = dict( - name="terminate_node_enrollment", - target="TERMINATE", - description="Terminate Node Enrollment subscription", - tag="NodeEnrollment", - search_phrase="Node Enrollment%", - ) -``` - -`target` is `TERMINATE`, `name` and `tag` are as you would expect. - -The first step of these workflow are slightly different as it pulls in the `State` object rather than just the subscription id: - -```python -@step("Load relevant subscription information") -def load_subscription_info(state: State) -> FormGenerator: - subscription = state["subscription"] - node = get_detailed_node(subscription["ne"]["esdb_node_id"]) - return {"subscription": subscription, "node_name": node.get("name")} -``` - -## Default Workflows - -A Default Workflows mechanism is provided to provide a way for a given workflow to be automatically attached to all Products. To ensure this, modify the `DEFAULT_PRODUCT_WORKFLOWS` environment variable, and be sure to use `helpers.create()` in your migration. - -Alternatively, be sure to execute `ensure_default_workflows()` within the migration if using `helpers.create()` is not desirable. - -By default, `DEFAULT_PRODUCT_WORKFLOWS` is set to `['modify_note']`. +[registering workflows]: ../../getting-started/workflows.md#register-workflows +[create_workflow]: ../reference-docs/workflows/workflows.md#orchestrator.workflows.utils.create_workflow +[modify_workflow]: ../reference-docs/workflows/workflows.md#orchestrator.workflows.utils.modify_workflow +[terminate_workflow]: ../reference-docs/workflows/workflows.md#orchestrator.workflows.utils.terminate_workflow +[validate_workflow]: ../reference-docs/workflows/workflows.md#orchestrator.workflows.utils.validate_workflow +[functional docs for step]: ../../reference-docs/workflows/workflow-steps.md#orchestrator.workflow.step +[functional docs for retrystep]: ../../reference-docs/workflows/workflow-steps.md#orchestrator.workflow.retrystep +[functional docs for inputstep]: ../../reference-docs/workflows/workflow-steps.md#orchestrator.workflow.inputstep +[functional docs for conditional]: ../../reference-docs/workflows/workflow-steps.md#orchestrator.workflow.conditional +[functional docs for callback_step]: ../../reference-docs/workflows/callbacks.md +[Reusable step functions and singledispatch usage]: ../../reference-docs/workflows/workflow-steps.md#reusable-workflow-steps-in-orchestrator-core +[registering-workflows]: ../../getting-started/workflows#register-workflows diff --git a/docs/getting-started/base.md b/docs/getting-started/base.md index bd9c90682..c8b2189d8 100644 --- a/docs/getting-started/base.md +++ b/docs/getting-started/base.md @@ -97,3 +97,10 @@ uvicorn --reload --host 127.0.0.1 --port 8080 main:app ### Step 6 - Profit :boom: :grin: Visit the [ReDoc](http://127.0.0.1:8080/api/redoc) or [OpenAPI](http://127.0.0.1:8080/api/docs) to view and interact with the API. + + +### Next: + +- [Create a product.](../workshops/advanced/domain-models.md) +- [Create a workflow for a product.](./workflows.md) +- [Generate products and workflows](../reference-docs/cli.md#generate) diff --git a/docs/getting-started/prepare-source-folder.md b/docs/getting-started/prepare-source-folder.md index e9890e69d..e05d6cce0 100644 --- a/docs/getting-started/prepare-source-folder.md +++ b/docs/getting-started/prepare-source-folder.md @@ -17,6 +17,8 @@ follows: └── workflows ``` +- [skip to creating a workflow](./workflows.md) + Some of the orchestrator-core functionality relies on the source folder being a valid Git repository. diff --git a/docs/getting-started/workflows.md b/docs/getting-started/workflows.md new file mode 100644 index 000000000..ac24688f2 --- /dev/null +++ b/docs/getting-started/workflows.md @@ -0,0 +1,340 @@ +# Workflows + +## Creating a workflow + +A **workflow** is the combination of: + +- An **initial input form** — used to collect input from the user. +- A sequence of **workflow steps** — defining the logic to be executed. + +For a more detailed explanation, see +👉 [Detailed explanation of workflows](../architecture/application/workflow.md) + +--- + +There are specialized decorators for each [workflow type] that execute "default" steps before and after the steps from your workflow. +It is recommended to use these decorators because they ensure correct functioning of the Orchestrator. + +- [create_workflow] +- [modify_workflow] +- [terminate_workflow] +- [validate_workflow] + +Under the hood they all use a [workflow] decorator which can be used for tasks that don't fit any of the types above. + +The decorated function must return a chain of steps using the `>>` operator to define their execution order. + +### Minimal create workflow example + +```python +from orchestrator.workflows.utils import create_workflow +from orchestrator.workflow import StepList, begin + + +@create_workflow( + "Create product subscription", + initial_input_form=initial_input_form_generator +) +def create_product_subscription() -> StepList: + return begin >> create_subscription +``` + +In this example: + +- The workflow is named **"Create product subscription"**. +- The input form is defined by `initial_input_form_generator`. +- The workflow engine will execute the steps inside `create_workflow` before returned steps, + `create_subscription`, and steps inside `create_workflow` after returned steps. + +Each step should be defined using the `@step` decorator and can access and update the shared subscription model. + +--- + +### How workflow steps work + +Information between workflow steps is passed using `State`, which is nothing more than a collection of key/value pairs. +In Python the state is represented by a `Dict`, with string keys and arbitrary values. +Between steps the `State` is serialized to JSON and stored in the database. + +The `@step` decorator converts a function into a workflow step. +Arguments to the step function are automatically filled using matching keys from the `State`. +The function must return a dictionary of new or updated key-value pairs, which are merged into the `State` and passed to the next step. +The serialization and deserialization between JSON and the indicated Python types are done automatically. +A minimal workflow step looks as follows: + +```python + +@step("Create subscription") +def create_subscription( + product: UUID, + user_input: str, +) -> State: + subscription = build_subscription(product, user_input) + return {"subscription": subscription} +``` + +In this step: + +- `product` and `user_input` are populated from the `State`. +- The return value includes a new key `subscription`, which will be available to the next step in the workflow. + +Every workflow starts with the builtin step `init` and ends with the builtin step `done`, + with an arbitrary list of other builtin steps or custom steps in between. +the [workflow type] decorators have these included and can use `begin >> your_step`. + +Domain models as parameters are subject to special processing. +With the previous step, the `subscription` is available in the state, which for the next step, can be used directly with the Subscription model type, for example: + +```python +@step("Add subscription to external system") +def add_subscription_to_external_system( + subscription: MySubscriptionModel, +) -> State: + payload = subscription.my_block + response = add_to_external_system(payload) + return {"response": response} +``` + +For `@modify_workflow`, `@validate_workflow` and `@terminate_workflow` the `subscription` is directly usable from the first step. + +Information about all usable step decorators can be found on [the architecture page on workflows](../architecture/application/workflow#workflow-steps). + +## Register workflows + +To make workflows available in the orchestrator, they must be registered in two stages: + +1. In code — by defining them as workflow functions and registering them via `LazyWorkflowInstance`. +2. In the database — by mapping them to the corresponding `product_type` using a migration. + - workflows don't need to necessarily be added to a product_type, doing this will only make them available as tasks not meant to be ran by a subscription. + +We’ll start with the code registration, followed by options for generating the database migration. + +### Step 1: Register workflow functions in code + +Workflow functions must be registered by creating a `LazyWorkflowInstance`, which maps a workflow function to the Python module where it's defined. + +Example — registering the `create_user_group` workflow: + +```python +from orchestrator.workflows import LazyWorkflowInstance + +LazyWorkflowInstance("workflows.user_group.create_user_group", "create_user_group") +``` + +To ensure the workflows are discovered at runtime: + +- Add all `LazyWorkflowInstance(...)` calls to `workflows/__init__.py`. +- Add `import workflows` to `main.py` so they are registered during app startup. + +!!! example + + For inspiration look at an example implementation of the [lazy workflow instances] + +### Step 2: Register workflows in the database + +After registering workflows in code, you need to add them to the database by mapping them to their `product_type`. +There are three ways to do this: + +- [Migrate workflows generator script](#migrate-workflows-generator-script) +- [Copy the example workflows migration](#copy-the-example-workflows-migration) +- [Manual](#manual) + +#### Migrate workflows generator script + +Similar to `db migrate-domain-models`, the orchestrator command line interface offers the `db migrate-workflows` command +that walks you through a menu to create a database migration file based on the difference between the registered workflows in the code and the database. + +Start with the following command: + +```shell +python main.py db migrate-workflows "add User and UserGroup workflows" +``` + +Navigate through the menu to add the six workflows to the corresponding `User` or `UserGroup` product type. +After confirming a migration file will be added to `migrations/versions/schema`. + +The migration can be run with: + +```shell +python main.py db upgrade heads +``` + +#### Copy the example workflows migration + +You can copy a predefined migration file from the example repository: + +```shell +( + cd migrations/versions/schema + curl --remote-name https://raw.githubusercontent.com/workfloworchestrator/example-orchestrator-beginner/main/examples/2022-11-12_8040c515d356_add_user_and_usergroup_workflows.py +) +``` + +Update it to your own workflow and update the database with: + +```shell +python main.py db upgrade heads +``` + +#### Manual + +Create a new empty database migration with the following command: + +```shell +python main.py db revision --head data --message "add User and UserGroup workflows" +``` + +This will create an empty database migration in the folder `migrations/versions/schema`. +For the migration we will make use of the migration helper functions `create_workflow` and `delete_workflow` that both expect a `Dict` that describes the workflow registration to be added or deleted from the database. + +To add all User and UserGroup workflows in bulk a list of `Dict` is created, for only the UserGroup create workflow the list looks like this: + +```python +from orchestrator.targets import Target + +new_workflows = [ + { + "name": "create_user_group", + "target": Target.CREATE, + "description": "Create user group", + "product_type": "UserGroup", + }, +] +``` + +This registers the workflow function `create_user_group` as a create workflow for the `UserGroup` product. + +Add a list of `Dict`s describing the create, modify and terminate workflows for both the `UserGroup` and `User` products to the migration that was created above. + +The migration `upgrade` and `downgrade` functions will just loop through the list: + +```python +from orchestrator.migrations.helpers import create_workflow, delete_workflow + + +def upgrade() -> None: + conn = op.get_bind() + for workflow in new_workflows: + create_workflow(conn, workflow) + + +def downgrade() -> None: + conn = op.get_bind() + for workflow in new_workflows: + delete_workflow(conn, workflow["name"]) +``` + +Run the migration with the following command: + +```shell +python main.py db upgrade heads +``` + + +## More workflow examples + +### Validate + +Validate workflows run integrity checks on an existing subscription. +Checking the state of associated data in an external system for example. +The validate migration parameters look something like this: + +```python +new_workflows = [ + { + "name": "validate_node_enrollment", + "target": Target.VALIDATE, + "description": "Validate Node Enrollment before production", + "product_type": "Node", + "is_task": True, + }, +] +``` + +This workflow uses `Target.VALIDATE`, which explicitly distinguishes it from system tasks that use `Target.SYSTEM`. +While both are marked with `is_task=True` and treated as tasks, they serve different purposes: + +- `SYSTEM` workflows are typically used for background processing and internal orchestration. +- `VALIDATE` workflows are used to confirm that a subscription is still correct and consistent, verifying that external systems are still in sync with it. + +Validate workflow steps generally raise an `AssertionError` when a condition fails. +If all checks pass, they return a simple success marker (e.g., "OK") to the workflow state. + + +```python +@step("Check NSO") +def check_nso(subscription: NodeEnrollment, node_name: str) -> State: + device = get_device(device_name=node_name) + + if device is None: + raise AssertionError(f"Device not found in NSO") + return {"check_nso": "OK"} +``` + +### Modify + +The `Modify` workflow is similar to a `Validate` workflow, but uses different migration parameters appropriate to its `Target.MODIFY` context. + +```python +new_workflows = [ + { + "name": "modify_node_enrollment", + "target": Target.MODIFY, + "description": "Modify Node Enrollment", + "product_type": "Node", + }, +] +``` + +This type of workflow applies changes to an existing subscription. +If necessary, it can also update the subscription’s lifecycle state at the end of the process. +For example, suppose a `CREATE` workflow initially sets the subscription to the `PROVISIONING` state. +A follow-up `Modify` workflow might transition it to production and set the lifecycle state to `ACTIVE`: + +```python +@step("Activate Subscription") +def update_subscription_and_description(subscription: NodeEnrollmentProvisioning, node_name: str) -> State: + subscription = NodeEnrollment.from_other_lifecycle(subscription) + subscription.description = f"Node {node_name} Production" + + return {"subscription": subscription} +``` + +These also have the `subscription` passed in in the initial step as outlined above. + +### Terminate + +A Terminate workflow is used to cleanly remove a subscription and undo any changes made during its lifecycle. + +The migration params are as one would suspect: + +```python +new_workflows = [ + { + "name": "terminate_node_enrollment", + "target": Target.TERMINATE, + "description": "Terminate Node Enrollment subscription", + "product_type": "Node", + }, +] +``` +Here, the `target`, `name`, and `description` follow standard naming conventions for `terminate` workflows. + +The first step of a terminate workflow can be used to store identifiers in the state, for example: + +```python +@step("Load relevant subscription information") +def load_subscription_info(subscription: NodeEnrollment) -> FormGenerator: + node = get_detailed_node(subscription.ne.esdb_node_id) + return {"subscription": subscription, "node_name": node.get("name")} +``` + +This approach ensures that the workflow has all the necessary context to safely tear down the subscription and associated resources. + +[create_workflow]: ../reference-docs/workflows/workflows.md#orchestrator.workflows.utils.create_workflow +[modify_workflow]: ../reference-docs/workflows/workflows.md#orchestrator.workflows.utils.modify_workflow +[terminate_workflow]: ../reference-docs/workflows/workflows.md#orchestrator.workflows.utils.terminate_workflow +[validate_workflow]: ../reference-docs/workflows/workflows.md#orchestrator.workflows.utils.validate_workflow +[workflow]: ../reference-docs/workflows/workflows.md#orchestrator.workflow.workflow +[workflow type]: ../architecture/application/workflow#subscription-workflow-types +[lazy workflow instances]: https://github.com/workfloworchestrator/example-orchestrator-beginner/blob/main/workflows/__init__.py diff --git a/docs/reference-docs/workflows/workflow-lifecycles.md b/docs/reference-docs/workflows/workflow-lifecycles.md deleted file mode 100644 index 9f8c36327..000000000 --- a/docs/reference-docs/workflows/workflow-lifecycles.md +++ /dev/null @@ -1,9 +0,0 @@ -# Overview - -Initial -Started -Resumed -Failed -Completed -Aborted -ProcessStat diff --git a/docs/reference-docs/workflows/workflow-steps.md b/docs/reference-docs/workflows/workflow-steps.md index 8e3c8362e..6e6c83a57 100644 --- a/docs/reference-docs/workflows/workflow-steps.md +++ b/docs/reference-docs/workflows/workflow-steps.md @@ -26,3 +26,103 @@ One important detail that is very helpful to understand is how your Step Functio ::: orchestrator.utils.state.inject_args options: heading_level: 3 + + +## Reusable Workflow Steps + +When designing workflows it is good practice to make steps reusable. +With a single product definition, you can build reusable steps for both `CREATE` and `MODIFY` workflows. +For example, you may want to: + +- Update a product's description using subscription properties. +- Push data to an external system via an upsert operation. + +To take reusability further, Python's `@singledispatch` decorator can help abstract product-specific logic behind a common interface. +This makes your steps cleaner, easier to reuse across multiple workflows and more maintainable. + + +### Generic Workflow Steps + +You can define a generic step when the logic should be shared across multiple workflows or products. +Here's an example of a reusable workflow step that updates an external system based on the product type: + +```python +@step("Update external system") +def update_external_system(subscription: SubscriptionModel): + match type(subscription): + case ProductTypeOne: + payload = create_product_type_one_payload(subscription.some_block) + case ProductTypeTwo: + payload = create_product_type_two_payload(subscription.some_other_block) + case _: + raise TypeError(f"Unsupported subscription type: {type(subscription)}") + + response = external_system_request(payload) + return {"response": response} +``` + +While this approach works, the switch logic (via `match` or `if isinstance()`) can become unwieldy as more product types are introduced. +This is where `@singledispatch` can help. + +### Using `@singledispatch` for Cleaner Reusability + +In the example above, each product requires slightly different logic for building the payload. +Rather than branching on type manually, you can delegate this responsibility to Python's `@singledispatch`. + +With `@singledispatch`, you define a generic function and register specific implementations based on the type of the input model. + +Benefits: + +- Simplifies logic: No need for `match` or if `isinstance` checks. +- Improves maintainability: Logic is cleanly separated per product. +- Enhances extensibility: Easily add new product support with a new @register. + +> Note: When using `@singledispatch` with Orchestrator models like `SubscriptionModel`, be sure to register specific lifecycle types (e.g., `ProductTypeOneProvisioning`). + +Example: Single Dispatch for External System Updates (default function) + +```python +from functools import singledispatch +from surf.utils.singledispatch import single_dispatch_base + +@singledispatch +def update_external_system(model: SubscriptionModel) -> str: + """ + Generic function to update an external system based on a subscription model. + Specific implementations must be registered for each product type. + + Args: + model: The subscription lifecycle model. + + Returns: + A response json from the external system. + + Raises: + TypeError: If no registered implementation is found for the model. + """ + return single_dispatch_base(update_external_system, model) +``` + +Registering Implementations: + +```python +@update_external_system.register +def product_one_update_external_system(model: ProductTypeOneProvisioning | ProductTypeOne) -> str: + payload = {} # add payload logic... + return external_system_request(payload) + + +@update_external_system.register +def product_two__active_update_external_system(model: ProductTypeTwo) -> str: + payload = {} # add payload logic... + return external_system_request(payload) + + +@update_external_system.register +def product_two_provisioning_update_external_system(model: ProductTypeTwoProvisioning) -> str: + payload = {} # add payload logic... + return external_system_request(payload) +``` + +Now you can call `update_external_system(model)` without worrying about branching logic. +The correct function will be called based on the model's type. diff --git a/docs/reference-docs/workflows/workflows.md b/docs/reference-docs/workflows/workflows.md new file mode 100644 index 000000000..34578c731 --- /dev/null +++ b/docs/reference-docs/workflows/workflows.md @@ -0,0 +1,34 @@ +# Workflows + +Workflows are what actually takes a product definition and populates your domain models. +To read more about the architectural design of workflows check out [the architecture page on workflows.](../../architecture/application/workflow.md) +To see more details about the workflow lifecycle states and functions, read on to the next section. + + +::: orchestrator.workflow.ProcessStatus + options: + heading_level: 3 + + +::: orchestrator.workflows.utils + options: + heading_level: 3 + members: + - create_workflow + - modify_workflow + - terminate_workflow + - validate_workflow + - workflow + +::: orchestrator.workflow.workflow + options: + heading_level: 3 + + +## Workflow helpers to register them in DB +::: orchestrator.migrations.helpers + options: + heading_level: 3 + members: + - create + - ensure_default_workflows diff --git a/docs/workshops/advanced/workflow-introduction.md b/docs/workshops/advanced/workflow-introduction.md index a885bd32d..fa1cc7475 100644 --- a/docs/workshops/advanced/workflow-introduction.md +++ b/docs/workshops/advanced/workflow-introduction.md @@ -1,36 +1,11 @@ -The workflow engine is the core of the orchestrator, it is responsible for the following functions: +> This document assumes you are already familiar with the key safeguards and potential pitfalls outlined in the [Architecture of workflows](../../architecture/application/workflow.md) -* Safely and reliable manipulate customer Subscriptions from one state to the next and maintain auditability. -* Create an API through which Subscriptions can be manipulated programmatically. -* Execute step functions in order and allow the retry of previously failed process-steps in an idempotent way. -* Atomically execute workflow functions. +### Continuing the Workshop -### Best Practices -The orchestrator will always attempt to be a robust a possible when executing workflow steps. However it is always -up to the developer to implement the best practices as well as he/she can. +The next sections introduce the workflow concept and its relationship to the product model. +Code examples referenced throughout can be found in the example orchestrator under the `.workflows.node` directory. -#### Safeguards in the orchestrator; -* Steps will be treated as atomic units: All code must execute otherwise the state will not be commited to the - database. For this reason it is not possible to call `.commit()` on the ORM within a step function -* Workflows are only allowed to be run on `insync` subscriptions, unless explicitly configured otherwise. This is to - safeguard against resource contention. One of the first things a workflow should do is set the subscription it it - manipulating `out of sync`. No other workflow can then manipulate it. -* Failed steps can be retried again and again, they use the state from the **last successful** step as their - starting point. - -#### Coding gotchas -* The orchestrator is best suited to be used as a data manipulator, not as a data transporter. Use the State log as - a log of work, not a log of data. If the data you enter in the state is corrupt or wrong, you might need to - attempt a very difficult database query to update the state to solve your conflict -* Always fetch data needed from an external system, **Just in time**. This will increase the robustness of the step -* Always create a step function that executes one piece of work at a time. Theoretically you can execute the whole - workflow in a single step. However this does not help with traceability and reliability. - - -### Workshop continued -The following pages try to introduce the workflow concept and how it relates to the product model. All code examples -can be found in the example orchestrator `.workflows.node` directory. Please read at least the following pages to grasp -the functionality of how workflows work and how the user/frontend will interact with the Orchestrator API: +Please read at least the following pages to grasp the functionality of how workflows work and how the user/frontend will interact with the Orchestrator API: * [Workflow Basics](workflow-basics.md) * [Create Workflow](node-create.md) diff --git a/docs/workshops/beginner/workflow-introduction.md b/docs/workshops/beginner/workflow-introduction.md index a445404cf..4c8ce303d 100644 --- a/docs/workshops/beginner/workflow-introduction.md +++ b/docs/workshops/beginner/workflow-introduction.md @@ -1,37 +1,7 @@ # Introduction -The workflow engine is the core of the orchestrator, it is responsible for the -following functions: - -* Safely and reliable manipulate customer Subscriptions from one state to the - next and maintain auditability. - -* Create an API through which Subscriptions can be manipulated programmatically. - -* Execute step functions in order and allow the retry of previously failed - process-steps in an idempotent way. - -* Atomically execute workflow functions. - -A workflow is the combination of an initial input form, used to acquire input -from the user, and a list of workflow steps. Four types of workflows are -distinguished: `CREATE` workflows that will produce a subscription on a product -for a specific customer, `MODIFY` workflows to manipulate existing -subscriptions, `TERMINATE` workflows to end the subscription on a product for a -customer, and `SYSTEM` workflows that run scheduled and do not have an input -form. The latter type of workflows is also referred to as tasks, and can for -example be used to validate subscriptions against external operations -support systems (OSS) and business support systems (BSS). The -same workflow step can be used in multiple workflows, and a set of workflow -steps can be combined in a step list and can be reused as well. - -Ideally workflow steps are idempotent. In case a workflow step fails, this -allows for save retry functionality without possible unwanted side effects or -new failures. This is especially important when a step is used to communicate -with external OSS and BSS. But in practice it will not always be possible to -make a step one hundred percent idempotent, thus requiring manual intervention -before a step can be retried. Note that the workflow steps created in this -beginner workshop are not written with idempotency in mind. +A workflow is the combination of an initial input form, used to acquire input from the user, and a list of workflow steps. +For more details, see [workflow architecture](../../architecture/application/workflow.md). The `workflow` decorator takes a description, initial input form, and a target as input and turns a function into a workflow that returns a step list to be diff --git a/mkdocs.yml b/mkdocs.yml index 58552712b..574f95d0d 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -151,6 +151,9 @@ nav: - Base Application: - Preparing source folder: getting-started/prepare-source-folder.md - Base application: getting-started/base.md + - Workflows: + - Creating a workflow: getting-started/workflows.md + - Registering a workflow: getting-started/workflows#register-workflows - Docker: getting-started/docker.md - Orchestrator UI: getting-started/orchestration-ui.md - Reference Documentation: @@ -186,9 +189,9 @@ nav: # - Tasks: reference-docs/tasks.md # - Tests: reference-docs/tests.md - Workflows: + - Workflows: reference-docs/workflows/workflows.md - Workflow Steps: reference-docs/workflows/workflow-steps.md - # - Workflow Lifecycles: reference-docs/workflows/workflow-lifecycles.md - - Callbacks: reference-docs/workflows/callbacks.md + - Callback step: reference-docs/workflows/callbacks.md - Websockets: reference-docs/websockets.md - Search: reference-docs/search.md - Monitoring: @@ -199,29 +202,30 @@ nav: - 4.x: migration-guide/4.0.md - Workshops: - # - Beginner: - # - Overview: workshops/beginner/overview.md - # - Installation: - # - Manual: - # - Debian: workshops/beginner/debian.md - # - MacOS: workshops/beginner/macos.md - # - Docker compose: workshops/beginner/docker.md - # - Start applications: workshops/beginner/start-applications.md - # - Products: - # - Scenario: workshops/beginner/scenario.md - # - Domain models: workshops/beginner/domain-models.md - # - Database migration: workshops/beginner/database-migration.md - # - Workflows: - # - Introduction: workshops/beginner/workflow-introduction.md - # - Register workflows: workshops/beginner/register-workflows.md - # - Input forms: workshops/beginner/input-forms.md - # - Create UserGroup: workshops/beginner/create-user-group.md - # - Modify UserGroup: workshops/beginner/modify-user-group.md - # - Terminate UserGroup: workshops/beginner/terminate-user-group.md - # - Create User: workshops/beginner/create-user.md - # - Modify User: workshops/beginner/modify-user.md - # - Terminate User: workshops/beginner/terminate-user.md - # - Explore: workshops/beginner/explore.md + # - Beginner: + # - Overview: workshops/beginner/workshop-overview.md + # - introduction: workshops/beginner/workshop-introduction.md + # - Installation: + # - Manual: + # - Debian: workshops/beginner/debian.md + # - MacOS: workshops/beginner/macos.md + # - Docker compose: workshops/beginner/docker.md + # - Start applications: workshops/beginner/start-applications.md + # - Products: + # - Scenario: workshops/beginner/scenario.md + # - Domain models: workshops/beginner/domain-models.md + # - Database migration: workshops/beginner/database-migration.md + # - Workflows: + # - Introduction: workshops/beginner/workflow-introduction.md + # - Register workflows: workshops/beginner/register-workflows.md + # - Input forms: workshops/beginner/input-forms.md + # - Create UserGroup: workshops/beginner/create-user-group.md + # - Modify UserGroup: workshops/beginner/modify-user-group.md + # - Terminate UserGroup: workshops/beginner/terminate-user-group.md + # - Create User: workshops/beginner/create-user.md + # - Modify User: workshops/beginner/modify-user.md + # - Terminate User: workshops/beginner/terminate-user.md + # - Explore: workshops/beginner/explore.md - Example Orchestrator Workshop: - Overview: workshops/advanced/overview.md # - Installation: workshops/advanced/docker-installation.md