Source code for redis_tasks.utils
import datetime
import importlib
import operator
import pickle
from functools import wraps
from .exceptions import DeserializationError
[docs]def import_attribute(name):
module_name, attribute = name.rsplit('.', 1)
module = importlib.import_module(module_name)
return getattr(module, attribute)
[docs]def generate_callstring(func_name, args, kwargs): # TODO: test
arg_list = [repr(arg) for arg in args]
arg_list += [f'{k}={v!r}' for k, v in kwargs.items()]
args = ', '.join(arg_list)
return f'{func_name}({args})'
[docs]def utcnow():
return datetime.datetime.now(datetime.timezone.utc)
[docs]def utcparse(string):
parsed = datetime.datetime.strptime(string, '%Y-%m-%dT%H:%M:%SZ')
return parsed.replace(tzinfo=datetime.timezone.utc)
[docs]def enum(name, *sequential, **named):
values = dict(zip(sequential, range(len(sequential))), **named)
return type(str(name), (), values)
[docs]def one(iterable):
it = iter(iterable)
try:
value = next(it)
except StopIteration:
raise ValueError('too few items in iterable (expected 1)')
try:
next(it)
except StopIteration:
return value
else:
raise ValueError('too many items in iterable (expected 1)')
[docs]def decode_list(lst):
return [x.decode() for x in lst]
[docs]def decode_dict(dct):
return {k.decode(): v.decode() for k, v in dct.items()}
[docs]def is_serializable(obj):
if obj is None:
return True
if isinstance(obj, (int, str, float, bool, datetime.date, datetime.datetime)):
return True
if isinstance(obj, (tuple, list, set)):
return all(is_serializable(x) for x in obj)
if isinstance(obj, dict):
return all(is_serializable(k) and is_serializable(v) for k, v in obj.items())
return False
[docs]def serialize(obj):
if not is_serializable(obj):
raise ValueError("Passed object contains non-serializable values")
return pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
[docs]def deserialize(bytes_obj):
try:
obj = pickle.loads(bytes_obj)
except Exception as e:
raise DeserializationError('Could not unpickle', bytes_obj) from e
return obj
[docs]def atomic_pipeline(f):
from .conf import connection
@wraps(f)
def pipeline_wrapper(*args, pipeline=None, **kwargs):
if pipeline is None:
with connection.pipeline() as pipe:
ret = f(*args, pipeline=pipe, **kwargs)
pipe.execute()
return ret
else:
return f(*args, pipeline=pipeline, **kwargs)
return pipeline_wrapper
empty = object()
[docs]def new_method_proxy(func):
def inner(self, *args, **kwargs):
__tracebackhide__ = True
if self._wrapped is empty:
self._setup()
return func(self._wrapped, *args, **kwargs)
return inner
[docs]class LazyObject:
def __init__(self, func):
self.__dict__['_wrapped'] = empty
self.__dict__['_setupfunc'] = func
def _setup(self):
self.__dict__['_wrapped'] = self._setupfunc()
del self.__dict__['_setupfunc']
__getattr__ = new_method_proxy(getattr)
__setattr__ = new_method_proxy(setattr)
__delattr__ = new_method_proxy(delattr)
__bytes__ = new_method_proxy(bytes)
__str__ = new_method_proxy(str)
__bool__ = new_method_proxy(bool)
__dir__ = new_method_proxy(dir)
__getitem__ = new_method_proxy(operator.getitem)
__setitem__ = new_method_proxy(operator.setitem)
__delitem__ = new_method_proxy(operator.delitem)
__iter__ = new_method_proxy(iter)
__reversed__ = new_method_proxy(reversed)
__len__ = new_method_proxy(len)
__contains__ = new_method_proxy(operator.contains)
def __repr__(self):
if self._wrapped is empty:
repr_attr = self._setupfunc
else:
repr_attr = self._wrapped
return f'<{type(self).__name__}: {repr_attr!r}>'