⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mapper.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 4 页
字号:
        self.assert_result(l, User, *user_result[0:2])    def test_mappingtojoinnopk(self):        metadata = MetaData()        account_ids_table = Table('account_ids', metadata,                Column('account_id', Integer, primary_key=True),                Column('username', String(20)))        account_stuff_table = Table('account_stuff', metadata,                Column('account_id', Integer, ForeignKey('account_ids.account_id')),                Column('credit', Numeric))        class A(object):pass        m = mapper(A, account_ids_table.join(account_stuff_table))        m.compile()        assert account_ids_table in m._pks_by_table        assert account_stuff_table not in m._pks_by_table        metadata.create_all(testing.db)        try:            sess = create_session(bind=testing.db)            a = A()            sess.save(a)            sess.flush()            assert testing.db.execute(account_ids_table.count()).scalar() == 1            assert testing.db.execute(account_stuff_table.count()).scalar() == 0        finally:            metadata.drop_all(testing.db)    def test_mappingtoouterjoin(self):        """test mapping to an outer join, with a composite primary key that allows nulls"""        result = [        {'user_id' : 7, 'address_id' : 1},        {'user_id' : 8, 'address_id' : 2},        {'user_id' : 8, 'address_id' : 3},        {'user_id' : 8, 'address_id' : 4},        {'user_id' : 9, 'address_id':None}        ]        j = join(users, addresses, isouter=True)        m = mapper(User, j, allow_null_pks=True, primary_key=[users.c.user_id, addresses.c.address_id])        q = create_session().query(m)        l = q.all()        self.assert_result(l, User, *result)    def test_customjoin(self):        """Tests that select_from totally replace the FROM parameters."""        m = mapper(User, users, properties={            'orders':relation(mapper(Order, orders, properties={                'items':relation(mapper(Item, orderitems))            }))        })        q = create_session().query(m)        l = (q.select_from(users.join(orders).join(orderitems)).             filter(orderitems.c.item_name=='item 4'))        self.assert_result(l, User, user_result[0])    @testing.uses_deprecated('//select')    def test_customjoin_deprecated(self):        """test that the from_obj parameter to query.select() can be used        to totally replace the FROM parameters of the generated query."""        m = mapper(User, users, properties={            'orders':relation(mapper(Order, orders, properties={                'items':relation(mapper(Item, orderitems))            }))        })        q = create_session().query(m)        l = q.select((orderitems.c.item_name=='item 4'), from_obj=[users.join(orders).join(orderitems)])        self.assert_result(l, User, user_result[0])    def test_orderby(self):        """test ordering at the mapper and query level"""        # TODO: make a unit test out of these various combinations        #m = mapper(User, users, order_by=desc(users.c.user_name))        mapper(User, users, order_by=None)        #mapper(User, users)        #l = create_session().query(User).select(order_by=[desc(users.c.user_name), asc(users.c.user_id)])        l = create_session().query(User).all()        #l = create_session().query(User).select(order_by=[])        #l = create_session().query(User).select(order_by=None)    @testing.unsupported('firebird')    def test_function(self):        """Test mapping to a SELECT statement that has functions in it."""        s = select([users,                    (users.c.user_id * 2).label('concat'),                    func.count(addresses.c.address_id).label('count')],                   users.c.user_id == addresses.c.user_id,                   group_by=[c for c in users.c]).alias('myselect')        mapper(User, s)        sess = create_session()        l = sess.query(User).all()        for u in l:            print "User", u.user_id, u.user_name, u.concat, u.count        assert l[0].concat == l[0].user_id * 2 == 14        assert l[1].concat == l[1].user_id * 2 == 16    @testing.unsupported('firebird')    def test_count(self):        """test the count function on Query.        (why doesnt this work on firebird?)"""        mapper(User, users)        q = create_session().query(User)        self.assert_(q.count()==3)        self.assert_(q.count(users.c.user_id.in_([8,9]))==2)    @testing.unsupported('firebird')    @testing.uses_deprecated('//count_by', '//join_by', '//join_via')    def test_count_by_deprecated(self):        mapper(User, users)        q = create_session().query(User)        self.assert_(q.count_by(user_name='fred')==1)    def test_manytomany_count(self):        mapper(Item, orderitems, properties = dict(                keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy = True),            ))        q = create_session().query(Item)        assert q.join('keywords').distinct().count(Keyword.c.name=="red") == 2    def test_override(self):        # assert that overriding a column raises an error        try:            m = mapper(User, users, properties = {                    'user_name' : relation(mapper(Address, addresses)),                }).compile()            self.assert_(False, "should have raised ArgumentError")        except exceptions.ArgumentError, e:            self.assert_(True)        clear_mappers()        # assert that allow_column_override cancels the error        m = mapper(User, users, properties = {                'user_name' : relation(mapper(Address, addresses))            }, allow_column_override=True)        clear_mappers()        # assert that the column being named else where also cancels the error        m = mapper(User, users, properties = {                'user_name' : relation(mapper(Address, addresses)),                'foo' : users.c.user_name,            })    def test_synonym(self):        sess = create_session()        assert_col = []        class User(object):            def _get_user_name(self):                assert_col.append(('get', self.user_name))                return self.user_name            def _set_user_name(self, name):                assert_col.append(('set', name))                self.user_name = name            uname = property(_get_user_name, _set_user_name)        mapper(User, users, properties = dict(            addresses = relation(mapper(Address, addresses), lazy = True),            uname = synonym('user_name'),            adlist = synonym('addresses', proxy=True),            adname = synonym('addresses')        ))        assert hasattr(User, 'adlist')        assert hasattr(User, 'adname')  # as of 0.4.2, synonyms always create a property        # test compile        assert not isinstance(User.uname == 'jack', bool)        u = sess.query(User).filter(User.uname=='jack').one()        self.assert_result(u.adlist, Address, *(user_address_result[0]['addresses'][1]))        addr = sess.query(Address).filter_by(address_id=user_address_result[0]['addresses'][1][0]['address_id']).one()        u = sess.query(User).filter_by(adname=addr).one()        u2 = sess.query(User).filter_by(adlist=addr).one()        assert u is u2        assert u not in sess.dirty        u.uname = "some user name"        assert len(assert_col) > 0        assert assert_col == [('set', 'some user name')], str(assert_col)        assert u.uname == "some user name"        assert assert_col == [('set', 'some user name'), ('get', 'some user name')], str(assert_col)        assert u.user_name == "some user name"        assert u in sess.dirty    def test_column_synonyms(self):        """test new-style synonyms which automatically instrument properties, set up aliased column, etc."""        sess = create_session()        assert_col = []        class User(object):            def _get_user_name(self):                assert_col.append(('get', self._user_name))                return self._user_name            def _set_user_name(self, name):                assert_col.append(('set', name))                self._user_name = name            user_name = property(_get_user_name, _set_user_name)        mapper(Address, addresses)        try:            mapper(User, users, properties = {                'addresses':relation(Address, lazy=True),                'not_user_name':synonym('_user_name', map_column=True)            })            User.not_user_name            assert False        except exceptions.ArgumentError, e:            assert str(e) == "Can't compile synonym '_user_name': no column on table 'users' named 'not_user_name'"        clear_mappers()        mapper(Address, addresses)        mapper(User, users, properties = {            'addresses':relation(Address, lazy=True),            'user_name':synonym('_user_name', map_column=True)        })        # test compile        assert not isinstance(User.user_name == 'jack', bool)        assert hasattr(User, 'user_name')        assert hasattr(User, '_user_name')        u = sess.query(User).filter(User.user_name == 'jack').one()        assert u.user_name == 'jack'        u.user_name = 'foo'        assert u.user_name == 'foo'        assert assert_col == [('get', 'jack'), ('set', 'foo'), ('get', 'foo')]class OptionsTest(MapperSuperTest):    @testing.fails_on('maxdb')    def test_synonymoptions(self):        sess = create_session()        mapper(User, users, properties = dict(            addresses = relation(mapper(Address, addresses), lazy = True),            adlist = synonym('addresses', proxy=True)        ))        def go():            u = sess.query(User).options(eagerload('adlist')).filter_by(user_name='jack').one()            self.assert_result(u.adlist, Address, *(user_address_result[0]['addresses'][1]))        self.assert_sql_count(testing.db, go, 1)    @testing.uses_deprecated('//select_by')    def test_extension_options(self):        sess  = create_session()        class ext1(MapperExtension):            def populate_instance(self, mapper, selectcontext, row, instance, **flags):                """test options at the Mapper._instance level"""                instance.TEST = "hello world"                return EXT_CONTINUE        mapper(User, users, extension=ext1(), properties={            'addresses':relation(mapper(Address, addresses), lazy=False)        })        class testext(MapperExtension):            def select_by(self, *args, **kwargs):                """test options at the Query level"""                return "HI"            def populate_instance(self, mapper, selectcontext, row, instance, **flags):                """test options at the Mapper._instance level"""                instance.TEST_2 = "also hello world"                return EXT_CONTINUE        l = sess.query(User).options(extension(testext())).select_by(x=5)        assert l == "HI"        l = sess.query(User).options(extension(testext())).get(7)        assert l.user_id == 7        assert l.TEST == "hello world"        assert l.TEST_2 == "also hello world"        assert not hasattr(l.addresses[0], 'TEST')        assert not hasattr(l.addresses[0], 'TEST2')    def test_eageroptions(self):        """tests that a lazy relation can be upgraded to an eager relation via the options method"""        sess = create_session()        mapper(User, users, properties = dict(            addresses = relation(mapper(Address, addresses))        ))        l = sess.query(User).options(eagerload('addresses')).all()        def go():            self.assert_result(l, User, *user_address_result)        self.assert_sql_count(testing.db, go, 0)    @testing.fails_on('maxdb')    def test_eageroptionswithlimit(self):        sess = create_session()        mapper(User, users, properties = dict(            addresses = relation(mapper(Address, addresses), lazy = True)        ))        u = sess.query(User).options(eagerload('addresses')).filter_by(user_id=8).one()        def go():            assert u.user_id == 8            assert len(u.addresses) == 3        self.assert_sql_count(testing.db, go, 0)        sess.clear()        # test that eager loading doesnt modify parent mapper        def go():            u = sess.query(User).filter_by(user_id=8).one()            assert u.user_id == 8            assert len(u.addresses) == 3        assert "tbl_row_count" not in self.capture_sql(testing.db, go)    @testing.fails_on('maxdb')    def test_lazyoptionswithlimit(self):        sess = create_session()        mapper(User, users, properties = dict(            addresses = relation(mapper(Address, addresses), lazy=False)        ))        u = sess.query(User).options(lazyload('addresses')).filter_by(user_id=8).one()        def go():            assert u.user_id == 8            assert len(u.addresses) == 3        self.assert_sql_count(testing.db, go, 1)    def test_eagerdegrade(self):        """tests that an eager relation automatically degrades to a lazy relation if eager columns are not available"""        sess = create_session()        usermapper = mapper(User, users, properties = dict(            addresses = relation(mapper(Address, addresses), lazy=False)        ))        # first test straight eager load, 1 statement        def go():            l = sess.query(usermapper).all()            self.assert_result(l, User, *user_address_result)        self.assert_sql_count(testing.db, go, 1)        sess.clear()        # then select just from users.  run it into instances.        # then assert the data, which will launch 3 more lazy loads        # (previous users in session fell out of scope and were removed from session's identity map)        def go():            r = users.select().execute()            l = sess.query(usermapper).instances(r)            self.assert_result(l, User, *user_address_result)        self.assert_sql_count(testing.db, go, 4)        clear_mappers()        sess.clear()        # test with a deeper set of eager loads.  when we first load the three        # users, they will have no addresses or orders.  the number of lazy loads when        # traversing the whole thing will be three for the addresses and three for the        # orders.        # (previous users in session fell out of scope and were removed from session's identity map)        usermapper = mapper(User, users,            properties = {                'addresses':relation(mapper(Address, addresses), lazy=False),                'orders': relation(mapper(Order, orders, properties = {                    'items' : relation(mapper(Item, orderitems, properties = {                        'keywords' : relation(mapper(Keyword, keywords), itemkeywords, lazy=False)                    }), lazy=False)                }), lazy=False)            })        sess.clear()        # first test straight eager load, 1 statement        def go():            l = sess.query(usermapper).all()            self.assert_result(l, User, *user_all_result)        self.assert_sql_count(testing.db, go, 1)        sess.clear()        # then select just from users.  run it into instances.        # then assert the data, which will launch 6 more lazy loads        def go():            r = users.select().execute()            l = sess.query(usermapper).instances(r)            self.assert_result(l, User, *user_all_result)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -