⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 query.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 3 页
字号:
import testenv; testenv.configure_for_tests()import datetimefrom sqlalchemy import *from sqlalchemy import exceptions, sqlfrom sqlalchemy.engine import defaultfrom testlib import *class QueryTest(TestBase):    def setUpAll(self):        global users, addresses, metadata        metadata = MetaData(testing.db)        users = Table('query_users', metadata,            Column('user_id', INT, primary_key = True),            Column('user_name', VARCHAR(20)),        )        addresses = Table('query_addresses', metadata,            Column('address_id', Integer, primary_key=True),            Column('user_id', Integer, ForeignKey('query_users.user_id')),            Column('address', String(30)))        metadata.create_all()    def tearDown(self):        addresses.delete().execute()        users.delete().execute()    def tearDownAll(self):        metadata.drop_all()    def test_insert(self):        users.insert().execute(user_id = 7, user_name = 'jack')        assert users.count().scalar() == 1    def test_insert_heterogeneous_params(self):        users.insert().execute(            {'user_id':7, 'user_name':'jack'},            {'user_id':8, 'user_name':'ed'},            {'user_id':9}        )        assert users.select().execute().fetchall() == [(7, 'jack'), (8, 'ed'), (9, None)]    def test_update(self):        users.insert().execute(user_id = 7, user_name = 'jack')        assert users.count().scalar() == 1        users.update(users.c.user_id == 7).execute(user_name = 'fred')        assert users.select(users.c.user_id==7).execute().fetchone()['user_name'] == 'fred'    def test_lastrow_accessor(self):        """Tests the last_inserted_ids() and lastrow_has_id() functions."""        def insert_values(table, values):            """            Inserts a row into a table, returns the full list of values            INSERTed including defaults that fired off on the DB side and            detects rows that had defaults and post-fetches.            """            result = table.insert().execute(**values)            ret = values.copy()            for col, id in zip(table.primary_key, result.last_inserted_ids()):                ret[col.key] = id            if result.lastrow_has_defaults():                criterion = and_(*[col==id for col, id in zip(table.primary_key, result.last_inserted_ids())])                row = table.select(criterion).execute().fetchone()                for c in table.c:                    ret[c.key] = row[c]            return ret        for supported, table, values, assertvalues in [            (                {'unsupported':['sqlite']},                Table("t1", metadata,                    Column('id', Integer, Sequence('t1_id_seq', optional=True), primary_key=True),                    Column('foo', String(30), primary_key=True)),                {'foo':'hi'},                {'id':1, 'foo':'hi'}            ),            (                {'unsupported':['sqlite']},                Table("t2", metadata,                    Column('id', Integer, Sequence('t2_id_seq', optional=True), primary_key=True),                    Column('foo', String(30), primary_key=True),                    Column('bar', String(30), PassiveDefault('hi'))                ),                {'foo':'hi'},                {'id':1, 'foo':'hi', 'bar':'hi'}            ),            (                {'unsupported':[]},                Table("t3", metadata,                    Column("id", String(40), primary_key=True),                    Column('foo', String(30), primary_key=True),                    Column("bar", String(30))                    ),                    {'id':'hi', 'foo':'thisisfoo', 'bar':"thisisbar"},                    {'id':'hi', 'foo':'thisisfoo', 'bar':"thisisbar"}            ),            (                {'unsupported':[]},                Table("t4", metadata,                    Column('id', Integer, Sequence('t4_id_seq', optional=True), primary_key=True),                    Column('foo', String(30), primary_key=True),                    Column('bar', String(30), PassiveDefault('hi'))                ),                {'foo':'hi', 'id':1},                {'id':1, 'foo':'hi', 'bar':'hi'}            ),            (                {'unsupported':[]},                Table("t5", metadata,                    Column('id', String(10), primary_key=True),                    Column('bar', String(30), PassiveDefault('hi'))                ),                {'id':'id1'},                {'id':'id1', 'bar':'hi'},            ),        ]:            if testing.db.name in supported['unsupported']:                continue            try:                table.create()                i = insert_values(table, values)                assert i == assertvalues, repr(i) + " " + repr(assertvalues)            finally:                table.drop()    def test_row_iteration(self):        users.insert().execute(            {'user_id':7, 'user_name':'jack'},            {'user_id':8, 'user_name':'ed'},            {'user_id':9, 'user_name':'fred'},        )        r = users.select().execute()        l = []        for row in r:            l.append(row)        self.assert_(len(l) == 3)    def test_anonymous_rows(self):        users.insert().execute(            {'user_id':7, 'user_name':'jack'},            {'user_id':8, 'user_name':'ed'},            {'user_id':9, 'user_name':'fred'},        )        sel = select([users.c.user_id]).where(users.c.user_name=='jack').as_scalar()        for row in select([sel + 1, sel + 3], bind=users.bind).execute():            assert row['anon_1'] == 8            assert row['anon_2'] == 10    def test_row_comparison(self):        users.insert().execute(user_id = 7, user_name = 'jack')        rp = users.select().execute().fetchone()        self.assert_(rp == rp)        self.assert_(not(rp != rp))        equal = (7, 'jack')        self.assert_(rp == equal)        self.assert_(equal == rp)        self.assert_(not (rp != equal))        self.assert_(not (equal != equal))    def test_fetchmany(self):        users.insert().execute(user_id = 7, user_name = 'jack')        users.insert().execute(user_id = 8, user_name = 'ed')        users.insert().execute(user_id = 9, user_name = 'fred')        r = users.select().execute()        l = []        for row in r.fetchmany(size=2):            l.append(row)        self.assert_(len(l) == 2, "fetchmany(size=2) got %s rows" % len(l))    def test_ilike(self):        users.insert().execute(            {'user_id':1, 'user_name':'one'},            {'user_id':2, 'user_name':'TwO'},            {'user_id':3, 'user_name':'ONE'},            {'user_id':4, 'user_name':'OnE'},        )        self.assertEquals(select([users.c.user_id]).where(users.c.user_name.ilike('one')).execute().fetchall(), [(1, ), (3, ), (4, )])        self.assertEquals(select([users.c.user_id]).where(users.c.user_name.ilike('TWO')).execute().fetchall(), [(2, )])        if testing.against('postgres'):            self.assertEquals(select([users.c.user_id]).where(users.c.user_name.like('one')).execute().fetchall(), [(1, )])            self.assertEquals(select([users.c.user_id]).where(users.c.user_name.like('TWO')).execute().fetchall(), [])    def test_compiled_execute(self):        users.insert().execute(user_id = 7, user_name = 'jack')        s = select([users], users.c.user_id==bindparam('id')).compile()        c = testing.db.connect()        assert c.execute(s, id=7).fetchall()[0]['user_id'] == 7    def test_compiled_insert_execute(self):        users.insert().compile().execute(user_id = 7, user_name = 'jack')        s = select([users], users.c.user_id==bindparam('id')).compile()        c = testing.db.connect()        assert c.execute(s, id=7).fetchall()[0]['user_id'] == 7    def test_repeated_bindparams(self):        """Tests that a BindParam can be used more than once.        This should be run for DB-APIs with both positional and named        paramstyles.        """        users.insert().execute(user_id = 7, user_name = 'jack')        users.insert().execute(user_id = 8, user_name = 'fred')        u = bindparam('userid')        s = users.select(and_(users.c.user_name==u, users.c.user_name==u))        r = s.execute(userid='fred').fetchall()        assert len(r) == 1        u = bindparam('userid', unique=True)        s = users.select(and_(users.c.user_name==u, users.c.user_name==u))        r = s.execute({u:'fred'}).fetchall()        assert len(r) == 1    def test_bindparams_in_params(self):        """test that a _BindParamClause itself can be a key in the params dict"""        users.insert().execute(user_id = 7, user_name = 'jack')        users.insert().execute(user_id = 8, user_name = 'fred')        u = bindparam('userid')        r = users.select(users.c.user_name==u).execute({u:'fred'}).fetchall()        assert len(r) == 1    def test_bindparam_shortname(self):        """test the 'shortname' field on BindParamClause."""        users.insert().execute(user_id = 7, user_name = 'jack')        users.insert().execute(user_id = 8, user_name = 'fred')        u = bindparam('userid', shortname='someshortname')        s = users.select(users.c.user_name==u)        r = s.execute(someshortname='fred').fetchall()        assert len(r) == 1    def test_bindparam_detection(self):        dialect = default.DefaultDialect(default_paramstyle='qmark')        prep = lambda q: str(sql.text(q).compile(dialect=dialect))        def a_eq(got, wanted):            if got != wanted:                print "Wanted %s" % wanted                print "Received %s" % got            self.assert_(got == wanted, got)        a_eq(prep('select foo'), 'select foo')        a_eq(prep("time='12:30:00'"), "time='12:30:00'")        a_eq(prep(u"time='12:30:00'"), u"time='12:30:00'")        a_eq(prep(":this:that"), ":this:that")        a_eq(prep(":this :that"), "? ?")        a_eq(prep("(:this),(:that :other)"), "(?),(? ?)")        a_eq(prep("(:this),(:that:other)"), "(?),(:that:other)")        a_eq(prep("(:this),(:that,:other)"), "(?),(?,?)")        a_eq(prep("(:that_:other)"), "(:that_:other)")        a_eq(prep("(:that_ :other)"), "(? ?)")        a_eq(prep("(:that_other)"), "(?)")        a_eq(prep("(:that$other)"), "(?)")        a_eq(prep("(:that$:other)"), "(:that$:other)")        a_eq(prep(".:that$ :other."), ".? ?.")        a_eq(prep(r'select \foo'), r'select \foo')        a_eq(prep(r"time='12\:30:00'"), r"time='12\:30:00'")        a_eq(prep(":this \:that"), "? :that")        a_eq(prep(r"(\:that$other)"), "(:that$other)")        a_eq(prep(r".\:that$ :other."), ".:that$ ?.")    def test_delete(self):        users.insert().execute(user_id = 7, user_name = 'jack')        users.insert().execute(user_id = 8, user_name = 'fred')        print repr(users.select().execute().fetchall())        users.delete(users.c.user_name == 'fred').execute()        print repr(users.select().execute().fetchall())    def test_select_limit(self):        users.insert().execute(user_id=1, user_name='john')        users.insert().execute(user_id=2, user_name='jack')        users.insert().execute(user_id=3, user_name='ed')        users.insert().execute(user_id=4, user_name='wendy')        users.insert().execute(user_id=5, user_name='laura')        users.insert().execute(user_id=6, user_name='ralph')        users.insert().execute(user_id=7, user_name='fido')        r = users.select(limit=3, order_by=[users.c.user_id]).execute().fetchall()        self.assert_(r == [(1, 'john'), (2, 'jack'), (3, 'ed')], repr(r))    @testing.unsupported('mssql')    @testing.fails_on('maxdb')    def test_select_limit_offset(self):        users.insert().execute(user_id=1, user_name='john')        users.insert().execute(user_id=2, user_name='jack')        users.insert().execute(user_id=3, user_name='ed')        users.insert().execute(user_id=4, user_name='wendy')        users.insert().execute(user_id=5, user_name='laura')        users.insert().execute(user_id=6, user_name='ralph')        users.insert().execute(user_id=7, user_name='fido')        r = users.select(limit=3, offset=2, order_by=[users.c.user_id]).execute().fetchall()        self.assert_(r==[(3, 'ed'), (4, 'wendy'), (5, 'laura')])        r = users.select(offset=5, order_by=[users.c.user_id]).execute().fetchall()        self.assert_(r==[(6, 'ralph'), (7, 'fido')])    @testing.exclude('mysql', '<', (5, 0, 37))    def test_scalar_select(self):        """test that scalar subqueries with labels get their type propigated to the result set."""        # mysql and/or mysqldb has a bug here, type isn't propagated for scalar        # subquery.        datetable = Table('datetable', metadata,            Column('id', Integer, primary_key=True),            Column('today', DateTime))        datetable.create()        try:            datetable.insert().execute(id=1, today=datetime.datetime(2006, 5, 12, 12, 0, 0))            s = select([datetable.alias('x').c.today]).as_scalar()            s2 = select([datetable.c.id, s.label('somelabel')])            #print s2.c.somelabel.type            assert isinstance(s2.execute().fetchone()['somelabel'], datetime.datetime)        finally:            datetable.drop()    def test_order_by(self):        """Exercises ORDER BY clause generation.        Tests simple, compound, aliased and DESC clauses.        """        users.insert().execute(user_id=1, user_name='c')        users.insert().execute(user_id=2, user_name='b')        users.insert().execute(user_id=3, user_name='a')        def a_eq(executable, wanted):            got = list(executable.execute())            self.assertEquals(got, wanted)        for labels in False, True:            a_eq(users.select(order_by=[users.c.user_id],                              use_labels=labels),                 [(1, 'c'), (2, 'b'), (3, 'a')])            a_eq(users.select(order_by=[users.c.user_name, users.c.user_id],                              use_labels=labels),                 [(3, 'a'), (2, 'b'), (1, 'c')])            a_eq(select([users.c.user_id.label('foo')],                        use_labels=labels,                        order_by=[users.c.user_id]),                 [(1,), (2,), (3,)])            a_eq(select([users.c.user_id.label('foo'), users.c.user_name],

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -