📄 select.py
字号:
assert False except exceptions.InvalidRequestError, err: assert str(err) == "Select objects don't have a type. Call as_scalar() on this Select object to return a 'scalar' version of this Select.", str(err) s = select([table1.c.myid], scalar=True, correlate=False) self.assert_compile(select([table1, s]), "SELECT mytable.myid, mytable.name, mytable.description, (SELECT mytable.myid FROM mytable) AS anon_1 FROM mytable") s = select([table1.c.myid], scalar=True) self.assert_compile(select([table2, s]), "SELECT myothertable.otherid, myothertable.othername, (SELECT mytable.myid FROM mytable) AS anon_1 FROM myothertable") s = select([table1.c.myid]).correlate(None).as_scalar() self.assert_compile(select([table1, s]), "SELECT mytable.myid, mytable.name, mytable.description, (SELECT mytable.myid FROM mytable) AS anon_1 FROM mytable") s = select([table1.c.myid]).as_scalar() self.assert_compile(select([table2, s]), "SELECT myothertable.otherid, myothertable.othername, (SELECT mytable.myid FROM mytable) AS anon_1 FROM myothertable") # test expressions against scalar selects self.assert_compile(select([s - literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) - :param_1 AS anon_1") self.assert_compile(select([select([table1.c.name]).as_scalar() + literal('x')]), "SELECT (SELECT mytable.name FROM mytable) || :param_1 AS anon_1") self.assert_compile(select([s > literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) > :param_1 AS anon_1") self.assert_compile(select([select([table1.c.name]).label('foo')]), "SELECT (SELECT mytable.name FROM mytable) AS foo") # scalar selects should not have any attributes on their 'c' or 'columns' attribute s = select([table1.c.myid]).as_scalar() try: s.c.foo except exceptions.InvalidRequestError, err: assert str(err) == 'Scalar Select expression has no columns; use this object directly within a column-level expression.' try: s.columns.foo except exceptions.InvalidRequestError, err: assert str(err) == 'Scalar Select expression has no columns; use this object directly within a column-level expression.' zips = table('zips', column('zipcode'), column('latitude'), column('longitude'), ) places = table('places', column('id'), column('nm') ) zip = '12345' qlat = select([zips.c.latitude], zips.c.zipcode == zip).correlate(None).as_scalar() qlng = select([zips.c.longitude], zips.c.zipcode == zip).correlate(None).as_scalar() q = select([places.c.id, places.c.nm, zips.c.zipcode, func.latlondist(qlat, qlng).label('dist')], zips.c.zipcode==zip, order_by = ['dist', places.c.nm] ) self.assert_compile(q,"SELECT places.id, places.nm, zips.zipcode, latlondist((SELECT zips.latitude FROM zips WHERE " "zips.zipcode = :zips_zipcode_1), (SELECT zips.longitude FROM zips WHERE zips.zipcode = :zips_zipcode_2)) AS dist " "FROM places, zips WHERE zips.zipcode = :zips_zipcode_3 ORDER BY dist, places.nm") zalias = zips.alias('main_zip') qlat = select([zips.c.latitude], zips.c.zipcode == zalias.c.zipcode, scalar=True) qlng = select([zips.c.longitude], zips.c.zipcode == zalias.c.zipcode, scalar=True) q = select([places.c.id, places.c.nm, zalias.c.zipcode, func.latlondist(qlat, qlng).label('dist')], order_by = ['dist', places.c.nm] ) self.assert_compile(q, "SELECT places.id, places.nm, main_zip.zipcode, latlondist((SELECT zips.latitude FROM zips WHERE zips.zipcode = main_zip.zipcode), (SELECT zips.longitude FROM zips WHERE zips.zipcode = main_zip.zipcode)) AS dist FROM places, zips AS main_zip ORDER BY dist, places.nm") a1 = table2.alias('t2alias') s1 = select([a1.c.otherid], table1.c.myid==a1.c.otherid, scalar=True) j1 = table1.join(table2, table1.c.myid==table2.c.otherid) s2 = select([table1, s1], from_obj=j1) self.assert_compile(s2, "SELECT mytable.myid, mytable.name, mytable.description, (SELECT t2alias.otherid FROM myothertable AS t2alias WHERE mytable.myid = t2alias.otherid) AS anon_1 FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid") def test_label_comparison(self): x = func.lala(table1.c.myid).label('foo') self.assert_compile(select([x], x==5), "SELECT lala(mytable.myid) AS foo FROM mytable WHERE lala(mytable.myid) = :param_1") def test_conjunctions(self): self.assert_compile( and_(table1.c.myid == 12, table1.c.name=='asdf', table2.c.othername == 'foo', "sysdate() = today()"), "mytable.myid = :mytable_myid_1 AND mytable.name = :mytable_name_1 "\ "AND myothertable.othername = :myothertable_othername_1 AND sysdate() = today()" ) self.assert_compile( and_( table1.c.myid == 12, or_(table2.c.othername=='asdf', table2.c.othername == 'foo', table2.c.otherid == 9), "sysdate() = today()", ), "mytable.myid = :mytable_myid_1 AND (myothertable.othername = :myothertable_othername_1 OR "\ "myothertable.othername = :myothertable_othername_2 OR myothertable.otherid = :myothertable_otherid_1) AND sysdate() = today()", checkparams = {'myothertable_othername_1': 'asdf', 'myothertable_othername_2':'foo', 'myothertable_otherid_1': 9, 'mytable_myid_1': 12} ) def test_distinct(self): self.assert_compile( select([table1.c.myid.distinct()]), "SELECT DISTINCT mytable.myid FROM mytable" ) self.assert_compile( select([distinct(table1.c.myid)]), "SELECT DISTINCT mytable.myid FROM mytable" ) self.assert_compile( select([table1.c.myid]).distinct(), "SELECT DISTINCT mytable.myid FROM mytable" ) self.assert_compile( select([func.count(table1.c.myid.distinct())]), "SELECT count(DISTINCT mytable.myid) AS count_1 FROM mytable" ) self.assert_compile( select([func.count(distinct(table1.c.myid))]), "SELECT count(DISTINCT mytable.myid) AS count_1 FROM mytable" ) def test_operators(self): for (py_op, sql_op) in ((operator.add, '+'), (operator.mul, '*'), (operator.sub, '-'), (operator.div, '/'), ): for (lhs, rhs, res) in ( (5, table1.c.myid, ':mytable_myid_1 %s mytable.myid'), (5, literal(5), ':param_1 %s :param_2'), (table1.c.myid, 'b', 'mytable.myid %s :mytable_myid_1'), (table1.c.myid, literal(2.7), 'mytable.myid %s :param_1'), (table1.c.myid, table1.c.myid, 'mytable.myid %s mytable.myid'), (literal(5), 8, ':param_1 %s :param_2'), (literal(6), table1.c.myid, ':param_1 %s mytable.myid'), (literal(7), literal(5.5), ':param_1 %s :param_2'), ): self.assert_compile(py_op(lhs, rhs), res % sql_op) dt = datetime.datetime.today() # exercise comparison operators for (py_op, fwd_op, rev_op) in ((operator.lt, '<', '>'), (operator.gt, '>', '<'), (operator.eq, '=', '='), (operator.ne, '!=', '!='), (operator.le, '<=', '>='), (operator.ge, '>=', '<=')): for (lhs, rhs, l_sql, r_sql) in ( ('a', table1.c.myid, ':mytable_myid_1', 'mytable.myid'), ('a', literal('b'), ':param_2', ':param_1'), # note swap! (table1.c.myid, 'b', 'mytable.myid', ':mytable_myid_1'), (table1.c.myid, literal('b'), 'mytable.myid', ':param_1'), (table1.c.myid, table1.c.myid, 'mytable.myid', 'mytable.myid'), (literal('a'), 'b', ':param_1', ':param_2'), (literal('a'), table1.c.myid, ':param_1', 'mytable.myid'), (literal('a'), literal('b'), ':param_1', ':param_2'), (dt, literal('b'), ':param_2', ':param_1'), (literal('b'), dt, ':param_1', ':param_2'), ): # the compiled clause should match either (e.g.): # 'a' < 'b' -or- 'b' > 'a'. compiled = str(py_op(lhs, rhs)) fwd_sql = "%s %s %s" % (l_sql, fwd_op, r_sql) rev_sql = "%s %s %s" % (r_sql, rev_op, l_sql) self.assert_(compiled == fwd_sql or compiled == rev_sql, "\n'" + compiled + "'\n does not match\n'" + fwd_sql + "'\n or\n'" + rev_sql + "'") self.assert_compile( table1.select((table1.c.myid != 12) & ~(table1.c.name=='john')), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid_1 AND mytable.name != :mytable_name_1" ) self.assert_compile( table1.select((table1.c.myid != 12) & ~(table1.c.name.between('jack','john'))), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid_1 AND "\ "NOT (mytable.name BETWEEN :mytable_name_1 AND :mytable_name_2)" ) self.assert_compile( table1.select((table1.c.myid != 12) & ~and_(table1.c.name=='john', table1.c.name=='ed', table1.c.name=='fred')), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid_1 AND "\ "NOT (mytable.name = :mytable_name_1 AND mytable.name = :mytable_name_2 AND mytable.name = :mytable_name_3)" ) self.assert_compile( table1.select((table1.c.myid != 12) & ~table1.c.name), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid_1 AND NOT mytable.name" ) self.assert_compile( literal("a") + literal("b") * literal("c"), ":param_1 || :param_2 * :param_3" ) # test the op() function, also that its results are further usable in expressions self.assert_compile( table1.select(table1.c.myid.op('hoho')(12)==14), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE (mytable.myid hoho :mytable_myid_1) = :param_1" ) # test that clauses can be pickled (operators need to be module-level, etc.) clause = (table1.c.myid == 12) & table1.c.myid.between(15, 20) & table1.c.myid.like('hoho') assert str(clause) == str(util.pickle.loads(util.pickle.dumps(clause))) # ILIKE stmt = table1.select(table1.c.name.ilike('%something%')) self.assert_compile(stmt, "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE lower(mytable.name) LIKE lower(:mytable_name_1)") self.assert_compile(stmt, "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.name ILIKE %(mytable_name_1)s", dialect=postgres.PGDialect()) stmt = table1.select(~table1.c.name.ilike('%something%')) self.assert_compile(stmt, "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE lower(mytable.name) NOT LIKE lower(:mytable_name_1)") self.assert_compile(stmt, "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.name NOT ILIKE %(mytable_name_1)s", dialect=postgres.PGDialect()) def test_composed_string_comparators(self): self.assert_compile( table1.c.name.contains('jo'), "mytable.name LIKE '%%' || :mytable_name_1 || '%%'" , checkparams = {'mytable_name_1': u'jo'}, ) self.assert_compile( table1.c.name.contains('jo'), "mytable.name LIKE concat(concat('%%', %s), '%%')" , checkparams = {'mytable_name_1': u'jo'}, dialect=mysql.dialect() ) self.assert_compile( table1.c.name.endswith('hn'), "mytable.name LIKE '%%' || :mytable_name_1", checkparams = {'mytable_name_1': u'hn'}, ) self.assert_compile( table1.c.name.endswith('hn'), "mytable.name LIKE concat('%%', %s)", checkparams = {'mytable_name_1': u'hn'}, dialect=mysql.dialect() ) self.assert_compile( table1.c.name.startswith(u"hi \xf6 \xf5"), "mytable.name LIKE :mytable_name_1 || '%%'", checkparams = {'mytable_name_1': u'hi \xf6 \xf5'}, ) self.assert_compile(column('name').endswith(text("'foo'")), "name LIKE '%%' || 'foo'" ) self.assert_compile(column('name').endswith(literal_column("'foo'")), "name LIKE '%%' || 'foo'" ) self.assert_compile(column('name').startswith(text("'foo'")), "name LIKE 'foo' || '%%'" ) self.assert_compile(column('name').startswith(text("'foo'")), "name LIKE concat('foo', '%%')", dialect=mysql.dialect()) self.assert_compile(column('name').startswith(literal_column("'foo'")), "name LIKE 'foo' || '%%'" ) self.assert_compile(column('name').startswith(literal_column("'foo'")), "name LIKE concat('foo', '%%')", dialect=mysql.dialect()) def test_multiple_col_binds(self): self.assert_compile(
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -