⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 compiler.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 3 页
字号:
# compiler.py# Copyright (C) 2005, 2006, 2007, 2008 Michael Bayer mike_mp@zzzcomputing.com## This module is part of SQLAlchemy and is released under# the MIT License: http://www.opensource.org/licenses/mit-license.php"""Base SQL and DDL compiler implementations.Provides the [sqlalchemy.sql.compiler#DefaultCompiler] class, which isresponsible for generating all SQL query strings, as well as[sqlalchemy.sql.compiler#SchemaGenerator] and [sqlalchemy.sql.compiler#SchemaDropper]which issue CREATE and DROP DDL for tables, sequences, and indexes.The elements in this module are used by public-facing constructs like[sqlalchemy.sql.expression#ClauseElement] and [sqlalchemy.engine#Engine].While dialect authors will want to be familiar with this module for the purpose ofcreating database-specific compilers and schema generators, the moduleis otherwise internal to SQLAlchemy."""import string, refrom sqlalchemy import schema, engine, util, exceptionsfrom sqlalchemy.sql import operators, functionsfrom sqlalchemy.sql import expression as sqlRESERVED_WORDS = util.Set([    'all', 'analyse', 'analyze', 'and', 'any', 'array',    'as', 'asc', 'asymmetric', 'authorization', 'between',    'binary', 'both', 'case', 'cast', 'check', 'collate',    'column', 'constraint', 'create', 'cross', 'current_date',    'current_role', 'current_time', 'current_timestamp',    'current_user', 'default', 'deferrable', 'desc',    'distinct', 'do', 'else', 'end', 'except', 'false',    'for', 'foreign', 'freeze', 'from', 'full', 'grant',    'group', 'having', 'ilike', 'in', 'initially', 'inner',    'intersect', 'into', 'is', 'isnull', 'join', 'leading',    'left', 'like', 'limit', 'localtime', 'localtimestamp',    'natural', 'new', 'not', 'notnull', 'null', 'off', 'offset',    'old', 'on', 'only', 'or', 'order', 'outer', 'overlaps',    'placing', 'primary', 'references', 'right', 'select',    'session_user', 'set', 'similar', 'some', 'symmetric', 'table',    'then', 'to', 'trailing', 'true', 'union', 'unique', 'user',    'using', 'verbose', 'when', 'where'])LEGAL_CHARACTERS = re.compile(r'^[A-Z0-9_$]+$', re.I)ILLEGAL_INITIAL_CHARACTERS = re.compile(r'[0-9$]')BIND_PARAMS = re.compile(r'(?<![:\w\$\x5c]):([\w\$]+)(?![:\w\$])', re.UNICODE)BIND_PARAMS_ESC = re.compile(r'\x5c(:[\w\$]+)(?![:\w\$])', re.UNICODE)ANONYMOUS_LABEL = re.compile(r'{ANON (-?\d+) (.*?)}')BIND_TEMPLATES = {    'pyformat':"%%(%(name)s)s",    'qmark':"?",    'format':"%%s",    'numeric':"%(position)s",    'named':":%(name)s"}OPERATORS =  {    operators.and_ : 'AND',    operators.or_ : 'OR',    operators.inv : 'NOT',    operators.add : '+',    operators.mul : '*',    operators.sub : '-',    operators.div : '/',    operators.mod : '%',    operators.truediv : '/',    operators.lt : '<',    operators.le : '<=',    operators.ne : '!=',    operators.gt : '>',    operators.ge : '>=',    operators.eq : '=',    operators.distinct_op : 'DISTINCT',    operators.concat_op : '||',    operators.like_op : 'LIKE',    operators.notlike_op : 'NOT LIKE',    operators.ilike_op : lambda x, y: "lower(%s) LIKE lower(%s)" % (x, y),    operators.notilike_op : lambda x, y: "lower(%s) NOT LIKE lower(%s)" % (x, y),    operators.between_op : 'BETWEEN',    operators.in_op : 'IN',    operators.notin_op : 'NOT IN',    operators.comma_op : ', ',    operators.desc_op : 'DESC',    operators.asc_op : 'ASC',    operators.from_ : 'FROM',    operators.as_ : 'AS',    operators.exists : 'EXISTS',    operators.is_ : 'IS',    operators.isnot : 'IS NOT'}FUNCTIONS = {    functions.coalesce : 'coalesce%(expr)s',    functions.current_date: 'CURRENT_DATE',    functions.current_time: 'CURRENT_TIME',    functions.current_timestamp: 'CURRENT_TIMESTAMP',    functions.current_user: 'CURRENT_USER',    functions.localtime: 'LOCALTIME',    functions.localtimestamp: 'LOCALTIMESTAMP',    functions.sysdate: 'sysdate',    functions.session_user :'SESSION_USER',    functions.user: 'USER'}class DefaultCompiler(engine.Compiled):    """Default implementation of Compiled.    Compiles ClauseElements into SQL strings.   Uses a similar visit    paradigm as visitors.ClauseVisitor but implements its own traversal.    """    __traverse_options__ = {'column_collections':False, 'entry':True}    operators = OPERATORS    functions = FUNCTIONS    def __init__(self, dialect, statement, column_keys=None, inline=False, **kwargs):        """Construct a new ``DefaultCompiler`` object.        dialect          Dialect to be used        statement          ClauseElement to be compiled        column_keys          a list of column names to be compiled into an INSERT or UPDATE          statement.        """        super(DefaultCompiler, self).__init__(dialect, statement, column_keys, **kwargs)        # if we are insert/update/delete.  set to true when we visit an INSERT, UPDATE or DELETE        self.isdelete = self.isinsert = self.isupdate = False        # compile INSERT/UPDATE defaults/sequences inlined (no pre-execute)        self.inline = inline or getattr(statement, 'inline', False)        # a dictionary of bind parameter keys to _BindParamClause instances.        self.binds = {}        # a dictionary of _BindParamClause instances to "compiled" names that are        # actually present in the generated SQL        self.bind_names = {}        # a stack.  what recursive compiler doesn't have a stack ? :)        self.stack = []        # relates label names in the final SQL to        # a tuple of local column/label name, ColumnElement object (if any) and TypeEngine.        # ResultProxy uses this for type processing and column targeting        self.result_map = {}        # a dictionary of ClauseElement subclasses to counters, which are used to        # generate truncated identifier names or "anonymous" identifiers such as        # for aliases        self.generated_ids = {}        # paramstyle from the dialect (comes from DB-API)        self.paramstyle = self.dialect.paramstyle        # true if the paramstyle is positional        self.positional = self.dialect.positional        self.bindtemplate = BIND_TEMPLATES[self.paramstyle]        # a list of the compiled's bind parameter names, used to help        # formulate a positional argument list        self.positiontup = []        # an IdentifierPreparer that formats the quoting of identifiers        self.preparer = self.dialect.identifier_preparer    def compile(self):        self.string = self.process(self.statement)    def process(self, obj, stack=None, **kwargs):        if stack:            self.stack.append(stack)        try:            meth = getattr(self, "visit_%s" % obj.__visit_name__, None)            if meth:                return meth(obj, **kwargs)        finally:            if stack:                self.stack.pop(-1)    def is_subquery(self, select):        return self.stack and self.stack[-1].get('is_subquery')    def get_whereclause(self, obj):        """given a FROM clause, return an additional WHERE condition that should be        applied to a SELECT.        Currently used by Oracle to provide WHERE criterion for JOIN and OUTER JOIN        constructs in non-ansi mode.        """        return None    def construct_params(self, params=None):        """return a dictionary of bind parameter keys and values"""        if params:            pd = {}            for bindparam, name in self.bind_names.iteritems():                for paramname in (bindparam, bindparam.key, bindparam.shortname, name):                    if paramname in params:                        pd[name] = params[paramname]                        break                else:                    pd[name] = bindparam.value            return pd        else:            return dict([(self.bind_names[bindparam], bindparam.value) for bindparam in self.bind_names])    params = property(construct_params)    def default_from(self):        """Called when a SELECT statement has no froms, and no FROM clause is to be appended.        Gives Oracle a chance to tack on a ``FROM DUAL`` to the string output.        """        return ""    def visit_grouping(self, grouping, **kwargs):        return "(" + self.process(grouping.elem) + ")"    def visit_label(self, label, result_map=None):        labelname = self._truncated_identifier("colident", label.name)        if result_map is not None:            result_map[labelname.lower()] = (label.name, (label, label.obj, labelname), label.obj.type)        return " ".join([self.process(label.obj), self.operator_string(operators.as_), self.preparer.format_label(label, labelname)])    def visit_column(self, column, result_map=None, use_schema=False, **kwargs):        # there is actually somewhat of a ruleset when you would *not* necessarily        # want to truncate a column identifier, if its mapped to the name of a        # physical column.  but thats very hard to identify at this point, and        # the identifier length should be greater than the id lengths of any physical        # columns so should not matter.        if use_schema and getattr(column, 'table', None) and getattr(column.table, 'schema', None):            schema_prefix = self.preparer.quote(column.table, column.table.schema) + '.'        else:            schema_prefix = ''        if not column.is_literal:            name = self._truncated_identifier("colident", column.name)        else:            name = column.name        if result_map is not None:            result_map[name.lower()] = (name, (column, ), column.type)        if column._is_oid:            n = self.dialect.oid_column_name(column)            if n is not None:                if column.table is None or not column.table.named_with_column:                    return n                else:                    return schema_prefix + self.preparer.quote(column.table, ANONYMOUS_LABEL.sub(self._process_anon, column.table.name)) + "." + n            elif len(column.table.primary_key) != 0:                pk = list(column.table.primary_key)[0]                return self.visit_column(pk, result_map=result_map, use_schema=use_schema, **kwargs)            else:                return None        elif column.table is None or not column.table.named_with_column:            if getattr(column, "is_literal", False):                return self.escape_literal_column(name)            else:                return self.preparer.quote(column, name)        else:            if getattr(column, "is_literal", False):                return schema_prefix + self.preparer.quote(column.table, ANONYMOUS_LABEL.sub(self._process_anon, column.table.name)) + "." + self.escape_literal_column(name)            else:                return schema_prefix + self.preparer.quote(column.table, ANONYMOUS_LABEL.sub(self._process_anon, column.table.name)) + "." + self.preparer.quote(column, name)    def escape_literal_column(self, text):        """provide escaping for the literal_column() construct."""        # TODO: some dialects might need different behavior here        return text.replace('%', '%%')    def visit_fromclause(self, fromclause, **kwargs):        return fromclause.name    def visit_index(self, index, **kwargs):        return index.name    def visit_typeclause(self, typeclause, **kwargs):        return typeclause.type.dialect_impl(self.dialect).get_col_spec()    def visit_textclause(self, textclause, **kwargs):        if textclause.typemap is not None:            for colname, type_ in textclause.typemap.iteritems():                self.result_map[colname.lower()] = (colname, None, type_)        def do_bindparam(m):            name = m.group(1)            if name in textclause.bindparams:                return self.process(textclause.bindparams[name])            else:                return self.bindparam_string(name)        # un-escape any \:params        return BIND_PARAMS_ESC.sub(lambda m: m.group(1),            BIND_PARAMS.sub(do_bindparam, textclause.text)        )    def visit_null(self, null, **kwargs):        return 'NULL'    def visit_clauselist(self, clauselist, **kwargs):        sep = clauselist.operator        if sep is None:            sep = " "        elif sep == operators.comma_op:            sep = ', '        else:            sep = " " + self.operator_string(clauselist.operator) + " "        return sep.join([s for s in [self.process(c) for c in clauselist.clauses] if s is not None])    def visit_calculatedclause(self, clause, **kwargs):        return self.process(clause.clause_expr)    def visit_cast(self, cast, **kwargs):        return "CAST(%s AS %s)" % (self.process(cast.clause), self.process(cast.typeclause))    def visit_function(self, func, result_map=None, **kwargs):        if result_map is not None:            result_map[func.name.lower()] = (func.name, None, func.type)        name = self.function_string(func)        if callable(name):            return name(*[self.process(x) for x in func.clause_expr])        else:            return ".".join(func.packagenames + [name]) % {'expr':self.function_argspec(func)}    def function_argspec(self, func):        return self.process(func.clause_expr)    def function_string(self, func):        return self.functions.get(func.__class__, func.name + "%(expr)s")    def visit_compound_select(self, cs, asfrom=False, parens=True, **kwargs):        stack_entry = {'select':cs}        if asfrom:            stack_entry['is_subquery'] = True        elif self.stack and self.stack[-1].get('select'):            stack_entry['is_subquery'] = True        self.stack.append(stack_entry)        text = string.join([self.process(c, asfrom=asfrom, parens=False) for c in cs.selects], " " + cs.keyword + " ")        group_by = self.process(cs._group_by_clause, asfrom=asfrom)        if group_by:            text += " GROUP BY " + group_by        text += self.order_by_clause(cs)        text += (cs._limit or cs._offset) and self.limit_clause(cs) or ""        self.stack.pop(-1)        if asfrom and parens:            return "(" + text + ")"        else:            return text

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -