From aabd29daf7682b7b95d0b8e840be15d0050bac16 Mon Sep 17 00:00:00 2001 From: Martin <15851033+malvex@users.noreply.github.com> Date: Mon, 1 Dec 2025 02:09:05 +0100 Subject: [PATCH 1/3] refactor: rename Spec and Config to TaskSpec and TaskConfig --- docs/reference/task-config.md | 2 +- docs/reference/task-spec.md | 2 +- mkdocs.yml | 4 ++-- src/sheppy/models.py | 24 ++++++++++++------------ src/sheppy/queue.py | 2 +- src/sheppy/task_factory.py | 6 +++--- src/sheppy/testqueue.py | 2 +- tests/contract/test_task.py | 4 ++-- tests/contract/test_taskcron.py | 6 +++--- 9 files changed, 26 insertions(+), 26 deletions(-) diff --git a/docs/reference/task-config.md b/docs/reference/task-config.md index a063760..162d4e0 100644 --- a/docs/reference/task-config.md +++ b/docs/reference/task-config.md @@ -1,3 +1,3 @@ # `TaskConfig` model reference -::: sheppy.models.Config +::: sheppy.models.TaskConfig diff --git a/docs/reference/task-spec.md b/docs/reference/task-spec.md index dc4510e..6d24946 100644 --- a/docs/reference/task-spec.md +++ b/docs/reference/task-spec.md @@ -1,3 +1,3 @@ # `TaskSpec` model reference -::: sheppy.models.Spec +::: sheppy.models.TaskSpec diff --git a/mkdocs.yml b/mkdocs.yml index d59c156..5587949 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -41,8 +41,8 @@ nav: - CLI reference: reference/cli.md - Models: - Task: reference/task.md - - Spec: reference/task-spec.md - - Config: reference/task-config.md + - TaskSpec: reference/task-spec.md + - TaskConfig: reference/task-config.md - TaskCron: reference/task-cron.md - Backends: - Backend class: reference/backends/backend.md diff --git a/src/sheppy/models.py b/src/sheppy/models.py index 19f0dea..c08ed35 100644 --- a/src/sheppy/models.py +++ b/src/sheppy/models.py @@ -36,7 +36,7 @@ def cron_expression_validator(value: str) -> str: CronExpression = Annotated[str, AfterValidator(cron_expression_validator)] -class Spec(BaseModel): +class TaskSpec(BaseModel): """Task specification. Attributes: @@ -47,7 +47,7 @@ class Spec(BaseModel): middleware (list[str]|None): List of fully qualified middleware function names to be applied to the task, e.g. `['my_module.submodule:my_middleware']`. Middleware will be applied in the order they are listed. Note: - - You should not create Spec instances directly. Instead, use the `@task` decorator to define a task function, and then call that function to create a Task instance. + - You should not create TaskSpec instances directly. Instead, use the `@task` decorator to define a task function, and then call that function to create a Task instance. - `args` and `kwargs` must be JSON serializable. Example: @@ -79,7 +79,7 @@ def my_task(x: int, y: str) -> str: """list[str]|None: List of fully qualified middleware function names to be applied to the task, e.g. `['my_module.submodule:my_middleware']`. Middleware will be applied in the order they are listed.""" -class Config(BaseModel): +class TaskConfig(BaseModel): """Task configuration Attributes: @@ -87,7 +87,7 @@ class Config(BaseModel): retry_delay (float|list[float]): Delay between retries in seconds. If a single float is provided, it will be used for all retries. If a list is provided, it will be used for each retry attempt respectively (exponential backoff). Default is 1.0 seconds. Note: - - You should not create Config instances directly. Instead, use the `@task` decorator to define a task function, and then call that function to create a Task instance. + - You should not create TaskConfig instances directly. Instead, use the `@task` decorator to define a task function, and then call that function to create a Task instance. - `retry` must be a non-negative integer. - `retry_delay` must be a positive float or a list of positive floats. @@ -130,8 +130,8 @@ class Task(BaseModel): completed (bool): A completion flag that is set to True only if task finished successfully. error (str|None): Error message if the task failed. None if the task succeeded or is not yet executed. result (Any): The result of the task execution. If the task failed, this will be None. - spec (sheppy.models.Spec): Task specification - config (sheppy.models.Config): Task configuration + spec (sheppy.models.TaskSpec): Task specification + config (sheppy.models.TaskConfig): Task configuration created_at (datetime): Timestamp when the task was created. finished_at (datetime|None): Timestamp when the task was finished. None if the task is not yet finished. scheduled_at (datetime|None): Timestamp when the task is scheduled to run. None if the task is not scheduled. @@ -171,9 +171,9 @@ def add(x: int, y: int) -> int: result: Any = None """Any: The result of the task execution. This will be None if the task failed or is not yet executed.""" - spec: Spec + spec: TaskSpec """Task specification""" - config: Config = Field(default_factory=Config) + config: TaskConfig = Field(default_factory=TaskConfig) """Task configuration""" created_at: AwareDatetime = Field(default_factory=lambda: datetime.now(timezone.utc)) @@ -244,8 +244,8 @@ class TaskCron(BaseModel): Attributes: id (UUID): Unique identifier for the cron definition. expression (str): Cron expression defining the schedule, e.g. "*/5 * * * *" for every 5 minutes. - spec (sheppy.models.Spec): Task specification - config (sheppy.models.Config): Task configuration + spec (sheppy.models.TaskSpec): Task specification + config (sheppy.models.TaskConfig): Task configuration Note: - You should not create TaskCron instances directly. Instead, use the `add_cron` method of the Queue class to create a cron definition. @@ -283,9 +283,9 @@ def say_hello(to: str) -> str: expression: CronExpression """str: Cron expression defining the schedule, e.g. "*/5 * * * *" for every 5 minutes.""" - spec: Spec + spec: TaskSpec """Task specification""" - config: Config + config: TaskConfig """Task configuration""" # enabled: bool = True diff --git a/src/sheppy/queue.py b/src/sheppy/queue.py index c19d0ba..2d401ff 100644 --- a/src/sheppy/queue.py +++ b/src/sheppy/queue.py @@ -372,7 +372,7 @@ async def get_crons(self) -> list[TaskCron]: crons = await q.get_crons() for cron in crons: - print(f"Cron ID: {cron.id}, Expression: {cron.expression}, Task Spec: {cron.spec}") + print(f"Cron ID: {cron.id}, Expression: {cron.expression}, TaskSpec: {cron.spec}") ``` """ await self.__ensure_backend_is_connected() diff --git a/src/sheppy/task_factory.py b/src/sheppy/task_factory.py index a8bbea0..23fde33 100644 --- a/src/sheppy/task_factory.py +++ b/src/sheppy/task_factory.py @@ -10,7 +10,7 @@ overload, ) -from .models import Config, Spec, Task, TaskCron +from .models import Task, TaskConfig, TaskCron, TaskSpec from .utils.validation import validate_input P = ParamSpec('P') @@ -86,14 +86,14 @@ def create_task(func: Callable[..., Any], stringified_middlewares.append(TaskFactory._stringify_function(m)) _task = Task( - spec=Spec( + spec=TaskSpec( func=func_string, args=args, kwargs=kwargs, return_type=return_type, middleware=stringified_middlewares ), - config=Config(**task_config) + config=TaskConfig(**task_config) ) return _task diff --git a/src/sheppy/testqueue.py b/src/sheppy/testqueue.py index 5d8468e..7fe707a 100644 --- a/src/sheppy/testqueue.py +++ b/src/sheppy/testqueue.py @@ -331,7 +331,7 @@ def get_crons(self) -> list[TaskCron]: crons = q.get_crons() for cron in crons: - print(f"Cron ID: {cron.id}, Expression: {cron.expression}, Task Spec: {cron.spec}") + print(f"Cron ID: {cron.id}, Expression: {cron.expression}, TaskSpec: {cron.spec}") ``` """ return asyncio.run(self._queue.get_crons()) diff --git a/tests/contract/test_task.py b/tests/contract/test_task.py index 30c3806..2d4bbac 100644 --- a/tests/contract/test_task.py +++ b/tests/contract/test_task.py @@ -4,7 +4,7 @@ from pydantic import ValidationError from sheppy import Task, task -from sheppy.models import Config +from sheppy.models import TaskConfig from tests.dependencies import ( Status, failing_task, @@ -110,7 +110,7 @@ def test_task_is_frozen(task_fn): task.config.retry = 5.5 with pytest.raises(ValidationError, match="Instance is frozen"): - task.config = Config() + task.config = TaskConfig() with pytest.raises(TypeError, match="does not support item assignment"): task.spec.args[0] = 5 diff --git a/tests/contract/test_taskcron.py b/tests/contract/test_taskcron.py index ce0cfd7..3bedf13 100644 --- a/tests/contract/test_taskcron.py +++ b/tests/contract/test_taskcron.py @@ -1,7 +1,7 @@ import pytest from pydantic import ValidationError -from sheppy.models import Config, Spec, TaskCron +from sheppy.models import TaskConfig, TaskCron, TaskSpec @pytest.mark.parametrize("cron_expression", [ @@ -26,7 +26,7 @@ ]) def test_valid_cron_expressions(cron_expression): - TaskCron(expression=cron_expression, spec=Spec(func=""), config=Config()) + TaskCron(expression=cron_expression, spec=TaskSpec(func=""), config=TaskConfig()) @pytest.mark.parametrize("cron_expression", [ @@ -50,7 +50,7 @@ def test_valid_cron_expressions(cron_expression): def test_invalid_cron_expressions(cron_expression): with pytest.raises(ValidationError): - TaskCron(expression=cron_expression, spec=Spec(func=""), config=Config()) + TaskCron(expression=cron_expression, spec=TaskSpec(func=""), config=TaskConfig()) @pytest.mark.parametrize("expression,spec,config", [ From 197dc11c034d5704f98e2cfff17450c7e57ad7e2 Mon Sep 17 00:00:00 2001 From: Martin <15851033+malvex@users.noreply.github.com> Date: Mon, 1 Dec 2025 01:58:54 +0100 Subject: [PATCH 2/3] chore: enable pydantic.mypy plugin --- pyproject.toml | 1 + src/sheppy/models.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 316e70f..991452a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -84,6 +84,7 @@ sheppy = "sheppy.cli.cli:app" [tool.mypy] strict = true +plugins = ['pydantic.mypy'] [tool.ruff.lint] select = [ diff --git a/src/sheppy/models.py b/src/sheppy/models.py index c08ed35..9db8ec8 100644 --- a/src/sheppy/models.py +++ b/src/sheppy/models.py @@ -65,7 +65,7 @@ def my_task(x: int, y: str) -> str: print(t.spec.return_type) # "builtins.str" ``` """ - model_config = ConfigDict(frozen=True) + model_config = ConfigDict(frozen=True, extra="forbid") func: str """str: Fully qualified function name, e.g. `my_module.my_submodule:my_function`""" @@ -104,7 +104,7 @@ def my_task(): print(t.config.retry_delay) # [1.0, 2.0, 3.0] ``` """ - model_config = ConfigDict(frozen=True) + model_config = ConfigDict(frozen=True, extra="forbid") retry: int = Field(default=0, ge=0) """int: Number of times to retry the task if it fails. Default is 0 (no retries).""" From e57f1a0aa3664d91ba418124a73b6bf84d09c0d2 Mon Sep 17 00:00:00 2001 From: Martin <15851033+malvex@users.noreply.github.com> Date: Mon, 17 Nov 2025 06:14:01 +0100 Subject: [PATCH 3/3] refactor: replace 'self: Task' magic with 'CURRENT_TASK' to fix issues with IDE typing --- src/sheppy/__init__.py | 4 ++-- src/sheppy/models.py | 3 +++ src/sheppy/utils/task_execution.py | 18 ++++++++---------- src/sheppy/utils/validation.py | 12 +++--------- tests/conftest.py | 10 +++++----- tests/dependencies.py | 20 ++++++++++---------- 6 files changed, 31 insertions(+), 36 deletions(-) diff --git a/src/sheppy/__init__.py b/src/sheppy/__init__.py index c0c7fe0..98d42f4 100644 --- a/src/sheppy/__init__.py +++ b/src/sheppy/__init__.py @@ -1,5 +1,5 @@ from .backend import Backend, BackendError, MemoryBackend, RedisBackend -from .models import Task +from .models import CURRENT_TASK, Task from .queue import Queue from .task_factory import task from .testqueue import TestQueue @@ -12,7 +12,7 @@ # fastapi "Depends", # task - "task", "Task", + "task", "Task", "CURRENT_TASK", # queue "Queue", # testqueue diff --git a/src/sheppy/models.py b/src/sheppy/models.py index 9db8ec8..08af11e 100644 --- a/src/sheppy/models.py +++ b/src/sheppy/models.py @@ -26,6 +26,9 @@ TASK_CRON_NS = UUID('7005b432-c135-4131-b19e-d3dc89703a9a') +# sentinel object for current task injection (def my_task(x: int, task: Task = CURRENT_TASK): ...) +CURRENT_TASK = object() + def cron_expression_validator(value: str) -> str: if not croniter.is_valid(value): diff --git a/src/sheppy/utils/task_execution.py b/src/sheppy/utils/task_execution.py index 31566c5..289c290 100644 --- a/src/sheppy/utils/task_execution.py +++ b/src/sheppy/utils/task_execution.py @@ -15,7 +15,7 @@ import anyio from pydantic import ConfigDict, PydanticSchemaGenerationError, TypeAdapter -from ..models import Task +from ..models import CURRENT_TASK, Task from .fastapi import Depends cache_main_module: str | None = None @@ -214,9 +214,13 @@ async def process_function_parameters( remaining_args = list(args) for param_name, param in list(signature.parameters.items()): - # Task injection (self: Task) + # current Task injection (current: Task = CURRENT_TASK) if task and TaskProcessor._is_task_injection(param): - final_args.append(task) + # inject positionally for positional params to maintain correct order + if param.kind in (inspect.Parameter.POSITIONAL_ONLY, inspect.Parameter.POSITIONAL_OR_KEYWORD): + final_args.append(task) + else: + final_kwargs[param_name] = task continue # validate positional args @@ -237,13 +241,7 @@ async def process_function_parameters( @staticmethod def _is_task_injection(param: inspect.Parameter) -> bool: - if param.name != 'self': - return False - - if param.annotation == inspect.Parameter.empty: - return False - - return param.annotation is Task or param.annotation == 'Task' + return param.default is CURRENT_TASK @staticmethod diff --git a/src/sheppy/utils/validation.py b/src/sheppy/utils/validation.py index c7c95a1..0f31ca6 100644 --- a/src/sheppy/utils/validation.py +++ b/src/sheppy/utils/validation.py @@ -8,7 +8,7 @@ from pydantic import TypeAdapter, ValidationError -from sheppy.models import Task +from sheppy.models import CURRENT_TASK from .fastapi import Depends @@ -39,11 +39,8 @@ def validate_input( for param_name, param in signature.parameters.items(): + # task self injection if _is_task_injection(param): - if param.default != inspect.Parameter.empty: - raise ValidationError.from_exception_data( - f"Task injection parameter '{param_name}' cannot have a default value", line_errors=[] - ) if param_name in remaining_kwargs: raise ValidationError.from_exception_data( f"Cannot provide value for Task injection parameter '{param_name}'", line_errors=[] @@ -109,10 +106,7 @@ def validate_input( def _is_task_injection(param: inspect.Parameter) -> bool: - if param.name != 'self': - return False - - return param.annotation is Task or param.annotation == 'Task' + return param.default is CURRENT_TASK def _is_depends_parameter(param: inspect.Parameter) -> bool: diff --git a/tests/conftest.py b/tests/conftest.py index 4d8fbc1..cdc9c34 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,7 +4,7 @@ import pytest import pytest_asyncio -from sheppy import Queue, Task, Worker, task +from sheppy import CURRENT_TASK, Queue, Task, Worker, task from sheppy.backend import Backend, MemoryBackend, RedisBackend TEST_QUEUE_NAME = "pytest" @@ -67,15 +67,15 @@ async def queue(backend: Backend) -> Queue: @task(retry=2, retry_delay=0.1) -async def async_fail_once(self: Task) -> str: - if self.retry_count == 0: +async def async_fail_once(current: Task = CURRENT_TASK) -> str: + if current.retry_count == 0: raise Exception("transient error") return "ok" @task(retry=2, retry_delay=0) -def sync_fail_once(self: Task) -> str: - if self.retry_count == 0: +def sync_fail_once(current: Task = CURRENT_TASK) -> str: + if current.retry_count == 0: raise Exception("transient error") return "ok" diff --git a/tests/dependencies.py b/tests/dependencies.py index 17e2cef..19c8a72 100644 --- a/tests/dependencies.py +++ b/tests/dependencies.py @@ -5,7 +5,7 @@ from pydantic import BaseModel -from sheppy import Depends, Task, task +from sheppy import CURRENT_TASK, Depends, Task, task class User(BaseModel): @@ -139,18 +139,18 @@ async def async_task_with_pydantic_model(user: User) -> Status: @task -def task_with_self(self: Task, x: int, y: int) -> dict[str, Any]: - return {"task_id": self.id, "result": x + y} +def task_with_current_task(current: Task = CURRENT_TASK, x: int = 5, y: int = 6) -> dict[str, Any]: + return {"task_id": current.id, "result": x + y} @task -def task_with_self_middle(x: int, self: Task, y: int) -> dict[str, Any]: - return {"task_id": self.id, "result": x + y} +def task_with_current_task_middle(x: int, current: Task = CURRENT_TASK, y: int = 7) -> dict[str, Any]: + return {"task_id": current.id, "result": x + y} @task -def task_with_self_end(x: int, y: int, self: Task) -> dict[str, Any]: - return {"task_id": self.id, "result": x + y} +def task_with_current_task_end(x: int, y: int, current: Task = CURRENT_TASK) -> dict[str, Any]: + return {"task_id": current.id, "result": x + y} @task @@ -482,9 +482,9 @@ def deep_recursion_tasks() -> list[TaskTestCase]: def self_referencing_tasks() -> list[TaskTestCase]: """Tasks that should fail.""" return [ - TaskTestCase("task_with_self", task_with_self, (22, 33), expected_result=55), - TaskTestCase("task_with_self_middle", task_with_self_middle, (22, 33), expected_result=55), - TaskTestCase("task_with_self_end", task_with_self_end, (22, 33), expected_result=55), + TaskTestCase("task_with_current_task", task_with_current_task, (), {"x": 22, "y": 33}, expected_result=55), + TaskTestCase("task_with_current_task_middle", task_with_current_task_middle, (22, ), {"y": 33}, expected_result=55), + TaskTestCase("task_with_current_task_end", task_with_current_task_end, (22, 33), expected_result=55), ] @staticmethod