📄 query.py
字号:
use_labels=labels, order_by=[users.c.user_name, users.c.user_id]), [(3, 'a'), (2, 'b'), (1, 'c')]) a_eq(users.select(distinct=True, use_labels=labels, order_by=[users.c.user_id]), [(1, 'c'), (2, 'b'), (3, 'a')]) a_eq(select([users.c.user_id.label('foo')], distinct=True, use_labels=labels, order_by=[users.c.user_id]), [(1,), (2,), (3,)]) a_eq(select([users.c.user_id.label('a'), users.c.user_id.label('b'), users.c.user_name], use_labels=labels, order_by=[users.c.user_id]), [(1, 1, 'c'), (2, 2, 'b'), (3, 3, 'a')]) a_eq(users.select(distinct=True, use_labels=labels, order_by=[desc(users.c.user_id)]), [(3, 'a'), (2, 'b'), (1, 'c')]) a_eq(select([users.c.user_id.label('foo')], distinct=True, use_labels=labels, order_by=[users.c.user_id.desc()]), [(3,), (2,), (1,)]) def test_column_accessor(self): users.insert().execute(user_id=1, user_name='john') users.insert().execute(user_id=2, user_name='jack') addresses.insert().execute(address_id=1, user_id=2, address='foo@bar.com') r = users.select(users.c.user_id==2).execute().fetchone() self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2) self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack') r = text("select * from query_users where user_id=2", bind=testing.db).execute().fetchone() self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2) self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack') # test slices r = text("select * from query_addresses", bind=testing.db).execute().fetchone() self.assert_(r[0:1] == (1,)) self.assert_(r[1:] == (2, 'foo@bar.com')) self.assert_(r[:-1] == (1, 2)) # test a little sqlite weirdness - with the UNION, cols come back as "query_users.user_id" in cursor.description r = text("select query_users.user_id, query_users.user_name from query_users " "UNION select query_users.user_id, query_users.user_name from query_users", bind=testing.db).execute().fetchone() self.assert_(r['user_id']) == 1 self.assert_(r['user_name']) == "john" # test using literal tablename.colname r = text('select query_users.user_id AS "query_users.user_id", query_users.user_name AS "query_users.user_name" from query_users', bind=testing.db).execute().fetchone() self.assert_(r['query_users.user_id']) == 1 self.assert_(r['query_users.user_name']) == "john" def test_ambiguous_column(self): users.insert().execute(user_id=1, user_name='john') r = users.outerjoin(addresses).select().execute().fetchone() try: print r['user_id'] assert False except exceptions.InvalidRequestError, e: assert str(e) == "Ambiguous column name 'user_id' in result set! try 'use_labels' option on select statement." or \ str(e) == "Ambiguous column name 'USER_ID' in result set! try 'use_labels' option on select statement." def test_column_label_targeting(self): users.insert().execute(user_id=7, user_name='ed') for s in ( users.select().alias('foo'), users.select().alias(users.name), ): row = s.select(use_labels=True).execute().fetchone() assert row[s.c.user_id] == 7 assert row[s.c.user_name] == 'ed' def test_keys(self): users.insert().execute(user_id=1, user_name='foo') r = users.select().execute().fetchone() self.assertEqual([x.lower() for x in r.keys()], ['user_id', 'user_name']) def test_items(self): users.insert().execute(user_id=1, user_name='foo') r = users.select().execute().fetchone() self.assertEqual([(x[0].lower(), x[1]) for x in r.items()], [('user_id', 1), ('user_name', 'foo')]) def test_len(self): users.insert().execute(user_id=1, user_name='foo') r = users.select().execute().fetchone() self.assertEqual(len(r), 2) r.close() r = testing.db.execute('select user_name, user_id from query_users', {}).fetchone() self.assertEqual(len(r), 2) r.close() r = testing.db.execute('select user_name from query_users', {}).fetchone() self.assertEqual(len(r), 1) r.close() def test_cant_execute_join(self): try: users.join(addresses).execute() except exceptions.ArgumentError, e: assert str(e).startswith('Not an executable clause: ') def test_column_order_with_simple_query(self): # should return values in column definition order users.insert().execute(user_id=1, user_name='foo') r = users.select(users.c.user_id==1).execute().fetchone() self.assertEqual(r[0], 1) self.assertEqual(r[1], 'foo') self.assertEqual([x.lower() for x in r.keys()], ['user_id', 'user_name']) self.assertEqual(r.values(), [1, 'foo']) def test_column_order_with_text_query(self): # should return values in query order users.insert().execute(user_id=1, user_name='foo') r = testing.db.execute('select user_name, user_id from query_users', {}).fetchone() self.assertEqual(r[0], 'foo') self.assertEqual(r[1], 1) self.assertEqual([x.lower() for x in r.keys()], ['user_name', 'user_id']) self.assertEqual(r.values(), ['foo', 1]) @testing.unsupported('oracle', 'firebird', 'maxdb') def test_column_accessor_shadow(self): meta = MetaData(testing.db) shadowed = Table('test_shadowed', meta, Column('shadow_id', INT, primary_key = True), Column('shadow_name', VARCHAR(20)), Column('parent', VARCHAR(20)), Column('row', VARCHAR(40)), Column('__parent', VARCHAR(20)), Column('__row', VARCHAR(20)), ) shadowed.create(checkfirst=True) try: shadowed.insert().execute(shadow_id=1, shadow_name='The Shadow', parent='The Light', row='Without light there is no shadow', __parent='Hidden parent', __row='Hidden row') r = shadowed.select(shadowed.c.shadow_id==1).execute().fetchone() self.assert_(r.shadow_id == r['shadow_id'] == r[shadowed.c.shadow_id] == 1) self.assert_(r.shadow_name == r['shadow_name'] == r[shadowed.c.shadow_name] == 'The Shadow') self.assert_(r.parent == r['parent'] == r[shadowed.c.parent] == 'The Light') self.assert_(r.row == r['row'] == r[shadowed.c.row] == 'Without light there is no shadow') self.assert_(r['__parent'] == 'Hidden parent') self.assert_(r['__row'] == 'Hidden row') try: print r.__parent, r.__row self.fail('Should not allow access to private attributes') except AttributeError: pass # expected r.close() finally: shadowed.drop(checkfirst=True) @testing.fails_on('maxdb') def test_in_filtering(self): """test the behavior of the in_() function.""" users.insert().execute(user_id = 7, user_name = 'jack') users.insert().execute(user_id = 8, user_name = 'fred') users.insert().execute(user_id = 9, user_name = None) s = users.select(users.c.user_name.in_([])) r = s.execute().fetchall() # No username is in empty set assert len(r) == 0 s = users.select(not_(users.c.user_name.in_([]))) r = s.execute().fetchall() # All usernames with a value are outside an empty set assert len(r) == 2 s = users.select(users.c.user_name.in_(['jack','fred'])) r = s.execute().fetchall() assert len(r) == 2 s = users.select(not_(users.c.user_name.in_(['jack','fred']))) r = s.execute().fetchall() # Null values are not outside any set assert len(r) == 0 u = bindparam('search_key') s = users.select(u.in_([])) r = s.execute(search_key='john').fetchall() assert len(r) == 0 r = s.execute(search_key=None).fetchall() assert len(r) == 0 s = users.select(not_(u.in_([]))) r = s.execute(search_key='john').fetchall() assert len(r) == 3 r = s.execute(search_key=None).fetchall() assert len(r) == 0 s = users.select(users.c.user_name.in_([]) == True) r = s.execute().fetchall() assert len(r) == 0 s = users.select(users.c.user_name.in_([]) == False) r = s.execute().fetchall() assert len(r) == 2 s = users.select(users.c.user_name.in_([]) == None) r = s.execute().fetchall() assert len(r) == 1class CompoundTest(TestBase): """test compound statements like UNION, INTERSECT, particularly their ability to nest on different databases.""" def setUpAll(self): global metadata, t1, t2, t3 metadata = MetaData(testing.db) t1 = Table('t1', metadata, Column('col1', Integer, Sequence('t1pkseq'), primary_key=True), Column('col2', String(30)), Column('col3', String(40)), Column('col4', String(30)) ) t2 = Table('t2', metadata, Column('col1', Integer, Sequence('t2pkseq'), primary_key=True), Column('col2', String(30)), Column('col3', String(40)), Column('col4', String(30))) t3 = Table('t3', metadata, Column('col1', Integer, Sequence('t3pkseq'), primary_key=True), Column('col2', String(30)), Column('col3', String(40)), Column('col4', String(30))) metadata.create_all() t1.insert().execute([ dict(col2="t1col2r1", col3="aaa", col4="aaa"), dict(col2="t1col2r2", col3="bbb", col4="bbb"), dict(col2="t1col2r3", col3="ccc", col4="ccc"), ]) t2.insert().execute([ dict(col2="t2col2r1", col3="aaa", col4="bbb"), dict(col2="t2col2r2", col3="bbb", col4="ccc"), dict(col2="t2col2r3", col3="ccc", col4="aaa"), ]) t3.insert().execute([ dict(col2="t3col2r1", col3="aaa", col4="ccc"), dict(col2="t3col2r2", col3="bbb", col4="aaa"), dict(col2="t3col2r3", col3="ccc", col4="bbb"), ]) def tearDownAll(self): metadata.drop_all() def _fetchall_sorted(self, executed): return sorted([tuple(row) for row in executed.fetchall()]) def test_union(self): (s1, s2) = ( select([t1.c.col3.label('col3'), t1.c.col4.label('col4')], t1.c.col2.in_(["t1col2r1", "t1col2r2"])), select([t2.c.col3.label('col3'), t2.c.col4.label('col4')], t2.c.col2.in_(["t2col2r2", "t2col2r3"])) ) u = union(s1, s2) wanted = [('aaa', 'aaa'), ('bbb', 'bbb'), ('bbb', 'ccc'), ('ccc', 'aaa')] found1 = self._fetchall_sorted(u.execute()) self.assertEquals(found1, wanted) found2 = self._fetchall_sorted(u.alias('bar').select().execute()) self.assertEquals(found2, wanted) def test_union_ordered(self): (s1, s2) = ( select([t1.c.col3.label('col3'), t1.c.col4.label('col4')], t1.c.col2.in_(["t1col2r1", "t1col2r2"])), select([t2.c.col3.label('col3'), t2.c.col4.label('col4')], t2.c.col2.in_(["t2col2r2", "t2col2r3"])) ) u = union(s1, s2, order_by=['col3', 'col4']) wanted = [('aaa', 'aaa'), ('bbb', 'bbb'), ('bbb', 'ccc'), ('ccc', 'aaa')] self.assertEquals(u.execute().fetchall(), wanted) @testing.fails_on('maxdb') def test_union_ordered_alias(self): (s1, s2) = ( select([t1.c.col3.label('col3'), t1.c.col4.label('col4')], t1.c.col2.in_(["t1col2r1", "t1col2r2"])), select([t2.c.col3.label('col3'), t2.c.col4.label('col4')], t2.c.col2.in_(["t2col2r2", "t2col2r3"])) ) u = union(s1, s2, order_by=['col3', 'col4']) wanted = [('aaa', 'aaa'), ('bbb', 'bbb'), ('bbb', 'ccc'), ('ccc', 'aaa')] self.assertEquals(u.alias('bar').select().execute().fetchall(), wanted) @testing.unsupported('sqlite', 'mysql', 'oracle') def test_union_all(self): e = union_all( select([t1.c.col3]), union( select([t1.c.col3]), select([t1.c.col3]), ) ) wanted = [('aaa',),('aaa',),('bbb',), ('bbb',), ('ccc',),('ccc',)] found1 = self._fetchall_sorted(e.execute()) self.assertEquals(found1, wanted) found2 = self._fetchall_sorted(e.alias('foo').select().execute()) self.assertEquals(found2, wanted) @testing.unsupported('firebird', 'mysql', 'sybase') def test_intersect(self): i = intersect( select([t2.c.col3, t2.c.col4]), select([t2.c.col3, t2.c.col4], t2.c.col4==t3.c.col3) ) wanted = [('aaa', 'bbb'), ('bbb', 'ccc'), ('ccc', 'aaa')] found1 = self._fetchall_sorted(i.execute()) self.assertEquals(found1, wanted) found2 = self._fetchall_sorted(i.alias('bar').select().execute()) self.assertEquals(found2, wanted) @testing.unsupported('firebird', 'mysql', 'oracle', 'sybase') def test_except_style1(self): e = except_(union( select([t1.c.col3, t1.c.col4]), select([t2.c.col3, t2.c.col4]), select([t3.c.col3, t3.c.col4]), ), select([t2.c.col3, t2.c.col4])) wanted = [('aaa', 'aaa'), ('aaa', 'ccc'), ('bbb', 'aaa'), ('bbb', 'bbb'), ('ccc', 'bbb'), ('ccc', 'ccc')] found = self._fetchall_sorted(e.alias('bar').select().execute()) self.assertEquals(found, wanted) @testing.unsupported('firebird', 'mysql', 'oracle', 'sybase') def test_except_style2(self): e = except_(union( select([t1.c.col3, t1.c.col4]), select([t2.c.col3, t2.c.col4]), select([t3.c.col3, t3.c.col4]), ).alias('foo').select(), select([t2.c.col3, t2.c.col4]))
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -