Skip to content

Commit e4cefa5

Browse files
authored
Move worker and worker_pool CLI commands to a separate file (rq#2240)
* Move worker and worker_pool CLI commands to a separate file * Formatting fixes * 1 more formatting fix
1 parent a7f0e45 commit e4cefa5

File tree

9 files changed

+277
-269
lines changed

9 files changed

+277
-269
lines changed

rq/cli/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@
33

44
# TODO: the following imports can be removed when we drop the `rqinfo` and
55
# `rqworkers` commands in favor of just shipping the `rq` command.
6-
from .cli import info, worker
6+
from .cli import info
7+
from .workers import worker

rq/cli/cli.py

Lines changed: 2 additions & 227 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,7 @@
22
RQ command line tool
33
"""
44

5-
import logging
6-
import logging.config
7-
import os
85
import sys
9-
import warnings
10-
from typing import TYPE_CHECKING, List, Type, cast
116

127
import click
138
from redis.exceptions import ConnectionError
@@ -18,39 +13,22 @@
1813
parse_function_args,
1914
parse_schedule,
2015
pass_cli_config,
21-
read_config_file,
2216
refresh,
23-
setup_loghandlers_from_args,
2417
show_both,
2518
show_queues,
2619
show_workers,
2720
)
2821

2922
# from rq.cli.pool import pool
30-
from rq.defaults import (
31-
DEFAULT_JOB_MONITORING_INTERVAL,
32-
DEFAULT_LOGGING_DATE_FORMAT,
33-
DEFAULT_LOGGING_FORMAT,
34-
DEFAULT_MAINTENANCE_TASK_INTERVAL,
35-
DEFAULT_RESULT_TTL,
36-
DEFAULT_WORKER_TTL,
37-
)
3823
from rq.exceptions import InvalidJobOperationError
39-
from rq.job import Job, JobStatus
24+
from rq.job import JobStatus
4025
from rq.logutils import blue
4126
from rq.registry import FailedJobRegistry, clean_registries
42-
from rq.serializers import DefaultSerializer
43-
from rq.suspension import is_suspended
4427
from rq.suspension import resume as connection_resume
4528
from rq.suspension import suspend as connection_suspend
46-
from rq.utils import get_call_string, import_attribute
47-
from rq.worker import Worker
48-
from rq.worker_pool import WorkerPool
29+
from rq.utils import get_call_string
4930
from rq.worker_registration import clean_worker_registry
5031

51-
if TYPE_CHECKING:
52-
from rq.serializers import Serializer
53-
5432

5533
@click.group()
5634
@click.version_option(version)
@@ -162,146 +140,6 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
162140
sys.exit(0)
163141

164142

165-
@main.command()
166-
@click.option('--burst', '-b', is_flag=True, help='Run in burst mode (quit after all work is done)')
167-
@click.option('--logging_level', type=str, default=None, help='Set logging level')
168-
@click.option('--log-format', type=str, default=DEFAULT_LOGGING_FORMAT, help='Set the format of the logs')
169-
@click.option('--date-format', type=str, default=DEFAULT_LOGGING_DATE_FORMAT, help='Set the date format of the logs')
170-
@click.option('--name', '-n', help='Specify a different name')
171-
@click.option('--results-ttl', type=int, default=DEFAULT_RESULT_TTL, help='Default results timeout to be used')
172-
@click.option('--worker-ttl', type=int, default=DEFAULT_WORKER_TTL, help='Worker timeout to be used')
173-
@click.option(
174-
'--maintenance-interval',
175-
type=int,
176-
default=DEFAULT_MAINTENANCE_TASK_INTERVAL,
177-
help='Maintenance task interval (in seconds) to be used',
178-
)
179-
@click.option(
180-
'--job-monitoring-interval',
181-
type=int,
182-
default=DEFAULT_JOB_MONITORING_INTERVAL,
183-
help='Default job monitoring interval to be used',
184-
)
185-
@click.option('--disable-job-desc-logging', is_flag=True, help='Turn off description logging.')
186-
@click.option('--verbose', '-v', is_flag=True, help='Show more output')
187-
@click.option('--quiet', '-q', is_flag=True, help='Show less output')
188-
@click.option('--exception-handler', help='Exception handler(s) to use', multiple=True)
189-
@click.option('--pid', help='Write the process ID number to a file at the specified path')
190-
@click.option('--disable-default-exception-handler', '-d', is_flag=True, help="Disable RQ's default exception handler")
191-
@click.option('--max-jobs', type=int, default=None, help='Maximum number of jobs to execute')
192-
@click.option('--max-idle-time', type=int, default=None, help='Maximum seconds to stay alive without jobs to execute')
193-
@click.option('--with-scheduler', '-s', is_flag=True, help='Run worker with scheduler')
194-
@click.option('--serializer', '-S', default=None, help='Run worker with custom serializer')
195-
@click.option(
196-
'--dequeue-strategy', '-ds', default='default', help='Sets a custom stratey to dequeue from multiple queues'
197-
)
198-
@click.argument('queues', nargs=-1)
199-
@pass_cli_config
200-
def worker(
201-
cli_config,
202-
burst,
203-
logging_level,
204-
name,
205-
results_ttl,
206-
worker_ttl,
207-
maintenance_interval,
208-
job_monitoring_interval,
209-
disable_job_desc_logging,
210-
verbose,
211-
quiet,
212-
exception_handler,
213-
pid,
214-
disable_default_exception_handler,
215-
max_jobs,
216-
max_idle_time,
217-
with_scheduler,
218-
queues,
219-
log_format,
220-
date_format,
221-
serializer,
222-
dequeue_strategy,
223-
**options,
224-
):
225-
"""Starts an RQ worker."""
226-
settings = read_config_file(cli_config.config) if cli_config.config else {}
227-
# Worker specific default arguments
228-
queues = queues or settings.get('QUEUES', ['default'])
229-
name = name or settings.get('NAME')
230-
dict_config = settings.get('DICT_CONFIG')
231-
232-
if dict_config:
233-
logging.config.dictConfig(dict_config)
234-
235-
if pid:
236-
with open(os.path.expanduser(pid), 'w') as fp:
237-
fp.write(str(os.getpid()))
238-
239-
worker_name = cli_config.worker_class.__qualname__
240-
if worker_name in ['RoundRobinWorker', 'RandomWorker']:
241-
strategy_alternative = 'random' if worker_name == 'RandomWorker' else 'round_robin'
242-
msg = f'WARNING: {worker_name} is deprecated. Use `--dequeue-strategy {strategy_alternative}` instead.'
243-
warnings.warn(msg, DeprecationWarning)
244-
click.secho(msg, fg='yellow')
245-
246-
if dequeue_strategy not in ('default', 'random', 'round_robin'):
247-
click.secho(
248-
'ERROR: Dequeue Strategy can only be one of `default`, `random` or `round_robin`.', err=True, fg='red'
249-
)
250-
sys.exit(1)
251-
252-
setup_loghandlers_from_args(verbose, quiet, date_format, log_format)
253-
254-
try:
255-
exception_handlers = []
256-
for h in exception_handler:
257-
exception_handlers.append(import_attribute(h))
258-
259-
if is_suspended(cli_config.connection):
260-
click.secho('RQ is currently suspended, to resume job execution run "rq resume"', fg='red')
261-
sys.exit(1)
262-
263-
queues = [
264-
cli_config.queue_class(
265-
queue, connection=cli_config.connection, job_class=cli_config.job_class, serializer=serializer
266-
)
267-
for queue in queues
268-
]
269-
worker = cli_config.worker_class(
270-
queues,
271-
name=name,
272-
connection=cli_config.connection,
273-
default_worker_ttl=worker_ttl, # TODO remove this arg in 2.0
274-
worker_ttl=worker_ttl,
275-
default_result_ttl=results_ttl,
276-
maintenance_interval=maintenance_interval,
277-
job_monitoring_interval=job_monitoring_interval,
278-
job_class=cli_config.job_class,
279-
queue_class=cli_config.queue_class,
280-
exception_handlers=exception_handlers or None,
281-
disable_default_exception_handler=disable_default_exception_handler,
282-
log_job_description=not disable_job_desc_logging,
283-
serializer=serializer,
284-
)
285-
286-
# if --verbose or --quiet, override --logging_level
287-
if verbose or quiet:
288-
logging_level = None
289-
290-
worker.work(
291-
burst=burst,
292-
logging_level=logging_level,
293-
date_format=date_format,
294-
log_format=log_format,
295-
max_jobs=max_jobs,
296-
max_idle_time=max_idle_time,
297-
with_scheduler=with_scheduler,
298-
dequeue_strategy=dequeue_strategy,
299-
)
300-
except ConnectionError as e:
301-
logging.error(e)
302-
sys.exit(1)
303-
304-
305143
@main.command()
306144
@click.option('--duration', help='Seconds you want the workers to be suspended. Default is forever.', type=int)
307145
@pass_cli_config
@@ -428,68 +266,5 @@ def enqueue(
428266
click.echo("Enqueued %s with job-id '%s'." % (blue(function_string), job.id))
429267

430268

431-
@main.command()
432-
@click.option('--burst', '-b', is_flag=True, help='Run in burst mode (quit after all work is done)')
433-
@click.option('--logging-level', '-l', type=str, default='INFO', help='Set logging level')
434-
@click.option('--verbose', '-v', is_flag=True, help='Show more output')
435-
@click.option('--quiet', '-q', is_flag=True, help='Show less output')
436-
@click.option('--log-format', type=str, default=DEFAULT_LOGGING_FORMAT, help='Set the format of the logs')
437-
@click.option('--date-format', type=str, default=DEFAULT_LOGGING_DATE_FORMAT, help='Set the date format of the logs')
438-
@click.option('--job-class', type=str, default=None, help='Dotted path to a Job class')
439-
@click.argument('queues', nargs=-1)
440-
@click.option('--num-workers', '-n', type=int, default=1, help='Number of workers to start')
441-
@pass_cli_config
442-
def worker_pool(
443-
cli_config,
444-
burst: bool,
445-
logging_level,
446-
queues,
447-
serializer,
448-
verbose,
449-
quiet,
450-
log_format,
451-
date_format,
452-
worker_class,
453-
job_class,
454-
num_workers,
455-
**options,
456-
):
457-
"""Starts a RQ worker pool"""
458-
settings = read_config_file(cli_config.config) if cli_config.config else {}
459-
# Worker specific default arguments
460-
queue_names: List[str] = queues or settings.get('QUEUES', ['default'])
461-
462-
setup_loghandlers_from_args(verbose, quiet, date_format, log_format)
463-
464-
if serializer:
465-
serializer_class = cast(Type['Serializer'], import_attribute(serializer))
466-
else:
467-
serializer_class = DefaultSerializer
468-
469-
if worker_class:
470-
worker_class = import_attribute(worker_class)
471-
else:
472-
worker_class = Worker
473-
474-
if job_class:
475-
job_class = import_attribute(job_class)
476-
else:
477-
job_class = Job
478-
479-
# if --verbose or --quiet, override --logging_level
480-
if verbose or quiet:
481-
logging_level = None
482-
483-
pool = WorkerPool(
484-
queue_names,
485-
connection=cli_config.connection,
486-
num_workers=num_workers,
487-
serializer=serializer_class,
488-
worker_class=worker_class,
489-
job_class=job_class,
490-
)
491-
pool.start(burst=burst, logging_level=logging_level)
492-
493-
494269
if __name__ == '__main__':
495270
main()

0 commit comments

Comments
 (0)