Source code for redis_tasks.queue

from .conf import connection, construct_redis_key
from .exceptions import TaskDoesNotExist
from .registries import queue_registry
from .task import Task
from .utils import atomic_pipeline, decode_list


[docs]class Queue(object): def __init__(self, name='default'): self.name = name self.key = construct_redis_key('queue:' + name) # We use a separate key for the workers to wait on, as we need to do a # multi-key blocking rpop on it, and redis does not have a variant of # that operation that is not at risk of losing tasks. self.unblock_key = construct_redis_key('unblock_queue:' + name)
[docs] @classmethod def all(cls): """Returns an iterable of all Queues.""" return [cls(name) for name in sorted(queue_registry.get_names())]
[docs] def count(self): return connection.llen(self.key)
def _empty_transaction(self, pipeline): task_ids = decode_list(pipeline.lrange(self.key, 0, -1)) pipeline.multi() Task.delete_many(task_ids, pipeline=pipeline) pipeline.delete(self.key) pipeline.delete(self.unblock_key)
[docs] def empty(self): """Removes all messages on the queue.""" connection.transaction(self._empty_transaction, self.key)
[docs] def delete(self): def transaction(pipeline): self._empty_transaction(pipeline) queue_registry.remove(self, pipeline=pipeline) connection.transaction(transaction, self.key)
[docs] def get_task_ids(self, offset=0, length=-1): end = -1 - offset start = -(length + 1) if length < 0 else end - length return [task_id.decode() for task_id in reversed(connection.lrange(self.key, start, end))]
[docs] def get_tasks(self, offset=0, length=-1): return Task.fetch_many(self.get_task_ids(offset, length))
[docs] @atomic_pipeline def enqueue_call(self, *args, pipeline, **kwargs): """Creates a task to represent the delayed function call and enqueues it.""" task = Task(*args, **kwargs) task.enqueue(self, pipeline=pipeline) return task
[docs] @atomic_pipeline def push(self, task, *, pipeline, at_front=False): """Pushes a task on the queue `at_front` inserts the task at the front instead of the back of the queue""" queue_registry.add(self, pipeline=pipeline) if at_front: pipeline.rpush(self.key, task.id) else: pipeline.lpush(self.key, task.id) pipeline.lpush(self.unblock_key, task.id)
[docs] def remove_and_delete(self, task): def transaction(pipeline): task_ids = decode_list(pipeline.lrange(self.key, 0, -1)) if task.id not in task_ids: raise TaskDoesNotExist() pipeline.multi() pipeline.lrem(self.key, 0, task.id) Task.delete_many([task.id], pipeline=pipeline) connection.transaction(transaction, self.key)
[docs] def dequeue(self, worker): """Dequeue a task and set it as the current task for `worker`""" # Use lua script to atomically clear unblock_key if queue is empty lua = connection.register_script(""" local queue, unblocker, worker_task_list = unpack(KEYS) local result = redis.call("RPOPLPUSH", queue, worker_task_list) if result == false then redis.call("DEL", unblocker) end return result """) result = lua(keys=[self.key, self.unblock_key, worker.task_key]) if result is None: return None else: task_id = result.decode() worker.current_task_id = task_id return Task.fetch(task_id)
[docs] @classmethod def await_multi(cls, queues, timeout): """Blocks until one of the passed queues contains a tasks. Return the queue that contained a task or None if `timeout` was reached.""" queue_map = {q.unblock_key: q for q in queues} result = connection.brpop(queue_map.keys(), timeout) if result is None: return None return queue_map[result[0].decode()]
def __eq__(self, other): if type(other) != type(self): return NotImplemented else: return self.name == other.name def __hash__(self): return hash((self.__class__.__name__, self.name)) def __repr__(self): return '{0}({1!r})'.format(self.__class__.__name__, self.name) def __str__(self): return '<{0} {1}>'.format(self.__class__.__name__, self.name)