📄 query.py
字号:
import testenv; testenv.configure_for_tests()import datetimefrom sqlalchemy import *from sqlalchemy import exceptions, sqlfrom sqlalchemy.engine import defaultfrom testlib import *class QueryTest(TestBase): def setUpAll(self): global users, addresses, metadata metadata = MetaData(testing.db) users = Table('query_users', metadata, Column('user_id', INT, primary_key = True), Column('user_name', VARCHAR(20)), ) addresses = Table('query_addresses', metadata, Column('address_id', Integer, primary_key=True), Column('user_id', Integer, ForeignKey('query_users.user_id')), Column('address', String(30))) metadata.create_all() def tearDown(self): addresses.delete().execute() users.delete().execute() def tearDownAll(self): metadata.drop_all() def test_insert(self): users.insert().execute(user_id = 7, user_name = 'jack') assert users.count().scalar() == 1 def test_insert_heterogeneous_params(self): users.insert().execute( {'user_id':7, 'user_name':'jack'}, {'user_id':8, 'user_name':'ed'}, {'user_id':9} ) assert users.select().execute().fetchall() == [(7, 'jack'), (8, 'ed'), (9, None)] def test_update(self): users.insert().execute(user_id = 7, user_name = 'jack') assert users.count().scalar() == 1 users.update(users.c.user_id == 7).execute(user_name = 'fred') assert users.select(users.c.user_id==7).execute().fetchone()['user_name'] == 'fred' def test_lastrow_accessor(self): """Tests the last_inserted_ids() and lastrow_has_id() functions.""" def insert_values(table, values): """ Inserts a row into a table, returns the full list of values INSERTed including defaults that fired off on the DB side and detects rows that had defaults and post-fetches. """ result = table.insert().execute(**values) ret = values.copy() for col, id in zip(table.primary_key, result.last_inserted_ids()): ret[col.key] = id if result.lastrow_has_defaults(): criterion = and_(*[col==id for col, id in zip(table.primary_key, result.last_inserted_ids())]) row = table.select(criterion).execute().fetchone() for c in table.c: ret[c.key] = row[c] return ret for supported, table, values, assertvalues in [ ( {'unsupported':['sqlite']}, Table("t1", metadata, Column('id', Integer, Sequence('t1_id_seq', optional=True), primary_key=True), Column('foo', String(30), primary_key=True)), {'foo':'hi'}, {'id':1, 'foo':'hi'} ), ( {'unsupported':['sqlite']}, Table("t2", metadata, Column('id', Integer, Sequence('t2_id_seq', optional=True), primary_key=True), Column('foo', String(30), primary_key=True), Column('bar', String(30), PassiveDefault('hi')) ), {'foo':'hi'}, {'id':1, 'foo':'hi', 'bar':'hi'} ), ( {'unsupported':[]}, Table("t3", metadata, Column("id", String(40), primary_key=True), Column('foo', String(30), primary_key=True), Column("bar", String(30)) ), {'id':'hi', 'foo':'thisisfoo', 'bar':"thisisbar"}, {'id':'hi', 'foo':'thisisfoo', 'bar':"thisisbar"} ), ( {'unsupported':[]}, Table("t4", metadata, Column('id', Integer, Sequence('t4_id_seq', optional=True), primary_key=True), Column('foo', String(30), primary_key=True), Column('bar', String(30), PassiveDefault('hi')) ), {'foo':'hi', 'id':1}, {'id':1, 'foo':'hi', 'bar':'hi'} ), ( {'unsupported':[]}, Table("t5", metadata, Column('id', String(10), primary_key=True), Column('bar', String(30), PassiveDefault('hi')) ), {'id':'id1'}, {'id':'id1', 'bar':'hi'}, ), ]: if testing.db.name in supported['unsupported']: continue try: table.create() i = insert_values(table, values) assert i == assertvalues, repr(i) + " " + repr(assertvalues) finally: table.drop() def test_row_iteration(self): users.insert().execute( {'user_id':7, 'user_name':'jack'}, {'user_id':8, 'user_name':'ed'}, {'user_id':9, 'user_name':'fred'}, ) r = users.select().execute() l = [] for row in r: l.append(row) self.assert_(len(l) == 3) def test_anonymous_rows(self): users.insert().execute( {'user_id':7, 'user_name':'jack'}, {'user_id':8, 'user_name':'ed'}, {'user_id':9, 'user_name':'fred'}, ) sel = select([users.c.user_id]).where(users.c.user_name=='jack').as_scalar() for row in select([sel + 1, sel + 3], bind=users.bind).execute(): assert row['anon_1'] == 8 assert row['anon_2'] == 10 def test_row_comparison(self): users.insert().execute(user_id = 7, user_name = 'jack') rp = users.select().execute().fetchone() self.assert_(rp == rp) self.assert_(not(rp != rp)) equal = (7, 'jack') self.assert_(rp == equal) self.assert_(equal == rp) self.assert_(not (rp != equal)) self.assert_(not (equal != equal)) def test_fetchmany(self): users.insert().execute(user_id = 7, user_name = 'jack') users.insert().execute(user_id = 8, user_name = 'ed') users.insert().execute(user_id = 9, user_name = 'fred') r = users.select().execute() l = [] for row in r.fetchmany(size=2): l.append(row) self.assert_(len(l) == 2, "fetchmany(size=2) got %s rows" % len(l)) def test_ilike(self): users.insert().execute( {'user_id':1, 'user_name':'one'}, {'user_id':2, 'user_name':'TwO'}, {'user_id':3, 'user_name':'ONE'}, {'user_id':4, 'user_name':'OnE'}, ) self.assertEquals(select([users.c.user_id]).where(users.c.user_name.ilike('one')).execute().fetchall(), [(1, ), (3, ), (4, )]) self.assertEquals(select([users.c.user_id]).where(users.c.user_name.ilike('TWO')).execute().fetchall(), [(2, )]) if testing.against('postgres'): self.assertEquals(select([users.c.user_id]).where(users.c.user_name.like('one')).execute().fetchall(), [(1, )]) self.assertEquals(select([users.c.user_id]).where(users.c.user_name.like('TWO')).execute().fetchall(), []) def test_compiled_execute(self): users.insert().execute(user_id = 7, user_name = 'jack') s = select([users], users.c.user_id==bindparam('id')).compile() c = testing.db.connect() assert c.execute(s, id=7).fetchall()[0]['user_id'] == 7 def test_compiled_insert_execute(self): users.insert().compile().execute(user_id = 7, user_name = 'jack') s = select([users], users.c.user_id==bindparam('id')).compile() c = testing.db.connect() assert c.execute(s, id=7).fetchall()[0]['user_id'] == 7 def test_repeated_bindparams(self): """Tests that a BindParam can be used more than once. This should be run for DB-APIs with both positional and named paramstyles. """ users.insert().execute(user_id = 7, user_name = 'jack') users.insert().execute(user_id = 8, user_name = 'fred') u = bindparam('userid') s = users.select(and_(users.c.user_name==u, users.c.user_name==u)) r = s.execute(userid='fred').fetchall() assert len(r) == 1 u = bindparam('userid', unique=True) s = users.select(and_(users.c.user_name==u, users.c.user_name==u)) r = s.execute({u:'fred'}).fetchall() assert len(r) == 1 def test_bindparams_in_params(self): """test that a _BindParamClause itself can be a key in the params dict""" users.insert().execute(user_id = 7, user_name = 'jack') users.insert().execute(user_id = 8, user_name = 'fred') u = bindparam('userid') r = users.select(users.c.user_name==u).execute({u:'fred'}).fetchall() assert len(r) == 1 def test_bindparam_shortname(self): """test the 'shortname' field on BindParamClause.""" users.insert().execute(user_id = 7, user_name = 'jack') users.insert().execute(user_id = 8, user_name = 'fred') u = bindparam('userid', shortname='someshortname') s = users.select(users.c.user_name==u) r = s.execute(someshortname='fred').fetchall() assert len(r) == 1 def test_bindparam_detection(self): dialect = default.DefaultDialect(default_paramstyle='qmark') prep = lambda q: str(sql.text(q).compile(dialect=dialect)) def a_eq(got, wanted): if got != wanted: print "Wanted %s" % wanted print "Received %s" % got self.assert_(got == wanted, got) a_eq(prep('select foo'), 'select foo') a_eq(prep("time='12:30:00'"), "time='12:30:00'") a_eq(prep(u"time='12:30:00'"), u"time='12:30:00'") a_eq(prep(":this:that"), ":this:that") a_eq(prep(":this :that"), "? ?") a_eq(prep("(:this),(:that :other)"), "(?),(? ?)") a_eq(prep("(:this),(:that:other)"), "(?),(:that:other)") a_eq(prep("(:this),(:that,:other)"), "(?),(?,?)") a_eq(prep("(:that_:other)"), "(:that_:other)") a_eq(prep("(:that_ :other)"), "(? ?)") a_eq(prep("(:that_other)"), "(?)") a_eq(prep("(:that$other)"), "(?)") a_eq(prep("(:that$:other)"), "(:that$:other)") a_eq(prep(".:that$ :other."), ".? ?.") a_eq(prep(r'select \foo'), r'select \foo') a_eq(prep(r"time='12\:30:00'"), r"time='12\:30:00'") a_eq(prep(":this \:that"), "? :that") a_eq(prep(r"(\:that$other)"), "(:that$other)") a_eq(prep(r".\:that$ :other."), ".:that$ ?.") def test_delete(self): users.insert().execute(user_id = 7, user_name = 'jack') users.insert().execute(user_id = 8, user_name = 'fred') print repr(users.select().execute().fetchall()) users.delete(users.c.user_name == 'fred').execute() print repr(users.select().execute().fetchall()) def test_select_limit(self): users.insert().execute(user_id=1, user_name='john') users.insert().execute(user_id=2, user_name='jack') users.insert().execute(user_id=3, user_name='ed') users.insert().execute(user_id=4, user_name='wendy') users.insert().execute(user_id=5, user_name='laura') users.insert().execute(user_id=6, user_name='ralph') users.insert().execute(user_id=7, user_name='fido') r = users.select(limit=3, order_by=[users.c.user_id]).execute().fetchall() self.assert_(r == [(1, 'john'), (2, 'jack'), (3, 'ed')], repr(r)) @testing.unsupported('mssql') @testing.fails_on('maxdb') def test_select_limit_offset(self): users.insert().execute(user_id=1, user_name='john') users.insert().execute(user_id=2, user_name='jack') users.insert().execute(user_id=3, user_name='ed') users.insert().execute(user_id=4, user_name='wendy') users.insert().execute(user_id=5, user_name='laura') users.insert().execute(user_id=6, user_name='ralph') users.insert().execute(user_id=7, user_name='fido') r = users.select(limit=3, offset=2, order_by=[users.c.user_id]).execute().fetchall() self.assert_(r==[(3, 'ed'), (4, 'wendy'), (5, 'laura')]) r = users.select(offset=5, order_by=[users.c.user_id]).execute().fetchall() self.assert_(r==[(6, 'ralph'), (7, 'fido')]) @testing.exclude('mysql', '<', (5, 0, 37)) def test_scalar_select(self): """test that scalar subqueries with labels get their type propigated to the result set.""" # mysql and/or mysqldb has a bug here, type isn't propagated for scalar # subquery. datetable = Table('datetable', metadata, Column('id', Integer, primary_key=True), Column('today', DateTime)) datetable.create() try: datetable.insert().execute(id=1, today=datetime.datetime(2006, 5, 12, 12, 0, 0)) s = select([datetable.alias('x').c.today]).as_scalar() s2 = select([datetable.c.id, s.label('somelabel')]) #print s2.c.somelabel.type assert isinstance(s2.execute().fetchone()['somelabel'], datetime.datetime) finally: datetable.drop() def test_order_by(self): """Exercises ORDER BY clause generation. Tests simple, compound, aliased and DESC clauses. """ users.insert().execute(user_id=1, user_name='c') users.insert().execute(user_id=2, user_name='b') users.insert().execute(user_id=3, user_name='a') def a_eq(executable, wanted): got = list(executable.execute()) self.assertEquals(got, wanted) for labels in False, True: a_eq(users.select(order_by=[users.c.user_id], use_labels=labels), [(1, 'c'), (2, 'b'), (3, 'a')]) a_eq(users.select(order_by=[users.c.user_name, users.c.user_id], use_labels=labels), [(3, 'a'), (2, 'b'), (1, 'c')]) a_eq(select([users.c.user_id.label('foo')], use_labels=labels, order_by=[users.c.user_id]), [(1,), (2,), (3,)]) a_eq(select([users.c.user_id.label('foo'), users.c.user_name],
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -