Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 4.3.0
current_version = 4.4.0rc1
commit = False
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(rc(?P<build>\d+))?
Expand Down
62 changes: 40 additions & 22 deletions docs/architecture/application/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ params = dict(
name="task_sync_from",
target="SYSTEM",
description="Nightly validate and NSO sync",
is_task=True
)


Expand All @@ -58,8 +59,8 @@ def upgrade() -> None:
conn.execute(
sa.text(
"""
INSERT INTO workflows(name, target, description)
VALUES (:name, :target, :description)
INSERT INTO workflows(name, target, description, is_task)
VALUES (:name, :target, :description, true)
"""
),
params,
Expand All @@ -71,7 +72,8 @@ This just needs to add an entry in the workflows table. No relations with other

### Running the task in the UI

After the migration is applied, the new task will surface in the UI under the tasks tab. It can be manually executed that way. Even if the task does not have any form input, an entry will still need to be made in `orchestrator-client/src/locale/en.ts` or an error will occur.
After the migration is applied, the new task will surface in the UI under the tasks tab.
It can be manually executed that way. Even if the task does not have any form input, an entry will still need to be made in `orchestrator-client/src/locale/en.ts` or an error will occur.

```ts
// ESnet
Expand All @@ -80,37 +82,42 @@ task_sync_from: "Verify and NSO sync",

## The schedule file

The schedule file is essentially the crontab associated with the task. They are located in `orchestrator/server/schedules/` - a sample schedule file:
> from `4.3.0` we switched from [schedule] package to [apscheduler] to allow schedules to be stored in the DB and schedule tasks from the API.

The schedule file is essentially the crontab associated with the task.
They are located in `orchestrator/server/schedules/` - a sample schedule file:

```python
from server.schedules.scheduling import scheduler
from server.services.processes import start_process
from orchestrator.schedules.scheduler import scheduler
from orchestrator.services.processes import start_process


@scheduler(name="Nightly sync", time_unit="minutes", period=1)
# previously `scheduler()` which is now deprecated
@scheduler.scheduled_job(id="nightly-sync", name="Nightly sync", trigger="cron", hour=1)
def run_nightly_sync() -> None:
start_process("task_sync_from")
```

Yes this runs every minute even though it's called `nightly_sync`. There are other variations on the time units that can be used:
This schedule will start the `task_sync_from` task every day at 01:00.

```python
time_unit = "hour", period = 1
time_unit = "hours", period = 6
time_unit = "day", at = "03:00"
time_unit = "day", at = "00:10"
```
There are multiple triggers that can be used: [data from docs]

And similar to the task/workflow file, the schedule file will need to be registered in `orchestrator/server/schedules/__init__.py`:
- [IntervalTrigger]: use when you want to run the task at fixed intervals of time.
- [CronTrigger]: use when you want to run the task periodically at certain time(s) of day.
- [DateTrigger]: use when you want to run the task just once at a certain point of time.
- [CalendarIntervalTrigger]: use when you want to run the task on calendar-based intervals, at a specific time of day.
- [AndTrigger]: use when you want to combine multiple triggers so the task only runs when **all** of them would fire at the same time.
- [OrTrigger]: use when you want to combine multiple triggers so the task runs when **any one** of them would fire.

```python
from server.schedules.scheduling import SchedulingFunction
from server.schedules.nightly_sync import run_nightly_sync
For detailed configuration options, see the [APScheduler scheduling docs].

The scheduler automatically loads any schedules that are imported before the scheduler starts.
To keep things organized and consistent (similar to how workflows are handled), it’s recommended to place your schedules in a `/schedules/__init__.py`.

> `ALL_SCHEDULERS` (Backwards Compatibility)
> In previous versions, schedules needed to be explicitly listed in an ALL_SCHEDULERS variable.
> This is no longer required, but ALL_SCHEDULERS is still supported for backwards compatibility.

ALL_SCHEDULERS: List[SchedulingFunction] = [
run_nightly_sync,
]
```

## Executing the task

Expand Down Expand Up @@ -167,3 +174,14 @@ def run_nightly_sync() -> None:

start_process("task_sync_from")
```

[schedule]: https://pypi.org/project/schedule/
[apscheduler]: https://pypi.org/project/APScheduler/
[IntervalTrigger]: https://apscheduler.readthedocs.io/en/master/api.html#apscheduler.triggers.interval.IntervalTrigger
[CronTrigger]: https://apscheduler.readthedocs.io/en/master/api.html#apscheduler.triggers.cron.CronTrigger
[DateTrigger]: https://apscheduler.readthedocs.io/en/master/api.html#apscheduler.triggers.date.DateTrigger
[CalendarIntervalTrigger]: https://apscheduler.readthedocs.io/en/master/api.html#apscheduler.triggers.calendarinterval.CalendarIntervalTrigger
[AndTrigger]: https://apscheduler.readthedocs.io/en/master/api.html#apscheduler.triggers.combining.AndTrigger
[OrTrigger]: https://apscheduler.readthedocs.io/en/master/api.html#apscheduler.triggers.combining.OrTrigger
[APScheduler scheduling docs]: https://apscheduler.readthedocs.io/en/master/userguide.html#scheduling-tasks
[data from docs]: https://apscheduler.readthedocs.io/en/master/api.html#triggers
21 changes: 9 additions & 12 deletions docs/reference-docs/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -744,18 +744,15 @@ None]

Access all the scheduler functions.

### force

Force the execution of (a) scheduler(s) based on a keyword.

Arguments

keyword - [required]

### run

Loop eternally and run schedulers at configured times.
::: orchestrator.cli.scheduler
options:
heading_level: 3
members:
- run
- force

### show-schedule

Show the currently configured schedule.
::: orchestrator.cli.scheduler.show_schedule
options:
heading_level: 4
2 changes: 1 addition & 1 deletion orchestrator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

"""This is the orchestrator workflow engine."""

__version__ = "4.3.0"
__version__ = "4.4.0rc1"

from orchestrator.app import OrchestratorCore
from orchestrator.settings import app_settings
Expand Down
68 changes: 39 additions & 29 deletions orchestrator/cli/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@


import logging
from time import sleep

import schedule
import typer
from apscheduler.schedulers.blocking import BlockingScheduler

from orchestrator.schedules import ALL_SCHEDULERS
from orchestrator.schedules.scheduler import jobstores, scheduler, scheduler_dispose_db_connections

log = logging.getLogger(__name__)

Expand All @@ -27,36 +26,47 @@

@app.command()
def run() -> None:
"""Loop eternally and run schedulers at configured times."""
for s in ALL_SCHEDULERS:
job = getattr(schedule.every(s.period), s.time_unit)
if s.at:
job = job.at(s.at)
job.do(s).tag(s.name)
log.info("Starting Schedule")
for j in schedule.jobs:
log.info("%s: %s", ", ".join(j.tags), j)
while True:
schedule.run_pending()
idle = schedule.idle_seconds()
if idle < 0:
log.info("Next job in queue is scheduled in the past, run it now.")
else:
log.info("Sleeping for %d seconds", idle)
sleep(idle)
"""Start scheduler and loop eternally to keep thread alive."""
blocking_scheduler = BlockingScheduler(jobstores=jobstores)

try:
blocking_scheduler.start()
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()
scheduler_dispose_db_connections()


@app.command()
def show_schedule() -> None:
"""Show the currently configured schedule."""
for s in ALL_SCHEDULERS:
at_str = f"@ {s.at} " if s.at else ""
typer.echo(f"{s.name}: {s.__name__} {at_str}every {s.period} {s.time_unit}")
"""Show the currently configured schedule.

in cli underscore is replaced by a dash `show-schedule`
"""
scheduler.start(paused=True) # paused: avoid triggering jobs during CLI
jobs = scheduler.get_jobs()
scheduler.shutdown(wait=False)
scheduler_dispose_db_connections()

for job in jobs:
typer.echo(f"[{job.id}] Next run: {job.next_run_time} | Trigger: {job.trigger}")


@app.command()
def force(keyword: str) -> None:
"""Force the execution of (a) scheduler(s) based on a keyword."""
for s in ALL_SCHEDULERS:
if keyword in s.name or keyword in s.__name__:
s()
def force(job_id: str) -> None:
"""Force the execution of (a) scheduler(s) based on a job_id."""
scheduler.start(paused=True) # paused: avoid triggering jobs during CLI
job = scheduler.get_job(job_id)
scheduler.shutdown(wait=False)
scheduler_dispose_db_connections()

if not job:
typer.echo(f"Job '{job_id}' not found.")
raise typer.Exit(code=1)

typer.echo(f"Running job [{job.id}] now...")
try:
job.func(*job.args or (), **job.kwargs or {})
typer.echo("Job executed successfully.")
except Exception as e:
typer.echo(f"Job execution failed: {e}")
raise typer.Exit(code=1)
36 changes: 36 additions & 0 deletions orchestrator/graphql/resolvers/scheduled_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import structlog

from orchestrator.db.filters import Filter
from orchestrator.db.sorting import Sort
from orchestrator.graphql.pagination import Connection
from orchestrator.graphql.schemas.scheduled_task import ScheduledTaskGraphql
from orchestrator.graphql.types import GraphqlFilter, GraphqlSort, OrchestratorInfo
from orchestrator.graphql.utils import create_resolver_error_handler, to_graphql_result_page
from orchestrator.graphql.utils.is_query_detailed import is_querying_page_data
from orchestrator.schedules.scheduler import get_scheduler_tasks, scheduled_task_filter_keys, scheduled_task_sort_keys

logger = structlog.get_logger(__name__)


async def resolve_scheduled_tasks(
info: OrchestratorInfo,
filter_by: list[GraphqlFilter] | None = None,
sort_by: list[GraphqlSort] | None = None,
first: int = 10,
after: int = 0,
) -> Connection[ScheduledTaskGraphql]:
_error_handler = create_resolver_error_handler(info)

pydantic_filter_by: list[Filter] = [item.to_pydantic() for item in filter_by] if filter_by else []
pydantic_sort_by: list[Sort] = [item.to_pydantic() for item in sort_by] if sort_by else []
scheduled_tasks, total = get_scheduler_tasks(
first=first, after=after, filter_by=pydantic_filter_by, sort_by=pydantic_sort_by, error_handler=_error_handler
)

graphql_scheduled_tasks = []
if is_querying_page_data(info):
graphql_scheduled_tasks = [ScheduledTaskGraphql.from_pydantic(p) for p in scheduled_tasks]

return to_graphql_result_page(
graphql_scheduled_tasks, first, after, total, scheduled_task_filter_keys, scheduled_task_sort_keys
)
5 changes: 5 additions & 0 deletions orchestrator/graphql/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,14 @@
resolve_version,
resolve_workflows,
)
from orchestrator.graphql.resolvers.scheduled_tasks import resolve_scheduled_tasks
from orchestrator.graphql.schemas import DEFAULT_GRAPHQL_MODELS
from orchestrator.graphql.schemas.customer import CustomerType
from orchestrator.graphql.schemas.process import ProcessType
from orchestrator.graphql.schemas.product import ProductType
from orchestrator.graphql.schemas.product_block import ProductBlock
from orchestrator.graphql.schemas.resource_type import ResourceType
from orchestrator.graphql.schemas.scheduled_task import ScheduledTaskGraphql
from orchestrator.graphql.schemas.settings import StatusType
from orchestrator.graphql.schemas.subscription import SubscriptionInterface
from orchestrator.graphql.schemas.version import VersionType
Expand Down Expand Up @@ -99,6 +101,9 @@ class OrchestratorQuery:
description="Returns information about cache, workers, and global engine settings",
)
version: VersionType = authenticated_field(resolver=resolve_version, description="Returns version information")
scheduled_tasks: Connection[ScheduledTaskGraphql] = authenticated_field(
resolver=resolve_scheduled_tasks, description="Returns scheduled job information"
)


@strawberry.federation.type(description="Orchestrator customer Query")
Expand Down
8 changes: 8 additions & 0 deletions orchestrator/graphql/schemas/scheduled_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import strawberry

from orchestrator.schedules.scheduler import ScheduledTask


@strawberry.experimental.pydantic.type(model=ScheduledTask, all_fields=True)
class ScheduledTaskGraphql:
pass
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ def _format_context(context: dict) -> str:

def create_resolver_error_handler(info: OrchestratorInfo) -> CallableErrorHandler:
def handle_error(message: str, **context) -> None: # type: ignore
return register_error(" ".join([message, _format_context(context)]), info, error_type=ErrorType.BAD_REQUEST)
return register_error(f"{message} {_format_context(context)}", info, error_type=ErrorType.BAD_REQUEST)

return handle_error
3 changes: 1 addition & 2 deletions orchestrator/schedules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@


from orchestrator.schedules.resume_workflows import run_resume_workflows
from orchestrator.schedules.scheduling import SchedulingFunction
from orchestrator.schedules.task_vacuum import vacuum_tasks
from orchestrator.schedules.validate_products import validate_products
from orchestrator.schedules.validate_subscriptions import validate_subscriptions

ALL_SCHEDULERS: list[SchedulingFunction] = [
ALL_SCHEDULERS: list = [
run_resume_workflows,
vacuum_tasks,
validate_subscriptions,
Expand Down
4 changes: 2 additions & 2 deletions orchestrator/schedules/resume_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
# limitations under the License.


from orchestrator.schedules.scheduling import scheduler
from orchestrator.schedules.scheduler import scheduler
from orchestrator.services.processes import start_process


@scheduler(name="Resume workflows", time_unit="hour", period=1)
@scheduler.scheduled_job(id="resume-workflows", name="Resume workflows", trigger="interval", hours=1) # type: ignore[misc]
def run_resume_workflows() -> None:
start_process("task_resume_workflows")
Loading