📄 query.py
字号:
import testenv; testenv.configure_for_tests()import operatorfrom sqlalchemy import *from sqlalchemy import exceptionsfrom sqlalchemy.sql import compilerfrom sqlalchemy.engine import defaultfrom sqlalchemy.orm import *from testlib import *from testlib import enginesfrom testlib.fixtures import *class QueryTest(FixtureTest): keep_mappers = True keep_data = True def setUpAll(self): super(QueryTest, self).setUpAll() self.setup_mappers() def setup_mappers(self): mapper(User, users, properties={ 'addresses':relation(Address, backref='user'), 'orders':relation(Order, backref='user'), # o2m, m2o }) mapper(Address, addresses) mapper(Order, orders, properties={ 'items':relation(Item, secondary=order_items, order_by=items.c.id), #m2m 'address':relation(Address), # m2o }) mapper(Item, items, properties={ 'keywords':relation(Keyword, secondary=item_keywords) #m2m }) mapper(Keyword, keywords)class UnicodeSchemaTest(QueryTest): keep_mappers = False def setup_mappers(self): pass def define_tables(self, metadata): super(UnicodeSchemaTest, self).define_tables(metadata) global uni_meta, uni_users uni_meta = MetaData() uni_users = Table(u'users', uni_meta, Column(u'id', Integer, primary_key=True), Column(u'name', String(30), nullable=False)) def test_get(self): mapper(User, uni_users) assert User(id=7) == create_session(bind=testing.db).query(User).get(7)class GetTest(QueryTest): def test_get(self): s = create_session() assert s.query(User).get(19) is None u = s.query(User).get(7) u2 = s.query(User).get(7) assert u is u2 s.clear() u2 = s.query(User).get(7) assert u is not u2 def test_no_criterion(self): """test that get()/load() does not use preexisting filter/etc. criterion""" s = create_session() try: s.query(User).join('addresses').filter(Address.user_id==8).get(7) assert False except exceptions.SAWarning, e: assert str(e) == "Query.get() being called on a Query with existing criterion; criterion is being ignored." @testing.emits_warning('Query.*') def warns(): assert s.query(User).filter(User.id==7).get(19) is None u = s.query(User).get(7) assert s.query(User).filter(User.id==9).get(7) is u s.clear() assert s.query(User).filter(User.id==9).get(7).id == u.id # user 10 has no addresses u = s.query(User).get(10) assert s.query(User).join('addresses').get(10) is u s.clear() assert s.query(User).join('addresses').get(10).id == u.id u = s.query(User).get(7) assert s.query(User).join('addresses').filter(Address.user_id==8).filter(User.id==7).first() is None assert s.query(User).join('addresses').filter(Address.user_id==8).get(7) is u s.clear() assert s.query(User).join('addresses').filter(Address.user_id==8).get(7).id == u.id assert s.query(User).join('addresses').filter(Address.user_id==8).load(7).id == u.id warns() def test_unique_param_names(self): class SomeUser(object): pass s = users.select(users.c.id!=12).alias('users') m = mapper(SomeUser, s) print s.primary_key print m.primary_key assert s.primary_key == m.primary_key row = s.select(use_labels=True).execute().fetchone() print row[s.primary_key[0]] sess = create_session() assert sess.query(SomeUser).get(7).name == 'jack' def test_load(self): s = create_session() try: assert s.query(User).load(19) is None assert False except exceptions.InvalidRequestError: assert True u = s.query(User).load(7) u2 = s.query(User).load(7) assert u is u2 s.clear() u2 = s.query(User).load(7) assert u is not u2 u2.name = 'some name' a = Address(email_address='some other name') u2.addresses.append(a) assert u2 in s.dirty assert a in u2.addresses s.query(User).load(7) assert u2 not in s.dirty assert u2.name =='jack' assert a not in u2.addresses @testing.exclude('mysql', '<', (4, 1)) def test_unicode(self): """test that Query.get properly sets up the type for the bind parameter. using unicode would normally fail on postgres, mysql and oracle unless it is converted to an encoded string""" metadata = MetaData(engines.utf8_engine()) table = Table('unicode_data', metadata, Column('id', Unicode(40), primary_key=True), Column('data', Unicode(40))) try: metadata.create_all() ustring = 'petit voix m\xe2\x80\x99a'.decode('utf-8') table.insert().execute(id=ustring, data=ustring) class LocalFoo(Base): pass mapper(LocalFoo, table) self.assertEquals(create_session().query(LocalFoo).get(ustring), LocalFoo(id=ustring, data=ustring)) finally: metadata.drop_all() def test_populate_existing(self): s = create_session() userlist = s.query(User).all() u = userlist[0] u.name = 'foo' a = Address(name='ed') u.addresses.append(a) self.assert_(a in u.addresses) s.query(User).populate_existing().all() self.assert_(u not in s.dirty) self.assert_(u.name == 'jack') self.assert_(a not in u.addresses) u.addresses[0].email_address = 'lala' u.orders[1].items[2].description = 'item 12' # test that lazy load doesnt change child items s.query(User).populate_existing().all() assert u.addresses[0].email_address == 'lala' assert u.orders[1].items[2].description == 'item 12' # eager load does s.query(User).options(eagerload('addresses'), eagerload_all('orders.items')).populate_existing().all() assert u.addresses[0].email_address == 'jack@bean.com' assert u.orders[1].items[2].description == 'item 5'class OperatorTest(QueryTest): """test sql.Comparator implementation for MapperProperties""" def _test(self, clause, expected): c = str(clause.compile(dialect = default.DefaultDialect())) assert c == expected, "%s != %s" % (c, expected) def test_arithmetic(self): create_session().query(User) for (py_op, sql_op) in ((operator.add, '+'), (operator.mul, '*'), (operator.sub, '-'), (operator.div, '/'), ): for (lhs, rhs, res) in ( (5, User.id, ':users_id_1 %s users.id'), (5, literal(6), ':param_1 %s :param_2'), (User.id, 5, 'users.id %s :users_id_1'), (User.id, literal('b'), 'users.id %s :param_1'), (User.id, User.id, 'users.id %s users.id'), (literal(5), 'b', ':param_1 %s :param_2'), (literal(5), User.id, ':param_1 %s users.id'), (literal(5), literal(6), ':param_1 %s :param_2'), ): self._test(py_op(lhs, rhs), res % sql_op) def test_comparison(self): create_session().query(User) for (py_op, fwd_op, rev_op) in ((operator.lt, '<', '>'), (operator.gt, '>', '<'), (operator.eq, '=', '='), (operator.ne, '!=', '!='), (operator.le, '<=', '>='), (operator.ge, '>=', '<=')): for (lhs, rhs, l_sql, r_sql) in ( ('a', User.id, ':users_id_1', 'users.id'), ('a', literal('b'), ':param_2', ':param_1'), # note swap! (User.id, 'b', 'users.id', ':users_id_1'), (User.id, literal('b'), 'users.id', ':param_1'), (User.id, User.id, 'users.id', 'users.id'), (literal('a'), 'b', ':param_1', ':param_2'), (literal('a'), User.id, ':param_1', 'users.id'), (literal('a'), literal('b'), ':param_1', ':param_2'), ): # the compiled clause should match either (e.g.): # 'a' < 'b' -or- 'b' > 'a'. compiled = str(py_op(lhs, rhs).compile(dialect=default.DefaultDialect())) fwd_sql = "%s %s %s" % (l_sql, fwd_op, r_sql) rev_sql = "%s %s %s" % (r_sql, rev_op, l_sql) self.assert_(compiled == fwd_sql or compiled == rev_sql, "\n'" + compiled + "'\n does not match\n'" + fwd_sql + "'\n or\n'" + rev_sql + "'") def test_op(self): assert str(User.name.op('ilike')('17').compile(dialect=default.DefaultDialect())) == "users.name ilike :users_name_1" def test_in(self): self._test(User.id.in_(['a', 'b']), "users.id IN (:users_id_1, :users_id_2)") def test_between(self): self._test(User.id.between('a', 'b'), "users.id BETWEEN :users_id_1 AND :users_id_2") def test_clauses(self): for (expr, compare) in ( (func.max(User.id), "max(users.id)"), (User.id.desc(), "users.id DESC"), (between(5, User.id, Address.id), ":param_1 BETWEEN users.id AND addresses.id"), # this one would require adding compile() to InstrumentedScalarAttribute. do we want this ? #(User.id, "users.id") ): c = expr.compile(dialect=default.DefaultDialect()) assert str(c) == compare, "%s != %s" % (str(c), compare)class CompileTest(QueryTest): def test_deferred(self): session = create_session() s = session.query(User).filter(and_(addresses.c.email_address == bindparam('emailad'), Address.user_id==User.id)).compile() l = session.query(User).instances(s.execute(emailad = 'jack@bean.com')) assert [User(id=7)] == lclass SliceTest(QueryTest): def test_first(self): assert User(id=7) == create_session().query(User).first() assert create_session().query(User).filter(User.id==27).first() is None # more slice tests are available in test/orm/generative.pyclass TextTest(QueryTest): def test_fulltext(self): assert [User(id=7), User(id=8), User(id=9),User(id=10)] == create_session().query(User).from_statement("select * from users").all() def test_fragment(self): assert [User(id=8), User(id=9)] == create_session().query(User).filter("id in (8, 9)").all() assert [User(id=9)] == create_session().query(User).filter("name='fred'").filter("id=9").all() assert [User(id=9)] == create_session().query(User).filter("name='fred'").filter(User.id==9).all() def test_binds(self): assert [User(id=8), User(id=9)] == create_session().query(User).filter("id in (:id1, :id2)").params(id1=8, id2=9).all()class FilterTest(QueryTest): def test_basic(self): assert [User(id=7), User(id=8), User(id=9),User(id=10)] == create_session().query(User).all() @testing.fails_on('maxdb') def test_limit(self): assert [User(id=8), User(id=9)] == create_session().query(User).limit(2).offset(1).all() assert [User(id=8), User(id=9)] == list(create_session().query(User)[1:3]) assert User(id=8) == create_session().query(User)[1] def test_onefilter(self):
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -