redis_tasks package

Submodules

redis_tasks.cli module

redis_tasks.cli.configure_logging(verbose, quiet, **options)[source]
redis_tasks.cli.print_separator()[source]
redis_tasks.cli.show_queues(queues)[source]
redis_tasks.cli.show_workers(queues, by_queue)[source]

redis_tasks.conf module

class redis_tasks.conf.RTPipeline(connection_pool, response_callbacks, transaction, shard_hint)[source]

Bases: redis.client.Pipeline, redis_tasks.conf.RTRedis

class redis_tasks.conf.RTRedis(*args, **kwargs)[source]

Bases: redis.client.Redis

RESPONSE_CALLBACKS = {'ACL CAT': <function Redis.<lambda>>, 'ACL DELUSER': <class 'int'>, 'ACL GENPASS': <function str_if_bytes>, 'ACL GETUSER': <function parse_acl_getuser>, 'ACL HELP': <function Redis.<lambda>>, 'ACL LIST': <function Redis.<lambda>>, 'ACL LOAD': <function bool_ok>, 'ACL LOG': <function parse_acl_log>, 'ACL SAVE': <function bool_ok>, 'ACL SETUSER': <function bool_ok>, 'ACL USERS': <function Redis.<lambda>>, 'ACL WHOAMI': <function str_if_bytes>, 'AUTH': <class 'bool'>, 'BGREWRITEAOF': <function Redis.<lambda>>, 'BGSAVE': <function Redis.<lambda>>, 'BITCOUNT': <class 'int'>, 'BITPOS': <class 'int'>, 'BLPOP': <function Redis.<lambda>>, 'BRPOP': <function Redis.<lambda>>, 'BZPOPMAX': <function Redis.<lambda>>, 'BZPOPMIN': <function Redis.<lambda>>, 'CLIENT GETNAME': <function str_if_bytes>, 'CLIENT GETREDIR': <class 'int'>, 'CLIENT ID': <class 'int'>, 'CLIENT INFO': <function parse_client_info>, 'CLIENT KILL': <function parse_client_kill>, 'CLIENT LIST': <function parse_client_list>, 'CLIENT PAUSE': <function bool_ok>, 'CLIENT SETNAME': <function bool_ok>, 'CLIENT TRACKINGINFO': <function Redis.<lambda>>, 'CLIENT UNBLOCK': <function Redis.<lambda>>, 'CLUSTER ADDSLOTS': <function bool_ok>, 'CLUSTER COUNT-FAILURE-REPORTS': <function Redis.<lambda>>, 'CLUSTER COUNTKEYSINSLOT': <function Redis.<lambda>>, 'CLUSTER DELSLOTS': <function bool_ok>, 'CLUSTER FAILOVER': <function bool_ok>, 'CLUSTER FORGET': <function bool_ok>, 'CLUSTER INFO': <function parse_cluster_info>, 'CLUSTER KEYSLOT': <function Redis.<lambda>>, 'CLUSTER MEET': <function bool_ok>, 'CLUSTER NODES': <function parse_cluster_nodes>, 'CLUSTER REPLICAS': <function parse_cluster_nodes>, 'CLUSTER REPLICATE': <function bool_ok>, 'CLUSTER RESET': <function bool_ok>, 'CLUSTER SAVECONFIG': <function bool_ok>, 'CLUSTER SET-CONFIG-EPOCH': <function bool_ok>, 'CLUSTER SETSLOT': <function bool_ok>, 'CLUSTER SLAVES': <function parse_cluster_nodes>, 'COMMAND': <function parse_command>, 'COMMAND COUNT': <class 'int'>, 'COMMAND GETKEYS': <function Redis.<lambda>>, 'CONFIG GET': <function parse_config_get>, 'CONFIG RESETSTAT': <function bool_ok>, 'CONFIG SET': <function bool_ok>, 'COPY': <class 'bool'>, 'DEBUG OBJECT': <function parse_debug_object>, 'DECRBY': <class 'int'>, 'DEL': <class 'int'>, 'EXISTS': <class 'int'>, 'EXPIRE': <class 'bool'>, 'EXPIREAT': <class 'bool'>, 'FLUSHALL': <function bool_ok>, 'FLUSHDB': <function bool_ok>, 'GEOADD': <class 'int'>, 'GEODIST': <function float_or_none>, 'GEOHASH': <function Redis.<lambda>>, 'GEOPOS': <function Redis.<lambda>>, 'GEORADIUS': <function parse_geosearch_generic>, 'GEORADIUSBYMEMBER': <function parse_geosearch_generic>, 'GEOSEARCH': <function parse_geosearch_generic>, 'GETBIT': <class 'int'>, 'HDEL': <class 'int'>, 'HEXISTS': <class 'bool'>, 'HGETALL': <function Redis.<lambda>>, 'HINCRBYFLOAT': <class 'float'>, 'HLEN': <class 'int'>, 'HMSET': <class 'bool'>, 'HSCAN': <function parse_hscan>, 'HSTRLEN': <class 'int'>, 'INCRBY': <class 'int'>, 'INCRBYFLOAT': <class 'float'>, 'INFO': <function parse_info>, 'LASTSAVE': <function timestamp_to_datetime>, 'LINSERT': <class 'int'>, 'LLEN': <class 'int'>, 'LPUSH': <function Redis.<lambda>>, 'LPUSHX': <class 'int'>, 'LSET': <function bool_ok>, 'LTRIM': <function bool_ok>, 'MEMORY PURGE': <function bool_ok>, 'MEMORY STATS': <function parse_memory_stats>, 'MEMORY USAGE': <function int_or_none>, 'MODULE LIST': <function Redis.<lambda>>, 'MODULE LOAD': <function parse_module_result>, 'MODULE UNLOAD': <function parse_module_result>, 'MOVE': <class 'bool'>, 'MSET': <function bool_ok>, 'MSETNX': <class 'bool'>, 'OBJECT': <function parse_object>, 'PERSIST': <class 'bool'>, 'PEXPIRE': <class 'bool'>, 'PEXPIREAT': <class 'bool'>, 'PFADD': <class 'int'>, 'PFCOUNT': <class 'int'>, 'PFMERGE': <function bool_ok>, 'PING': <function Redis.<lambda>>, 'PSETEX': <class 'bool'>, 'PUBSUB NUMSUB': <function parse_pubsub_numsub>, 'QUIT': <function bool_ok>, 'RANDOMKEY': <function Redis.<lambda>>, 'READONLY': <function bool_ok>, 'READWRITE': <function bool_ok>, 'RENAME': <function bool_ok>, 'RENAMENX': <class 'bool'>, 'RESET': <function str_if_bytes>, 'RPUSH': <function Redis.<lambda>>, 'RPUSHX': <class 'int'>, 'SADD': <class 'int'>, 'SAVE': <function bool_ok>, 'SCAN': <function parse_scan>, 'SCARD': <class 'int'>, 'SCRIPT EXISTS': <function Redis.<lambda>>, 'SCRIPT FLUSH': <function bool_ok>, 'SCRIPT KILL': <function bool_ok>, 'SCRIPT LOAD': <function str_if_bytes>, 'SDIFF': <function Redis.<lambda>>, 'SDIFFSTORE': <class 'int'>, 'SELECT': <function bool_ok>, 'SENTINEL CKQUORUM': <function bool_ok>, 'SENTINEL FAILOVER': <function bool_ok>, 'SENTINEL FLUSHCONFIG': <function bool_ok>, 'SENTINEL GET-MASTER-ADDR-BY-NAME': <function parse_sentinel_get_master>, 'SENTINEL MASTER': <function parse_sentinel_master>, 'SENTINEL MASTERS': <function parse_sentinel_masters>, 'SENTINEL MONITOR': <function bool_ok>, 'SENTINEL REMOVE': <function bool_ok>, 'SENTINEL RESET': <function bool_ok>, 'SENTINEL SENTINELS': <function parse_sentinel_slaves_and_sentinels>, 'SENTINEL SET': <function bool_ok>, 'SENTINEL SLAVES': <function parse_sentinel_slaves_and_sentinels>, 'SET': <function parse_set_result>, 'SETBIT': <class 'int'>, 'SETEX': <class 'bool'>, 'SETNX': <class 'bool'>, 'SETRANGE': <class 'int'>, 'SHUTDOWN': <function bool_ok>, 'SINTER': <function Redis.<lambda>>, 'SINTERSTORE': <class 'int'>, 'SISMEMBER': <class 'bool'>, 'SLAVEOF': <function bool_ok>, 'SLOWLOG GET': <function parse_slowlog_get>, 'SLOWLOG LEN': <class 'int'>, 'SLOWLOG RESET': <function bool_ok>, 'SMEMBERS': <function Redis.<lambda>>, 'SMOVE': <class 'bool'>, 'SORT': <function sort_return_tuples>, 'SREM': <class 'int'>, 'SSCAN': <function parse_scan>, 'STRALGO': <function parse_stralgo>, 'STRLEN': <class 'int'>, 'SUNION': <function Redis.<lambda>>, 'SUNIONSTORE': <class 'int'>, 'SWAPDB': <function bool_ok>, 'TIME': <function Redis.<lambda>>, 'UNLINK': <class 'int'>, 'UNWATCH': <function bool_ok>, 'WATCH': <function bool_ok>, 'XACK': <class 'int'>, 'XAUTOCLAIM': <function parse_xautoclaim>, 'XCLAIM': <function parse_xclaim>, 'XDEL': <class 'int'>, 'XGROUP CREATE': <function bool_ok>, 'XGROUP DELCONSUMER': <class 'int'>, 'XGROUP DESTROY': <class 'bool'>, 'XGROUP SETID': <function bool_ok>, 'XINFO CONSUMERS': <function parse_list_of_dicts>, 'XINFO GROUPS': <function parse_list_of_dicts>, 'XINFO STREAM': <function parse_xinfo_stream>, 'XLEN': <class 'int'>, 'XPENDING': <function parse_xpending>, 'XRANGE': <function parse_stream_list>, 'XREAD': <function parse_xread>, 'XREADGROUP': <function parse_xread>, 'XREVRANGE': <function parse_stream_list>, 'XTRIM': <class 'int'>, 'ZADD': <function parse_zadd>, 'ZCARD': <class 'int'>, 'ZDIFF': <function zset_score_pairs>, 'ZINCRBY': <function float_or_none>, 'ZINTER': <function zset_score_pairs>, 'ZLEXCOUNT': <class 'int'>, 'ZMSCORE': <function parse_zmscore>, 'ZPOPMAX': <function zset_score_pairs>, 'ZPOPMIN': <function zset_score_pairs>, 'ZRANGE': <function zset_score_pairs>, 'ZRANGEBYSCORE': <function zset_score_pairs>, 'ZRANK': <function int_or_none>, 'ZREM': <class 'int'>, 'ZREMRANGEBYLEX': <class 'int'>, 'ZREMRANGEBYRANK': <class 'int'>, 'ZREMRANGEBYSCORE': <class 'int'>, 'ZREVRANGE': <function zset_score_pairs>, 'ZREVRANGEBYSCORE': <function zset_score_pairs>, 'ZREVRANK': <function int_or_none>, 'ZSCAN': <function parse_zscan>, 'ZSCORE': <function float_or_none>, 'ZUNION': <function zset_score_pairs>}
exists(*keys)[source]

Returns the number of names that exist

For more information check https://redis.io/commands/exists

ftime()[source]
pipeline(transaction=True, shard_hint=None)[source]

Return a new pipeline object that can queue multiple commands for later execution. transaction indicates whether all commands should be executed atomically. Apart from making a group of operations atomic, pipelines are useful for reducing the back-and-forth overhead between the client and server.

zadd(name, items, nx=False, xx=False, ch=False, incr=False)[source]

Set any number of element-name, score pairs to the key name. Pairs are specified as a dict of element-names keys to score values.

nx forces ZADD to only create new elements and not to update scores for elements that already exist.

xx forces ZADD to only update scores of elements that already exist. New elements will not be added.

ch modifies the return value to be the numbers of elements changed. Changed elements include new elements that were added and elements whose scores changed.

incr modifies ZADD to behave like ZINCRBY. In this mode only a single element/score pair can be specified and the score is the amount the existing score will be incremented by. When using this mode the return value of ZADD will be the new score of the element.

LT Only update existing elements if the new score is less than the current score. This flag doesn’t prevent adding new elements.

GT Only update existing elements if the new score is greater than the current score. This flag doesn’t prevent adding new elements.

The return value of ZADD varies based on the mode specified. With no options, ZADD returns the number of new elements added to the sorted set.

NX, LT, and GT are mutually exclusive options.

See: https://redis.io/commands/ZADD

class redis_tasks.conf.Settings[source]

Bases: object

configure(settings)[source]
configure_from_dict(dct)[source]
redis_tasks.conf.construct_redis_key(name)[source]
redis_tasks.conf.settings = <redis_tasks.conf.Settings object>

default redis_tasks.conf.Settings instance

redis_tasks.defaults module

redis_tasks.exceptions module

exception redis_tasks.exceptions.DeserializationError(message, raw_data)[source]

Bases: Exception

exception redis_tasks.exceptions.InvalidOperation[source]

Bases: Exception

exception redis_tasks.exceptions.TaskAborted(message)[source]

Bases: Exception

exception redis_tasks.exceptions.TaskDoesNotExist[source]

Bases: Exception

exception redis_tasks.exceptions.WorkerDoesNotExist[source]

Bases: Exception

exception redis_tasks.exceptions.WorkerShutdown[source]

Bases: BaseException

redis_tasks.queue module

class redis_tasks.queue.Queue(name='default')[source]

Bases: object

classmethod all()[source]

Returns an iterable of all Queues.

classmethod await_multi(queues, timeout)[source]

Blocks until one of the passed queues contains a tasks.

Return the queue that contained a task or None if timeout was reached.

count()[source]
delete()[source]
dequeue(worker)[source]

Dequeue a task and set it as the current task for worker

empty()[source]

Removes all messages on the queue.

enqueue_call(*args, pipeline, **kwargs)[source]

Creates a task to represent the delayed function call and enqueues it.

get_task_ids(offset=0, length=- 1)[source]
get_tasks(offset=0, length=- 1)[source]
push(task, *, pipeline, at_front=False)[source]

Pushes a task on the queue

at_front inserts the task at the front instead of the back of the queue

remove_and_delete(task)[source]

redis_tasks.registries module

class redis_tasks.registries.ExpiringRegistry(name)[source]

Bases: object

add(task, *, pipeline)[source]
count()[source]
empty()[source]
expire()[source]

Remove expired tasks from registry.

get_task_ids(offset=0, length=- 1)[source]
get_tasks(offset=0, length=- 1)[source]
class redis_tasks.registries.QueueRegistry[source]

Bases: object

add(queue, *, pipeline)[source]
get_names()[source]
remove(queue, *, pipeline)[source]
class redis_tasks.registries.WorkerRegistry[source]

Bases: object

add(worker, *, pipeline)[source]
get_dead_ids()[source]
get_running_tasks()[source]

Returns a worker_id -> task_id dict

get_worker_ids()[source]
handle_died_workers()[source]
heartbeat(worker)[source]
remove(worker, *, pipeline)[source]
redis_tasks.registries.registry_maintenance()[source]

redis_tasks.scheduler module

class redis_tasks.scheduler.CrontabSchedule(crontab)[source]

Bases: object

get_next(after)[source]
class redis_tasks.scheduler.Mutex(*, timeout)[source]

Bases: object

acquire(wait=None)[source]
expire_script = None
extend()[source]
class redis_tasks.scheduler.PeriodicSchedule(*, hours=0, minutes=0, seconds=0, start_at=None)[source]

Bases: object

get_next(after)[source]
class redis_tasks.scheduler.Scheduler[source]

Bases: object

run()[source]
setup_signal_handler()[source]
class redis_tasks.scheduler.SchedulerEntry(id, config)[source]

Bases: object

enqueue(*, pipeline)[source]
is_enqueued()[source]
process(now, *, pipeline)[source]
save(*, pipeline)[source]
redis_tasks.scheduler.crontab

alias of redis_tasks.scheduler.CrontabSchedule

redis_tasks.scheduler.once_per_day(time_str)[source]
redis_tasks.scheduler.run_every

alias of redis_tasks.scheduler.PeriodicSchedule

redis_tasks.scheduler.scheduler_main()[source]

redis_tasks.smear_dst module

class redis_tasks.smear_dst.DstSmearingTz(name)[source]

Bases: object

from_utc(utc)[source]
to_utc(dt)[source]
class redis_tasks.smear_dst.Transition(start, end, utc_start, utc_end, old_utcoffset, new_utcoffset)

Bases: tuple

property end

Alias for field number 1

property new_utcoffset

Alias for field number 5

property old_utcoffset

Alias for field number 4

property start

Alias for field number 0

property utc_end

Alias for field number 3

property utc_start

Alias for field number 2

redis_tasks.task module

class redis_tasks.task.Task(func=None, args=None, kwargs=None, *, fetch_id=None, fetch_data=None)[source]

Bases: object

cancel()[source]
classmethod delete_many(task_ids, *, pipeline)[source]
enqueue(queue, *, pipeline)[source]
execute(*, shutdown_cm=<contextlib.ExitStack object>)[source]

Run the task using middleware.

The shutdown_cm parameter is a context manager that will wrap the part of the execution in which WorkerShutdown is allowed to be raised.

Returns a TaskOutcome.

classmethod fetch(id)[source]
classmethod fetch_many(task_ids)[source]
get_abort_outcome(message, *, may_requeue=True)[source]
handle_outcome(outcome, *, pipeline)[source]
handle_worker_death(*, pipeline)[source]
property is_reentrant
property key
classmethod key_for(task_id)[source]
property queue
refresh(data=None)[source]
requeue(*, pipeline)[source]
save_meta(*, pipeline=None)[source]
set_failed(error_message, *, pipeline)[source]
set_finished(*, pipeline)[source]
set_running(worker, *, pipeline)[source]
property timeout
class redis_tasks.task.TaskOutcome(outcome, message=None)[source]

Bases: object

class redis_tasks.task.TaskProperties(reentrant=False, timeout=None)[source]

Bases: object

class redis_tasks.task.TaskStack[source]

Bases: object

peek()[source]
pop()[source]
push(task)[source]
redis_tasks.task.get_current_task()[source]
redis_tasks.task.redis_task(*args, **kwargs)[source]

redis_tasks.utils module

class redis_tasks.utils.LazyObject(func)[source]

Bases: object

redis_tasks.utils.atomic_pipeline(f)[source]
redis_tasks.utils.decode_dict(dct)[source]
redis_tasks.utils.decode_list(lst)[source]
redis_tasks.utils.deserialize(bytes_obj)[source]
redis_tasks.utils.enum(name, *sequential, **named)[source]
redis_tasks.utils.generate_callstring(func_name, args, kwargs)[source]
redis_tasks.utils.import_attribute(name)[source]
redis_tasks.utils.is_serializable(obj)[source]
redis_tasks.utils.new_method_proxy(func)[source]
redis_tasks.utils.one(iterable)[source]
redis_tasks.utils.serialize(obj)[source]
redis_tasks.utils.utcformat(dt)[source]
redis_tasks.utils.utcnow()[source]
redis_tasks.utils.utcparse(string)[source]

redis_tasks.worker module

class redis_tasks.worker.Worker(id=None, *, description=None, queues=None, fetch_id=None)[source]

Bases: object

classmethod all()[source]
died(*, pipeline)[source]
end_task(task, outcome, *, pipeline)[source]
classmethod fetch(id)[source]
fetch_current_task()[source]

Returns the currently executing task.

heartbeat()[source]

Send a heartbeat.

Raises WorkerDoesNotExist if the registry considers this worker as dead

refresh()[source]
shutdown(*, pipeline)[source]
start_task(task, *, pipeline)[source]
startup(*, pipeline)[source]

redis_tasks.worker_process module

class redis_tasks.worker_process.Maintenance[source]

Bases: object

run()[source]
run_if_neccessary()[source]
class redis_tasks.worker_process.PostponeShutdown[source]

Bases: object

activate()[source]
static assert_main_thread()[source]
deactivate()[source]
classmethod trigger_shutdown()[source]
exception redis_tasks.worker_process.ShutdownRequested[source]

Bases: BaseException

class redis_tasks.worker_process.TWorker(queues=['default'])[source]

Bases: object

run(raise_on_failure=True)[source]
class redis_tasks.worker_process.WorkHorse(task, worker_connection)[source]

Bases: multiprocessing.context.Process

ignore_shutdown_signal()[source]
request_stop(signum, frame)[source]
run()[source]

Method to be run in sub-process; can be overridden in sub-class

send_signal(sig)[source]
setup_signal_handler()[source]
class redis_tasks.worker_process.WorkerProcess(queues, *, description=None)[source]

Bases: object

execute_task(task)[source]
handle_stop_signal(signum, frame)[source]
install_signal_handlers()[source]
interruptible()[source]
maybe_shutdown()[source]
process_task(task)[source]
queue_iter(burst)[source]
run(burst=False)[source]

Starts the work loop.

Returns the number of tasks that were processed in burst mode

redis_tasks.worker_process.generate_worker_description()[source]
redis_tasks.worker_process.worker_main(queue_names=['default'], *, burst=False, description=None)[source]

Module contents