Skip to content

Commit 979adff

Browse files
authored
feat(worker): add support for --max-prefetch (#12)
1 parent 4e8bbd0 commit 979adff

File tree

2 files changed

+10
-3
lines changed

2 files changed

+10
-3
lines changed

src/sheppy/cli/commands/work.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ def work(
1717
backend: BackendType = typer.Option(BackendType.redis, "--backend", "-b", help="Queue backend type"),
1818
redis_url: str = typer.Option("redis://127.0.0.1:6379", "--redis-url", "-r", help="Redis server URL"),
1919
max_concurrent: int = typer.Option(10, "--max-concurrent", "-c", help="Max concurrent tasks", min=1),
20+
max_prefetch: int | None = typer.Option(None, "--max-prefetch", help="Max prefetch tasks", min=1),
2021
autoreload: bool = typer.Option(False, "--reload", help="Reload worker on file changes"),
2122
oneshot: bool = typer.Option(False, "--oneshot", help="Process pending tasks and then exit"),
2223
max_tasks: int | None = typer.Option(None, "--max-tasks", help="Maximum amount of tasks to process", min=1),
@@ -70,16 +71,17 @@ def work(
7071
console.print("[yellow]Warning: --oneshot is not compatible with --reload, ignoring[/yellow]")
7172

7273
run_process('.', target=_start_worker,
73-
args=(queues, backend_instance, max_concurrent, log_level,
74+
args=(queues, backend_instance, max_concurrent, max_prefetch, log_level,
7475
disable_job_processing, disable_scheduler, disable_cron_manager),
7576
callback=lambda _: console.print("Detected file changes, reloading worker..."))
7677
else:
77-
_start_worker(queues, backend_instance, max_concurrent, log_level,
78+
_start_worker(queues, backend_instance, max_concurrent, max_prefetch, log_level,
7879
disable_job_processing, disable_scheduler, disable_cron_manager,
7980
oneshot, max_tasks)
8081

8182

82-
def _start_worker(queues: list[str], backend: Backend, max_concurrent: int, log_level: LogLevel,
83+
def _start_worker(queues: list[str], backend: Backend, max_concurrent: int, max_prefetch_tasks: int | None,
84+
log_level: LogLevel,
8385
disable_job_processing: bool, disable_scheduler: bool, disable_cron_manager: bool,
8486
oneshot: bool = False, max_tasks: int | None = None,
8587
) -> None:
@@ -96,6 +98,7 @@ def _start_worker(queues: list[str], backend: Backend, max_concurrent: int, log_
9698
))
9799

98100
worker = Worker(queues, backend=backend, max_concurrent_tasks=max_concurrent,
101+
max_prefetch_tasks=max_prefetch_tasks,
99102
enable_job_processing=not disable_job_processing,
100103
enable_scheduler=not disable_scheduler,
101104
enable_cron_manager=not disable_cron_manager)

src/sheppy/worker.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ def __init__(
7474
backend: Backend,
7575
shutdown_timeout: float = 30.0,
7676
max_concurrent_tasks: int = 10,
77+
max_prefetch_tasks: int | None = None,
7778
enable_job_processing: bool = True,
7879
enable_scheduler: bool = True,
7980
enable_cron_manager: bool = True,
@@ -93,6 +94,7 @@ def __init__(
9394

9495
self._task_processor = TaskProcessor()
9596
self._task_semaphore = asyncio.Semaphore(max_concurrent_tasks)
97+
self._max_prefetch_tasks = max_prefetch_tasks
9698
self._shutdown_event = asyncio.Event()
9799
self._ctrl_c_counter = 0
98100

@@ -277,6 +279,8 @@ async def _run_worker_loop(self, queue: Queue, oneshot: bool = False) -> None:
277279

278280
# how many tasks to get
279281
capacity = self._task_semaphore._value
282+
if self._max_prefetch_tasks:
283+
capacity = min(capacity, self._max_prefetch_tasks)
280284
if self._tasks_to_process is not None:
281285
capacity = min(capacity, self._tasks_to_process)
282286

0 commit comments

Comments
 (0)