📄 mapper.py
字号:
self.assert_result(l, User, *user_result[0:2]) def test_mappingtojoinnopk(self): metadata = MetaData() account_ids_table = Table('account_ids', metadata, Column('account_id', Integer, primary_key=True), Column('username', String(20))) account_stuff_table = Table('account_stuff', metadata, Column('account_id', Integer, ForeignKey('account_ids.account_id')), Column('credit', Numeric)) class A(object):pass m = mapper(A, account_ids_table.join(account_stuff_table)) m.compile() assert account_ids_table in m._pks_by_table assert account_stuff_table not in m._pks_by_table metadata.create_all(testing.db) try: sess = create_session(bind=testing.db) a = A() sess.save(a) sess.flush() assert testing.db.execute(account_ids_table.count()).scalar() == 1 assert testing.db.execute(account_stuff_table.count()).scalar() == 0 finally: metadata.drop_all(testing.db) def test_mappingtoouterjoin(self): """test mapping to an outer join, with a composite primary key that allows nulls""" result = [ {'user_id' : 7, 'address_id' : 1}, {'user_id' : 8, 'address_id' : 2}, {'user_id' : 8, 'address_id' : 3}, {'user_id' : 8, 'address_id' : 4}, {'user_id' : 9, 'address_id':None} ] j = join(users, addresses, isouter=True) m = mapper(User, j, allow_null_pks=True, primary_key=[users.c.user_id, addresses.c.address_id]) q = create_session().query(m) l = q.all() self.assert_result(l, User, *result) def test_customjoin(self): """Tests that select_from totally replace the FROM parameters.""" m = mapper(User, users, properties={ 'orders':relation(mapper(Order, orders, properties={ 'items':relation(mapper(Item, orderitems)) })) }) q = create_session().query(m) l = (q.select_from(users.join(orders).join(orderitems)). filter(orderitems.c.item_name=='item 4')) self.assert_result(l, User, user_result[0]) @testing.uses_deprecated('//select') def test_customjoin_deprecated(self): """test that the from_obj parameter to query.select() can be used to totally replace the FROM parameters of the generated query.""" m = mapper(User, users, properties={ 'orders':relation(mapper(Order, orders, properties={ 'items':relation(mapper(Item, orderitems)) })) }) q = create_session().query(m) l = q.select((orderitems.c.item_name=='item 4'), from_obj=[users.join(orders).join(orderitems)]) self.assert_result(l, User, user_result[0]) def test_orderby(self): """test ordering at the mapper and query level""" # TODO: make a unit test out of these various combinations #m = mapper(User, users, order_by=desc(users.c.user_name)) mapper(User, users, order_by=None) #mapper(User, users) #l = create_session().query(User).select(order_by=[desc(users.c.user_name), asc(users.c.user_id)]) l = create_session().query(User).all() #l = create_session().query(User).select(order_by=[]) #l = create_session().query(User).select(order_by=None) @testing.unsupported('firebird') def test_function(self): """Test mapping to a SELECT statement that has functions in it.""" s = select([users, (users.c.user_id * 2).label('concat'), func.count(addresses.c.address_id).label('count')], users.c.user_id == addresses.c.user_id, group_by=[c for c in users.c]).alias('myselect') mapper(User, s) sess = create_session() l = sess.query(User).all() for u in l: print "User", u.user_id, u.user_name, u.concat, u.count assert l[0].concat == l[0].user_id * 2 == 14 assert l[1].concat == l[1].user_id * 2 == 16 @testing.unsupported('firebird') def test_count(self): """test the count function on Query. (why doesnt this work on firebird?)""" mapper(User, users) q = create_session().query(User) self.assert_(q.count()==3) self.assert_(q.count(users.c.user_id.in_([8,9]))==2) @testing.unsupported('firebird') @testing.uses_deprecated('//count_by', '//join_by', '//join_via') def test_count_by_deprecated(self): mapper(User, users) q = create_session().query(User) self.assert_(q.count_by(user_name='fred')==1) def test_manytomany_count(self): mapper(Item, orderitems, properties = dict( keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy = True), )) q = create_session().query(Item) assert q.join('keywords').distinct().count(Keyword.c.name=="red") == 2 def test_override(self): # assert that overriding a column raises an error try: m = mapper(User, users, properties = { 'user_name' : relation(mapper(Address, addresses)), }).compile() self.assert_(False, "should have raised ArgumentError") except exceptions.ArgumentError, e: self.assert_(True) clear_mappers() # assert that allow_column_override cancels the error m = mapper(User, users, properties = { 'user_name' : relation(mapper(Address, addresses)) }, allow_column_override=True) clear_mappers() # assert that the column being named else where also cancels the error m = mapper(User, users, properties = { 'user_name' : relation(mapper(Address, addresses)), 'foo' : users.c.user_name, }) def test_synonym(self): sess = create_session() assert_col = [] class User(object): def _get_user_name(self): assert_col.append(('get', self.user_name)) return self.user_name def _set_user_name(self, name): assert_col.append(('set', name)) self.user_name = name uname = property(_get_user_name, _set_user_name) mapper(User, users, properties = dict( addresses = relation(mapper(Address, addresses), lazy = True), uname = synonym('user_name'), adlist = synonym('addresses', proxy=True), adname = synonym('addresses') )) assert hasattr(User, 'adlist') assert hasattr(User, 'adname') # as of 0.4.2, synonyms always create a property # test compile assert not isinstance(User.uname == 'jack', bool) u = sess.query(User).filter(User.uname=='jack').one() self.assert_result(u.adlist, Address, *(user_address_result[0]['addresses'][1])) addr = sess.query(Address).filter_by(address_id=user_address_result[0]['addresses'][1][0]['address_id']).one() u = sess.query(User).filter_by(adname=addr).one() u2 = sess.query(User).filter_by(adlist=addr).one() assert u is u2 assert u not in sess.dirty u.uname = "some user name" assert len(assert_col) > 0 assert assert_col == [('set', 'some user name')], str(assert_col) assert u.uname == "some user name" assert assert_col == [('set', 'some user name'), ('get', 'some user name')], str(assert_col) assert u.user_name == "some user name" assert u in sess.dirty def test_column_synonyms(self): """test new-style synonyms which automatically instrument properties, set up aliased column, etc.""" sess = create_session() assert_col = [] class User(object): def _get_user_name(self): assert_col.append(('get', self._user_name)) return self._user_name def _set_user_name(self, name): assert_col.append(('set', name)) self._user_name = name user_name = property(_get_user_name, _set_user_name) mapper(Address, addresses) try: mapper(User, users, properties = { 'addresses':relation(Address, lazy=True), 'not_user_name':synonym('_user_name', map_column=True) }) User.not_user_name assert False except exceptions.ArgumentError, e: assert str(e) == "Can't compile synonym '_user_name': no column on table 'users' named 'not_user_name'" clear_mappers() mapper(Address, addresses) mapper(User, users, properties = { 'addresses':relation(Address, lazy=True), 'user_name':synonym('_user_name', map_column=True) }) # test compile assert not isinstance(User.user_name == 'jack', bool) assert hasattr(User, 'user_name') assert hasattr(User, '_user_name') u = sess.query(User).filter(User.user_name == 'jack').one() assert u.user_name == 'jack' u.user_name = 'foo' assert u.user_name == 'foo' assert assert_col == [('get', 'jack'), ('set', 'foo'), ('get', 'foo')]class OptionsTest(MapperSuperTest): @testing.fails_on('maxdb') def test_synonymoptions(self): sess = create_session() mapper(User, users, properties = dict( addresses = relation(mapper(Address, addresses), lazy = True), adlist = synonym('addresses', proxy=True) )) def go(): u = sess.query(User).options(eagerload('adlist')).filter_by(user_name='jack').one() self.assert_result(u.adlist, Address, *(user_address_result[0]['addresses'][1])) self.assert_sql_count(testing.db, go, 1) @testing.uses_deprecated('//select_by') def test_extension_options(self): sess = create_session() class ext1(MapperExtension): def populate_instance(self, mapper, selectcontext, row, instance, **flags): """test options at the Mapper._instance level""" instance.TEST = "hello world" return EXT_CONTINUE mapper(User, users, extension=ext1(), properties={ 'addresses':relation(mapper(Address, addresses), lazy=False) }) class testext(MapperExtension): def select_by(self, *args, **kwargs): """test options at the Query level""" return "HI" def populate_instance(self, mapper, selectcontext, row, instance, **flags): """test options at the Mapper._instance level""" instance.TEST_2 = "also hello world" return EXT_CONTINUE l = sess.query(User).options(extension(testext())).select_by(x=5) assert l == "HI" l = sess.query(User).options(extension(testext())).get(7) assert l.user_id == 7 assert l.TEST == "hello world" assert l.TEST_2 == "also hello world" assert not hasattr(l.addresses[0], 'TEST') assert not hasattr(l.addresses[0], 'TEST2') def test_eageroptions(self): """tests that a lazy relation can be upgraded to an eager relation via the options method""" sess = create_session() mapper(User, users, properties = dict( addresses = relation(mapper(Address, addresses)) )) l = sess.query(User).options(eagerload('addresses')).all() def go(): self.assert_result(l, User, *user_address_result) self.assert_sql_count(testing.db, go, 0) @testing.fails_on('maxdb') def test_eageroptionswithlimit(self): sess = create_session() mapper(User, users, properties = dict( addresses = relation(mapper(Address, addresses), lazy = True) )) u = sess.query(User).options(eagerload('addresses')).filter_by(user_id=8).one() def go(): assert u.user_id == 8 assert len(u.addresses) == 3 self.assert_sql_count(testing.db, go, 0) sess.clear() # test that eager loading doesnt modify parent mapper def go(): u = sess.query(User).filter_by(user_id=8).one() assert u.user_id == 8 assert len(u.addresses) == 3 assert "tbl_row_count" not in self.capture_sql(testing.db, go) @testing.fails_on('maxdb') def test_lazyoptionswithlimit(self): sess = create_session() mapper(User, users, properties = dict( addresses = relation(mapper(Address, addresses), lazy=False) )) u = sess.query(User).options(lazyload('addresses')).filter_by(user_id=8).one() def go(): assert u.user_id == 8 assert len(u.addresses) == 3 self.assert_sql_count(testing.db, go, 1) def test_eagerdegrade(self): """tests that an eager relation automatically degrades to a lazy relation if eager columns are not available""" sess = create_session() usermapper = mapper(User, users, properties = dict( addresses = relation(mapper(Address, addresses), lazy=False) )) # first test straight eager load, 1 statement def go(): l = sess.query(usermapper).all() self.assert_result(l, User, *user_address_result) self.assert_sql_count(testing.db, go, 1) sess.clear() # then select just from users. run it into instances. # then assert the data, which will launch 3 more lazy loads # (previous users in session fell out of scope and were removed from session's identity map) def go(): r = users.select().execute() l = sess.query(usermapper).instances(r) self.assert_result(l, User, *user_address_result) self.assert_sql_count(testing.db, go, 4) clear_mappers() sess.clear() # test with a deeper set of eager loads. when we first load the three # users, they will have no addresses or orders. the number of lazy loads when # traversing the whole thing will be three for the addresses and three for the # orders. # (previous users in session fell out of scope and were removed from session's identity map) usermapper = mapper(User, users, properties = { 'addresses':relation(mapper(Address, addresses), lazy=False), 'orders': relation(mapper(Order, orders, properties = { 'items' : relation(mapper(Item, orderitems, properties = { 'keywords' : relation(mapper(Keyword, keywords), itemkeywords, lazy=False) }), lazy=False) }), lazy=False) }) sess.clear() # first test straight eager load, 1 statement def go(): l = sess.query(usermapper).all() self.assert_result(l, User, *user_all_result) self.assert_sql_count(testing.db, go, 1) sess.clear() # then select just from users. run it into instances. # then assert the data, which will launch 6 more lazy loads def go(): r = users.select().execute() l = sess.query(usermapper).instances(r) self.assert_result(l, User, *user_all_result)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -