⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 expr.py

📁 Python的一个ORM,现在很火
💻 PY
📖 第 1 页 / 共 3 页
字号:
            for i, other in enumerate(others):                if not isinstance(other, (Expr, Variable)):                    others[i] = variable_factory(value=other)        return In(self, others)    def like(self, other, escape=Undef):        if not isinstance(other, (Expr, Variable)):            other = getattr(self, "variable_factory", Variable)(value=other)        return Like(self, other, escape)    def lower(self):        return Lower(self)    def upper(self):        return Upper(self)class ComparableExpr(Expr, Comparable):    passclass BinaryExpr(ComparableExpr):    def __init__(self, expr1, expr2):        self.expr1 = expr1        self.expr2 = expr2class CompoundExpr(ComparableExpr):    def __init__(self, *exprs):        self.exprs = exprs# --------------------------------------------------------------------# Statement expressionsdef has_tables(state, expr):    return (expr.tables is not Undef or            expr.default_tables is not Undef or            state.auto_tables)def build_tables(compile, state, tables, default_tables):    """Compile provided tables.    Tables will be built from either C{tables}, C{state.auto_tables}, or    C{default_tables}.  If C{tables} is not C{Undef}, it will be used. If    C{tables} is C{Undef} and C{state.auto_tables} is available, that's used    instead. If neither C{tables} nor C{state.auto_tables} are available,    C{default_tables} is tried as a last resort. If none of them are available,    C{NoTableError} is raised.    """    if tables is Undef:        if state.auto_tables:            tables = state.auto_tables        elif default_tables is not Undef:            tables = default_tables        else:            tables = None    # If we have no elements, it's an error.    if not tables:        raise NoTableError("Couldn't find any tables")    # If it's a single element, it's trivial.    if type(tables) not in (list, tuple) or len(tables) == 1:        return compile(state, tables, raw=True)        # If we have no joins, it's trivial as well.    for elem in tables:        if isinstance(elem, JoinExpr):            break    else:        if tables is state.auto_tables:            tables = set(compile(state, table, raw=True) for table in tables)            return ", ".join(sorted(tables))        else:            return compile(state, tables, raw=True)    # Ok, now we have to be careful.    # If we're dealing with auto_tables, we have to take care of    # duplicated tables, join ordering, and so on.    if tables is state.auto_tables:        table_stmts = set()        join_stmts = set()        half_join_stmts = set()        # push a join_tables onto the state: compile calls below will        # populate this set so that we know what tables not to include.        state.push("join_tables", set())        for elem in tables:            statement = compile(state, elem, raw=True)            if isinstance(elem, JoinExpr):                if elem.left is Undef:                    half_join_stmts.add(statement)                else:                    join_stmts.add(statement)            else:                table_stmts.add(statement)        # Remove tables that were seen in join statements.        table_stmts -= state.join_tables        state.pop()        result = ", ".join(sorted(table_stmts)+sorted(join_stmts))        if half_join_stmts:            result += " " + " ".join(sorted(half_join_stmts))        return "".join(result)    # Otherwise, it's just a matter of putting it together.    result = []    for elem in tables:        if result:            if isinstance(elem, JoinExpr) and elem.left is Undef: #half-join                result.append(" ")            else:                result.append(", ")        result.append(compile(state, elem, raw=True))    return "".join(result)class Select(Expr):    def __init__(self, columns, where=Undef,                 tables=Undef, default_tables=Undef,                 order_by=Undef, group_by=Undef,                 limit=Undef, offset=Undef, distinct=False):        self.columns = columns        self.where = where        self.tables = tables        self.default_tables = default_tables        self.order_by = order_by        self.group_by = group_by        self.limit = limit        self.offset = offset        self.distinct = distinct@compile.when(Select)def compile_select(compile, state, select):    tokens = ["SELECT "]    if select.distinct:        tokens.append("DISTINCT ")    state.push("auto_tables", [])    state.push("context", COLUMN)    tokens.append(compile(state, select.columns))    tables_pos = len(tokens)    parameters_pos = len(state.parameters)    state.context = EXPR    if select.where is not Undef:        tokens.append(" WHERE ")        tokens.append(compile(state, select.where, raw=True))    if select.order_by is not Undef:        tokens.append(" ORDER BY ")        tokens.append(compile(state, select.order_by, raw=True))    if select.group_by is not Undef:        tokens.append(" GROUP BY ")        tokens.append(compile(state, select.group_by, raw=True))    if select.limit is not Undef:        tokens.append(" LIMIT %d" % select.limit)    if select.offset is not Undef:        tokens.append(" OFFSET %d" % select.offset)    if has_tables(state, select):        state.context = TABLE        state.push("parameters", [])        tokens.insert(tables_pos, " FROM ")        tokens.insert(tables_pos+1, build_tables(compile, state, select.tables,                                                 select.default_tables))        parameters = state.parameters        state.pop()        state.parameters[parameters_pos:parameters_pos] = parameters    state.pop()    state.pop()    return "".join(tokens)class Insert(Expr):    def __init__(self, columns, values, table=Undef, default_table=Undef):        self.columns = columns        self.values = values        self.table = table        self.default_table = default_table@compile.when(Insert)def compile_insert(compile, state, insert):    state.push("context", COLUMN_NAME)    columns = compile(state, insert.columns)    state.context = TABLE    table = build_tables(compile, state, insert.table, insert.default_table)    state.context = EXPR    values = compile(state, insert.values)    state.pop()    return "".join(["INSERT INTO ", table, " (", columns,                    ") VALUES (", values, ")"])class Update(Expr):    def __init__(self, set, where=Undef, table=Undef, default_table=Undef):        self.set = set        self.where = where        self.table = table        self.default_table = default_table@compile.when(Update)def compile_update(compile, state, update):    set = update.set    state.push("context", COLUMN_NAME)    sets = ["%s=%s" % (compile(state, col), compile(state, set[col]))            for col in set]    state.context = TABLE    tokens = ["UPDATE ", build_tables(compile, state, update.table,                                      update.default_table),              " SET ", ", ".join(sets)]    if update.where is not Undef:        state.context = EXPR        tokens.append(" WHERE ")        tokens.append(compile(state, update.where, raw=True))    state.pop()    return "".join(tokens)class Delete(Expr):    def __init__(self, where=Undef, table=Undef, default_table=Undef):        self.where = where        self.table = table        self.default_table = default_table@compile.when(Delete)def compile_delete(compile, state, delete):    tokens = ["DELETE FROM ", None]    state.push("context", EXPR)    if delete.where is not Undef:        tokens.append(" WHERE ")        tokens.append(compile(state, delete.where, raw=True))    # Compile later for auto_tables support.    state.context = TABLE    tokens[1] = build_tables(compile, state, delete.table,                             delete.default_table)    state.pop()    return "".join(tokens)# --------------------------------------------------------------------# Columnsclass Column(ComparableExpr):    """Representation of a column in some table.    @ivar name: Column name.    @ivar table: Column table (maybe another expression).    @ivar primary: Integer representing the primary key position of        this column, or 0 if it's not a primary key. May be provided as        a bool.    @ivar variable_factory: Factory producing C{Variable} instances typed        according to this column.    """    def __init__(self, name=Undef, table=Undef, primary=False,                 variable_factory=None):        self.name = name        self.table = table        self.primary = int(primary)        self.variable_factory = variable_factory or Variable@compile.when(Column)def compile_column(compile, state, column):    if column.table is not Undef:        state.auto_tables.append(column.table)    if column.table is Undef or state.context is COLUMN_NAME:        if state.aliases is not None:            # See compile_set_expr().            alias = state.aliases.get(column)            if alias is not None:                return alias.name        return column.name    state.push("context", COLUMN_PREFIX)    table = compile(state, column.table, raw=True)    state.pop()    return "%s.%s" % (table, column.name)@compile_python.when(Column)def compile_python_column(compile, state, column):    return "get_column(%s)" % repr(column.name)# --------------------------------------------------------------------# Alias expressionsclass Alias(ComparableExpr):    """A representation of "AS" alias clauses. e.g., SELECT foo AS bar.    """    auto_counter = 0    def __init__(self, expr, name=Undef):        """Create alias of C{expr} AS C{name}.        If C{name} is not given, then a name will automatically be        generated.        """        self.expr = expr        if name is Undef:            Alias.auto_counter += 1            name = "_%x" % Alias.auto_counter        self.name = name@compile.when(Alias)def compile_alias(compile, state, alias):    if state.context is COLUMN or state.context is TABLE:        return "%s AS %s" % (compile(state, alias.expr), alias.name)    return alias.name# --------------------------------------------------------------------# From expressionsclass FromExpr(Expr):    passclass Table(FromExpr):    def __init__(self, name):        self.name = name@compile.when(Table)def compile_table(compile, state, table):    return table.nameclass JoinExpr(FromExpr):    left = on = Undef    oper = "(unknown)"    def __init__(self, arg1, arg2=Undef, on=Undef):        # http://www.postgresql.org/docs/8.1/interactive/explicit-joins.html        if arg2 is Undef:            self.right = arg1            if on is not Undef:                self.on = on        elif not isinstance(arg2, Expr) or isinstance(arg2, (FromExpr, Alias)):            self.left = arg1            self.right = arg2            if on is not Undef:                self.on = on        else:            self.right = arg1            self.on = arg2            if on is not Undef:                raise ExprError("Improper join arguments: (%r, %r, %r)" %                                (arg1, arg2, on))@compile.when(JoinExpr)def compile_join(compile, state, expr):    result = []    # Ensure that nested JOINs get parentheses.    state.precedence += 0.5    if expr.left is not Undef:        statement = compile(state, expr.left, raw=True)        result.append(statement)        if state.join_tables is not None:            state.join_tables.add(statement)    result.append(expr.oper)    statement = compile(state, expr.right, raw=True)    result.append(statement)    if state.join_tables is not None:        state.join_tables.add(statement)    if expr.on is not Undef:        state.push("context", EXPR)        result.append("ON")        result.append(compile(state, expr.on, raw=True))        state.pop()    return " ".join(result)class Join(JoinExpr):    oper = "JOIN"class LeftJoin(JoinExpr):    oper = "LEFT JOIN"class RightJoin(JoinExpr):    oper = "RIGHT JOIN"class NaturalJoin(JoinExpr):    oper = "NATURAL JOIN"class NaturalLeftJoin(JoinExpr):    oper = "NATURAL LEFT JOIN"class NaturalRightJoin(JoinExpr):    oper = "NATURAL RIGHT JOIN"# --------------------------------------------------------------------# Operatorsclass BinaryOper(BinaryExpr):    oper = " (unknown) "@compile.when(BinaryOper)@compile_python.when(BinaryOper)def compile_binary_oper(compile, state, oper):    return "%s%s%s" % (compile(state, oper.expr1), oper.oper,                       compile(state, oper.expr2))

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -