diff --git a/apps/workers/broker.py b/apps/workers/broker.py index d91cf2d..5c4befa 100644 --- a/apps/workers/broker.py +++ b/apps/workers/broker.py @@ -1,27 +1,51 @@ +import importlib import json +import logging from functools import wraps from django.utils import timezone from . import registry +log = logging.getLogger(__name__) -def task(schedule=None): + +def run_now(handler, *args, **kwargs): + try: + module_path, function_name = handler.rsplit('.', 1) + module = importlib.import_module(module_path) + fn = getattr(module, function_name) + results = fn.__wrapped__(*args, **kwargs) + return results + except Exception as e: + log.exception(e) + + +def task(schedule=None, run_at=timezone.now()): def handler(f): path = '{0}.{1}'.format(f.__module__, f.__name__) if schedule: - registry.add(path, schedule) + registry.add(path, schedule, run_at) @wraps(f) def wrapper(*args, **kwargs): - run_at = kwargs.pop('_schedule', timezone.now()) - from .models import Task - Task.objects.create( - handler=path, - args=json.dumps(args), - kwargs=json.dumps(kwargs), - run_at=run_at, - ) + kwargs_run_at = kwargs.pop('_schedule', None) + if kwargs_run_at: + run_at_time = kwargs_run_at + else: + run_at_time = run_at + + if kwargs.pop('now', False): + return run_now(path, *args, **kwargs) + else: + from .models import Task + Task.objects.get_or_create( + handler=path, + args=json.dumps(args), + kwargs=json.dumps(kwargs), + run_at=run_at_time, + status=Task.WAITING + ) return wrapper return handler diff --git a/apps/workers/consumer.py b/apps/workers/consumer.py index fef3d82..e7ac8ac 100644 --- a/apps/workers/consumer.py +++ b/apps/workers/consumer.py @@ -4,6 +4,7 @@ import signal import time import importlib +from datetime import timedelta from multiprocessing import get_context from django.db import transaction @@ -69,7 +70,7 @@ def handle_callback(result): task.save() if task.schedule: - Task.create_scheduled_task(task.handler, task.schedule) + Task.create_scheduled_task(task.handler, task.schedule, run_at=task.run_at + timedelta(seconds=task.schedule)) def run_forever(): diff --git a/apps/workers/management/commands/runworkers.py b/apps/workers/management/commands/runworkers.py index 63ca4b4..d1cd55a 100644 --- a/apps/workers/management/commands/runworkers.py +++ b/apps/workers/management/commands/runworkers.py @@ -14,8 +14,8 @@ def handle(self, *args, **options): scheduled = registry.get() if scheduled: from ...models import Task - for handler, schedule in scheduled: - Task.create_scheduled_task(handler, schedule) + for handler, schedule, run_at in scheduled: + Task.create_scheduled_task(handler, schedule, run_at) # Close active db connection so workers create their own # This is REQUIRED for multiprocessing to work with Django diff --git a/apps/workers/models.py b/apps/workers/models.py index 9213586..bf92ad1 100644 --- a/apps/workers/models.py +++ b/apps/workers/models.py @@ -1,8 +1,6 @@ -from datetime import timedelta import logging from django.db import models -from django.utils import timezone log = logging.getLogger(__name__) @@ -40,16 +38,16 @@ def __str__(self): return self.handler @staticmethod - def create_scheduled_task(handler, schedule): + def create_scheduled_task(handler, schedule, run_at): if Task.objects.filter(handler=handler, schedule=schedule, status=Task.WAITING).exists(): return - scheduled_time = timezone.now() + timedelta(seconds=schedule) - log.debug('scheduling task: {0} for {1}'.format(handler, scheduled_time)) - Task.objects.create( + log.debug('scheduling task: {0} for {1}'.format(handler, run_at)) + Task.objects.get_or_create( handler=handler, args={}, kwargs={}, schedule=schedule, - run_at=scheduled_time, + run_at=run_at, + status=Task.WAITING ) diff --git a/apps/workers/registry.py b/apps/workers/registry.py index 65c69d4..b03a62b 100644 --- a/apps/workers/registry.py +++ b/apps/workers/registry.py @@ -1,8 +1,8 @@ _registry = [] -def add(handler, schedule): - token = (handler, schedule) +def add(handler, schedule, run_at): + token = (handler, schedule, run_at) if token not in _registry: _registry.append(token) diff --git a/apps/workers/tasks.py b/apps/workers/tasks.py new file mode 100644 index 0000000..5bb7e15 --- /dev/null +++ b/apps/workers/tasks.py @@ -0,0 +1,6 @@ +from apps.workers import task + + +@task() +def test(stuff_to_print): + return stuff_to_print diff --git a/apps/workers/tests.py b/apps/workers/tests.py new file mode 100644 index 0000000..babe881 --- /dev/null +++ b/apps/workers/tests.py @@ -0,0 +1,49 @@ +from datetime import timedelta +from unittest.mock import patch +from django.db.models import Q +from django.utils import timezone +from rest_framework.test import APITestCase + +from apps.workers.tasks import test +from apps.workers.models import Task + +string = "something to print" +future_time = timezone.now() + timedelta(minutes=10) + + +class WorkerTests(APITestCase): + @patch('apps.user.mail.Mail.send') + def test_sample_task_creation(self, mock_send): + """ + Ensure we can queue a sample task. + """ + old_task_ids = Task.objects.all().values_list('id', flat=True) + old_task_count = Task.objects.all().count() + test(string) + new_task = Task.objects.filter(Q(id__in=old_task_ids)).first() + new_task_count = Task.objects.all().count() + self.assertIsNotNone(new_task) + self.assertEqual(new_task_count - old_task_count, 1) + self.assertEqual(new_task.args, f'["{string}"]') + + def test_task_direct_run(self): + """ + Ensure we can run a sample task directly. + """ + string = "something to print" + testing_direct_run = test(string, now=True) + self.assertEqual(testing_direct_run, string) + + def test_task_schedule_time(self): + """ + Ensure we can run a sample task at specific time. + """ + old_task_ids = Task.objects.all().values_list('id', flat=True) + old_task_count = Task.objects.all().count() + test(string, _schedule=future_time) + new_task = Task.objects.filter(Q(id__in=old_task_ids)).first() + new_task_count = Task.objects.all().count() + self.assertIsNotNone(new_task) + self.assertEqual(new_task_count - old_task_count, 1) + self.assertEqual(new_task.args, f'["{string}"]') + self.assertEqual(new_task.run_at, future_time)