📄 compiler.py
字号:
# compiler.py# Copyright (C) 2005, 2006, 2007, 2008 Michael Bayer mike_mp@zzzcomputing.com## This module is part of SQLAlchemy and is released under# the MIT License: http://www.opensource.org/licenses/mit-license.php"""Base SQL and DDL compiler implementations.Provides the [sqlalchemy.sql.compiler#DefaultCompiler] class, which isresponsible for generating all SQL query strings, as well as[sqlalchemy.sql.compiler#SchemaGenerator] and [sqlalchemy.sql.compiler#SchemaDropper]which issue CREATE and DROP DDL for tables, sequences, and indexes.The elements in this module are used by public-facing constructs like[sqlalchemy.sql.expression#ClauseElement] and [sqlalchemy.engine#Engine].While dialect authors will want to be familiar with this module for the purpose ofcreating database-specific compilers and schema generators, the moduleis otherwise internal to SQLAlchemy."""import string, refrom sqlalchemy import schema, engine, util, exceptionsfrom sqlalchemy.sql import operators, functionsfrom sqlalchemy.sql import expression as sqlRESERVED_WORDS = util.Set([ 'all', 'analyse', 'analyze', 'and', 'any', 'array', 'as', 'asc', 'asymmetric', 'authorization', 'between', 'binary', 'both', 'case', 'cast', 'check', 'collate', 'column', 'constraint', 'create', 'cross', 'current_date', 'current_role', 'current_time', 'current_timestamp', 'current_user', 'default', 'deferrable', 'desc', 'distinct', 'do', 'else', 'end', 'except', 'false', 'for', 'foreign', 'freeze', 'from', 'full', 'grant', 'group', 'having', 'ilike', 'in', 'initially', 'inner', 'intersect', 'into', 'is', 'isnull', 'join', 'leading', 'left', 'like', 'limit', 'localtime', 'localtimestamp', 'natural', 'new', 'not', 'notnull', 'null', 'off', 'offset', 'old', 'on', 'only', 'or', 'order', 'outer', 'overlaps', 'placing', 'primary', 'references', 'right', 'select', 'session_user', 'set', 'similar', 'some', 'symmetric', 'table', 'then', 'to', 'trailing', 'true', 'union', 'unique', 'user', 'using', 'verbose', 'when', 'where'])LEGAL_CHARACTERS = re.compile(r'^[A-Z0-9_$]+$', re.I)ILLEGAL_INITIAL_CHARACTERS = re.compile(r'[0-9$]')BIND_PARAMS = re.compile(r'(?<![:\w\$\x5c]):([\w\$]+)(?![:\w\$])', re.UNICODE)BIND_PARAMS_ESC = re.compile(r'\x5c(:[\w\$]+)(?![:\w\$])', re.UNICODE)ANONYMOUS_LABEL = re.compile(r'{ANON (-?\d+) (.*?)}')BIND_TEMPLATES = { 'pyformat':"%%(%(name)s)s", 'qmark':"?", 'format':"%%s", 'numeric':"%(position)s", 'named':":%(name)s"}OPERATORS = { operators.and_ : 'AND', operators.or_ : 'OR', operators.inv : 'NOT', operators.add : '+', operators.mul : '*', operators.sub : '-', operators.div : '/', operators.mod : '%', operators.truediv : '/', operators.lt : '<', operators.le : '<=', operators.ne : '!=', operators.gt : '>', operators.ge : '>=', operators.eq : '=', operators.distinct_op : 'DISTINCT', operators.concat_op : '||', operators.like_op : 'LIKE', operators.notlike_op : 'NOT LIKE', operators.ilike_op : lambda x, y: "lower(%s) LIKE lower(%s)" % (x, y), operators.notilike_op : lambda x, y: "lower(%s) NOT LIKE lower(%s)" % (x, y), operators.between_op : 'BETWEEN', operators.in_op : 'IN', operators.notin_op : 'NOT IN', operators.comma_op : ', ', operators.desc_op : 'DESC', operators.asc_op : 'ASC', operators.from_ : 'FROM', operators.as_ : 'AS', operators.exists : 'EXISTS', operators.is_ : 'IS', operators.isnot : 'IS NOT'}FUNCTIONS = { functions.coalesce : 'coalesce%(expr)s', functions.current_date: 'CURRENT_DATE', functions.current_time: 'CURRENT_TIME', functions.current_timestamp: 'CURRENT_TIMESTAMP', functions.current_user: 'CURRENT_USER', functions.localtime: 'LOCALTIME', functions.localtimestamp: 'LOCALTIMESTAMP', functions.sysdate: 'sysdate', functions.session_user :'SESSION_USER', functions.user: 'USER'}class DefaultCompiler(engine.Compiled): """Default implementation of Compiled. Compiles ClauseElements into SQL strings. Uses a similar visit paradigm as visitors.ClauseVisitor but implements its own traversal. """ __traverse_options__ = {'column_collections':False, 'entry':True} operators = OPERATORS functions = FUNCTIONS def __init__(self, dialect, statement, column_keys=None, inline=False, **kwargs): """Construct a new ``DefaultCompiler`` object. dialect Dialect to be used statement ClauseElement to be compiled column_keys a list of column names to be compiled into an INSERT or UPDATE statement. """ super(DefaultCompiler, self).__init__(dialect, statement, column_keys, **kwargs) # if we are insert/update/delete. set to true when we visit an INSERT, UPDATE or DELETE self.isdelete = self.isinsert = self.isupdate = False # compile INSERT/UPDATE defaults/sequences inlined (no pre-execute) self.inline = inline or getattr(statement, 'inline', False) # a dictionary of bind parameter keys to _BindParamClause instances. self.binds = {} # a dictionary of _BindParamClause instances to "compiled" names that are # actually present in the generated SQL self.bind_names = {} # a stack. what recursive compiler doesn't have a stack ? :) self.stack = [] # relates label names in the final SQL to # a tuple of local column/label name, ColumnElement object (if any) and TypeEngine. # ResultProxy uses this for type processing and column targeting self.result_map = {} # a dictionary of ClauseElement subclasses to counters, which are used to # generate truncated identifier names or "anonymous" identifiers such as # for aliases self.generated_ids = {} # paramstyle from the dialect (comes from DB-API) self.paramstyle = self.dialect.paramstyle # true if the paramstyle is positional self.positional = self.dialect.positional self.bindtemplate = BIND_TEMPLATES[self.paramstyle] # a list of the compiled's bind parameter names, used to help # formulate a positional argument list self.positiontup = [] # an IdentifierPreparer that formats the quoting of identifiers self.preparer = self.dialect.identifier_preparer def compile(self): self.string = self.process(self.statement) def process(self, obj, stack=None, **kwargs): if stack: self.stack.append(stack) try: meth = getattr(self, "visit_%s" % obj.__visit_name__, None) if meth: return meth(obj, **kwargs) finally: if stack: self.stack.pop(-1) def is_subquery(self, select): return self.stack and self.stack[-1].get('is_subquery') def get_whereclause(self, obj): """given a FROM clause, return an additional WHERE condition that should be applied to a SELECT. Currently used by Oracle to provide WHERE criterion for JOIN and OUTER JOIN constructs in non-ansi mode. """ return None def construct_params(self, params=None): """return a dictionary of bind parameter keys and values""" if params: pd = {} for bindparam, name in self.bind_names.iteritems(): for paramname in (bindparam, bindparam.key, bindparam.shortname, name): if paramname in params: pd[name] = params[paramname] break else: pd[name] = bindparam.value return pd else: return dict([(self.bind_names[bindparam], bindparam.value) for bindparam in self.bind_names]) params = property(construct_params) def default_from(self): """Called when a SELECT statement has no froms, and no FROM clause is to be appended. Gives Oracle a chance to tack on a ``FROM DUAL`` to the string output. """ return "" def visit_grouping(self, grouping, **kwargs): return "(" + self.process(grouping.elem) + ")" def visit_label(self, label, result_map=None): labelname = self._truncated_identifier("colident", label.name) if result_map is not None: result_map[labelname.lower()] = (label.name, (label, label.obj, labelname), label.obj.type) return " ".join([self.process(label.obj), self.operator_string(operators.as_), self.preparer.format_label(label, labelname)]) def visit_column(self, column, result_map=None, use_schema=False, **kwargs): # there is actually somewhat of a ruleset when you would *not* necessarily # want to truncate a column identifier, if its mapped to the name of a # physical column. but thats very hard to identify at this point, and # the identifier length should be greater than the id lengths of any physical # columns so should not matter. if use_schema and getattr(column, 'table', None) and getattr(column.table, 'schema', None): schema_prefix = self.preparer.quote(column.table, column.table.schema) + '.' else: schema_prefix = '' if not column.is_literal: name = self._truncated_identifier("colident", column.name) else: name = column.name if result_map is not None: result_map[name.lower()] = (name, (column, ), column.type) if column._is_oid: n = self.dialect.oid_column_name(column) if n is not None: if column.table is None or not column.table.named_with_column: return n else: return schema_prefix + self.preparer.quote(column.table, ANONYMOUS_LABEL.sub(self._process_anon, column.table.name)) + "." + n elif len(column.table.primary_key) != 0: pk = list(column.table.primary_key)[0] return self.visit_column(pk, result_map=result_map, use_schema=use_schema, **kwargs) else: return None elif column.table is None or not column.table.named_with_column: if getattr(column, "is_literal", False): return self.escape_literal_column(name) else: return self.preparer.quote(column, name) else: if getattr(column, "is_literal", False): return schema_prefix + self.preparer.quote(column.table, ANONYMOUS_LABEL.sub(self._process_anon, column.table.name)) + "." + self.escape_literal_column(name) else: return schema_prefix + self.preparer.quote(column.table, ANONYMOUS_LABEL.sub(self._process_anon, column.table.name)) + "." + self.preparer.quote(column, name) def escape_literal_column(self, text): """provide escaping for the literal_column() construct.""" # TODO: some dialects might need different behavior here return text.replace('%', '%%') def visit_fromclause(self, fromclause, **kwargs): return fromclause.name def visit_index(self, index, **kwargs): return index.name def visit_typeclause(self, typeclause, **kwargs): return typeclause.type.dialect_impl(self.dialect).get_col_spec() def visit_textclause(self, textclause, **kwargs): if textclause.typemap is not None: for colname, type_ in textclause.typemap.iteritems(): self.result_map[colname.lower()] = (colname, None, type_) def do_bindparam(m): name = m.group(1) if name in textclause.bindparams: return self.process(textclause.bindparams[name]) else: return self.bindparam_string(name) # un-escape any \:params return BIND_PARAMS_ESC.sub(lambda m: m.group(1), BIND_PARAMS.sub(do_bindparam, textclause.text) ) def visit_null(self, null, **kwargs): return 'NULL' def visit_clauselist(self, clauselist, **kwargs): sep = clauselist.operator if sep is None: sep = " " elif sep == operators.comma_op: sep = ', ' else: sep = " " + self.operator_string(clauselist.operator) + " " return sep.join([s for s in [self.process(c) for c in clauselist.clauses] if s is not None]) def visit_calculatedclause(self, clause, **kwargs): return self.process(clause.clause_expr) def visit_cast(self, cast, **kwargs): return "CAST(%s AS %s)" % (self.process(cast.clause), self.process(cast.typeclause)) def visit_function(self, func, result_map=None, **kwargs): if result_map is not None: result_map[func.name.lower()] = (func.name, None, func.type) name = self.function_string(func) if callable(name): return name(*[self.process(x) for x in func.clause_expr]) else: return ".".join(func.packagenames + [name]) % {'expr':self.function_argspec(func)} def function_argspec(self, func): return self.process(func.clause_expr) def function_string(self, func): return self.functions.get(func.__class__, func.name + "%(expr)s") def visit_compound_select(self, cs, asfrom=False, parens=True, **kwargs): stack_entry = {'select':cs} if asfrom: stack_entry['is_subquery'] = True elif self.stack and self.stack[-1].get('select'): stack_entry['is_subquery'] = True self.stack.append(stack_entry) text = string.join([self.process(c, asfrom=asfrom, parens=False) for c in cs.selects], " " + cs.keyword + " ") group_by = self.process(cs._group_by_clause, asfrom=asfrom) if group_by: text += " GROUP BY " + group_by text += self.order_by_clause(cs) text += (cs._limit or cs._offset) and self.limit_clause(cs) or "" self.stack.pop(-1) if asfrom and parens: return "(" + text + ")" else: return text
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -