Source code for redis_tasks.conf

import importlib
import os
from types import SimpleNamespace

import redis

from redis_tasks import defaults

from .utils import LazyObject, import_attribute

ENVIRONMENT_VARIABLE = "RT_SETTINGS_MODULE"


[docs]class Settings: def __init__(self): self._initialized = False def _configure_from_env(self): settings_module = os.environ.get(ENVIRONMENT_VARIABLE) if not settings_module: raise Exception( f"redis_tasks settings are not configured. " "You must either define the environment variable " f"{ENVIRONMENT_VARIABLE} or call settings.configure() before " "accessing redis_tasks.") mod = importlib.import_module(settings_module) self._setup(mod) def _setup(self, settings_module): for setting in dir(defaults): if setting.isupper(): setattr(self, setting, getattr(defaults, setting)) for setting in dir(settings_module): if setting.isupper(): setattr(self, setting, getattr(settings_module, setting)) self._initialized = True def __getattr__(self, name): if not self._initialized: self._configure_from_env() return self.__dict__[name]
[docs] def configure(self, settings): if self._initialized: raise RuntimeError('Settings already configured.') self._setup(settings)
[docs] def configure_from_dict(self, dct): self.configure(SimpleNamespace(**dct))
#: default :py:class:`redis_tasks.conf.Settings` instance settings = Settings()
[docs]class RTRedis(redis.StrictRedis): RESPONSE_CALLBACKS = redis.StrictRedis.RESPONSE_CALLBACKS def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.set_response_callback('EXISTS', int)
[docs] def pipeline(self, transaction=True, shard_hint=None): return RTPipeline(self.connection_pool, self.response_callbacks, transaction, shard_hint)
[docs] def exists(self, *keys): return self.execute_command('EXISTS', *keys)
[docs] def ftime(self): seconds, microseconds = self.time() return seconds + microseconds * 10**-6
[docs] def zadd(self, name, items, nx=False, xx=False, ch=False, incr=False): if nx and xx: raise redis.RedisError("ZADD can't use both NX and XX modes") pieces = [] if nx: pieces.append('NX') if xx: pieces.append('XX') if ch: pieces.append('CH') if incr: pieces.append('INCR') for k, v in items.items(): pieces.extend([v, k]) return self.execute_command('ZADD', name, *pieces)
[docs]class RTPipeline(redis.client.Pipeline, RTRedis): pass
@LazyObject def connection(): return RTRedis.from_url(settings.REDIS_URL) @LazyObject def task_middleware(): # TODO: test def middleware_constructor(class_path): return import_attribute(class_path) return [middleware_constructor(x) for x in settings.MIDDLEWARE]
[docs]def construct_redis_key(name): return settings.REDIS_PREFIX + ':' + name