📄 collections.py
字号:
# 'append': ('fire_append_event', 1, None), # '__setitem__': ('fire_append_event', 1, 'fire_remove_event'), # 'pop': (None, None, 'fire_remove_event'), # } # } # In the normal call flow, a request for any of the 3 basic collection # types is transformed into one of our trivial subclasses # (e.g. InstrumentedList). Catch anything else that sneaks in here... if cls.__module__ == '__builtin__': raise exceptions.ArgumentError( "Can not instrument a built-in type. Use a " "subclass, even a trivial one.") collection_type = sautil.duck_type_collection(cls) if collection_type in __interfaces: roles = __interfaces[collection_type].copy() decorators = roles.pop('_decorators', {}) else: roles, decorators = {}, {} if hasattr(cls, '__instrumentation__'): roles.update(copy.deepcopy(getattr(cls, '__instrumentation__'))) methods = roles.pop('methods', {}) for name in dir(cls): method = getattr(cls, name) if not callable(method): continue # note role declarations if hasattr(method, '_sa_instrument_role'): role = method._sa_instrument_role assert role in ('appender', 'remover', 'iterator', 'on_link', 'converter') roles[role] = name # transfer instrumentation requests from decorated function # to the combined queue before, after = None, None if hasattr(method, '_sa_instrument_before'): op, argument = method._sa_instrument_before assert op in ('fire_append_event', 'fire_remove_event') before = op, argument if hasattr(method, '_sa_instrument_after'): op = method._sa_instrument_after assert op in ('fire_append_event', 'fire_remove_event') after = op if before: methods[name] = before[0], before[1], after elif after: methods[name] = None, None, after # apply ABC auto-decoration to methods that need it for method, decorator in decorators.items(): fn = getattr(cls, method, None) if fn and method not in methods and not hasattr(fn, '_sa_instrumented'): setattr(cls, method, decorator(fn)) # ensure all roles are present, and apply implicit instrumentation if # needed if 'appender' not in roles or not hasattr(cls, roles['appender']): raise exceptions.ArgumentError( "Type %s must elect an appender method to be " "a collection class" % cls.__name__) elif (roles['appender'] not in methods and not hasattr(getattr(cls, roles['appender']), '_sa_instrumented')): methods[roles['appender']] = ('fire_append_event', 1, None) if 'remover' not in roles or not hasattr(cls, roles['remover']): raise exceptions.ArgumentError( "Type %s must elect a remover method to be " "a collection class" % cls.__name__) elif (roles['remover'] not in methods and not hasattr(getattr(cls, roles['remover']), '_sa_instrumented')): methods[roles['remover']] = ('fire_remove_event', 1, None) if 'iterator' not in roles or not hasattr(cls, roles['iterator']): raise exceptions.ArgumentError( "Type %s must elect an iterator method to be " "a collection class" % cls.__name__) # apply ad-hoc instrumentation from decorators, class-level defaults # and implicit role declarations for method, (before, argument, after) in methods.items(): setattr(cls, method, _instrument_membership_mutator(getattr(cls, method), before, argument, after)) # intern the role map for role, method in roles.items(): setattr(cls, '_sa_%s' % role, getattr(cls, method)) setattr(cls, '_sa_instrumented', id(cls))def _instrument_membership_mutator(method, before, argument, after): """Route method args and/or return value through the collection adapter.""" # This isn't smart enough to handle @adds(1) for 'def fn(self, (a, b))' if before: fn_args = list(sautil.flatten_iterator(inspect.getargspec(method)[0])) if type(argument) is int: pos_arg = argument named_arg = len(fn_args) > argument and fn_args[argument] or None else: if argument in fn_args: pos_arg = fn_args.index(argument) else: pos_arg = None named_arg = argument del fn_args def wrapper(*args, **kw): if before: if pos_arg is None: if named_arg not in kw: raise exceptions.ArgumentError( "Missing argument %s" % argument) value = kw[named_arg] else: if len(args) > pos_arg: value = args[pos_arg] elif named_arg in kw: value = kw[named_arg] else: raise exceptions.ArgumentError( "Missing argument %s" % argument) initiator = kw.pop('_sa_initiator', None) if initiator is False: executor = None else: executor = getattr(args[0], '_sa_adapter', None) if before and executor: getattr(executor, before)(value, initiator) if not after or not executor: return method(*args, **kw) else: res = method(*args, **kw) if res is not None: getattr(executor, after)(res, initiator) return res try: wrapper._sa_instrumented = True wrapper.__name__ = method.__name__ wrapper.__doc__ = method.__doc__ except: pass return wrapperdef __set(collection, item, _sa_initiator=None): """Run set events, may eventually be inlined into decorators.""" if _sa_initiator is not False and item is not None: executor = getattr(collection, '_sa_adapter', None) if executor: getattr(executor, 'fire_append_event')(item, _sa_initiator)def __del(collection, item, _sa_initiator=None): """Run del events, may eventually be inlined into decorators.""" if _sa_initiator is not False and item is not None: executor = getattr(collection, '_sa_adapter', None) if executor: getattr(executor, 'fire_remove_event')(item, _sa_initiator)def __before_delete(collection, _sa_initiator=None): """Special method to run 'commit existing value' methods""" executor = getattr(collection, '_sa_adapter', None) if executor: getattr(executor, 'fire_pre_remove_event')(_sa_initiator)def _list_decorators(): """Hand-turned instrumentation wrappers that can decorate any list-like class.""" def _tidy(fn): setattr(fn, '_sa_instrumented', True) fn.__doc__ = getattr(getattr(list, fn.__name__), '__doc__') def append(fn): def append(self, item, _sa_initiator=None): # FIXME: example of fully inlining __set and adapter.fire # for critical path if _sa_initiator is not False and item is not None: executor = getattr(self, '_sa_adapter', None) if executor: executor.attr.fire_append_event(executor.owner_state, item, _sa_initiator) fn(self, item) _tidy(append) return append def remove(fn): def remove(self, value, _sa_initiator=None): __before_delete(self, _sa_initiator) # testlib.pragma exempt:__eq__ fn(self, value) __del(self, value, _sa_initiator) _tidy(remove) return remove def insert(fn): def insert(self, index, value): __set(self, value) fn(self, index, value) _tidy(insert) return insert def __setitem__(fn): def __setitem__(self, index, value): if not isinstance(index, slice): existing = self[index] if existing is not None: __del(self, existing) __set(self, value) fn(self, index, value) else: # slice assignment requires __delitem__, insert, __len__ if index.stop is None: stop = 0 elif index.stop < 0: stop = len(self) + index.stop else: stop = index.stop step = index.step or 1 rng = range(index.start or 0, stop, step) if step == 1: for i in rng: del self[index.start] i = index.start for item in value: self.insert(i, item) i += 1 else: if len(value) != len(rng): raise ValueError( "attempt to assign sequence of size %s to " "extended slice of size %s" % (len(value), len(rng))) for i, item in zip(rng, value): self.__setitem__(i, item) _tidy(__setitem__) return __setitem__ def __delitem__(fn): def __delitem__(self, index): if not isinstance(index, slice): item = self[index] __del(self, item) fn(self, index) else: # slice deletion requires __getslice__ and a slice-groking # __getitem__ for stepped deletion # note: not breaking this into atomic dels for item in self[index]: __del(self, item) fn(self, index) _tidy(__delitem__) return __delitem__ def __setslice__(fn): def __setslice__(self, start, end, values): for value in self[start:end]: __del(self, value) for value in values: __set(self, value) fn(self, start, end, values) _tidy(__setslice__) return __setslice__ def __delslice__(fn): def __delslice__(self, start, end): for value in self[start:end]: __del(self, value) fn(self, start, end) _tidy(__delslice__) return __delslice__ def extend(fn): def extend(self, iterable): for value in iterable: self.append(value) _tidy(extend) return extend def __iadd__(fn): def __iadd__(self, iterable): # list.__iadd__ takes any iterable and seems to let TypeError raise # as-is instead of returning NotImplemented for value in iterable: self.append(value) return self _tidy(__iadd__) return __iadd__ def pop(fn): def pop(self, index=-1): __before_delete(self) item = fn(self, index) __del(self, item) return item _tidy(pop) return pop # __imul__ : not wrapping this. all members of the collection are already # present, so no need to fire appends... wrapping it with an explicit # decorator is still possible, so events on *= can be had if they're # desired. hard to imagine a use case for __imul__, though. l = locals().copy() l.pop('_tidy') return ldef _dict_decorators(): """Hand-turned instrumentation wrappers that can decorate any dict-like mapping class.""" def _tidy(fn): setattr(fn, '_sa_instrumented', True) fn.__doc__ = getattr(getattr(dict, fn.__name__), '__doc__') Unspecified=object() def __setitem__(fn): def __setitem__(self, key, value, _sa_initiator=None): if key in self: __del(self, self[key], _sa_initiator) __set(self, value, _sa_initiator) fn(self, key, value) _tidy(__setitem__) return __setitem__ def __delitem__(fn): def __delitem__(self, key, _sa_initiator=None): if key in self: __del(self, self[key], _sa_initiator) fn(self, key) _tidy(__delitem__) return __delitem__
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -