import redis_tasks
from .conf import connection, construct_redis_key, settings
from .exceptions import WorkerDoesNotExist
from .utils import LazyObject, atomic_pipeline, decode_list
[docs]class ExpiringRegistry:
def __init__(self, name):
self.key = construct_redis_key(name + '_tasks')
[docs] @atomic_pipeline
def add(self, task, *, pipeline):
timestamp = connection.ftime()
pipeline.zadd(self.key, {task.id: timestamp})
[docs] def get_task_ids(self, offset=0, length=-1):
end = offset + length if length >= 0 else length
return decode_list(connection.zrange(self.key, offset, end))
[docs] def get_tasks(self, offset=0, length=-1):
return redis_tasks.task.Task.fetch_many(
self.get_task_ids(offset, length))
[docs] def empty(self): # TODO: test
def transaction(pipeline):
task_ids = decode_list(pipeline.zrange(self.key, 0, -1))
pipeline.multi()
pipeline.delete(self.key)
redis_tasks.task.Task.delete_many(task_ids, pipeline=pipeline)
connection.transaction(transaction, self.key)
[docs] def count(self):
return connection.zcard(self.key)
[docs] def expire(self):
"""Remove expired tasks from registry."""
timestamp = connection.ftime()
cutoff_time = timestamp - settings.EXPIRING_REGISTRIES_TTL
def transaction(pipeline):
expired_task_ids = decode_list(pipeline.zrangebyscore(
self.key, 0, cutoff_time))
if expired_task_ids:
pipeline.multi()
pipeline.zremrangebyscore(self.key, 0, cutoff_time)
redis_tasks.task.Task.delete_many(expired_task_ids, pipeline=pipeline)
connection.transaction(transaction, self.key)
finished_task_registry = LazyObject(lambda: ExpiringRegistry('finished'))
failed_task_registry = LazyObject(lambda: ExpiringRegistry('failed'))
[docs]def registry_maintenance():
finished_task_registry.expire()
failed_task_registry.expire()
worker_registry.handle_died_workers()
[docs]class WorkerRegistry:
def __init__(self):
self.key = construct_redis_key('workers')
[docs] @atomic_pipeline
def add(self, worker, *, pipeline):
timestamp = connection.ftime()
pipeline.zadd(self.key, {worker.id: timestamp})
[docs] def heartbeat(self, worker):
timestamp = connection.ftime()
updated = connection.zadd(self.key, {worker.id: timestamp}, xx=True, ch=True)
if not updated:
raise WorkerDoesNotExist()
[docs] @atomic_pipeline
def remove(self, worker, *, pipeline):
pipeline.zrem(self.key, worker.id)
[docs] def get_worker_ids(self):
return decode_list(connection.zrangebyscore(
self.key, '-inf', '+inf'))
[docs] def get_running_tasks(self):
"""Returns a worker_id -> task_id dict"""
task_key_prefix = construct_redis_key('worker_task:')
lua = connection.register_script("""
local workers_key, task_key_prefix = unpack(KEYS)
local worker_ids = redis.call("ZRANGE", workers_key, 0, -1)
local out = {}
for _, worker_id in ipairs(worker_ids) do
local task_key = task_key_prefix .. worker_id
local task_id = redis.call("LINDEX", task_key, 0)
if task_id ~= false then
table.insert(out, worker_id)
table.insert(out, task_id)
end
end
return out
""")
it = iter(decode_list(lua(keys=[self.key, task_key_prefix])))
return dict(zip(it, it))
[docs] def handle_died_workers(self): # TODO: Test
from redis_tasks.worker import Worker
died_worker_ids = self.get_dead_ids()
for worker_id in died_worker_ids:
worker = Worker.fetch(worker_id)
worker.died()
[docs] def get_dead_ids(self):
oldest_valid = connection.ftime() - settings.WORKER_HEARTBEAT_TIMEOUT
return decode_list(connection.zrangebyscore(
self.key, '-inf', oldest_valid))
worker_registry = LazyObject(WorkerRegistry)
[docs]class QueueRegistry:
def __init__(self):
self.key = construct_redis_key('queues')
[docs] def get_names(self):
return decode_list(connection.smembers(self.key))
[docs] @atomic_pipeline
def add(self, queue, *, pipeline):
pipeline.sadd(self.key, queue.name)
[docs] @atomic_pipeline
def remove(self, queue, *, pipeline):
pipeline.srem(self.key, queue.name)
queue_registry = LazyObject(QueueRegistry)