⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 compiler.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 3 页
字号:
            def visit_foreign_key_constraint(self, constraint):                if constraint.use_alter and constraint.table in tables:                    alterables.append(constraint)        findalterables = FindAlterables()        for table in tables:            for c in table.constraints:                findalterables.traverse(c)        return alterablesclass SchemaGenerator(DDLBase):    def __init__(self, dialect, connection, checkfirst=False, tables=None, **kwargs):        super(SchemaGenerator, self).__init__(connection, **kwargs)        self.checkfirst = checkfirst        self.tables = tables and util.Set(tables) or None        self.preparer = dialect.identifier_preparer        self.dialect = dialect    def get_column_specification(self, column, first_pk=False):        raise NotImplementedError()    def visit_metadata(self, metadata):        collection = [t for t in metadata.table_iterator(reverse=False, tables=self.tables) if (not self.checkfirst or not self.dialect.has_table(self.connection, t.name, schema=t.schema))]        for table in collection:            self.traverse_single(table)        if self.dialect.supports_alter:            for alterable in self.find_alterables(collection):                self.add_foreignkey(alterable)    def visit_table(self, table):        for listener in table.ddl_listeners['before-create']:            listener('before-create', table, self.connection)        for column in table.columns:            if column.default is not None:                self.traverse_single(column.default)        self.append("\nCREATE TABLE " + self.preparer.format_table(table) + " (")        separator = "\n"        # if only one primary key, specify it along with the column        first_pk = False        for column in table.columns:            self.append(separator)            separator = ", \n"            self.append("\t" + self.get_column_specification(column, first_pk=column.primary_key and not first_pk))            if column.primary_key:                first_pk = True            for constraint in column.constraints:                self.traverse_single(constraint)        # On some DB order is significant: visit PK first, then the        # other constraints (engine.ReflectionTest.testbasic failed on FB2)        if table.primary_key:            self.traverse_single(table.primary_key)        for constraint in [c for c in table.constraints if c is not table.primary_key]:            self.traverse_single(constraint)        self.append("\n)%s\n\n" % self.post_create_table(table))        self.execute()        if hasattr(table, 'indexes'):            for index in table.indexes:                self.traverse_single(index)        for listener in table.ddl_listeners['after-create']:            listener('after-create', table, self.connection)    def post_create_table(self, table):        return ''    def get_column_default_string(self, column):        if isinstance(column.default, schema.PassiveDefault):            if isinstance(column.default.arg, basestring):                return "'%s'" % column.default.arg            else:                return unicode(self._compile(column.default.arg, None))        else:            return None    def _compile(self, tocompile, parameters):        """compile the given string/parameters using this SchemaGenerator's dialect."""        compiler = self.dialect.statement_compiler(self.dialect, tocompile, parameters)        compiler.compile()        return compiler    def visit_check_constraint(self, constraint):        self.append(", \n\t")        if constraint.name is not None:            self.append("CONSTRAINT %s " %                        self.preparer.format_constraint(constraint))        self.append(" CHECK (%s)" % constraint.sqltext)        self.define_constraint_deferrability(constraint)    def visit_column_check_constraint(self, constraint):        self.append(" CHECK (%s)" % constraint.sqltext)        self.define_constraint_deferrability(constraint)    def visit_primary_key_constraint(self, constraint):        if len(constraint) == 0:            return        self.append(", \n\t")        if constraint.name is not None:            self.append("CONSTRAINT %s " % self.preparer.format_constraint(constraint))        self.append("PRIMARY KEY ")        self.append("(%s)" % ', '.join([self.preparer.quote(c, c.name) for c in constraint]))        self.define_constraint_deferrability(constraint)    def visit_foreign_key_constraint(self, constraint):        if constraint.use_alter and self.dialect.supports_alter:            return        self.append(", \n\t ")        self.define_foreign_key(constraint)    def add_foreignkey(self, constraint):        self.append("ALTER TABLE %s ADD " % self.preparer.format_table(constraint.table))        self.define_foreign_key(constraint)        self.execute()    def define_foreign_key(self, constraint):        preparer = self.preparer        if constraint.name is not None:            self.append("CONSTRAINT %s " %                        preparer.format_constraint(constraint))        table = list(constraint.elements)[0].column.table        self.append("FOREIGN KEY(%s) REFERENCES %s (%s)" % (            ', '.join([preparer.quote(f.parent, f.parent.name) for f in constraint.elements]),            preparer.format_table(table),            ', '.join([preparer.quote(f.column, f.column.name) for f in constraint.elements])        ))        if constraint.ondelete is not None:            self.append(" ON DELETE %s" % constraint.ondelete)        if constraint.onupdate is not None:            self.append(" ON UPDATE %s" % constraint.onupdate)        self.define_constraint_deferrability(constraint)    def visit_unique_constraint(self, constraint):        self.append(", \n\t")        if constraint.name is not None:            self.append("CONSTRAINT %s " %                        self.preparer.format_constraint(constraint))        self.append(" UNIQUE (%s)" % (', '.join([self.preparer.quote(c, c.name) for c in constraint])))        self.define_constraint_deferrability(constraint)    def define_constraint_deferrability(self, constraint):        if constraint.deferrable is not None:            if constraint.deferrable:                self.append(" DEFERRABLE")            else:                self.append(" NOT DEFERRABLE")        if constraint.initially is not None:            self.append(" INITIALLY %s" % constraint.initially)    def visit_column(self, column):        pass    def visit_index(self, index):        preparer = self.preparer        self.append("CREATE ")        if index.unique:            self.append("UNIQUE ")        self.append("INDEX %s ON %s (%s)" \                    % (preparer.format_index(index),                       preparer.format_table(index.table),                       string.join([preparer.quote(c, c.name) for c in index.columns], ', ')))        self.execute()class SchemaDropper(DDLBase):    def __init__(self, dialect, connection, checkfirst=False, tables=None, **kwargs):        super(SchemaDropper, self).__init__(connection, **kwargs)        self.checkfirst = checkfirst        self.tables = tables        self.preparer = dialect.identifier_preparer        self.dialect = dialect    def visit_metadata(self, metadata):        collection = [t for t in metadata.table_iterator(reverse=True, tables=self.tables) if (not self.checkfirst or  self.dialect.has_table(self.connection, t.name, schema=t.schema))]        if self.dialect.supports_alter:            for alterable in self.find_alterables(collection):                self.drop_foreignkey(alterable)        for table in collection:            self.traverse_single(table)    def visit_index(self, index):        self.append("\nDROP INDEX " + self.preparer.format_index(index))        self.execute()    def drop_foreignkey(self, constraint):        self.append("ALTER TABLE %s DROP CONSTRAINT %s" % (            self.preparer.format_table(constraint.table),            self.preparer.format_constraint(constraint)))        self.execute()    def visit_table(self, table):        for listener in table.ddl_listeners['before-drop']:            listener('before-drop', table, self.connection)        for column in table.columns:            if column.default is not None:                self.traverse_single(column.default)        self.append("\nDROP TABLE " + self.preparer.format_table(table))        self.execute()        for listener in table.ddl_listeners['after-drop']:            listener('after-drop', table, self.connection)class IdentifierPreparer(object):    """Handle quoting and case-folding of identifiers based on options."""    reserved_words = RESERVED_WORDS    legal_characters = LEGAL_CHARACTERS    illegal_initial_characters = ILLEGAL_INITIAL_CHARACTERS    def __init__(self, dialect, initial_quote='"', final_quote=None, omit_schema=False):        """Construct a new ``IdentifierPreparer`` object.        initial_quote          Character that begins a delimited identifier.        final_quote          Character that ends a delimited identifier. Defaults to `initial_quote`.        omit_schema          Prevent prepending schema name. Useful for databases that do          not support schemae.        """        self.dialect = dialect        self.initial_quote = initial_quote        self.final_quote = final_quote or self.initial_quote        self.omit_schema = omit_schema        self.__strings = {}    def _escape_identifier(self, value):        """Escape an identifier.        Subclasses should override this to provide database-dependent        escaping behavior.        """        return value.replace('"', '""')    def _unescape_identifier(self, value):        """Canonicalize an escaped identifier.        Subclasses should override this to provide database-dependent        unescaping behavior that reverses _escape_identifier.        """        return value.replace('""', '"')    def quote_identifier(self, value):        """Quote an identifier.        Subclasses should override this to provide database-dependent        quoting behavior.        """        return self.initial_quote + self._escape_identifier(value) + self.final_quote    def _requires_quotes(self, value):        """Return True if the given identifier requires quoting."""        lc_value = value.lower()        return (lc_value in self.reserved_words                or self.illegal_initial_characters.match(value[0])                or not self.legal_characters.match(unicode(value))                or (lc_value != value))    def quote(self, obj, ident):        if getattr(obj, 'quote', False):            return self.quote_identifier(ident)        if ident in self.__strings:            return self.__strings[ident]        else:            if self._requires_quotes(ident):                self.__strings[ident] = self.quote_identifier(ident)            else:                self.__strings[ident] = ident            return self.__strings[ident]    def should_quote(self, object):        return object.quote or self._requires_quotes(object.name)    def format_sequence(self, sequence, use_schema=True):        name = self.quote(sequence, sequence.name)        if not self.omit_schema and use_schema and sequence.schema is not None:            name = self.quote(sequence, sequence.schema) + "." + name        return name    def format_label(self, label, name=None):        return self.quote(label, name or label.name)    def format_alias(self, alias, name=None):        return self.quote(alias, name or alias.name)    def format_savepoint(self, savepoint, name=None):        return self.quote(savepoint, name or savepoint.ident)    def format_constraint(self, constraint):        return self.quote(constraint, constraint.name)    def format_index(self, index):        return self.quote(index, index.name)    def format_table(self, table, use_schema=True, name=None):        """Prepare a quoted table and schema name."""        if name is None:            name = table.name        result = self.quote(table, name)        if not self.omit_schema and use_schema and getattr(table, "schema", None):            result = self.quote(table, table.schema) + "." + result        return result    def format_column(self, column, use_table=False, name=None, table_name=None):        """Prepare a quoted column name.        deprecated.  use preparer.quote(col, column.name) or combine with format_table()        """        if name is None:            name = column.name        if not getattr(column, 'is_literal', False):            if use_table:                return self.format_table(column.table, use_schema=False, name=table_name) + "." + self.quote(column, name)            else:                return self.quote(column, name)        else:            # literal textual elements get stuck into ColumnClause alot, which shouldnt get quoted            if use_table:                return self.format_table(column.table, use_schema=False, name=table_name) + "." + name            else:                return name    def format_table_seq(self, table, use_schema=True):        """Format table name and schema as a tuple."""        # Dialects with more levels in their fully qualified references        # ('database', 'owner', etc.) could override this and return        # a longer sequence.        if not self.omit_schema and use_schema and getattr(table, 'schema', None):            return (self.quote_identifier(table.schema),                    self.format_table(table, use_schema=False))        else:            return (self.format_table(table, use_schema=False), )    def unformat_identifiers(self, identifiers):        """Unpack 'schema.table.column'-like strings into components."""        try:            r = self._r_identifiers        except AttributeError:            initial, final, escaped_final = \                     [re.escape(s) for s in                      (self.initial_quote, self.final_quote,                       self._escape_identifier(self.final_quote))]            r = re.compile(                r'(?:'                r'(?:%(initial)s((?:%(escaped)s|[^%(final)s])+)%(final)s'                r'|([^\.]+))(?=\.|$))+' %                { 'initial': initial,                  'final': final,                  'escaped': escaped_final })            self._r_identifiers = r        return [self._unescape_identifier(i)                for i in [a or b for a, b in r.findall(identifiers)]]

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -