Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 34 additions & 10 deletions apps/workers/broker.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,51 @@
import importlib
import json
import logging
from functools import wraps

from django.utils import timezone

from . import registry

log = logging.getLogger(__name__)

def task(schedule=None):

def run_now(handler, *args, **kwargs):
try:
module_path, function_name = handler.rsplit('.', 1)
module = importlib.import_module(module_path)
fn = getattr(module, function_name)
results = fn.__wrapped__(*args, **kwargs)
return results
except Exception as e:
log.exception(e)


def task(schedule=None, run_at=timezone.now()):
def handler(f):
path = '{0}.{1}'.format(f.__module__, f.__name__)

if schedule:
registry.add(path, schedule)
registry.add(path, schedule, run_at)

@wraps(f)
def wrapper(*args, **kwargs):
run_at = kwargs.pop('_schedule', timezone.now())
from .models import Task
Task.objects.create(
handler=path,
args=json.dumps(args),
kwargs=json.dumps(kwargs),
run_at=run_at,
)
kwargs_run_at = kwargs.pop('_schedule', None)
if kwargs_run_at:
run_at_time = kwargs_run_at
else:
run_at_time = run_at

if kwargs.pop('now', False):
return run_now(path, *args, **kwargs)
else:
from .models import Task
Task.objects.get_or_create(
handler=path,
args=json.dumps(args),
kwargs=json.dumps(kwargs),
run_at=run_at_time,
status=Task.WAITING
)
return wrapper
return handler
3 changes: 2 additions & 1 deletion apps/workers/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import signal
import time
import importlib
from datetime import timedelta
from multiprocessing import get_context

from django.db import transaction
Expand Down Expand Up @@ -69,7 +70,7 @@ def handle_callback(result):
task.save()

if task.schedule:
Task.create_scheduled_task(task.handler, task.schedule)
Task.create_scheduled_task(task.handler, task.schedule, run_at=task.run_at + timedelta(seconds=task.schedule))


def run_forever():
Expand Down
4 changes: 2 additions & 2 deletions apps/workers/management/commands/runworkers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ def handle(self, *args, **options):
scheduled = registry.get()
if scheduled:
from ...models import Task
for handler, schedule in scheduled:
Task.create_scheduled_task(handler, schedule)
for handler, schedule, run_at in scheduled:
Task.create_scheduled_task(handler, schedule, run_at)

# Close active db connection so workers create their own
# This is REQUIRED for multiprocessing to work with Django
Expand Down
12 changes: 5 additions & 7 deletions apps/workers/models.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from datetime import timedelta
import logging

from django.db import models
from django.utils import timezone


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -40,16 +38,16 @@ def __str__(self):
return self.handler

@staticmethod
def create_scheduled_task(handler, schedule):
def create_scheduled_task(handler, schedule, run_at):
if Task.objects.filter(handler=handler, schedule=schedule, status=Task.WAITING).exists():
return

scheduled_time = timezone.now() + timedelta(seconds=schedule)
log.debug('scheduling task: {0} for {1}'.format(handler, scheduled_time))
Task.objects.create(
log.debug('scheduling task: {0} for {1}'.format(handler, run_at))
Task.objects.get_or_create(
handler=handler,
args={},
kwargs={},
schedule=schedule,
run_at=scheduled_time,
run_at=run_at,
status=Task.WAITING
)
4 changes: 2 additions & 2 deletions apps/workers/registry.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
_registry = []


def add(handler, schedule):
token = (handler, schedule)
def add(handler, schedule, run_at):
token = (handler, schedule, run_at)
if token not in _registry:
_registry.append(token)

Expand Down
6 changes: 6 additions & 0 deletions apps/workers/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from apps.workers import task


@task()
def test(stuff_to_print):
return stuff_to_print
49 changes: 49 additions & 0 deletions apps/workers/tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from datetime import timedelta
from unittest.mock import patch
from django.db.models import Q
from django.utils import timezone
from rest_framework.test import APITestCase

from apps.workers.tasks import test
from apps.workers.models import Task

string = "something to print"
future_time = timezone.now() + timedelta(minutes=10)


class WorkerTests(APITestCase):
@patch('apps.user.mail.Mail.send')
def test_sample_task_creation(self, mock_send):
"""
Ensure we can queue a sample task.
"""
old_task_ids = Task.objects.all().values_list('id', flat=True)
old_task_count = Task.objects.all().count()
test(string)
new_task = Task.objects.filter(Q(id__in=old_task_ids)).first()
new_task_count = Task.objects.all().count()
self.assertIsNotNone(new_task)
self.assertEqual(new_task_count - old_task_count, 1)
self.assertEqual(new_task.args, f'["{string}"]')

def test_task_direct_run(self):
"""
Ensure we can run a sample task directly.
"""
string = "something to print"
testing_direct_run = test(string, now=True)
self.assertEqual(testing_direct_run, string)

def test_task_schedule_time(self):
"""
Ensure we can run a sample task at specific time.
"""
old_task_ids = Task.objects.all().values_list('id', flat=True)
old_task_count = Task.objects.all().count()
test(string, _schedule=future_time)
new_task = Task.objects.filter(Q(id__in=old_task_ids)).first()
new_task_count = Task.objects.all().count()
self.assertIsNotNone(new_task)
self.assertEqual(new_task_count - old_task_count, 1)
self.assertEqual(new_task.args, f'["{string}"]')
self.assertEqual(new_task.run_at, future_time)