📄 expr.py
字号:
for i, other in enumerate(others): if not isinstance(other, (Expr, Variable)): others[i] = variable_factory(value=other) return In(self, others) def like(self, other, escape=Undef): if not isinstance(other, (Expr, Variable)): other = getattr(self, "variable_factory", Variable)(value=other) return Like(self, other, escape) def lower(self): return Lower(self) def upper(self): return Upper(self)class ComparableExpr(Expr, Comparable): passclass BinaryExpr(ComparableExpr): def __init__(self, expr1, expr2): self.expr1 = expr1 self.expr2 = expr2class CompoundExpr(ComparableExpr): def __init__(self, *exprs): self.exprs = exprs# --------------------------------------------------------------------# Statement expressionsdef has_tables(state, expr): return (expr.tables is not Undef or expr.default_tables is not Undef or state.auto_tables)def build_tables(compile, state, tables, default_tables): """Compile provided tables. Tables will be built from either C{tables}, C{state.auto_tables}, or C{default_tables}. If C{tables} is not C{Undef}, it will be used. If C{tables} is C{Undef} and C{state.auto_tables} is available, that's used instead. If neither C{tables} nor C{state.auto_tables} are available, C{default_tables} is tried as a last resort. If none of them are available, C{NoTableError} is raised. """ if tables is Undef: if state.auto_tables: tables = state.auto_tables elif default_tables is not Undef: tables = default_tables else: tables = None # If we have no elements, it's an error. if not tables: raise NoTableError("Couldn't find any tables") # If it's a single element, it's trivial. if type(tables) not in (list, tuple) or len(tables) == 1: return compile(state, tables, raw=True) # If we have no joins, it's trivial as well. for elem in tables: if isinstance(elem, JoinExpr): break else: if tables is state.auto_tables: tables = set(compile(state, table, raw=True) for table in tables) return ", ".join(sorted(tables)) else: return compile(state, tables, raw=True) # Ok, now we have to be careful. # If we're dealing with auto_tables, we have to take care of # duplicated tables, join ordering, and so on. if tables is state.auto_tables: table_stmts = set() join_stmts = set() half_join_stmts = set() # push a join_tables onto the state: compile calls below will # populate this set so that we know what tables not to include. state.push("join_tables", set()) for elem in tables: statement = compile(state, elem, raw=True) if isinstance(elem, JoinExpr): if elem.left is Undef: half_join_stmts.add(statement) else: join_stmts.add(statement) else: table_stmts.add(statement) # Remove tables that were seen in join statements. table_stmts -= state.join_tables state.pop() result = ", ".join(sorted(table_stmts)+sorted(join_stmts)) if half_join_stmts: result += " " + " ".join(sorted(half_join_stmts)) return "".join(result) # Otherwise, it's just a matter of putting it together. result = [] for elem in tables: if result: if isinstance(elem, JoinExpr) and elem.left is Undef: #half-join result.append(" ") else: result.append(", ") result.append(compile(state, elem, raw=True)) return "".join(result)class Select(Expr): def __init__(self, columns, where=Undef, tables=Undef, default_tables=Undef, order_by=Undef, group_by=Undef, limit=Undef, offset=Undef, distinct=False): self.columns = columns self.where = where self.tables = tables self.default_tables = default_tables self.order_by = order_by self.group_by = group_by self.limit = limit self.offset = offset self.distinct = distinct@compile.when(Select)def compile_select(compile, state, select): tokens = ["SELECT "] if select.distinct: tokens.append("DISTINCT ") state.push("auto_tables", []) state.push("context", COLUMN) tokens.append(compile(state, select.columns)) tables_pos = len(tokens) parameters_pos = len(state.parameters) state.context = EXPR if select.where is not Undef: tokens.append(" WHERE ") tokens.append(compile(state, select.where, raw=True)) if select.order_by is not Undef: tokens.append(" ORDER BY ") tokens.append(compile(state, select.order_by, raw=True)) if select.group_by is not Undef: tokens.append(" GROUP BY ") tokens.append(compile(state, select.group_by, raw=True)) if select.limit is not Undef: tokens.append(" LIMIT %d" % select.limit) if select.offset is not Undef: tokens.append(" OFFSET %d" % select.offset) if has_tables(state, select): state.context = TABLE state.push("parameters", []) tokens.insert(tables_pos, " FROM ") tokens.insert(tables_pos+1, build_tables(compile, state, select.tables, select.default_tables)) parameters = state.parameters state.pop() state.parameters[parameters_pos:parameters_pos] = parameters state.pop() state.pop() return "".join(tokens)class Insert(Expr): def __init__(self, columns, values, table=Undef, default_table=Undef): self.columns = columns self.values = values self.table = table self.default_table = default_table@compile.when(Insert)def compile_insert(compile, state, insert): state.push("context", COLUMN_NAME) columns = compile(state, insert.columns) state.context = TABLE table = build_tables(compile, state, insert.table, insert.default_table) state.context = EXPR values = compile(state, insert.values) state.pop() return "".join(["INSERT INTO ", table, " (", columns, ") VALUES (", values, ")"])class Update(Expr): def __init__(self, set, where=Undef, table=Undef, default_table=Undef): self.set = set self.where = where self.table = table self.default_table = default_table@compile.when(Update)def compile_update(compile, state, update): set = update.set state.push("context", COLUMN_NAME) sets = ["%s=%s" % (compile(state, col), compile(state, set[col])) for col in set] state.context = TABLE tokens = ["UPDATE ", build_tables(compile, state, update.table, update.default_table), " SET ", ", ".join(sets)] if update.where is not Undef: state.context = EXPR tokens.append(" WHERE ") tokens.append(compile(state, update.where, raw=True)) state.pop() return "".join(tokens)class Delete(Expr): def __init__(self, where=Undef, table=Undef, default_table=Undef): self.where = where self.table = table self.default_table = default_table@compile.when(Delete)def compile_delete(compile, state, delete): tokens = ["DELETE FROM ", None] state.push("context", EXPR) if delete.where is not Undef: tokens.append(" WHERE ") tokens.append(compile(state, delete.where, raw=True)) # Compile later for auto_tables support. state.context = TABLE tokens[1] = build_tables(compile, state, delete.table, delete.default_table) state.pop() return "".join(tokens)# --------------------------------------------------------------------# Columnsclass Column(ComparableExpr): """Representation of a column in some table. @ivar name: Column name. @ivar table: Column table (maybe another expression). @ivar primary: Integer representing the primary key position of this column, or 0 if it's not a primary key. May be provided as a bool. @ivar variable_factory: Factory producing C{Variable} instances typed according to this column. """ def __init__(self, name=Undef, table=Undef, primary=False, variable_factory=None): self.name = name self.table = table self.primary = int(primary) self.variable_factory = variable_factory or Variable@compile.when(Column)def compile_column(compile, state, column): if column.table is not Undef: state.auto_tables.append(column.table) if column.table is Undef or state.context is COLUMN_NAME: if state.aliases is not None: # See compile_set_expr(). alias = state.aliases.get(column) if alias is not None: return alias.name return column.name state.push("context", COLUMN_PREFIX) table = compile(state, column.table, raw=True) state.pop() return "%s.%s" % (table, column.name)@compile_python.when(Column)def compile_python_column(compile, state, column): return "get_column(%s)" % repr(column.name)# --------------------------------------------------------------------# Alias expressionsclass Alias(ComparableExpr): """A representation of "AS" alias clauses. e.g., SELECT foo AS bar. """ auto_counter = 0 def __init__(self, expr, name=Undef): """Create alias of C{expr} AS C{name}. If C{name} is not given, then a name will automatically be generated. """ self.expr = expr if name is Undef: Alias.auto_counter += 1 name = "_%x" % Alias.auto_counter self.name = name@compile.when(Alias)def compile_alias(compile, state, alias): if state.context is COLUMN or state.context is TABLE: return "%s AS %s" % (compile(state, alias.expr), alias.name) return alias.name# --------------------------------------------------------------------# From expressionsclass FromExpr(Expr): passclass Table(FromExpr): def __init__(self, name): self.name = name@compile.when(Table)def compile_table(compile, state, table): return table.nameclass JoinExpr(FromExpr): left = on = Undef oper = "(unknown)" def __init__(self, arg1, arg2=Undef, on=Undef): # http://www.postgresql.org/docs/8.1/interactive/explicit-joins.html if arg2 is Undef: self.right = arg1 if on is not Undef: self.on = on elif not isinstance(arg2, Expr) or isinstance(arg2, (FromExpr, Alias)): self.left = arg1 self.right = arg2 if on is not Undef: self.on = on else: self.right = arg1 self.on = arg2 if on is not Undef: raise ExprError("Improper join arguments: (%r, %r, %r)" % (arg1, arg2, on))@compile.when(JoinExpr)def compile_join(compile, state, expr): result = [] # Ensure that nested JOINs get parentheses. state.precedence += 0.5 if expr.left is not Undef: statement = compile(state, expr.left, raw=True) result.append(statement) if state.join_tables is not None: state.join_tables.add(statement) result.append(expr.oper) statement = compile(state, expr.right, raw=True) result.append(statement) if state.join_tables is not None: state.join_tables.add(statement) if expr.on is not Undef: state.push("context", EXPR) result.append("ON") result.append(compile(state, expr.on, raw=True)) state.pop() return " ".join(result)class Join(JoinExpr): oper = "JOIN"class LeftJoin(JoinExpr): oper = "LEFT JOIN"class RightJoin(JoinExpr): oper = "RIGHT JOIN"class NaturalJoin(JoinExpr): oper = "NATURAL JOIN"class NaturalLeftJoin(JoinExpr): oper = "NATURAL LEFT JOIN"class NaturalRightJoin(JoinExpr): oper = "NATURAL RIGHT JOIN"# --------------------------------------------------------------------# Operatorsclass BinaryOper(BinaryExpr): oper = " (unknown) "@compile.when(BinaryOper)@compile_python.when(BinaryOper)def compile_binary_oper(compile, state, oper): return "%s%s%s" % (compile(state, oper.expr1), oper.oper, compile(state, oper.expr2))
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -