📄 util.py
字号:
def __setitem__(self, key, object): if key not in self: self._list.append(key) dict.__setitem__(self, key, object) def __delitem__(self, key): dict.__delitem__(self, key) self._list.remove(key) def pop(self, key): value = dict.pop(self, key) self._list.remove(key) return value def popitem(self): item = dict.popitem(self) self._list.remove(item[0]) return itemtry: from threading import local as ThreadLocalexcept ImportError: try: from dummy_threading import local as ThreadLocal except ImportError: class ThreadLocal(object): """An object in which attribute access occurs only within the context of the current thread.""" def __init__(self): self.__dict__['_tdict'] = {} def __delattr__(self, key): try: del self._tdict[(thread.get_ident(), key)] except KeyError: raise AttributeError(key) def __getattr__(self, key): try: return self._tdict[(thread.get_ident(), key)] except KeyError: raise AttributeError(key) def __setattr__(self, key, value): self._tdict[(thread.get_ident(), key)] = valueclass OrderedSet(Set): def __init__(self, d=None): Set.__init__(self) self._list = [] if d is not None: self.update(d) def add(self, key): if key not in self: self._list.append(key) Set.add(self, key) def remove(self, element): Set.remove(self, element) self._list.remove(element) def discard(self, element): try: Set.remove(self, element) except KeyError: pass else: self._list.remove(element) def clear(self): Set.clear(self) self._list = [] def __getitem__(self, key): return self._list[key] def __iter__(self): return iter(self._list) def __repr__(self): return '%s(%r)' % (self.__class__.__name__, self._list) __str__ = __repr__ def update(self, iterable): add = self.add for i in iterable: add(i) return self __ior__ = update def union(self, other): result = self.__class__(self) result.update(other) return result __or__ = union def intersection(self, other): other = Set(other) return self.__class__([a for a in self if a in other]) __and__ = intersection def symmetric_difference(self, other): other = Set(other) result = self.__class__([a for a in self if a not in other]) result.update([a for a in other if a not in self]) return result __xor__ = symmetric_difference def difference(self, other): other = Set(other) return self.__class__([a for a in self if a not in other]) __sub__ = difference def intersection_update(self, other): other = Set(other) Set.intersection_update(self, other) self._list = [ a for a in self._list if a in other] return self __iand__ = intersection_update def symmetric_difference_update(self, other): Set.symmetric_difference_update(self, other) self._list = [ a for a in self._list if a in self] self._list += [ a for a in other._list if a in self] return self __ixor__ = symmetric_difference_update def difference_update(self, other): Set.difference_update(self, other) self._list = [ a for a in self._list if a in self] return self __isub__ = difference_update if hasattr(Set, '__getstate__'): def __getstate__(self): base = Set.__getstate__(self) return base, self._list def __setstate__(self, state): Set.__setstate__(self, state[0]) self._list = state[1]class IdentitySet(object): """A set that considers only object id() for uniqueness. This strategy has edge cases for builtin types- it's possible to have two 'foo' strings in one of these sets, for example. Use sparingly. """ _working_set = Set def __init__(self, iterable=None): self._members = _IterableUpdatableDict() if iterable: for o in iterable: self.add(o) def add(self, value): self._members[id(value)] = value def __contains__(self, value): return id(value) in self._members def remove(self, value): del self._members[id(value)] def discard(self, value): try: self.remove(value) except KeyError: pass def pop(self): try: pair = self._members.popitem() return pair[1] except KeyError: raise KeyError('pop from an empty set') def clear(self): self._members.clear() def __cmp__(self, other): raise TypeError('cannot compare sets using cmp()') def __eq__(self, other): if isinstance(other, IdentitySet): return self._members == other._members else: return False def __ne__(self, other): if isinstance(other, IdentitySet): return self._members != other._members else: return True def issubset(self, iterable): other = type(self)(iterable) if len(self) > len(other): return False for m in itertools.ifilterfalse(other._members.has_key, self._members.iterkeys()): return False return True def __le__(self, other): if not isinstance(other, IdentitySet): return NotImplemented return self.issubset(other) def __lt__(self, other): if not isinstance(other, IdentitySet): return NotImplemented return len(self) < len(other) and self.issubset(other) def issuperset(self, iterable): other = type(self)(iterable) if len(self) < len(other): return False for m in itertools.ifilterfalse(self._members.has_key, other._members.iterkeys()): return False return True def __ge__(self, other): if not isinstance(other, IdentitySet): return NotImplemented return self.issuperset(other) def __gt__(self, other): if not isinstance(other, IdentitySet): return NotImplemented return len(self) > len(other) and self.issuperset(other) def union(self, iterable): result = type(self)() # testlib.pragma exempt:__hash__ result._members.update( self._working_set(self._members.iteritems()).union(_iter_id(iterable))) return result def __or__(self, other): if not isinstance(other, IdentitySet): return NotImplemented return self.union(other) def update(self, iterable): self._members = self.union(iterable)._members def __ior__(self, other): if not isinstance(other, IdentitySet): return NotImplemented self.update(other) return self def difference(self, iterable): result = type(self)() # testlib.pragma exempt:__hash__ result._members.update( self._working_set(self._members.iteritems()).difference(_iter_id(iterable))) return result def __sub__(self, other): if not isinstance(other, IdentitySet): return NotImplemented return self.difference(other) def difference_update(self, iterable): self._members = self.difference(iterable)._members def __isub__(self, other): if not isinstance(other, IdentitySet): return NotImplemented self.difference_update(other) return self def intersection(self, iterable): result = type(self)() # testlib.pragma exempt:__hash__ result._members.update( self._working_set(self._members.iteritems()).intersection(_iter_id(iterable))) return result def __and__(self, other): if not isinstance(other, IdentitySet): return NotImplemented return self.intersection(other) def intersection_update(self, iterable): self._members = self.intersection(iterable)._members def __iand__(self, other): if not isinstance(other, IdentitySet): return NotImplemented self.intersection_update(other) return self def symmetric_difference(self, iterable): result = type(self)() # testlib.pragma exempt:__hash__ result._members.update( self._working_set(self._members.iteritems()).symmetric_difference(_iter_id(iterable))) return result def __xor__(self, other): if not isinstance(other, IdentitySet): return NotImplemented return self.symmetric_difference(other) def symmetric_difference_update(self, iterable): self._members = self.symmetric_difference(iterable)._members def __ixor__(self, other): if not isinstance(other, IdentitySet): return NotImplemented self.symmetric_difference(other) return self def copy(self): return type(self)(self._members.itervalues()) __copy__ = copy def __len__(self): return len(self._members) def __iter__(self): return self._members.itervalues() def __hash__(self): raise TypeError('set objects are unhashable') def __repr__(self): return '%s(%r)' % (type(self).__name__, self._members.values())if sys.version_info >= (2, 4): _IterableUpdatableDict = dictelse: class _IterableUpdatableDict(dict): """A dict that can update(iterable) like Python 2.4+'s dict.""" def update(self, __iterable=None, **kw): if __iterable is not None: if not isinstance(__iterable, dict): __iterable = dict(__iterable) dict.update(self, __iterable) if kw: dict.update(self, **kw)def _iter_id(iterable): """Generator: ((id(o), o) for o in iterable).""" for item in iterable: yield id(item), itemclass OrderedIdentitySet(IdentitySet): class _working_set(OrderedSet): # a testing pragma: exempt the OIDS working set from the test suite's # "never call the user's __hash__" assertions. this is a big hammer, # but it's safe here: IDS operates on (id, instance) tuples in the # working set. __sa_hash_exempt__ = True def __init__(self, iterable=None): IdentitySet.__init__(self) self._members = OrderedDict() if iterable: for o in iterable: self.add(o)class UniqueAppender(object): """Only adds items to a collection once. Additional appends() of the same object are ignored. Membership is determined by identity (``is a``) not equality (``==``). """ def __init__(self, data, via=None): self.data = data self._unique = IdentitySet() if via: self._data_appender = getattr(data, via) elif hasattr(data, 'append'): self._data_appender = data.append elif hasattr(data, 'add'): # TODO: we think its a set here. bypass unneeded uniquing logic ? self._data_appender = data.add def append(self, item): if item not in self._unique: self._data_appender(item) self._unique.add(item) def __iter__(self): return iter(self.data)class ScopedRegistry(object): """A Registry that can store one or multiple instances of a single class on a per-thread scoped basis, or on a customized scope. createfunc a callable that returns a new object to be placed in the registry scopefunc a callable that will return a key to store/retrieve an object, defaults to ``thread.get_ident`` for thread-local objects. Use a value like ``lambda: True`` for application scope. """ def __init__(self, createfunc, scopefunc=None): self.createfunc = createfunc if scopefunc is None: self.scopefunc = thread.get_ident else: self.scopefunc = scopefunc self.registry = {} def __call__(self): key = self._get_key() try: return self.registry[key] except KeyError: return self.registry.setdefault(key, self.createfunc()) def has(self): return self._get_key() in self.registry def set(self, obj): self.registry[self._get_key()] = obj def clear(self): try: del self.registry[self._get_key()] except KeyError: pass def _get_key(self): return self.scopefunc()def warn(msg): if isinstance(msg, basestring): warnings.warn(msg, exceptions.SAWarning, stacklevel=3) else: warnings.warn(msg, stacklevel=3)def warn_deprecated(msg): warnings.warn(msg, exceptions.SADeprecationWarning, stacklevel=3)def deprecated(func, message=None, add_deprecation_to_docstring=True): """Decorates a function and issues a deprecation warning on use. message If provided, issue message in the warning. A sensible default is used if not provided. add_deprecation_to_docstring Default True. If False, the wrapped function's __doc__ is left as-is. If True, the 'message' is prepended to the docs if provided, or sensible default if message is omitted. """ if message is not None: warning = message % dict(func=func.__name__) else: warning = "Call to deprecated function %s" % func.__name__ def func_with_warning(*args, **kwargs): warnings.warn(exceptions.SADeprecationWarning(warning), stacklevel=2) return func(*args, **kwargs) doc = func.__doc__ is not None and func.__doc__ or '' if add_deprecation_to_docstring: header = message is not None and warning or 'Deprecated.' doc = '\n'.join((header.rstrip(), doc)) func_with_warning.__doc__ = doc func_with_warning.__dict__.update(func.__dict__) try: func_with_warning.__name__ = func.__name__ except TypeError: pass return func_with_warning
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -