📄 compiler.py
字号:
def visit_foreign_key_constraint(self, constraint): if constraint.use_alter and constraint.table in tables: alterables.append(constraint) findalterables = FindAlterables() for table in tables: for c in table.constraints: findalterables.traverse(c) return alterablesclass SchemaGenerator(DDLBase): def __init__(self, dialect, connection, checkfirst=False, tables=None, **kwargs): super(SchemaGenerator, self).__init__(connection, **kwargs) self.checkfirst = checkfirst self.tables = tables and util.Set(tables) or None self.preparer = dialect.identifier_preparer self.dialect = dialect def get_column_specification(self, column, first_pk=False): raise NotImplementedError() def visit_metadata(self, metadata): collection = [t for t in metadata.table_iterator(reverse=False, tables=self.tables) if (not self.checkfirst or not self.dialect.has_table(self.connection, t.name, schema=t.schema))] for table in collection: self.traverse_single(table) if self.dialect.supports_alter: for alterable in self.find_alterables(collection): self.add_foreignkey(alterable) def visit_table(self, table): for listener in table.ddl_listeners['before-create']: listener('before-create', table, self.connection) for column in table.columns: if column.default is not None: self.traverse_single(column.default) self.append("\nCREATE TABLE " + self.preparer.format_table(table) + " (") separator = "\n" # if only one primary key, specify it along with the column first_pk = False for column in table.columns: self.append(separator) separator = ", \n" self.append("\t" + self.get_column_specification(column, first_pk=column.primary_key and not first_pk)) if column.primary_key: first_pk = True for constraint in column.constraints: self.traverse_single(constraint) # On some DB order is significant: visit PK first, then the # other constraints (engine.ReflectionTest.testbasic failed on FB2) if table.primary_key: self.traverse_single(table.primary_key) for constraint in [c for c in table.constraints if c is not table.primary_key]: self.traverse_single(constraint) self.append("\n)%s\n\n" % self.post_create_table(table)) self.execute() if hasattr(table, 'indexes'): for index in table.indexes: self.traverse_single(index) for listener in table.ddl_listeners['after-create']: listener('after-create', table, self.connection) def post_create_table(self, table): return '' def get_column_default_string(self, column): if isinstance(column.default, schema.PassiveDefault): if isinstance(column.default.arg, basestring): return "'%s'" % column.default.arg else: return unicode(self._compile(column.default.arg, None)) else: return None def _compile(self, tocompile, parameters): """compile the given string/parameters using this SchemaGenerator's dialect.""" compiler = self.dialect.statement_compiler(self.dialect, tocompile, parameters) compiler.compile() return compiler def visit_check_constraint(self, constraint): self.append(", \n\t") if constraint.name is not None: self.append("CONSTRAINT %s " % self.preparer.format_constraint(constraint)) self.append(" CHECK (%s)" % constraint.sqltext) self.define_constraint_deferrability(constraint) def visit_column_check_constraint(self, constraint): self.append(" CHECK (%s)" % constraint.sqltext) self.define_constraint_deferrability(constraint) def visit_primary_key_constraint(self, constraint): if len(constraint) == 0: return self.append(", \n\t") if constraint.name is not None: self.append("CONSTRAINT %s " % self.preparer.format_constraint(constraint)) self.append("PRIMARY KEY ") self.append("(%s)" % ', '.join([self.preparer.quote(c, c.name) for c in constraint])) self.define_constraint_deferrability(constraint) def visit_foreign_key_constraint(self, constraint): if constraint.use_alter and self.dialect.supports_alter: return self.append(", \n\t ") self.define_foreign_key(constraint) def add_foreignkey(self, constraint): self.append("ALTER TABLE %s ADD " % self.preparer.format_table(constraint.table)) self.define_foreign_key(constraint) self.execute() def define_foreign_key(self, constraint): preparer = self.preparer if constraint.name is not None: self.append("CONSTRAINT %s " % preparer.format_constraint(constraint)) table = list(constraint.elements)[0].column.table self.append("FOREIGN KEY(%s) REFERENCES %s (%s)" % ( ', '.join([preparer.quote(f.parent, f.parent.name) for f in constraint.elements]), preparer.format_table(table), ', '.join([preparer.quote(f.column, f.column.name) for f in constraint.elements]) )) if constraint.ondelete is not None: self.append(" ON DELETE %s" % constraint.ondelete) if constraint.onupdate is not None: self.append(" ON UPDATE %s" % constraint.onupdate) self.define_constraint_deferrability(constraint) def visit_unique_constraint(self, constraint): self.append(", \n\t") if constraint.name is not None: self.append("CONSTRAINT %s " % self.preparer.format_constraint(constraint)) self.append(" UNIQUE (%s)" % (', '.join([self.preparer.quote(c, c.name) for c in constraint]))) self.define_constraint_deferrability(constraint) def define_constraint_deferrability(self, constraint): if constraint.deferrable is not None: if constraint.deferrable: self.append(" DEFERRABLE") else: self.append(" NOT DEFERRABLE") if constraint.initially is not None: self.append(" INITIALLY %s" % constraint.initially) def visit_column(self, column): pass def visit_index(self, index): preparer = self.preparer self.append("CREATE ") if index.unique: self.append("UNIQUE ") self.append("INDEX %s ON %s (%s)" \ % (preparer.format_index(index), preparer.format_table(index.table), string.join([preparer.quote(c, c.name) for c in index.columns], ', '))) self.execute()class SchemaDropper(DDLBase): def __init__(self, dialect, connection, checkfirst=False, tables=None, **kwargs): super(SchemaDropper, self).__init__(connection, **kwargs) self.checkfirst = checkfirst self.tables = tables self.preparer = dialect.identifier_preparer self.dialect = dialect def visit_metadata(self, metadata): collection = [t for t in metadata.table_iterator(reverse=True, tables=self.tables) if (not self.checkfirst or self.dialect.has_table(self.connection, t.name, schema=t.schema))] if self.dialect.supports_alter: for alterable in self.find_alterables(collection): self.drop_foreignkey(alterable) for table in collection: self.traverse_single(table) def visit_index(self, index): self.append("\nDROP INDEX " + self.preparer.format_index(index)) self.execute() def drop_foreignkey(self, constraint): self.append("ALTER TABLE %s DROP CONSTRAINT %s" % ( self.preparer.format_table(constraint.table), self.preparer.format_constraint(constraint))) self.execute() def visit_table(self, table): for listener in table.ddl_listeners['before-drop']: listener('before-drop', table, self.connection) for column in table.columns: if column.default is not None: self.traverse_single(column.default) self.append("\nDROP TABLE " + self.preparer.format_table(table)) self.execute() for listener in table.ddl_listeners['after-drop']: listener('after-drop', table, self.connection)class IdentifierPreparer(object): """Handle quoting and case-folding of identifiers based on options.""" reserved_words = RESERVED_WORDS legal_characters = LEGAL_CHARACTERS illegal_initial_characters = ILLEGAL_INITIAL_CHARACTERS def __init__(self, dialect, initial_quote='"', final_quote=None, omit_schema=False): """Construct a new ``IdentifierPreparer`` object. initial_quote Character that begins a delimited identifier. final_quote Character that ends a delimited identifier. Defaults to `initial_quote`. omit_schema Prevent prepending schema name. Useful for databases that do not support schemae. """ self.dialect = dialect self.initial_quote = initial_quote self.final_quote = final_quote or self.initial_quote self.omit_schema = omit_schema self.__strings = {} def _escape_identifier(self, value): """Escape an identifier. Subclasses should override this to provide database-dependent escaping behavior. """ return value.replace('"', '""') def _unescape_identifier(self, value): """Canonicalize an escaped identifier. Subclasses should override this to provide database-dependent unescaping behavior that reverses _escape_identifier. """ return value.replace('""', '"') def quote_identifier(self, value): """Quote an identifier. Subclasses should override this to provide database-dependent quoting behavior. """ return self.initial_quote + self._escape_identifier(value) + self.final_quote def _requires_quotes(self, value): """Return True if the given identifier requires quoting.""" lc_value = value.lower() return (lc_value in self.reserved_words or self.illegal_initial_characters.match(value[0]) or not self.legal_characters.match(unicode(value)) or (lc_value != value)) def quote(self, obj, ident): if getattr(obj, 'quote', False): return self.quote_identifier(ident) if ident in self.__strings: return self.__strings[ident] else: if self._requires_quotes(ident): self.__strings[ident] = self.quote_identifier(ident) else: self.__strings[ident] = ident return self.__strings[ident] def should_quote(self, object): return object.quote or self._requires_quotes(object.name) def format_sequence(self, sequence, use_schema=True): name = self.quote(sequence, sequence.name) if not self.omit_schema and use_schema and sequence.schema is not None: name = self.quote(sequence, sequence.schema) + "." + name return name def format_label(self, label, name=None): return self.quote(label, name or label.name) def format_alias(self, alias, name=None): return self.quote(alias, name or alias.name) def format_savepoint(self, savepoint, name=None): return self.quote(savepoint, name or savepoint.ident) def format_constraint(self, constraint): return self.quote(constraint, constraint.name) def format_index(self, index): return self.quote(index, index.name) def format_table(self, table, use_schema=True, name=None): """Prepare a quoted table and schema name.""" if name is None: name = table.name result = self.quote(table, name) if not self.omit_schema and use_schema and getattr(table, "schema", None): result = self.quote(table, table.schema) + "." + result return result def format_column(self, column, use_table=False, name=None, table_name=None): """Prepare a quoted column name. deprecated. use preparer.quote(col, column.name) or combine with format_table() """ if name is None: name = column.name if not getattr(column, 'is_literal', False): if use_table: return self.format_table(column.table, use_schema=False, name=table_name) + "." + self.quote(column, name) else: return self.quote(column, name) else: # literal textual elements get stuck into ColumnClause alot, which shouldnt get quoted if use_table: return self.format_table(column.table, use_schema=False, name=table_name) + "." + name else: return name def format_table_seq(self, table, use_schema=True): """Format table name and schema as a tuple.""" # Dialects with more levels in their fully qualified references # ('database', 'owner', etc.) could override this and return # a longer sequence. if not self.omit_schema and use_schema and getattr(table, 'schema', None): return (self.quote_identifier(table.schema), self.format_table(table, use_schema=False)) else: return (self.format_table(table, use_schema=False), ) def unformat_identifiers(self, identifiers): """Unpack 'schema.table.column'-like strings into components.""" try: r = self._r_identifiers except AttributeError: initial, final, escaped_final = \ [re.escape(s) for s in (self.initial_quote, self.final_quote, self._escape_identifier(self.final_quote))] r = re.compile( r'(?:' r'(?:%(initial)s((?:%(escaped)s|[^%(final)s])+)%(final)s' r'|([^\.]+))(?=\.|$))+' % { 'initial': initial, 'final': final, 'escaped': escaped_final }) self._r_identifiers = r return [self._unescape_identifier(i) for i in [a or b for a, b in r.findall(identifiers)]]
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -