Source code for redis_tasks.conf
import importlib
import os
from types import SimpleNamespace
import redis
from redis_tasks import defaults
from .utils import LazyObject, import_attribute
ENVIRONMENT_VARIABLE = "RT_SETTINGS_MODULE"
[docs]class Settings:
def __init__(self):
self._initialized = False
def _configure_from_env(self):
settings_module = os.environ.get(ENVIRONMENT_VARIABLE)
if not settings_module:
raise Exception(
f"redis_tasks settings are not configured. "
"You must either define the environment variable "
f"{ENVIRONMENT_VARIABLE} or call settings.configure() before "
"accessing redis_tasks.")
mod = importlib.import_module(settings_module)
self._setup(mod)
def _setup(self, settings_module):
for setting in dir(defaults):
if setting.isupper():
setattr(self, setting, getattr(defaults, setting))
for setting in dir(settings_module):
if setting.isupper():
setattr(self, setting, getattr(settings_module, setting))
self._initialized = True
def __getattr__(self, name):
if not self._initialized:
self._configure_from_env()
return self.__dict__[name]
#: default :py:class:`redis_tasks.conf.Settings` instance
settings = Settings()
[docs]class RTRedis(redis.StrictRedis):
RESPONSE_CALLBACKS = redis.StrictRedis.RESPONSE_CALLBACKS
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.set_response_callback('EXISTS', int)
[docs] def pipeline(self, transaction=True, shard_hint=None):
return RTPipeline(self.connection_pool, self.response_callbacks, transaction, shard_hint)
[docs] def exists(self, *keys):
return self.execute_command('EXISTS', *keys)
[docs] def ftime(self):
seconds, microseconds = self.time()
return seconds + microseconds * 10**-6
[docs] def zadd(self, name, items, nx=False, xx=False, ch=False, incr=False):
if nx and xx:
raise redis.RedisError("ZADD can't use both NX and XX modes")
pieces = []
if nx:
pieces.append('NX')
if xx:
pieces.append('XX')
if ch:
pieces.append('CH')
if incr:
pieces.append('INCR')
for k, v in items.items():
pieces.extend([v, k])
return self.execute_command('ZADD', name, *pieces)
[docs]class RTPipeline(redis.client.Pipeline, RTRedis):
pass
@LazyObject
def connection():
return RTRedis.from_url(settings.REDIS_URL)
@LazyObject
def task_middleware(): # TODO: test
def middleware_constructor(class_path):
return import_attribute(class_path)
return [middleware_constructor(x) for x in settings.MIDDLEWARE]
[docs]def construct_redis_key(name):
return settings.REDIS_PREFIX + ':' + name