📄 relationships.py
字号:
Column('scol1', Integer, ForeignKey(sometable.c.col1)), Column('data', String(20)) ) def testbasic(self): class MyList(list): pass class Foo(object): pass class Bar(object): pass mapper(Foo, sometable, properties={ 'bars':relation(Bar, collection_class=MyList) }) mapper(Bar, someothertable) f = Foo() assert isinstance(f.bars, MyList) def testlazyload(self): """test that a 'set' can be used as a collection and can lazyload.""" class Foo(object): pass class Bar(object): pass mapper(Foo, sometable, properties={ 'bars':relation(Bar, collection_class=set) }) mapper(Bar, someothertable) f = Foo() f.bars.add(Bar()) f.bars.add(Bar()) sess = create_session() sess.save(f) sess.flush() sess.clear() f = sess.query(Foo).get(f.col1) assert len(list(f.bars)) == 2 f.bars.clear() def testdict(self): """test that a 'dict' can be used as a collection and can lazyload.""" class Foo(object): pass class Bar(object): pass class AppenderDict(dict): @collection.appender def set(self, item): self[id(item)] = item @collection.remover def remove(self, item): if id(item) in self: del self[id(item)] mapper(Foo, sometable, properties={ 'bars':relation(Bar, collection_class=AppenderDict) }) mapper(Bar, someothertable) f = Foo() f.bars.set(Bar()) f.bars.set(Bar()) sess = create_session() sess.save(f) sess.flush() sess.clear() f = sess.query(Foo).get(f.col1) assert len(list(f.bars)) == 2 f.bars.clear() def testdictwrapper(self): """test that the supplied 'dict' wrapper can be used as a collection and can lazyload.""" class Foo(object): pass class Bar(object): def __init__(self, data): self.data = data mapper(Foo, sometable, properties={ 'bars':relation(Bar, collection_class=collections.column_mapped_collection(someothertable.c.data)) }) mapper(Bar, someothertable) f = Foo() col = collections.collection_adapter(f.bars) col.append_with_event(Bar('a')) col.append_with_event(Bar('b')) sess = create_session() sess.save(f) sess.flush() sess.clear() f = sess.query(Foo).get(f.col1) assert len(list(f.bars)) == 2 existing = set([id(b) for b in f.bars.values()]) col = collections.collection_adapter(f.bars) col.append_with_event(Bar('b')) f.bars['a'] = Bar('a') sess.flush() sess.clear() f = sess.query(Foo).get(f.col1) assert len(list(f.bars)) == 2 replaced = set([id(b) for b in f.bars.values()]) self.assert_(existing != replaced) def testlist(self): class Parent(object): pass class Child(object): pass mapper(Parent, sometable, properties={ 'children':relation(Child, collection_class=list) }) mapper(Child, someothertable) control = list() p = Parent() o = Child() control.append(o) p.children.append(o) assert control == p.children assert control == list(p.children) o = [Child(), Child(), Child(), Child()] control.extend(o) p.children.extend(o) assert control == p.children assert control == list(p.children) assert control[0] == p.children[0] assert control[-1] == p.children[-1] assert control[1:3] == p.children[1:3] del control[1] del p.children[1] assert control == p.children assert control == list(p.children) o = [Child()] control[1:3] = o p.children[1:3] = o assert control == p.children assert control == list(p.children) o = [Child(), Child(), Child(), Child()] control[1:3] = o p.children[1:3] = o assert control == p.children assert control == list(p.children) o = [Child(), Child(), Child(), Child()] control[-1:-2] = o p.children[-1:-2] = o assert control == p.children assert control == list(p.children) o = [Child(), Child(), Child(), Child()] control[4:] = o p.children[4:] = o assert control == p.children assert control == list(p.children) o = Child() control.insert(0, o) p.children.insert(0, o) assert control == p.children assert control == list(p.children) o = Child() control.insert(3, o) p.children.insert(3, o) assert control == p.children assert control == list(p.children) o = Child() control.insert(999, o) p.children.insert(999, o) assert control == p.children assert control == list(p.children) del control[0:1] del p.children[0:1] assert control == p.children assert control == list(p.children) del control[1:1] del p.children[1:1] assert control == p.children assert control == list(p.children) del control[1:3] del p.children[1:3] assert control == p.children assert control == list(p.children) del control[7:] del p.children[7:] assert control == p.children assert control == list(p.children) assert control.pop() == p.children.pop() assert control == p.children assert control == list(p.children) assert control.pop(0) == p.children.pop(0) assert control == p.children assert control == list(p.children) assert control.pop(2) == p.children.pop(2) assert control == p.children assert control == list(p.children) o = Child() control.insert(2, o) p.children.insert(2, o) assert control == p.children assert control == list(p.children) control.remove(o) p.children.remove(o) assert control == p.children assert control == list(p.children) def testobj(self): class Parent(object): pass class Child(object): pass class MyCollection(object): def __init__(self): self.data = [] @collection.appender def append(self, value): self.data.append(value) @collection.remover def remove(self, value): self.data.remove(value) @collection.iterator def __iter__(self): return iter(self.data) mapper(Parent, sometable, properties={ 'children':relation(Child, collection_class=MyCollection) }) mapper(Child, someothertable) control = list() p1 = Parent() o = Child() control.append(o) p1.children.append(o) assert control == list(p1.children) o = Child() control.append(o) p1.children.append(o) assert control == list(p1.children) o = Child() control.append(o) p1.children.append(o) assert control == list(p1.children) sess = create_session() sess.save(p1) sess.flush() sess.clear() p2 = sess.query(Parent).get(p1.col1) o = list(p2.children) assert len(o) == 3class ViewOnlyTest(ORMTest): """test a view_only mapping where a third table is pulled into the primary join condition, using overlapping PK column names (should not produce "conflicting column" error)""" def define_tables(self, metadata): global t1, t2, t3 t1 = Table("t1", metadata, Column('id', Integer, primary_key=True), Column('data', String(40))) t2 = Table("t2", metadata, Column('id', Integer, primary_key=True), Column('data', String(40)), Column('t1id', Integer, ForeignKey('t1.id'))) t3 = Table("t3", metadata, Column('id', Integer, primary_key=True), Column('data', String(40)), Column('t2id', Integer, ForeignKey('t2.id')) ) def test_basic(self): class C1(object):pass class C2(object):pass class C3(object):pass mapper(C1, t1, properties={ 't2s':relation(C2), 't2_view':relation(C2, viewonly=True, primaryjoin=and_(t1.c.id==t2.c.t1id, t3.c.t2id==t2.c.id, t3.c.data==t1.c.data)) }) mapper(C2, t2) mapper(C3, t3, properties={ 't2':relation(C2) }) c1 = C1() c1.data = 'c1data' c2a = C2() c1.t2s.append(c2a) c2b = C2() c1.t2s.append(c2b) c3 = C3() c3.data='c1data' c3.t2 = c2b sess = create_session() sess.save(c1) sess.save(c3) sess.flush() sess.clear() c1 = sess.query(C1).get(c1.id) assert set([x.id for x in c1.t2s]) == set([c2a.id, c2b.id]) assert set([x.id for x in c1.t2_view]) == set([c2b.id])class ViewOnlyTest2(ORMTest): """test a view_only mapping where a third table is pulled into the primary join condition, using non-overlapping PK column names (should not produce "mapper has no column X" error)""" def define_tables(self, metadata): global t1, t2, t3 t1 = Table("t1", metadata, Column('t1id', Integer, primary_key=True), Column('data', String(40))) t2 = Table("t2", metadata, Column('t2id', Integer, primary_key=True), Column('data', String(40)), Column('t1id_ref', Integer, ForeignKey('t1.t1id'))) t3 = Table("t3", metadata, Column('t3id', Integer, primary_key=True), Column('data', String(40)), Column('t2id_ref', Integer, ForeignKey('t2.t2id')) ) def test_basic(self): class C1(object):pass class C2(object):pass class C3(object):pass mapper(C1, t1, properties={ 't2s':relation(C2), 't2_view':relation(C2, viewonly=True, primaryjoin=and_(t1.c.t1id==t2.c.t1id_ref, t3.c.t2id_ref==t2.c.t2id, t3.c.data==t1.c.data)) }) mapper(C2, t2) mapper(C3, t3, properties={ 't2':relation(C2) }) c1 = C1() c1.data = 'c1data' c2a = C2() c1.t2s.append(c2a) c2b = C2() c1.t2s.append(c2b) c3 = C3() c3.data='c1data' c3.t2 = c2b sess = create_session() sess.save(c1) sess.save(c3) sess.flush() sess.clear() c1 = sess.query(C1).get(c1.t1id) assert set([x.t2id for x in c1.t2s]) == set([c2a.t2id, c2b.t2id]) assert set([x.t2id for x in c1.t2_view]) == set([c2b.t2id])if __name__ == "__main__": testenv.main()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -