📄 collection.py
字号:
import testenv; testenv.configure_for_tests()import sysfrom operator import and_from sqlalchemy import *import sqlalchemy.exceptions as exceptionsfrom sqlalchemy.orm import create_session, mapper, relation, \ interfaces, attributesimport sqlalchemy.orm.collections as collectionsfrom sqlalchemy.orm.collections import collectionfrom sqlalchemy import utilfrom testlib import *try: py_set = __builtins__.setexcept AttributeError: import sets py_set = sets.Setclass Canary(interfaces.AttributeExtension): def __init__(self): self.data = set() self.added = set() self.removed = set() def append(self, obj, value, initiator): assert value not in self.added self.data.add(value) self.added.add(value) def remove(self, obj, value, initiator): assert value not in self.removed self.data.remove(value) self.removed.add(value) def set(self, obj, value, oldvalue, initiator): if oldvalue is not None: self.remove(obj, oldvalue, None) self.append(obj, value, None)class Entity(object): def __init__(self, a=None, b=None, c=None): self.a = a self.b = b self.c = c def __repr__(self): return str((id(self), self.a, self.b, self.c))attributes.register_class(Entity)_id = 1def entity_maker(): global _id _id += 1 return Entity(_id)def dictable_entity(a=None, b=None, c=None): global _id _id += 1 return Entity(a or str(_id), b or 'value %s' % _id, c)class CollectionsTest(TestBase): def _test_adapter(self, typecallable, creator=entity_maker, to_set=None): class Foo(object): pass canary = Canary() attributes.register_class(Foo) attributes.register_attribute(Foo, 'attr', True, extension=canary, typecallable=typecallable, useobject=True) obj = Foo() adapter = collections.collection_adapter(obj.attr) direct = obj.attr if to_set is None: to_set = lambda col: set(col) def assert_eq(): self.assert_(to_set(direct) == canary.data) self.assert_(set(adapter) == canary.data) assert_ne = lambda: self.assert_(to_set(direct) != canary.data) e1, e2 = creator(), creator() adapter.append_with_event(e1) assert_eq() adapter.append_without_event(e2) assert_ne() canary.data.add(e2) assert_eq() adapter.remove_without_event(e2) assert_ne() canary.data.remove(e2) assert_eq() adapter.remove_with_event(e1) assert_eq() def _test_list(self, typecallable, creator=entity_maker): class Foo(object): pass canary = Canary() attributes.register_class(Foo) attributes.register_attribute(Foo, 'attr', True, extension=canary, typecallable=typecallable, useobject=True) obj = Foo() adapter = collections.collection_adapter(obj.attr) direct = obj.attr control = list() def assert_eq(): self.assert_(set(direct) == canary.data) self.assert_(set(adapter) == canary.data) self.assert_(direct == control) # assume append() is available for list tests e = creator() direct.append(e) control.append(e) assert_eq() if hasattr(direct, 'pop'): direct.pop() control.pop() assert_eq() if hasattr(direct, '__setitem__'): e = creator() direct.append(e) control.append(e) e = creator() direct[0] = e control[0] = e assert_eq() if reduce(and_, [hasattr(direct, a) for a in ('__delitem__', 'insert', '__len__')], True): values = [creator(), creator(), creator(), creator()] direct[slice(0,1)] = values control[slice(0,1)] = values assert_eq() values = [creator(), creator()] direct[slice(0,-1,2)] = values control[slice(0,-1,2)] = values assert_eq() values = [creator()] direct[slice(0,-1)] = values control[slice(0,-1)] = values assert_eq() if hasattr(direct, '__delitem__'): e = creator() direct.append(e) control.append(e) del direct[-1] del control[-1] assert_eq() if hasattr(direct, '__getslice__'): for e in [creator(), creator(), creator(), creator()]: direct.append(e) control.append(e) del direct[:-3] del control[:-3] assert_eq() del direct[0:1] del control[0:1] assert_eq() del direct[::2] del control[::2] assert_eq() if hasattr(direct, 'remove'): e = creator() direct.append(e) control.append(e) direct.remove(e) control.remove(e) assert_eq() if hasattr(direct, '__setslice__'): values = [creator(), creator()] direct[0:1] = values control[0:1] = values assert_eq() values = [creator()] direct[0:] = values control[0:] = values assert_eq() values = [creator()] direct[:1] = values control[:1] = values assert_eq() values = [creator()] direct[-1::2] = values control[-1::2] = values assert_eq() values = [creator()] * len(direct[1::2]) direct[1::2] = values control[1::2] = values assert_eq() if hasattr(direct, '__delslice__'): for i in range(1, 4): e = creator() direct.append(e) control.append(e) del direct[-1:] del control[-1:] assert_eq() del direct[1:2] del control[1:2] assert_eq() del direct[:] del control[:] assert_eq() if hasattr(direct, 'extend'): values = [creator(), creator(), creator()] direct.extend(values) control.extend(values) assert_eq() if hasattr(direct, '__iadd__'): values = [creator(), creator(), creator()] direct += values control += values assert_eq() direct += [] control += [] assert_eq() values = [creator(), creator()] obj.attr += values control += values assert_eq() if hasattr(direct, '__imul__'): direct *= 2 control *= 2 assert_eq() obj.attr *= 2 control *= 2 assert_eq() def _test_list_bulk(self, typecallable, creator=entity_maker): class Foo(object): pass canary = Canary() attributes.register_class(Foo) attributes.register_attribute(Foo, 'attr', True, extension=canary, typecallable=typecallable, useobject=True) obj = Foo() direct = obj.attr e1 = creator() obj.attr.append(e1) like_me = typecallable() e2 = creator() like_me.append(e2) self.assert_(obj.attr is direct) obj.attr = like_me self.assert_(obj.attr is not direct) self.assert_(obj.attr is not like_me) self.assert_(set(obj.attr) == set([e2])) self.assert_(e1 in canary.removed) self.assert_(e2 in canary.added) e3 = creator() real_list = [e3] obj.attr = real_list self.assert_(obj.attr is not real_list) self.assert_(set(obj.attr) == set([e3])) self.assert_(e2 in canary.removed) self.assert_(e3 in canary.added) e4 = creator() try: obj.attr = set([e4]) self.assert_(False) except TypeError: self.assert_(e4 not in canary.data) self.assert_(e3 in canary.data) e5 = creator() e6 = creator() e7 = creator() obj.attr = [e5, e6, e7] self.assert_(e5 in canary.added) self.assert_(e6 in canary.added) self.assert_(e7 in canary.added) obj.attr = [e6, e7] self.assert_(e5 in canary.removed) self.assert_(e6 in canary.added) self.assert_(e7 in canary.added) self.assert_(e6 not in canary.removed) self.assert_(e7 not in canary.removed) def test_list(self): self._test_adapter(list) self._test_list(list) self._test_list_bulk(list) def test_list_subclass(self): class MyList(list): pass self._test_adapter(MyList) self._test_list(MyList) self._test_list_bulk(MyList) self.assert_(getattr(MyList, '_sa_instrumented') == id(MyList)) def test_list_duck(self): class ListLike(object): def __init__(self): self.data = list() def append(self, item): self.data.append(item) def remove(self, item): self.data.remove(item) def insert(self, index, item): self.data.insert(index, item) def pop(self, index=-1): return self.data.pop(index) def extend(self): assert False def __iter__(self): return iter(self.data) def __eq__(self, other): return self.data == other def __repr__(self): return 'ListLike(%s)' % repr(self.data) self._test_adapter(ListLike) self._test_list(ListLike) self._test_list_bulk(ListLike) self.assert_(getattr(ListLike, '_sa_instrumented') == id(ListLike)) def test_list_emulates(self): class ListIsh(object): __emulates__ = list def __init__(self): self.data = list() def append(self, item): self.data.append(item) def remove(self, item): self.data.remove(item) def insert(self, index, item): self.data.insert(index, item) def pop(self, index=-1): return self.data.pop(index) def extend(self): assert False def __iter__(self): return iter(self.data) def __eq__(self, other): return self.data == other def __repr__(self): return 'ListIsh(%s)' % repr(self.data) self._test_adapter(ListIsh) self._test_list(ListIsh) self._test_list_bulk(ListIsh) self.assert_(getattr(ListIsh, '_sa_instrumented') == id(ListIsh)) def _test_set(self, typecallable, creator=entity_maker): class Foo(object): pass canary = Canary() attributes.register_class(Foo) attributes.register_attribute(Foo, 'attr', True, extension=canary, typecallable=typecallable, useobject=True) obj = Foo() adapter = collections.collection_adapter(obj.attr) direct = obj.attr control = set() def assert_eq(): self.assert_(set(direct) == canary.data) self.assert_(set(adapter) == canary.data) self.assert_(direct == control) def addall(*values): for item in values: direct.add(item) control.add(item) assert_eq() def zap(): for item in list(direct): direct.remove(item) control.clear() # assume add() is available for list tests addall(creator()) if hasattr(direct, 'pop'): direct.pop() control.pop() assert_eq() if hasattr(direct, 'remove'): e = creator() addall(e) direct.remove(e) control.remove(e) assert_eq() e = creator() try: direct.remove(e) except KeyError: assert_eq() self.assert_(e not in canary.removed) else: self.assert_(False) if hasattr(direct, 'discard'): e = creator() addall(e) direct.discard(e) control.discard(e) assert_eq() e = creator() direct.discard(e) self.assert_(e not in canary.removed) assert_eq() if hasattr(direct, 'update'): zap() e = creator() addall(e) values = set([e, creator(), creator()]) direct.update(values) control.update(values) assert_eq() if hasattr(direct, '__ior__'): zap() e = creator() addall(e) values = set([e, creator(), creator()]) direct |= values control |= values assert_eq() # cover self-assignment short-circuit values = set([e, creator(), creator()]) obj.attr |= values control |= values
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -