diff --git a/docs/source/class.rst b/docs/source/class.rst index 6121909..e69de29 100644 --- a/docs/source/class.rst +++ b/docs/source/class.rst @@ -1,28 +0,0 @@ -.. module:: pyres - -ResQ Classes -========================================== - -.. autoclass:: pyres.ResQ - :members: - -Job Classes -================= - -.. autoclass:: pyres.job.Job - :members: - -Worker Classes -================= - -.. autoclass:: pyres.worker.Worker - :members: - -Failure Classes -================= - -.. autoclass:: pyres.failure.base.BaseBackend - :members: - -.. autoclass:: pyres.failure.RedisBackend - :members: diff --git a/docs/source/conf.py b/docs/source/conf.py index 545ed07..71414ec 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -32,7 +32,7 @@ # The encoding of source files. #source_encoding = 'utf-8' - +autoclass_content = 'both' # The master toctree document. master_doc = 'index' diff --git a/docs/source/failure_example.rst b/docs/source/failure_example.rst new file mode 100644 index 0000000..5ad6cc2 --- /dev/null +++ b/docs/source/failure_example.rst @@ -0,0 +1,18 @@ +Failure Example +=============== + +Pyres provides a ``BaseBackend`` for handling failed jobs. You can subclass +this backend to store failed jobs in any system you like. + +Currently, the only provided backend is a ``RedisBackend`` which will store +your failed jobs into a special *failed* queue for later processing or +reenqueueing. + +Here's a simple example:: + + >>> from pyres import failure + >>> from pyres.job import Job + >>> from pyres import ResQ + >>> r = ResQ() + >>> job = Job.reserve('basic', r) + >>> job.fail('problem') diff --git a/docs/source/failures.rst b/docs/source/failures.rst index ea1db83..1fc8066 100644 --- a/docs/source/failures.rst +++ b/docs/source/failures.rst @@ -1,18 +1,16 @@ -Failures -=============== +.. module:: pyres -Pyres provides a ``BaseBackend`` for handling failed jobs. You can subclass -this backend to store failed jobs in any system you like. +Failure Classes +================= -Currently, the only provided backend is a ``RedisBackend`` which will store -your failed jobs into a special *failed* queue for later processing or -reenqueueing. +.. autoclass:: pyres.failure.base.BaseBackend + :members: + +.. autoclass:: pyres.failure.redis.RedisBackend + :members: -Here's a simple example:: +.. autoclass:: pyres.failure.multiple.MultipleBackend + :members: - >>> from pyres import failure - >>> from pyres.job import Job - >>> from pyres import ResQ - >>> r = ResQ() - >>> job = Job.reserve('basic', r) - >>> job.fail('problem') +.. autoclass:: pyres.failure.mail.MailBackend + :members: diff --git a/docs/source/index.rst b/docs/source/index.rst index 20d8c58..f2cdd19 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -9,15 +9,18 @@ Welcome to pyres's documentation! Contents: .. toctree:: - :maxdepth: 2 - - intro - install - example - class - tests - failures - horde + :maxdepth: 2 + + intro + install + example + resq + job + workers + failures + failure_example + tests + horde Indices and tables diff --git a/docs/source/job.rst b/docs/source/job.rst new file mode 100644 index 0000000..c55fece --- /dev/null +++ b/docs/source/job.rst @@ -0,0 +1,7 @@ +.. module:: pyres + +Job Classes +================= + +.. autoclass:: pyres.job.Job + :members: diff --git a/docs/source/resq.rst b/docs/source/resq.rst new file mode 100644 index 0000000..552e085 --- /dev/null +++ b/docs/source/resq.rst @@ -0,0 +1,7 @@ +.. module:: pyres + +The ResQ Class +========================================== + +.. autoclass:: pyres.ResQ + :members: diff --git a/docs/source/workers.rst b/docs/source/workers.rst new file mode 100644 index 0000000..f52d046 --- /dev/null +++ b/docs/source/workers.rst @@ -0,0 +1,7 @@ +.. module:: pyres + +Worker Classes +================= + +.. autoclass:: pyres.worker.Worker + :members: diff --git a/pyres/__init__.py b/pyres/__init__.py index 011cd88..413d97f 100644 --- a/pyres/__init__.py +++ b/pyres/__init__.py @@ -124,11 +124,6 @@ class ResQ(object): """The ResQ class defines the Redis server object to which we will enqueue jobs into various queues. - The ``__init__`` takes these keyword arguments: - - ``server`` -- IP address and port of the Redis server to which you want to connect, and optional Redis DB number. Default is `localhost:6379`. - - ``password`` -- The password, if required, of your Redis server. Default is "None". Example usage:: @@ -145,15 +140,44 @@ class ResQ(object): """ def __init__(self, server="localhost:6379", password=None): + """ + :param server: The IP address and port of the Redis server to which + you want to connect, and optional Redis DB number. + In the format `server:port` e.g. `localhost:6379` + :type server: str + :param password: The password, if required, of the Redis server + :type password: str + """ self.password = password self.redis = server self._watched_queues = set() def push(self, queue, item): + """Push a work item on to the specified queue. + + See the Redis Docs: `RPUSH `_ + + :param queue: the name of the queue to use + :type queue: str + :param item: the work item to put on the queue + :type item: dict + """ self.watch_queue(queue) self.redis.rpush("resque:queue:%s" % queue, ResQ.encode(item)) def pop(self, queues, timeout=10): + """Fetch an item off of any of the specified queues + + See the Redis Docs: `BLPOP `_ + + :param queues: Names of queues to fetch from + :type queues: `list` of `str` + :param timeout: How long to block before giving up + :type timeout: int + + :returns: The name of the queue and the job + :rtype: (str, dict) + """ if isinstance(queues, string_types): queues = [queues] ret = self.redis.blpop(["resque:queue:%s" % q for q in queues], @@ -165,9 +189,25 @@ def pop(self, queues, timeout=10): return None, None def size(self, queue): + """Get the number of items on the specified queue + + See the Redis Docs: `LLEN `_ + + :param queue: The name of the queue to check + :type queue: str + + :returns: The number of items on the specified queue + :rtype: int + """ return int(self.redis.llen("resque:queue:%s" % queue)) def watch_queue(self, queue): + """Add a queue to be watched + + See the Redis Docs: `SADD `_ + :param queue: the name of the queue to be watched + :type queue: str + """ if queue in self._watched_queues: return else: @@ -175,9 +215,36 @@ def watch_queue(self, queue): self._watched_queues.add(queue) def peek(self, queue, start=0, count=1): + """Fetch `count` number of items in the specified `queue`, starting at + `start`. + + :param queue: The name of the queue to peek at + :type queue: str + :param start: Where in the queue to start fetching data from + :type start: int + :param count: The number of items to peek at + :type count: int + + :returns: A list of items in the queue + :rtype: `list` of `dict` + """ return self.list_range('resque:queue:%s' % queue, start, count) def list_range(self, key, start, count): + """Fetch `count` number of items in the specified `queue`, starting at + `start`. + + :param queue: The name of the queue to peek at. + It must start with 'resque:queue:'. + :type queue: str + :param start: Where in the queue to start fetching data from + :type start: int + :param count: The number of items to peek at + :type count: int + + :returns: A list of items in the queue + :rtype: `list` of `dict` + """ items = self.redis.lrange(key, start,start+count-1) or [] ret_list = [] for i in items: @@ -210,8 +277,16 @@ def _set_redis(self, server): redis = property(_get_redis, _set_redis) def enqueue(self, klass, *args): - """Enqueue a job into a specific queue. Make sure the class you are - passing has **queue** attribute and a **perform** method on it. + """Enqueue a job. The job will be enqueued in to the queue name + specified by ``klass.queue``, and it will eventually be run with + ``klass.perform(*args)``. + + :param klass: A class instance that represents a job + :type klass: Any class that has a ``queue`` attribute and a + class level ``perform`` method. + :param args: The arguments to be passed to the ``perform`` method of + the ``klass`` when it is called. + :type args: list """ queue = getattr(klass,'queue', None) @@ -233,14 +308,36 @@ def enqueue_from_string(self, klass_as_string, queue, *args, **kwargs): logger.debug("no arguments passed in.") def queues(self): + """Get the queues that are being watched for jobs. + + See the Redis Docs: `SMEMBERS `_ + + :returns: A list of queue names + :rtype: `list` of `str` + """ return [sm.decode() for sm in self.redis.smembers("resque:queues")] or [] def workers(self): + """Get the workers that are being used. + + See the Redis Docs: `SMEMBERS `_ + + :returns: A list of worker names + :rtype: `list` of `str` + """ return [w.decode() for w in self.redis.smembers("resque:workers")] or [] def info(self): - """Returns a dictionary of the current status of the pending jobs, - processed, no. of queues, no. of workers, no. of failed jobs. + """Get some information about the current status of pyres + + :returns: status information + :rtype: { + 'pending': int, + 'processed': int, + 'queues': `list` of `str`, + 'workers': `list` of `str`, + 'failed': int, + 'servers': `list` of `str`} """ pending = 0 @@ -257,10 +354,24 @@ def info(self): } def keys(self): + """Get a list of keys being used by pyres + + :returns: a list of keys + :rtype: `list` of `str` + """ return [key.decode().replace('resque:','') for key in self.redis.keys('resque:*')] def reserve(self, queues): + """Fetch a job from one of the queues in ``queues`` and mark it + as reserved. If no jobs are available, do nothing. + + :param queues: a list of queues to try to fetch from. + :type queues: `list` of `str` or just `str` + + :returns: the reserved :class:`Job` or `None` + :rtype: :class:`Job` or `None` + """ from pyres.job import Job return Job.reserve(queues, self) @@ -272,18 +383,40 @@ def working(self): return Worker.working(self) def remove_queue(self, queue): + """Remove a queue from the watched queues list + + See the Redis Docs: `SREM `_ + + :param queue: The name of the queue to remove + :type queue: str + """ if queue in self._watched_queues: self._watched_queues.remove(queue) self.redis.srem('resque:queues',queue) del self.redis['resque:queue:%s' % queue] def close(self): - """Close the underlying redis connection. - - """ + """Close the underlying redis connection.""" self.redis.connection_pool.get_connection('_').disconnect() def enqueue_at(self, datetime, klass, *args, **kwargs): + """Enqueue a job at a specific time. The job will be enqueued in to the + queue name specified by ``klass.queue``, and it will be run at or after + ``datetime`` with ``klass.perform(*args)``. Similar to :func:`enqueue`. + + .. note:: A :class:`Scheduler` instance must be + running for these jobs to run. + + :param datetime: When to run the the job + :type datetime: :py:mod:`datetime.datetime` + :param klass: A class instance that represents a job + :type klass: Any class that has a ``queue`` attribute and a + class level ``perform`` method. + :param args: The arguments to be passed to the ``perform`` method of + the ``klass`` when it is called. + :type args: list + + """ class_name = '%s.%s' % (klass.__module__, klass.__name__) self.enqueue_at_from_string(datetime, class_name, klass.queue, *args, **kwargs) @@ -298,18 +431,63 @@ def enqueue_at_from_string(self, datetime, klass_as_string, queue, *args, **kwar self.delayed_push(datetime, payload) def delayed_push(self, datetime, item): + """Schedule a work `item` to be run on a specified queue at or after + the specified `datetime`. Similar to :func:`enqueue`. + + See the Redis Docs: `RPUSH `_, \ + `ZADD `_ + + :param datetime: When to run the job + :type datetime: :py:mod:`datetime.datetime` + :param item: the work item to put on the queue + :type item: dict + """ key = int(time.mktime(datetime.timetuple())) self.redis.rpush('resque:delayed:%s' % key, ResQ.encode(item)) self.redis.zadd('resque:delayed_queue_schedule', key, key) def delayed_queue_peek(self, start, count): + """Fetch `count` number of item numbers, starting at `start`. + + See the Redis Docs: `RANGE `_ + + :param start: Where in the queue to start fetching data from + :type start: int + :param count: The number of items to peek at + :type count: int + + :returns: A list of items in the queue + :rtype: `list` of `int` + """ return [int(item) for item in self.redis.zrange( 'resque:delayed_queue_schedule', start, start+count) or []] def delayed_timestamp_peek(self, timestamp, start, count): + """Fetch `count` number of items scheduled for a given `timestamp`, + starting at `start`. + + + :param timestamp: The timestamp to check + :type timestamp: str + :param start: Where in the queue to start fetching data from + :type start: int + :param count: The number of items to peek at + :type count: int + + :returns: A list of items in the queue + :rtype: `list` of `dict` + """ return self.list_range('resque:delayed:%s' % timestamp, start, count) def delayed_queue_schedule_size(self): + """Fetch the number of scheduled jobs + + See the Redis Docs: `ZCARD `_, \ + `ZRANGE `_ + + :returns: the number of schedule jobs + :rtype: int + """ size = 0 length = self.redis.zcard('resque:delayed_queue_schedule') for i in self.redis.zrange('resque:delayed_queue_schedule',0,length): @@ -317,10 +495,28 @@ def delayed_queue_schedule_size(self): return size def delayed_timestamp_size(self, timestamp): + """Fetch the number of jobs scheduled for a specific time + + See the Redis Docs: `LLEN `_ + + :param timestamp: the timestamp to check + :type timestamp: str + + :returns: the number of jobs scheduled for the specified time + :rtype: int + """ #key = int(time.mktime(timestamp.timetuple())) return self.redis.llen("resque:delayed:%s" % timestamp) def next_delayed_timestamp(self): + """Get the timestamp of the next job that should be run + + See the Redis Docs: \ + `ZRANGEBYSCORE `_ + + :returns: The timestamp of the next job to run + :rtype: str + """ key = int(time.mktime(ResQ._current_time().timetuple())) array = self.redis.zrangebyscore('resque:delayed_queue_schedule', '-inf', key, start=0, num=1) @@ -332,6 +528,23 @@ def next_delayed_timestamp(self): return timestamp.decode() def next_item_for_timestamp(self, timestamp): + """Get the next job to be run at a given timestamp. If an item is + found, it will be removed from the queue so that it will not be run + again. + + See the Redis Docs: \ + `LPOP `_, \ + `LLEN `_, \ + `DEL `_, \ + `ZREM `_ + + + :param timestamp: The timestamp to get a job for + :type timestamp: str + + :returns: The job to run next + :rtype: dict + """ #key = int(time.mktime(timestamp.timetuple())) key = "resque:delayed:%s" % timestamp ret = self.redis.lpop(key) @@ -345,10 +558,24 @@ def next_item_for_timestamp(self, timestamp): @classmethod def encode(cls, item): + """Convert an `item` to a string (Currently JSON) + + :param item: the item to encode + :type item: dict + """ return json.dumps(item) @classmethod def decode(cls, item): + """Decode an `item`. If the `item` is not a string, call + its `decode` method (`item.decode()`) before running the regular + decoding. + + :param item: The item to decode + + :returns: The decoded item + :rtype: `dict` or `str` + """ if not isinstance(item, string_types): item = item.decode() ret = json.loads(item) diff --git a/pyres/failure/base.py b/pyres/failure/base.py index 330fbe4..88ccf7b 100644 --- a/pyres/failure/base.py +++ b/pyres/failure/base.py @@ -4,19 +4,21 @@ class BaseBackend(object): """Provides a base class that custom backends can subclass. Also provides basic traceback and message parsing. - - The ``__init__`` takes these keyword arguments: - - ``exp`` -- The exception generated by your failure. - - ``queue`` -- The queue in which the ``Job`` was enqueued when it failed. - - ``payload`` -- The payload that was passed to the ``Job``. - - ``worker`` -- The worker that was processing the ``Job`` when it failed. - """ def __init__(self, exp, queue, payload, worker=None): + """ + :param exp: The exception generated by your failure + :type exp: Exception + :param queue: The name of the queue in which the :class:`Job` + was enqueued when it failed. + :type queue: string + :param payload: The payload that was passed to the :class:`Job` + :type payload: dict + :param worker: The worker that was processing the :class:`Job` + when it failed. + :type worker: :class:`Worker` subclass + + """ excc = sys.exc_info()[0] self._exception = excc @@ -29,15 +31,20 @@ def __init__(self, exp, queue, payload, worker=None): self._queue = queue self._payload = payload - def _parse_traceback(self, trace): - """Return the given traceback string formatted for a notification.""" + """Return the given traceback string formatted for a notification. + :param trace: The traceback of an exception + :type trace: str + """ if not trace: return [] return trace.split('\n') def _parse_message(self, exc): - """Return a message for a notification from the given exception.""" + """Return a message for a notification from the given exception. + :param exc: An exception to create a message for + :type exc: Exception + """ return '%s: %s' % (exc.__class__.__name__, str(exc)) diff --git a/pyres/failure/mail.py b/pyres/failure/mail.py index 3816b29..a746753 100644 --- a/pyres/failure/mail.py +++ b/pyres/failure/mail.py @@ -7,35 +7,34 @@ class MailBackend(BaseBackend): """Extends ``BaseBackend`` to provide support for emailing failures. - Intended to be used with the MultipleBackend: + Intended to be used with the MultipleBackend:: - from pyres import failure + from pyres import failure - from pyres.failure.mail import MailBackend - from pyres.failure.multiple import MultipleBackend - from pyres.failure.redis import RedisBackend + from pyres.failure.mail import MailBackend + from pyres.failure.multiple import MultipleBackend + from pyres.failure.redis import RedisBackend - class EmailFailure(MailBackend): - subject = 'Pyres Failure on {queue}' - from_user = 'My Email User ' - recipients = ['Me '] + class EmailFailure(MailBackend): + subject = 'Pyres Failure on {queue}' + from_user = 'My Email User ' + recipients = ['Me '] - smtp_host = 'mail.mydomain.tld' - smtp_port = 25 - smtp_tls = True + smtp_host = 'mail.mydomain.tld' + smtp_port = 25 + smtp_tls = True - smtp_user = 'mailuser' - smtp_password = 'm41lp455w0rd' + smtp_user = 'mailuser' + smtp_password = 'm41lp455w0rd' - failure.backend = MultipleBackend - failure.backend.classes = [RedisBackend, EmailFailure] + failure.backend = MultipleBackend + failure.backend.classes = [RedisBackend, EmailFailure] - Additional notes: - - The following tokens are available in subject: queue, worker, exception + .. note:: The following tokens are available in subject: `queue`, `worker`, `exception` + + .. note:: Override :func:`create_message` to provide an alternate body. - - Override the create_message method to provide an alternate body. It - should return one of the message types from email.mime.* """ subject = 'Pyres Failure on {queue}' @@ -50,6 +49,10 @@ class EmailFailure(MailBackend): smtp_password = None def save(self, resq=None): + """Sends an email to the indicated recipients + + :param resq: Provided for compatibility. Not used. + """ if not self.recipients or not self.smtp_host or not self.from_user: return @@ -68,7 +71,12 @@ def format_subject(self): exception=self._exception) def create_message(self): - """Returns a message body to send in this email. Should be from email.mime.*""" + """Creates a message body for the email. + + :returns: A message body + :rtype: email.mime.* + """ + body = dedent("""\ Received exception {exception} on {queue} from worker {worker}: @@ -87,6 +95,11 @@ def create_message(self): return MIMEText(body) def send_message(self, message): + """Sends the message to the specified recipients + + :param message: The message to send. + :type message: email.mime.* + """ smtp = smtplib.SMTP(self.smtp_host, self.smtp_port) try: diff --git a/pyres/failure/multiple.py b/pyres/failure/multiple.py index 6362363..7f649d4 100644 --- a/pyres/failure/multiple.py +++ b/pyres/failure/multiple.py @@ -2,28 +2,34 @@ from pyres.failure.redis import RedisBackend class MultipleBackend(BaseBackend): - """Extends ``BaseBackend`` to provide support for delegating calls to multiple - backends. Queries are delegated to the first backend in the list. Defaults to - only the RedisBackend. + """Extends :class:`BaseBackend` to provide support for delegating calls + to multiple backends. - To use: + .. note:: Queries are delegated to the first backend in the list - from pyres import failure + .. note:: Defaults to only the RedisBackend - from pyres.failure.base import BaseBackend - from pyres.failure.multiple import MultipleBackend - from pyres.failure.redis import RedisBackend + To use:: - class CustomBackend(BaseBackend): - def save(self, resq): - print('Custom backend') + from pyres import failure - failure.backend = MultipleBackend - failure.backend.classes = [RedisBackend, CustomBackend] + from pyres.failure.base import BaseBackend + from pyres.failure.multiple import MultipleBackend + from pyres.failure.redis import RedisBackend + + class CustomBackend(BaseBackend): + def save(self, resq): + print('Custom backend') + + failure.backend = MultipleBackend + failure.backend.classes = [RedisBackend, CustomBackend] """ classes = [] def __init__(self, *args): + """Sets up the class to use a :class:`RedisBackend` by default + """ + if not self.classes: self.classes = [RedisBackend] @@ -32,18 +38,52 @@ def __init__(self, *args): @classmethod def count(cls, resq): + """ Returns the result of a `count` call to the first backend + + :param resq: The redis queue to count on + :type resq: :class:`ResQ` + + :returns: The number of items in the backend + :rtype: int + """ first = MultipleBackend.classes[0] return first.count(resq) @classmethod def all(cls, resq, start=0, count=1): + """ Returns the result of an `all` call to the first backend + + :param resq: The redis queue to count on + :type resq: :class:`ResQ` + :param start: The location to start fetching items + :type start: int + :param count: The number of items to fetch + :type count: + + :returns: A list of items from the backend + :rtype: `list` of `dict` + """ first = MultipleBackend.classes[0] return first.all(resq, start, count) @classmethod def clear(cls, resq): + """ Returns the result of a `clear` call to the first backend + + :param resq: The redis queue to clear on + :type resq: :class:`ResQ` + + :returns: The number of items cleared from the backend + :rtype: int + """ first = MultipleBackend.classes[0] return first.clear(resq) def save(self, resq=None): + """ Calls save on all of the backends + + :param resq: The redis queue to save to + :type resq: :class:`ResQ` + + """ map(lambda x: x.save(resq), self.backends) diff --git a/pyres/failure/redis.py b/pyres/failure/redis.py index 5fe71ee..380fa2a 100644 --- a/pyres/failure/redis.py +++ b/pyres/failure/redis.py @@ -5,10 +5,16 @@ from pyres import ResQ class RedisBackend(BaseBackend): - """Extends the ``BaseBackend`` to provide a Redis backend for failed jobs.""" + """Extends the :class:`BaseBackend` to provide a Redis backend for failed jobs.""" def save(self, resq=None): - """Saves the failed Job into a "failed" Redis queue preserving all its original enqueud info.""" + """Saves the failed :class:`Job` in to a "failed" Redis queue, + preserving all of its original enqueued information. + + :param resq: The redis queue instance to save to + :type resq: :class:`ResQ` + """ + if not resq: resq = ResQ() data = { @@ -26,10 +32,32 @@ def save(self, resq=None): @classmethod def count(cls, resq): + """Gets the number of failed items in the queue + + :param resq: The redis queue instance to check + :type resq: :class:`ResQ` + + :returns: The number of failed items in the queue + :rtype: int + """ return int(resq.redis.llen('resque:failed')) @classmethod def all(cls, resq, start=0, count=1): + """Get a list of the items in the failure queue. + + Redis' documentation: `LLEN `_ + + :param resq: The redis queue instance to check + :type resq: :class:`ResQ` + :param start: The location in the queue to start checking at. + :type start: int + :param count: The number of items to retrieve + :type count: int + + :returns: A list of items in the queue + :rtype: `list` of `dict` + """ items = resq.redis.lrange('resque:failed', start, count) or [] ret_list = [] @@ -41,5 +69,14 @@ def all(cls, resq, start=0, count=1): @classmethod def clear(cls, resq): - return resq.redis.delete('resque:failed') + """Clears the failure queue. + + Redis' documentation: `DEL `_ + :param resq: The redis queue instance to clear on + :type resq: :class:`ResQ` + + :returns: The number of items deleted + :rtype: int + """ + return resq.redis.delete('resque:failed') diff --git a/pyres/job.py b/pyres/job.py index 4f4b547..83d17b8 100644 --- a/pyres/job.py +++ b/pyres/job.py @@ -7,27 +7,25 @@ from pyres.compat import string_types class Job(object): - """Every job on the ResQ is an instance of the *Job* class. - - The ``__init__`` takes these keyword arguments: - - ``queue`` -- A string defining the queue to which this Job will be - added. - - ``payload`` -- A dictionary which contains the string name of a class - which extends this Job and a list of args which will be - passed to that class. - - ``resq`` -- An instance of the ResQ class. - - ``worker`` -- The name of a specific worker if you'd like this Job to be - done by that worker. Default is "None". - - """ + """Every job on the ResQ is an instance of the :class:`Job` class.""" safe_str_to_class = staticmethod(safe_str_to_class) def __init__(self, queue, payload, resq, worker=None): + """ + :param queue: A string defining the queue to which this `Job` will + be added + :type queue: str + :param payload: A dictionary containing the name of a class that + extends this `Job` and a list of args which will + be passed to it's `perform` method. + :type payload: dict + :param resq: the :class:`ResQ` that this will be run on + :type resq: :class:`ResQ` + :param worker: The name of a specific worker for this Job to be + run with. + :type worker: str + """ self._queue = queue self._payload = payload self.resq = resq @@ -100,8 +98,15 @@ def perform(self): def fail(self, exception): """This method provides a way to fail a job and will use whatever - failure backend you've provided. The default is the ``RedisBackend``. + failure backend you've provided. The default is the + :class:`RedisBackend`. + :param exception: The exception that caused the :class:`Job` to fail. + :type exception: Exception + + :returns: The failure backend instance + :rtype: An instance of a Failure Backend. + Defaults to :class:`RedisBackend` """ fail = failure.create(exception, self._queue, self._payload, self._worker) @@ -110,9 +115,17 @@ def fail(self, exception): def retry(self, payload_class, args): """This method provides a way to retry a job after a failure. - If the jobclass defined by the payload containes a ``retry_every`` attribute then pyres - will attempt to retry the job until successful or until timeout defined by ``retry_timeout`` on the payload class. + If the jobclass defined by the payload containes a ``retry_every`` + attribute then pyres will attempt to retry the job until successful + or until timeout defined by ``retry_timeout`` on the payload class. + + :param payload_class: the :class:`Job`-like class that needs + to be retried + :type payload_class: :class:`Job`-like + :param args: The args to be passed to the `payload_class.perform` + method when it is retried. + :type args: list """ retry_every = getattr(payload_class, 'retry_every', None) retry_timeout = getattr(payload_class, 'retry_timeout', 0) @@ -133,6 +146,14 @@ def reserve(cls, queues, res, worker=None, timeout=10): """Reserve a job on one of the queues. This marks this job so that other workers will not pick it up. + :param queues: The names of the queues to try and reserve from + :type queues: str + :param res: the redis instance to reserve from + :type res: :class:`ResQ` + :param worker: The name of worker to perform the job with + :type worker: str + :param timeout: How long to block while fetching a job before giving up + :type timeout: int """ if isinstance(queues, string_types): queues = [queues]