📄 query.py
字号:
assert [User(id=8), User(id=9)] == create_session().query(User).filter(User.name.endswith('ed')).all() def test_contains(self): """test comparing a collection to an object instance.""" sess = create_session() address = sess.query(Address).get(3) assert [User(id=8)] == sess.query(User).filter(User.addresses.contains(address)).all() try: sess.query(User).filter(User.addresses == address) assert False except exceptions.InvalidRequestError: assert True assert [User(id=10)] == sess.query(User).filter(User.addresses==None).all() try: assert [User(id=7), User(id=9), User(id=10)] == sess.query(User).filter(User.addresses!=address).all() assert False except exceptions.InvalidRequestError: assert True #assert [User(id=7), User(id=9), User(id=10)] == sess.query(User).filter(User.addresses!=address).all() def test_any(self): sess = create_session() assert [User(id=8), User(id=9)] == sess.query(User).filter(User.addresses.any(Address.email_address.like('%ed%'))).all() assert [User(id=8)] == sess.query(User).filter(User.addresses.any(Address.email_address.like('%ed%'), id=4)).all() assert [User(id=8)] == sess.query(User).filter(User.addresses.any(Address.email_address.like('%ed%'))).\ filter(User.addresses.any(id=4)).all() assert [User(id=9)] == sess.query(User).filter(User.addresses.any(email_address='fred@fred.com')).all() @testing.unsupported('maxdb') # can core def test_has(self): sess = create_session() assert [Address(id=5)] == sess.query(Address).filter(Address.user.has(name='fred')).all() assert [Address(id=2), Address(id=3), Address(id=4), Address(id=5)] == sess.query(Address).filter(Address.user.has(User.name.like('%ed%'))).all() assert [Address(id=2), Address(id=3), Address(id=4)] == sess.query(Address).filter(Address.user.has(User.name.like('%ed%'), id=8)).all() def test_contains_m2m(self): sess = create_session() item = sess.query(Item).get(3) assert [Order(id=1), Order(id=2), Order(id=3)] == sess.query(Order).filter(Order.items.contains(item)).all() assert [Order(id=4), Order(id=5)] == sess.query(Order).filter(~Order.items.contains(item)).all() def test_comparison(self): """test scalar comparison to an object instance""" sess = create_session() user = sess.query(User).get(8) assert [Address(id=2), Address(id=3), Address(id=4)] == sess.query(Address).filter(Address.user==user).all() assert [Address(id=1), Address(id=5)] == sess.query(Address).filter(Address.user!=user).all() # generates an IS NULL assert [] == sess.query(Address).filter(Address.user == None).all() assert [Order(id=5)] == sess.query(Order).filter(Order.address == None).all() def test_filter_by(self): sess = create_session() user = sess.query(User).get(8) assert [Address(id=2), Address(id=3), Address(id=4)] == sess.query(Address).filter_by(user=user).all() # many to one generates IS NULL assert [] == sess.query(Address).filter_by(user = None).all() # one to many generates WHERE NOT EXISTS assert [User(name='chuck')] == sess.query(User).filter_by(addresses = None).all()class AggregateTest(QueryTest): def test_sum(self): sess = create_session() orders = sess.query(Order).filter(Order.id.in_([2, 3, 4])) assert orders.sum(Order.user_id * Order.address_id) == 79 def test_apply(self): sess = create_session() assert sess.query(Order).apply_sum(Order.user_id * Order.address_id).filter(Order.id.in_([2, 3, 4])).one() == 79 def test_having(self): sess = create_session() assert [User(name=u'ed',id=8)] == sess.query(User).group_by([c for c in User.c]).join('addresses').having(func.count(Address.c.id)> 2).all() assert [User(name=u'jack',id=7), User(name=u'fred',id=9)] == sess.query(User).group_by([c for c in User.c]).join('addresses').having(func.count(Address.c.id)< 2).all()class CountTest(QueryTest): def test_basic(self): assert 4 == create_session().query(User).count() assert 2 == create_session().query(User).filter(users.c.name.endswith('ed')).count()class DistinctTest(QueryTest): def test_basic(self): assert [User(id=7), User(id=8), User(id=9),User(id=10)] == create_session().query(User).distinct().all() assert [User(id=7), User(id=9), User(id=8),User(id=10)] == create_session().query(User).distinct().order_by(desc(User.name)).all() def test_joined(self): """test that orderbys from a joined table get placed into the columns clause when DISTINCT is used""" sess = create_session() q = sess.query(User).join('addresses').distinct().order_by(desc(Address.email_address)) assert [User(id=7), User(id=9), User(id=8)] == q.all() sess.clear() # test that it works on embedded eagerload/LIMIT subquery q = sess.query(User).join('addresses').distinct().options(eagerload('addresses')).order_by(desc(Address.email_address)).limit(2) def go(): assert [ User(id=7, addresses=[ Address(id=1) ]), User(id=9, addresses=[ Address(id=5) ]), ] == q.all() self.assert_sql_count(testing.db, go, 1)class YieldTest(QueryTest): def test_basic(self): import gc sess = create_session() q = iter(sess.query(User).yield_per(1).from_statement("select * from users")) ret = [] self.assertEquals(len(sess.identity_map), 0) ret.append(q.next()) ret.append(q.next()) self.assertEquals(len(sess.identity_map), 2) ret.append(q.next()) ret.append(q.next()) self.assertEquals(len(sess.identity_map), 4) try: q.next() assert False except StopIteration: passclass TextTest(QueryTest): def test_fulltext(self): assert [User(id=7), User(id=8), User(id=9),User(id=10)] == create_session().query(User).from_statement("select * from users").all() def test_fragment(self): assert [User(id=8), User(id=9)] == create_session().query(User).filter("id in (8, 9)").all() assert [User(id=9)] == create_session().query(User).filter("name='fred'").filter("id=9").all() assert [User(id=9)] == create_session().query(User).filter("name='fred'").filter(User.id==9).all() def test_binds(self): assert [User(id=8), User(id=9)] == create_session().query(User).filter("id in (:id1, :id2)").params(id1=8, id2=9).all()class ParentTest(QueryTest): def test_o2m(self): sess = create_session() q = sess.query(User) u1 = q.filter_by(name='jack').one() # test auto-lookup of property o = sess.query(Order).with_parent(u1).all() assert [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")] == o # test with explicit property o = sess.query(Order).with_parent(u1, property='orders').all() assert [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")] == o # test static method o = Query.query_from_parent(u1, property='orders', session=sess).all() assert [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")] == o # test generative criterion o = sess.query(Order).with_parent(u1).filter(orders.c.id>2).all() assert [Order(description="order 3"), Order(description="order 5")] == o # test against None for parent? this can't be done with the current API since we don't know # what mapper to use #assert sess.query(Order).with_parent(None, property='addresses').all() == [Order(description="order 5")] def test_noparent(self): sess = create_session() q = sess.query(User) u1 = q.filter_by(name='jack').one() try: q = sess.query(Item).with_parent(u1) assert False except exceptions.InvalidRequestError, e: assert str(e) == "Could not locate a property which relates instances of class 'Item' to instances of class 'User'" def test_m2m(self): sess = create_session() i1 = sess.query(Item).filter_by(id=2).one() k = sess.query(Keyword).with_parent(i1).all() assert [Keyword(name='red'), Keyword(name='small'), Keyword(name='square')] == kclass JoinTest(QueryTest): def test_getjoinable_tables(self): sess = create_session() sel1 = select([users]).alias() sel2 = select([users], from_obj=users.join(addresses)).alias() j1 = sel1.join(users, sel1.c.id==users.c.id) j2 = j1.join(addresses) for from_obj, assert_cond in ( (users, [users]), (users.join(addresses), [users, addresses]), (sel1, [sel1]), (sel2, [sel2]), (sel1.join(users, sel1.c.id==users.c.id), [sel1, users]), (sel2.join(users, sel2.c.id==users.c.id), [sel2, users]), (j2, [j1, j2, sel1, users, addresses]) ): ret = set(sess.query(User).select_from(from_obj)._get_joinable_tables()) self.assertEquals(ret, set(assert_cond).union([from_obj]), [x.description for x in ret]) def test_overlapping_paths(self): for aliased in (True,False): # load a user who has an order that contains item id 3 and address id 1 (order 3, owned by jack) result = create_session().query(User).join(['orders', 'items'], aliased=aliased).filter_by(id=3).join(['orders','address'], aliased=aliased).filter_by(id=1).all() assert [User(id=7, name='jack')] == result def test_overlapping_paths_outerjoin(self): result = create_session().query(User).outerjoin(['orders', 'items']).filter_by(id=3).outerjoin(['orders','address']).filter_by(id=1).all() assert [User(id=7, name='jack')] == result def test_reset_joinpoint(self): for aliased in (True, False): # load a user who has an order that contains item id 3 and address id 1 (order 3, owned by jack) result = create_session().query(User).join(['orders', 'items'], aliased=aliased).filter_by(id=3).reset_joinpoint().join(['orders','address'], aliased=aliased).filter_by(id=1).all() assert [User(id=7, name='jack')] == result result = create_session().query(User).outerjoin(['orders', 'items'], aliased=aliased).filter_by(id=3).reset_joinpoint().outerjoin(['orders','address'], aliased=aliased).filter_by(id=1).all() assert [User(id=7, name='jack')] == result def test_overlap_with_aliases(self): oalias = orders.alias('oalias') result = create_session().query(User).select_from(users.join(oalias)).filter(oalias.c.description.in_(["order 1", "order 2", "order 3"])).join(['orders', 'items']).all() assert [User(id=7, name='jack'), User(id=9, name='fred')] == result result = create_session().query(User).select_from(users.join(oalias)).filter(oalias.c.description.in_(["order 1", "order 2", "order 3"])).join(['orders', 'items']).filter_by(id=4).all() assert [User(id=7, name='jack')] == result def test_aliased(self): """test automatic generation of aliased joins.""" sess = create_session() # test a basic aliasized path q = sess.query(User).join('addresses', aliased=True).filter_by(email_address='jack@bean.com') assert [User(id=7)] == q.all() q = sess.query(User).join('addresses', aliased=True).filter(Address.email_address=='jack@bean.com') assert [User(id=7)] == q.all() q = sess.query(User).join('addresses', aliased=True).filter(or_(Address.email_address=='jack@bean.com', Address.email_address=='fred@fred.com')) assert [User(id=7), User(id=9)] == q.all() # test two aliasized paths, one to 'orders' and the other to 'orders','items'. # one row is returned because user 7 has order 3 and also has order 1 which has item 1 # this tests a o2m join and a m2m join. q = sess.query(User).join('orders', aliased=True).filter(Order.description=="order 3").join(['orders', 'items'], aliased=True).filter(Item.description=="item 1") assert q.count() == 1 assert [User(id=7)] == q.all() # test the control version - same joins but not aliased. rows are not returned because order 3 does not have item 1 # addtionally by placing this test after the previous one, test that the "aliasing" step does not corrupt the # join clauses that are cached by the relationship. q = sess.query(User).join('orders').filter(Order.description=="order 3").join(['orders', 'items']).filter(Order.description=="item 1") assert [] == q.all() assert q.count() == 0 q = sess.query(User).join('orders', aliased=True).filter(Order.items.any(Item.description=='item 4')) assert [User(id=7)] == q.all() def test_aliased_add_entity(self): """test the usage of aliased joins with add_entity()""" sess = create_session() q = sess.query(User).join('orders', aliased=True, id='order1').filter(Order.description=="order 3").join(['orders', 'items'], aliased=True, id='item1').filter(Item.description=="item 1") try: q.add_entity(Order, id='fakeid').compile() assert False except exceptions.InvalidRequestError, e: assert str(e) == "Query has no alias identified by 'fakeid'" try: q.add_entity(Order, id='fakeid').instances(None) assert False except exceptions.InvalidRequestError, e: assert str(e) == "Query has no alias identified by 'fakeid'"
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -