📄 unitofwork.py
字号:
self.assert_sql_count(testing.db, go, 1) def go(): self.assert_(h3.counter == h2.counter == 12) self.assert_(h2.foober == h3.foober == h4.foober == 'im foober') self.assert_(h5.foober=='im the new foober') self.assert_sql_count(testing.db, go, 0) Session.close() l = Hoho.query.all() (h1, h2, h3, h4, h5) = l self.assert_(h1.hoho==althohoval) self.assert_(h3.hoho==althohoval) self.assert_(h2.hoho==h4.hoho==h5.hoho==hohoval) self.assert_(h3.counter == h2.counter == 12) self.assert_(h1.counter == h4.counter==h5.counter==7) self.assert_(h2.foober == h3.foober == h4.foober == 'im foober') self.assert_(h5.foober=='im the new foober') def test_eager_defaults(self): class Hoho(object):pass mapper(Hoho, default_table, eager_defaults=True) h1 = Hoho() Session.commit() def go(): self.assert_(h1.hoho==hohoval) self.assert_sql_count(testing.db, go, 0) def test_insert_nopostfetch(self): # populates the PassiveDefaults explicitly so there is no "post-update" class Hoho(object):pass mapper(Hoho, default_table) h1 = Hoho(hoho="15", counter="15") Session.commit() def go(): self.assert_(h1.hoho=="15") self.assert_(h1.counter=="15") self.assert_(h1.foober=="im foober") self.assert_sql_count(testing.db, go, 0) def test_update(self): class Hoho(object):pass mapper(Hoho, default_table) h1 = Hoho() Session.commit() self.assertEquals(h1.foober, 'im foober') h1.counter = 19 Session.commit() self.assertEquals(h1.foober, 'im the update') def test_used_in_relation(self): """test that a server-side generated default can be used as the target of a foreign key""" class Hoho(fixtures.Base): pass class Secondary(fixtures.Base): pass mapper(Hoho, default_table, properties={ 'secondaries':relation(Secondary) }, save_on_init=False) mapper(Secondary, secondary_table, save_on_init=False) h1 = Hoho() s1 = Secondary(data='s1') h1.secondaries.append(s1) Session.save(h1) Session.commit() Session.clear() self.assertEquals(Session.query(Hoho).get(h1.id), Hoho(hoho=hohoval, secondaries=[Secondary(data='s1')])) h1 = Session.query(Hoho).get(h1.id) h1.secondaries.append(Secondary(data='s2')) Session.commit() Session.clear() self.assertEquals(Session.query(Hoho).get(h1.id), Hoho(hoho=hohoval, secondaries=[Secondary(data='s1'), Secondary(data='s2')]) ) class OneToManyTest(ORMTest): metadata = tables.metadata def define_tables(self, metadata): pass def test_onetomany_1(self): """test basic save of one to many.""" m = mapper(User, users, properties = dict( addresses = relation(mapper(Address, addresses), lazy = True) )) u = User() u.user_name = 'one2manytester' u.addresses = [] a = Address() a.email_address = 'one2many@test.org' u.addresses.append(a) a2 = Address() a2.email_address = 'lala@test.org' u.addresses.append(a2) print repr(u.addresses) Session.commit() usertable = users.select(users.c.user_id.in_([u.user_id])).execute().fetchall() self.assertEqual(usertable[0].values(), [u.user_id, 'one2manytester']) addresstable = addresses.select(addresses.c.address_id.in_([a.address_id, a2.address_id]), order_by=[addresses.c.email_address]).execute().fetchall() self.assertEqual(addresstable[0].values(), [a2.address_id, u.user_id, 'lala@test.org']) self.assertEqual(addresstable[1].values(), [a.address_id, u.user_id, 'one2many@test.org']) userid = u.user_id addressid = a2.address_id a2.email_address = 'somethingnew@foo.com' Session.commit() addresstable = addresses.select(addresses.c.address_id == addressid).execute().fetchall() self.assertEqual(addresstable[0].values(), [addressid, userid, 'somethingnew@foo.com']) self.assert_(u.user_id == userid and a2.address_id == addressid) def test_onetomany_2(self): """digs deeper into modifying the child items of an object to insure the correct updates take place""" m = mapper(User, users, properties = dict( addresses = relation(mapper(Address, addresses), lazy = True) )) u1 = User() u1.user_name = 'user1' u1.addresses = [] a1 = Address() a1.email_address = 'emailaddress1' u1.addresses.append(a1) u2 = User() u2.user_name = 'user2' u2.addresses = [] a2 = Address() a2.email_address = 'emailaddress2' u2.addresses.append(a2) a3 = Address() a3.email_address = 'emailaddress3' Session.commit() # modify user2 directly, append an address to user1. # upon commit, user2 should be updated, user1 should not # both address1 and address3 should be updated u2.user_name = 'user2modified' u1.addresses.append(a3) del u1.addresses[0] self.assert_sql(testing.db, lambda: Session.commit(), [ ( "UPDATE users SET user_name=:user_name WHERE users.user_id = :users_user_id", {'users_user_id': u2.user_id, 'user_name': 'user2modified'} ), ("UPDATE email_addresses SET user_id=:user_id WHERE email_addresses.address_id = :email_addresses_address_id", {'user_id': None, 'email_addresses_address_id': a1.address_id} ), ( "UPDATE email_addresses SET user_id=:user_id WHERE email_addresses.address_id = :email_addresses_address_id", {'user_id': u1.user_id, 'email_addresses_address_id': a3.address_id} ), ]) def test_childmove(self): """tests moving a child from one parent to the other, then deleting the first parent, properly updates the child with the new parent. this tests the 'trackparent' option in the attributes module.""" m = mapper(User, users, properties = dict( addresses = relation(mapper(Address, addresses), lazy = True) )) u1 = User() u1.user_name = 'user1' u2 = User() u2.user_name = 'user2' a = Address() a.email_address = 'address1' u1.addresses.append(a) Session.commit() del u1.addresses[0] u2.addresses.append(a) Session.delete(u1) Session.commit() Session.close() u2 = Session.get(User, u2.user_id) assert len(u2.addresses) == 1 def test_childmove_2(self): m = mapper(User, users, properties = dict( addresses = relation(mapper(Address, addresses), lazy = True) )) u1 = User() u1.user_name = 'user1' u2 = User() u2.user_name = 'user2' a = Address() a.email_address = 'address1' u1.addresses.append(a) Session.commit() del u1.addresses[0] u2.addresses.append(a) Session.commit() Session.close() u2 = Session.get(User, u2.user_id) assert len(u2.addresses) == 1 def test_o2m_delete_parent(self): m = mapper(User, users, properties = dict( address = relation(mapper(Address, addresses), lazy = True, uselist = False, private = False) )) u = User() a = Address() u.user_name = 'one2onetester' u.address = a u.address.email_address = 'myonlyaddress@foo.com' Session.commit() Session.delete(u) Session.commit() self.assert_(a.address_id is not None and a.user_id is None and u._instance_key not in Session.identity_map and a._instance_key in Session.identity_map) def test_onetoone(self): m = mapper(User, users, properties = dict( address = relation(mapper(Address, addresses), lazy = True, uselist = False) )) u = User() u.user_name = 'one2onetester' u.address = Address() u.address.email_address = 'myonlyaddress@foo.com' Session.commit() u.user_name = 'imnew' Session.commit() u.address.email_address = 'imnew@foo.com' Session.commit() def test_bidirectional(self): m1 = mapper(User, users) m2 = mapper(Address, addresses, properties = dict( user = relation(m1, lazy = False, backref='addresses') )) u = User() print repr(u.addresses) u.user_name = 'test' a = Address() a.email_address = 'testaddress' a.user = u Session.commit() print repr(u.addresses) x = False try: u.addresses.append('hi') x = True except: pass if x: self.assert_(False, "User addresses element should be scalar based") Session.delete(u) Session.commit() def test_doublerelation(self): m2 = mapper(Address, addresses) m = mapper(User, users, properties={ 'boston_addresses' : relation(m2, primaryjoin= and_(users.c.user_id==addresses.c.user_id, addresses.c.email_address.like('%boston%'))), 'newyork_addresses' : relation(m2, primaryjoin= and_(users.c.user_id==addresses.c.user_id, addresses.c.email_address.like('%newyork%'))), }) u = User() a = Address() a.email_address = 'foo@boston.com' b = Address() b.email_address = 'bar@newyork.com' u.boston_addresses.append(a) u.newyork_addresses.append(b) Session.commit()class SaveTest(ORMTest): metadata = tables.metadata def define_tables(self, metadata): pass def setUp(self): super(SaveTest, self).setUp() keywords.insert().execute( dict(name='blue'), dict(name='red'), dict(name='green'), dict(name='big'), dict(name='small'), dict(name='round'), dict(name='square') ) def test_basic(self): # save two users u = User() u.user_name = 'savetester' m = mapper(User, users) u2 = User() u2.user_name = 'savetester2' Session.save(u) Session.flush([u]) Session.commit() # assert the first one retreives the same from the identity map nu = Session.get(m, u.user_id) print "U: " + repr(u) + "NU: " + repr(nu) self.assert_(u is nu) # clear out the identity map, so next get forces a SELECT Session.close() # check it again, identity should be different but ids the same nu = Session.get(m, u.user_id) self.assert_(u is not nu and u.user_id == nu.user_id and nu.user_name == 'savetester') Session.close() # change first users name and save Session.update(u) u.user_name = 'modifiedname' assert u in Session.dirty Session.commit() # select both #Session.close() userlist = User.query.filter(users.c.user_id.in_([u.user_id, u2.user_id])).order_by([users.c.user_name]).all() print repr(u.user_id), repr(userlist[0].user_id), repr(userlist[0].user_name) self.assert_(u.user_id == userlist[0].user_id and userlist[0].user_name == 'modifiedname') self.assert_(u2.user_id == userlist[1].user_id and userlist[1].user_name == 'savetester2') def test_synonym(self): class User(object): def _get_name(self): return "User:" + self.user_name def _set_name(self, name): self.user_name = name + ":User" name = property(_get_name, _set_name) mapper(User, users, properties={ 'name':synonym('user_name') }) u = User() u.name = "some name" assert u.name == 'User:some name:User' Session.save(u) Session.flush() Session.clear() u = Session.query(User).first() assert u.name == 'User:some name:User' def test_lazyattr_commit(self):
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -