⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 util.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 2 页
字号:
    def __setitem__(self, key, object):        if key not in self:            self._list.append(key)        dict.__setitem__(self, key, object)    def __delitem__(self, key):        dict.__delitem__(self, key)        self._list.remove(key)    def pop(self, key):        value = dict.pop(self, key)        self._list.remove(key)        return value    def popitem(self):        item = dict.popitem(self)        self._list.remove(item[0])        return itemtry:    from threading import local as ThreadLocalexcept ImportError:    try:        from dummy_threading import local as ThreadLocal    except ImportError:        class ThreadLocal(object):            """An object in which attribute access occurs only within the context of the current thread."""            def __init__(self):                self.__dict__['_tdict'] = {}            def __delattr__(self, key):                try:                    del self._tdict[(thread.get_ident(), key)]                except KeyError:                    raise AttributeError(key)            def __getattr__(self, key):                try:                    return self._tdict[(thread.get_ident(), key)]                except KeyError:                    raise AttributeError(key)            def __setattr__(self, key, value):                self._tdict[(thread.get_ident(), key)] = valueclass OrderedSet(Set):    def __init__(self, d=None):        Set.__init__(self)        self._list = []        if d is not None:            self.update(d)    def add(self, key):        if key not in self:            self._list.append(key)        Set.add(self, key)    def remove(self, element):        Set.remove(self, element)        self._list.remove(element)    def discard(self, element):        try:            Set.remove(self, element)        except KeyError:            pass        else:            self._list.remove(element)    def clear(self):        Set.clear(self)        self._list = []    def __getitem__(self, key):        return self._list[key]    def __iter__(self):        return iter(self._list)    def __repr__(self):      return '%s(%r)' % (self.__class__.__name__, self._list)    __str__ = __repr__    def update(self, iterable):      add = self.add      for i in iterable:          add(i)      return self    __ior__ = update    def union(self, other):      result = self.__class__(self)      result.update(other)      return result    __or__ = union    def intersection(self, other):        other = Set(other)        return self.__class__([a for a in self if a in other])    __and__ = intersection    def symmetric_difference(self, other):        other = Set(other)        result = self.__class__([a for a in self if a not in other])        result.update([a for a in other if a not in self])        return result    __xor__ = symmetric_difference    def difference(self, other):        other = Set(other)        return self.__class__([a for a in self if a not in other])    __sub__ = difference    def intersection_update(self, other):        other = Set(other)        Set.intersection_update(self, other)        self._list = [ a for a in self._list if a in other]        return self    __iand__ = intersection_update    def symmetric_difference_update(self, other):      Set.symmetric_difference_update(self, other)      self._list =  [ a for a in self._list if a in self]      self._list += [ a for a in other._list if a in self]      return self    __ixor__ = symmetric_difference_update    def difference_update(self, other):        Set.difference_update(self, other)        self._list = [ a for a in self._list if a in self]        return self    __isub__ = difference_update    if hasattr(Set, '__getstate__'):        def __getstate__(self):            base = Set.__getstate__(self)            return base, self._list        def __setstate__(self, state):            Set.__setstate__(self, state[0])            self._list = state[1]class IdentitySet(object):    """A set that considers only object id() for uniqueness.    This strategy has edge cases for builtin types- it's possible to have    two 'foo' strings in one of these sets, for example.  Use sparingly.    """    _working_set = Set    def __init__(self, iterable=None):        self._members = _IterableUpdatableDict()        if iterable:            for o in iterable:                self.add(o)    def add(self, value):        self._members[id(value)] = value    def __contains__(self, value):        return id(value) in self._members    def remove(self, value):        del self._members[id(value)]    def discard(self, value):        try:            self.remove(value)        except KeyError:            pass    def pop(self):        try:            pair = self._members.popitem()            return pair[1]        except KeyError:            raise KeyError('pop from an empty set')    def clear(self):        self._members.clear()    def __cmp__(self, other):        raise TypeError('cannot compare sets using cmp()')    def __eq__(self, other):        if isinstance(other, IdentitySet):            return self._members == other._members        else:            return False    def __ne__(self, other):        if isinstance(other, IdentitySet):            return self._members != other._members        else:            return True    def issubset(self, iterable):        other = type(self)(iterable)        if len(self) > len(other):            return False        for m in itertools.ifilterfalse(other._members.has_key,                                        self._members.iterkeys()):            return False        return True    def __le__(self, other):        if not isinstance(other, IdentitySet):            return NotImplemented        return self.issubset(other)    def __lt__(self, other):        if not isinstance(other, IdentitySet):            return NotImplemented        return len(self) < len(other) and self.issubset(other)    def issuperset(self, iterable):        other = type(self)(iterable)        if len(self) < len(other):            return False        for m in itertools.ifilterfalse(self._members.has_key,                                        other._members.iterkeys()):            return False        return True    def __ge__(self, other):        if not isinstance(other, IdentitySet):            return NotImplemented        return self.issuperset(other)    def __gt__(self, other):        if not isinstance(other, IdentitySet):            return NotImplemented        return len(self) > len(other) and self.issuperset(other)    def union(self, iterable):        result = type(self)()        # testlib.pragma exempt:__hash__        result._members.update(            self._working_set(self._members.iteritems()).union(_iter_id(iterable)))        return result    def __or__(self, other):        if not isinstance(other, IdentitySet):            return NotImplemented        return self.union(other)    def update(self, iterable):        self._members = self.union(iterable)._members    def __ior__(self, other):        if not isinstance(other, IdentitySet):            return NotImplemented        self.update(other)        return self    def difference(self, iterable):        result = type(self)()        # testlib.pragma exempt:__hash__        result._members.update(            self._working_set(self._members.iteritems()).difference(_iter_id(iterable)))        return result    def __sub__(self, other):        if not isinstance(other, IdentitySet):            return NotImplemented        return self.difference(other)    def difference_update(self, iterable):        self._members = self.difference(iterable)._members    def __isub__(self, other):        if not isinstance(other, IdentitySet):            return NotImplemented        self.difference_update(other)        return self    def intersection(self, iterable):        result = type(self)()        # testlib.pragma exempt:__hash__        result._members.update(            self._working_set(self._members.iteritems()).intersection(_iter_id(iterable)))        return result    def __and__(self, other):        if not isinstance(other, IdentitySet):            return NotImplemented        return self.intersection(other)    def intersection_update(self, iterable):        self._members = self.intersection(iterable)._members    def __iand__(self, other):        if not isinstance(other, IdentitySet):            return NotImplemented        self.intersection_update(other)        return self    def symmetric_difference(self, iterable):        result = type(self)()        # testlib.pragma exempt:__hash__        result._members.update(            self._working_set(self._members.iteritems()).symmetric_difference(_iter_id(iterable)))        return result    def __xor__(self, other):        if not isinstance(other, IdentitySet):            return NotImplemented        return self.symmetric_difference(other)    def symmetric_difference_update(self, iterable):        self._members = self.symmetric_difference(iterable)._members    def __ixor__(self, other):        if not isinstance(other, IdentitySet):            return NotImplemented        self.symmetric_difference(other)        return self    def copy(self):        return type(self)(self._members.itervalues())    __copy__ = copy    def __len__(self):        return len(self._members)    def __iter__(self):        return self._members.itervalues()    def __hash__(self):        raise TypeError('set objects are unhashable')    def __repr__(self):        return '%s(%r)' % (type(self).__name__, self._members.values())if sys.version_info >= (2, 4):    _IterableUpdatableDict = dictelse:    class _IterableUpdatableDict(dict):        """A dict that can update(iterable) like Python 2.4+'s dict."""        def update(self, __iterable=None, **kw):            if __iterable is not None:                if not isinstance(__iterable, dict):                    __iterable = dict(__iterable)                dict.update(self, __iterable)            if kw:                dict.update(self, **kw)def _iter_id(iterable):    """Generator: ((id(o), o) for o in iterable)."""    for item in iterable:        yield id(item), itemclass OrderedIdentitySet(IdentitySet):    class _working_set(OrderedSet):        # a testing pragma: exempt the OIDS working set from the test suite's        # "never call the user's __hash__" assertions.  this is a big hammer,        # but it's safe here: IDS operates on (id, instance) tuples in the        # working set.        __sa_hash_exempt__ = True    def __init__(self, iterable=None):        IdentitySet.__init__(self)        self._members = OrderedDict()        if iterable:            for o in iterable:                self.add(o)class UniqueAppender(object):    """Only adds items to a collection once.    Additional appends() of the same object are ignored.  Membership is    determined by identity (``is a``) not equality (``==``).    """    def __init__(self, data, via=None):        self.data = data        self._unique = IdentitySet()        if via:            self._data_appender = getattr(data, via)        elif hasattr(data, 'append'):            self._data_appender = data.append        elif hasattr(data, 'add'):            # TODO: we think its a set here.  bypass unneeded uniquing logic ?            self._data_appender = data.add    def append(self, item):        if item not in self._unique:            self._data_appender(item)            self._unique.add(item)    def __iter__(self):        return iter(self.data)class ScopedRegistry(object):    """A Registry that can store one or multiple instances of a single    class on a per-thread scoped basis, or on a customized scope.    createfunc      a callable that returns a new object to be placed in the registry    scopefunc      a callable that will return a key to store/retrieve an object,      defaults to ``thread.get_ident`` for thread-local objects.  Use      a value like ``lambda: True`` for application scope.    """    def __init__(self, createfunc, scopefunc=None):        self.createfunc = createfunc        if scopefunc is None:            self.scopefunc = thread.get_ident        else:            self.scopefunc = scopefunc        self.registry = {}    def __call__(self):        key = self._get_key()        try:            return self.registry[key]        except KeyError:            return self.registry.setdefault(key, self.createfunc())    def has(self):        return self._get_key() in self.registry    def set(self, obj):        self.registry[self._get_key()] = obj    def clear(self):        try:            del self.registry[self._get_key()]        except KeyError:            pass    def _get_key(self):        return self.scopefunc()def warn(msg):    if isinstance(msg, basestring):        warnings.warn(msg, exceptions.SAWarning, stacklevel=3)    else:        warnings.warn(msg, stacklevel=3)def warn_deprecated(msg):    warnings.warn(msg, exceptions.SADeprecationWarning, stacklevel=3)def deprecated(func, message=None, add_deprecation_to_docstring=True):    """Decorates a function and issues a deprecation warning on use.    message      If provided, issue message in the warning.  A sensible default      is used if not provided.    add_deprecation_to_docstring      Default True.  If False, the wrapped function's __doc__ is left      as-is.  If True, the 'message' is prepended to the docs if      provided, or sensible default if message is omitted.    """    if message is not None:        warning = message % dict(func=func.__name__)    else:        warning = "Call to deprecated function %s" % func.__name__    def func_with_warning(*args, **kwargs):        warnings.warn(exceptions.SADeprecationWarning(warning),                      stacklevel=2)        return func(*args, **kwargs)    doc = func.__doc__ is not None and func.__doc__ or ''    if add_deprecation_to_docstring:        header = message is not None and warning or 'Deprecated.'        doc = '\n'.join((header.rstrip(), doc))    func_with_warning.__doc__ = doc    func_with_warning.__dict__.update(func.__dict__)    try:        func_with_warning.__name__ = func.__name__    except TypeError:        pass    return func_with_warning

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -