📄 expr.py
字号:
class NonAssocBinaryOper(BinaryOper): oper = " (unknown) "@compile.when(NonAssocBinaryOper)@compile_python.when(NonAssocBinaryOper)def compile_non_assoc_binary_oper(compile, state, oper): expr1 = compile(state, oper.expr1) state.precedence += 0.5 # Enforce parentheses. expr2 = compile(state, oper.expr2) return "%s%s%s" % (expr1, oper.oper, expr2)class CompoundOper(CompoundExpr): oper = " (unknown) "@compile.when(CompoundOper)def compile_compound_oper(compile, state, oper): return compile(state, oper.exprs, oper.oper)@compile_python.when(CompoundOper)def compile_compound_oper(compile, state, oper): return compile(state, oper.exprs, oper.oper.lower())class Eq(BinaryOper): oper = " = "@compile.when(Eq)def compile_eq(compile, state, eq): if eq.expr2 is None: return "%s IS NULL" % compile(state, eq.expr1) return "%s = %s" % (compile(state, eq.expr1), compile(state, eq.expr2))@compile_python.when(Eq)def compile_eq(compile, state, eq): return "%s == %s" % (compile(state, eq.expr1), compile(state, eq.expr2))class Ne(BinaryOper): oper = " != "@compile.when(Ne)def compile_ne(compile, state, ne): if ne.expr2 is None: return "%s IS NOT NULL" % compile(state, ne.expr1) return "%s != %s" % (compile(state, ne.expr1), compile(state, ne.expr2))class Gt(BinaryOper): oper = " > "class Ge(BinaryOper): oper = " >= "class Lt(BinaryOper): oper = " < "class Le(BinaryOper): oper = " <= "class RShift(BinaryOper): oper = ">>"class LShift(BinaryOper): oper = "<<"class Like(BinaryOper): oper = " LIKE " def __init__(self, expr1, expr2, escape=Undef): self.expr1 = expr1 self.expr2 = expr2 self.escape = escape@compile.when(Like)def compile_binary_oper(compile, state, like): statement = "%s%s%s" % (compile(state, like.expr1), like.oper, compile(state, like.expr2)) if like.escape is not Undef: statement = "%s ESCAPE %s" % (statement, compile(state, like.escape)) return statement# It's easy to support it. Later.compile_python.when(Like)(compile_python_unsupported)class In(BinaryOper): oper = " IN "@compile.when(In)def compile_in(compile, state, expr): expr1 = compile(state, expr.expr1) state.precedence = 0 # We're forcing parenthesis here. return "%s IN (%s)" % (expr1, compile(state, expr.expr2))@compile_python.when(In)def compile_in(compile, state, expr): expr1 = compile(state, expr.expr1) state.precedence = 0 # We're forcing parenthesis here. return "%s in (%s,)" % (expr1, compile(state, expr.expr2))class Add(CompoundOper): oper = "+"class Sub(NonAssocBinaryOper): oper = "-"class Mul(CompoundOper): oper = "*"class Div(NonAssocBinaryOper): oper = "/"class Mod(NonAssocBinaryOper): oper = "%"class And(CompoundOper): oper = " AND "class Or(CompoundOper): oper = " OR "@compile.when(And, Or)def compile_compound_oper(compile, state, oper): return compile(state, oper.exprs, oper.oper, raw=True)# --------------------------------------------------------------------# Set expressions.class SetExpr(Expr): oper = " (unknown) " def __init__(self, *exprs, **kwargs): self.exprs = exprs self.all = kwargs.get("all", False) self.order_by = kwargs.get("order_by", Undef) self.limit = kwargs.get("limit", Undef) self.offset = kwargs.get("offset", Undef)@compile.when(SetExpr)def compile_set_expr(compile, state, expr): if expr.order_by is not Undef: # When ORDER BY is present, databases usually have trouble using # fully qualified column names. Because of that, we transform # pure column names into aliases, and use them in the ORDER BY. aliases = {} for subexpr in expr.exprs: if isinstance(subexpr, Select): columns = subexpr.columns if not isinstance(columns, (tuple, list)): columns = [columns] else: columns = list(columns) for i, column in enumerate(columns): if column not in aliases: if isinstance(column, Column): aliases[column] = columns[i] = Alias(column) elif isinstance(column, Alias): aliases[column.expr] = column subexpr.columns = columns state.push("context", SELECT) # In the statement: # SELECT foo UNION SELECT bar LIMIT 1 # The LIMIT 1 applies to the union results, not the SELECT bar # This ensures that parentheses will be placed around the # sub-selects in the expression. state.precedence += 0.5 oper = expr.oper if expr.all: oper += "ALL " statement = compile(state, expr.exprs, oper) state.precedence -= 0.5 if expr.order_by is not Undef: state.context = COLUMN_NAME if state.aliases is None: state.push("aliases", aliases) else: # Previously defined aliases have precedence. aliases.update(state.aliases) state.aliases = aliases aliases = None statement += " ORDER BY " + compile(state, expr.order_by) if aliases is not None: state.pop() if expr.limit is not Undef: statement += " LIMIT %d" % expr.limit if expr.offset is not Undef: statement += " OFFSET %d" % expr.offset state.pop() return statementclass Union(SetExpr): oper = " UNION "class Except(SetExpr): oper = " EXCEPT "class Intersect(SetExpr): oper = " INTERSECT "# --------------------------------------------------------------------# Functionsclass FuncExpr(ComparableExpr): name = "(unknown)"class Count(FuncExpr): name = "COUNT" def __init__(self, column=Undef, distinct=False): if distinct and column is Undef: raise ValueError("Must specify column when using distinct count") self.column = column self.distinct = distinct@compile.when(Count)def compile_count(compile, state, count): if count.column is not Undef: if count.distinct: return "COUNT(DISTINCT %s)" % compile(state, count.column) return "COUNT(%s)" % compile(state, count.column) return "COUNT(*)"class Func(FuncExpr): def __init__(self, name, *args): self.name = name self.args = argsclass NamedFunc(FuncExpr): def __init__(self, *args): self.args = args@compile.when(Func, NamedFunc)def compile_func(compile, state, func): return "%s(%s)" % (func.name, compile(state, func.args))class Max(NamedFunc): name = "MAX"class Min(NamedFunc): name = "MIN"class Avg(NamedFunc): name = "AVG"class Sum(NamedFunc): name = "SUM"class Lower(NamedFunc): name = "LOWER"class Upper(NamedFunc): name = "UPPER"# --------------------------------------------------------------------# Prefix and suffix expressionsclass PrefixExpr(Expr): prefix = "(unknown)" def __init__(self, expr): self.expr = expr@compile.when(PrefixExpr)def compile_prefix_expr(compile, state, expr): return "%s %s" % (expr.prefix, compile(state, expr.expr))class SuffixExpr(Expr): suffix = "(unknown)" def __init__(self, expr): self.expr = expr@compile.when(SuffixExpr)def compile_suffix_expr(compile, state, expr): return "%s %s" % (compile(state, expr.expr, raw=True), expr.suffix)class Not(PrefixExpr): prefix = "NOT"class Exists(PrefixExpr): prefix = "EXISTS"class Asc(SuffixExpr): suffix = "ASC"class Desc(SuffixExpr): suffix = "DESC"# --------------------------------------------------------------------# Plain SQL expressions.class SQLRaw(str): """Subtype to mark a string as something that shouldn't be compiled. This is handled internally by the compiler. """class SQLToken(str): """Marker for strings the should be considered as a single SQL token. In the future, these strings will be quoted, when needed. This is handled internally by the compiler. """class SQL(ComparableExpr): def __init__(self, expr, params=Undef, tables=Undef): self.expr = expr self.params = params self.tables = tables@compile.when(SQL)def compile_sql(compile, state, expr): if expr.params is not Undef: if type(expr.params) not in (tuple, list): raise CompileError("Parameters should be a list or a tuple, " "not %r" % type(expr.params)) for param in expr.params: state.parameters.append(param) if expr.tables is not Undef: state.auto_tables.append(expr.tables) return expr.expr# --------------------------------------------------------------------# Utility functions.def compare_columns(columns, values): if not columns: return Undef equals = [] if len(columns) == 1: value = values[0] if not isinstance(value, (Expr, Variable)) and value is not None: value = columns[0].variable_factory(value=value) return Eq(columns[0], value) else: for column, value in zip(columns, values): if not isinstance(value, (Expr, Variable)) and value is not None: value = column.variable_factory(value=value) equals.append(Eq(column, value)) return And(*equals)# --------------------------------------------------------------------# Auto tableclass AutoTable(Expr): """This class will inject an entry in state.auto_tables. If the constructor is passed replace=True, it will also discard any auto_table entries injected by compiling the given expression. """ def __init__(self, expr, table, replace=False): self.expr = expr self.table = table self.replace = replace@compile.when(AutoTable)def compile_auto_table(compile, state, expr): if expr.replace: state.push("auto_tables", []) statement = compile(state, expr.expr) if expr.replace: state.pop() state.auto_tables.append(expr.table) return statement# --------------------------------------------------------------------# Set operator precedences.compile.set_precedence(10, Select, Insert, Update, Delete)compile.set_precedence(10, Join, LeftJoin, RightJoin)compile.set_precedence(10, NaturalJoin, NaturalLeftJoin, NaturalRightJoin)compile.set_precedence(10, Union, Except, Intersect)compile.set_precedence(20, SQL)compile.set_precedence(30, Or)compile.set_precedence(40, And)compile.set_precedence(50, Eq, Ne, Gt, Ge, Lt, Le, Like, In)compile.set_precedence(60, LShift, RShift)compile.set_precedence(70, Add, Sub)compile.set_precedence(80, Mul, Div, Mod)compile_python.set_precedence(10, Or)compile_python.set_precedence(20, And)compile_python.set_precedence(30, Eq, Ne, Gt, Ge, Lt, Le, Like, In)compile_python.set_precedence(40, LShift, RShift)compile_python.set_precedence(50, Add, Sub)compile_python.set_precedence(60, Mul, Div, Mod)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -