📄 collection.py
字号:
collections.MappedCollection.__init__(self, lambda e: e.a) util.OrderedDict.__init__(self) self._test_adapter(MyOrdered, dictable_entity, to_set=lambda c: set(c.values())) self._test_dict(MyOrdered) self._test_dict_bulk(MyOrdered) self.assert_(getattr(MyOrdered, '_sa_instrumented') == id(MyOrdered)) def test_dict_duck(self): class DictLike(object): def __init__(self): self.data = dict() @collection.appender @collection.replaces(1) def set(self, item): current = self.data.get(item.a, None) self.data[item.a] = item return current @collection.remover def _remove(self, item): del self.data[item.a] def __setitem__(self, key, value): self.data[key] = value def __getitem__(self, key): return self.data[key] def __delitem__(self, key): del self.data[key] def values(self): return self.data.values() def __contains__(self, key): return key in self.data @collection.iterator def itervalues(self): return self.data.itervalues() def __eq__(self, other): return self.data == other def __repr__(self): return 'DictLike(%s)' % repr(self.data) self._test_adapter(DictLike, dictable_entity, to_set=lambda c: set(c.itervalues())) self._test_dict(DictLike) self._test_dict_bulk(DictLike) self.assert_(getattr(DictLike, '_sa_instrumented') == id(DictLike)) def test_dict_emulates(self): class DictIsh(object): __emulates__ = dict def __init__(self): self.data = dict() @collection.appender @collection.replaces(1) def set(self, item): current = self.data.get(item.a, None) self.data[item.a] = item return current @collection.remover def _remove(self, item): del self.data[item.a] def __setitem__(self, key, value): self.data[key] = value def __getitem__(self, key): return self.data[key] def __delitem__(self, key): del self.data[key] def values(self): return self.data.values() def __contains__(self, key): return key in self.data @collection.iterator def itervalues(self): return self.data.itervalues() def __eq__(self, other): return self.data == other def __repr__(self): return 'DictIsh(%s)' % repr(self.data) self._test_adapter(DictIsh, dictable_entity, to_set=lambda c: set(c.itervalues())) self._test_dict(DictIsh) self._test_dict_bulk(DictIsh) self.assert_(getattr(DictIsh, '_sa_instrumented') == id(DictIsh)) def _test_object(self, typecallable, creator=entity_maker): class Foo(object): pass canary = Canary() attributes.register_class(Foo) attributes.register_attribute(Foo, 'attr', True, extension=canary, typecallable=typecallable, useobject=True) obj = Foo() adapter = collections.collection_adapter(obj.attr) direct = obj.attr control = set() def assert_eq(): self.assert_(set(direct) == canary.data) self.assert_(set(adapter) == canary.data) self.assert_(direct == control) # There is no API for object collections. We'll make one up # for the purposes of the test. e = creator() direct.push(e) control.add(e) assert_eq() direct.zark(e) control.remove(e) assert_eq() e = creator() direct.maybe_zark(e) control.discard(e) assert_eq() e = creator() direct.push(e) control.add(e) assert_eq() e = creator() direct.maybe_zark(e) control.discard(e) assert_eq() def test_object_duck(self): class MyCollection(object): def __init__(self): self.data = set() @collection.appender def push(self, item): self.data.add(item) @collection.remover def zark(self, item): self.data.remove(item) @collection.removes_return() def maybe_zark(self, item): if item in self.data: self.data.remove(item) return item @collection.iterator def __iter__(self): return iter(self.data) def __eq__(self, other): return self.data == other self._test_adapter(MyCollection) self._test_object(MyCollection) self.assert_(getattr(MyCollection, '_sa_instrumented') == id(MyCollection)) def test_object_emulates(self): class MyCollection2(object): __emulates__ = None def __init__(self): self.data = set() # looks like a list def append(self, item): assert False @collection.appender def push(self, item): self.data.add(item) @collection.remover def zark(self, item): self.data.remove(item) @collection.removes_return() def maybe_zark(self, item): if item in self.data: self.data.remove(item) return item @collection.iterator def __iter__(self): return iter(self.data) def __eq__(self, other): return self.data == other self._test_adapter(MyCollection2) self._test_object(MyCollection2) self.assert_(getattr(MyCollection2, '_sa_instrumented') == id(MyCollection2)) def test_recipes(self): class Custom(object): def __init__(self): self.data = [] @collection.appender @collection.adds('entity') def put(self, entity): self.data.append(entity) @collection.remover @collection.removes(1) def remove(self, entity): self.data.remove(entity) @collection.adds(1) def push(self, *args): self.data.append(args[0]) @collection.removes('entity') def yank(self, entity, arg): self.data.remove(entity) @collection.replaces(2) def replace(self, arg, entity, **kw): self.data.insert(0, entity) return self.data.pop() @collection.removes_return() def pop(self, key): return self.data.pop() @collection.iterator def __iter__(self): return iter(self.data) class Foo(object): pass canary = Canary() attributes.register_class(Foo) attributes.register_attribute(Foo, 'attr', True, extension=canary, typecallable=Custom, useobject=True) obj = Foo() adapter = collections.collection_adapter(obj.attr) direct = obj.attr control = list() def assert_eq(): self.assert_(set(direct) == canary.data) self.assert_(set(adapter) == canary.data) self.assert_(list(direct) == control) creator = entity_maker e1 = creator() direct.put(e1) control.append(e1) assert_eq() e2 = creator() direct.put(entity=e2) control.append(e2) assert_eq() direct.remove(e2) control.remove(e2) assert_eq() direct.remove(entity=e1) control.remove(e1) assert_eq() e3 = creator() direct.push(e3) control.append(e3) assert_eq() direct.yank(e3, 'blah') control.remove(e3) assert_eq() e4, e5, e6, e7 = creator(), creator(), creator(), creator() direct.put(e4) direct.put(e5) control.append(e4) control.append(e5) dr1 = direct.replace('foo', e6, bar='baz') control.insert(0, e6) cr1 = control.pop() assert_eq() self.assert_(dr1 is cr1) dr2 = direct.replace(arg=1, entity=e7) control.insert(0, e7) cr2 = control.pop() assert_eq() self.assert_(dr2 is cr2) dr3 = direct.pop('blah') cr3 = control.pop() assert_eq() self.assert_(dr3 is cr3) def test_lifecycle(self): class Foo(object): pass canary = Canary() creator = entity_maker attributes.register_class(Foo) attributes.register_attribute(Foo, 'attr', True, extension=canary, useobject=True) obj = Foo() col1 = obj.attr e1 = creator() obj.attr.append(e1) e2 = creator() bulk1 = [e2] # empty & sever col1 from obj obj.attr = bulk1 self.assert_(len(col1) == 0) self.assert_(len(canary.data) == 1) self.assert_(obj.attr is not col1) self.assert_(obj.attr is not bulk1) self.assert_(obj.attr == bulk1) e3 = creator() col1.append(e3) self.assert_(e3 not in canary.data) self.assert_(collections.collection_adapter(col1) is None) obj.attr[0] = e3 self.assert_(e3 in canary.data)class DictHelpersTest(ORMTest): def define_tables(self, metadata): global parents, children, Parent, Child parents = Table('parents', metadata, Column('id', Integer, primary_key=True), Column('label', String(128))) children = Table('children', metadata, Column('id', Integer, primary_key=True), Column('parent_id', Integer, ForeignKey('parents.id'), nullable=False), Column('a', String(128)), Column('b', String(128)), Column('c', String(128))) class Parent(object): def __init__(self, label=None): self.label = label class Child(object): def __init__(self, a=None, b=None, c=None): self.a = a self.b = b self.c = c def _test_scalar_mapped(self, collection_class): mapper(Child, children) mapper(Parent, parents, properties={ 'children': relation(Child, collection_class=collection_class, cascade="all, delete-orphan") }) p = Parent() p.children['foo'] = Child('foo', 'value') p.children['bar'] = Child('bar', 'value') session = create_session() session.save(p) session.flush() pid = p.id session.clear() p = session.query(Parent).get(pid) self.assert_(set(p.children.keys()) == set(['foo', 'bar'])) cid = p.children['foo'].id collections.collection_adapter(p.children).append_with_event( Child('foo', 'newvalue')) session.flush() session.clear() p = session.query(Parent).get(pid) self.assert_(set(p.children.keys()) == set(['foo', 'bar'])) self.assert_(p.children['foo'].id != cid) self.assert_(len(list(collections.collection_adapter(p.children))) == 2) session.flush() session.clear() p = session.query(Parent).get(pid) self.assert_(len(list(collections.collection_adapter(p.children))) == 2) collections.collection_adapter(p.children).remove_with_event( p.children['foo']) self.assert_(len(list(collections.collection_adapter(p.children))) == 1) session.flush() session.clear() p = session.query(Parent).get(pid) self.assert_(len(list(collections.collection_adapter(p.children))) == 1) del p.children['bar'] self.assert_(len(list(collections.collection_adapter(p.children))) == 0) session.flush() session.clear() p = session.query(Parent).get(pid) self.assert_(len(list(collections.collection_adapter(p.children))) == 0) def _test_composite_mapped(self, collection_class): mapper(Child, children) mapper(Parent, parents, properties={ 'children': relation(Child, collection_class=collection_class, cascade="all, delete-orphan") }) p = Parent() p.children[('foo', '1')] = Child('foo', '1', 'value 1') p.children[('foo', '2')] = Child('foo', '2', 'value 2') session = create_session() session.save(p) session.flush() pid = p.id session.clear() p = session.query(Parent).get(pid) self.assert_(set(p.children.keys()) == set([('foo', '1'), ('foo', '2')])) cid = p.children[('foo', '1')].id collections.collection_adapter(p.children).append_with_event( Child('foo', '1', 'newvalue')) session.flush() session.clear() p = session.query(Parent).get(pid) self.assert_(set(p.children.keys()) == set([('foo', '1'), ('foo', '2')])) self.assert_(p.children[('foo', '1')].id != cid) self.assert_(len(list(collections.collection_adapter(p.children))) == 2) def test_mapped_collection(self): collection_class = collections.mapped_collection(lambda c: c.a) self._test_scalar_mapped(collection_class) def test_mapped_collection2(self): collection_class = collections.mapped_collection(lambda c: (c.a, c.b)) self._test_composite_mapped(collection_class) def test_attr_mapped_collection(self): collection_class = collections.attribute_mapped_collection('a') self._test_scalar_mapped(collection_class) def test_column_mapped_collection(self): collection_class = collections.column_mapped_collection(children.c.a) self._test_scalar_mapped(collection_class) def test_column_mapped_collection2(self): collection_class = collections.column_mapped_collection((children.c.a, children.c.b)) self._test_composite_mapped(collection_class) def test_mixin(self): class Ordered(util.OrderedDict, collections.MappedCollection): def __init__(self): collections.MappedCollection.__init__(self, lambda v: v.a) util.OrderedDict.__init__(self) collection_class = Ordered self._test_scalar_mapped(collection_class) def test_mixin2(self): class Ordered2(util.OrderedDict, collections.MappedCollection): def __init__(self, keyfunc): collections.MappedCollection.__init__(self, keyfunc) util.OrderedDict.__init__(self) collection_class = lambda: Ordered2(lambda v: (v.a, v.b)) self._test_composite_mapped(collection_class)if __name__ == "__main__": testenv.main()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -