⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 postgres.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 3 页
字号:
                if match is not None:                    # the default is related to a Sequence                    sch = table.schema                    if '.' not in match.group(2) and sch is not None:                        # unconditionally quote the schema name.  this could                        # later be enhanced to obey quoting rules / "quote schema"                        default = match.group(1) + ('"%s"' % sch) + '.' + match.group(2) + match.group(3)                colargs.append(schema.PassiveDefault(sql.text(default)))            table.append_column(schema.Column(name, coltype, nullable=nullable, *colargs))        # Primary keys        PK_SQL = """          SELECT attname FROM pg_attribute          WHERE attrelid = (             SELECT indexrelid FROM pg_index i             WHERE i.indrelid = :table             AND i.indisprimary = 't')          ORDER BY attnum        """        t = sql.text(PK_SQL, typemap={'attname':sqltypes.Unicode})        c = connection.execute(t, table=table_oid)        for row in c.fetchall():            pk = row[0]            col = table.c[pk]            table.primary_key.add(col)            if col.default is None:                col.autoincrement=False        # Foreign keys        FK_SQL = """          SELECT conname, pg_catalog.pg_get_constraintdef(oid, true) as condef          FROM  pg_catalog.pg_constraint r          WHERE r.conrelid = :table AND r.contype = 'f'          ORDER BY 1        """        t = sql.text(FK_SQL, typemap={'conname':sqltypes.Unicode, 'condef':sqltypes.Unicode})        c = connection.execute(t, table=table_oid)        for conname, condef in c.fetchall():            m = re.search('FOREIGN KEY \((.*?)\) REFERENCES (?:(.*?)\.)?(.*?)\((.*?)\)', condef).groups()            (constrained_columns, referred_schema, referred_table, referred_columns) = m            constrained_columns = [preparer._unquote_identifier(x) for x in re.split(r'\s*,\s*', constrained_columns)]            if referred_schema:                referred_schema = preparer._unquote_identifier(referred_schema)            elif table.schema is not None and table.schema == self.get_default_schema_name(connection):                # no schema (i.e. its the default schema), and the table we're                # reflecting has the default schema explicit, then use that.                # i.e. try to use the user's conventions                referred_schema = table.schema            referred_table = preparer._unquote_identifier(referred_table)            referred_columns = [preparer._unquote_identifier(x) for x in re.split(r'\s*,\s', referred_columns)]            refspec = []            if referred_schema is not None:                schema.Table(referred_table, table.metadata, autoload=True, schema=referred_schema,                            autoload_with=connection)                for column in referred_columns:                    refspec.append(".".join([referred_schema, referred_table, column]))            else:                schema.Table(referred_table, table.metadata, autoload=True, autoload_with=connection)                for column in referred_columns:                    refspec.append(".".join([referred_table, column]))            table.append_constraint(schema.ForeignKeyConstraint(constrained_columns, refspec, conname))    def _load_domains(self, connection):        ## Load data types for domains:        SQL_DOMAINS = """            SELECT t.typname as "name",                   pg_catalog.format_type(t.typbasetype, t.typtypmod) as "attype",                   not t.typnotnull as "nullable",                   t.typdefault as "default",                   pg_catalog.pg_type_is_visible(t.oid) as "visible",                   n.nspname as "schema"            FROM pg_catalog.pg_type t                 LEFT JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace                 LEFT JOIN pg_catalog.pg_constraint r ON t.oid = r.contypid            WHERE t.typtype = 'd'        """        s = sql.text(SQL_DOMAINS, typemap={'attname':sqltypes.Unicode})        c = connection.execute(s)        domains = {}        for domain in c.fetchall():            ## strip (30) from character varying(30)            attype = re.search('([^\(]+)', domain['attype']).group(1)            if domain['visible']:                # 'visible' just means whether or not the domain is in a                # schema that's on the search path -- or not overriden by                # a schema with higher presedence. If it's not visible,                # it will be prefixed with the schema-name when it's used.                name = domain['name']            else:                name = "%s.%s" % (domain['schema'], domain['name'])            domains[name] = {'attype':attype, 'nullable': domain['nullable'], 'default': domain['default']}        return domainsclass PGCompiler(compiler.DefaultCompiler):    operators = compiler.DefaultCompiler.operators.copy()    operators.update(        {            sql_operators.mod : '%%',            sql_operators.ilike_op: 'ILIKE',            sql_operators.notilike_op: 'NOT ILIKE'        }    )    def visit_sequence(self, seq):        if seq.optional:            return None        else:            return "nextval('%s')" % self.preparer.format_sequence(seq)    def limit_clause(self, select):        text = ""        if select._limit is not None:            text +=  " \n LIMIT " + str(select._limit)        if select._offset is not None:            if select._limit is None:                text += " \n LIMIT ALL"            text += " OFFSET " + str(select._offset)        return text    def get_select_precolumns(self, select):        if select._distinct:            if isinstance(select._distinct, bool):                return "DISTINCT "            elif isinstance(select._distinct, (list, tuple)):                return "DISTINCT ON (" + ', '.join(                    [(isinstance(col, basestring) and col or self.process(col)) for col in select._distinct]                )+ ") "            else:                return "DISTINCT ON (" + unicode(select._distinct) + ") "        else:            return ""    def for_update_clause(self, select):        if select.for_update == 'nowait':            return " FOR UPDATE NOWAIT"        else:            return super(PGCompiler, self).for_update_clause(select)    def _append_returning(self, text, stmt):        returning_cols = stmt.kwargs['postgres_returning']        def flatten_columnlist(collist):            for c in collist:                if isinstance(c, expression.Selectable):                    for co in c.columns:                        yield co                else:                    yield c        columns = [self.process(c) for c in flatten_columnlist(returning_cols)]        text += ' RETURNING ' + string.join(columns, ', ')        return text    def visit_update(self, update_stmt):        text = super(PGCompiler, self).visit_update(update_stmt)        if 'postgres_returning' in update_stmt.kwargs:            return self._append_returning(text, update_stmt)        else:            return text    def visit_insert(self, insert_stmt):        text = super(PGCompiler, self).visit_insert(insert_stmt)        if 'postgres_returning' in insert_stmt.kwargs:            return self._append_returning(text, insert_stmt)        else:            return textclass PGSchemaGenerator(compiler.SchemaGenerator):    def get_column_specification(self, column, **kwargs):        colspec = self.preparer.format_column(column)        if column.primary_key and len(column.foreign_keys)==0 and column.autoincrement and isinstance(column.type, sqltypes.Integer) and not isinstance(column.type, sqltypes.SmallInteger) and (column.default is None or (isinstance(column.default, schema.Sequence) and column.default.optional)):            if isinstance(column.type, PGBigInteger):                colspec += " BIGSERIAL"            else:                colspec += " SERIAL"        else:            colspec += " " + column.type.dialect_impl(self.dialect, _for_ddl=column).get_col_spec()            default = self.get_column_default_string(column)            if default is not None:                colspec += " DEFAULT " + default        if not column.nullable:            colspec += " NOT NULL"        return colspec    def visit_sequence(self, sequence):        if not sequence.optional and (not self.checkfirst or not self.dialect.has_sequence(self.connection, sequence.name)):            self.append("CREATE SEQUENCE %s" % self.preparer.format_sequence(sequence))            self.execute()    def visit_index(self, index):        preparer = self.preparer        self.append("CREATE ")        if index.unique:            self.append("UNIQUE ")        self.append("INDEX %s ON %s (%s)" \                    % (preparer.format_index(index),                       preparer.format_table(index.table),                       string.join([preparer.format_column(c) for c in index.columns], ', ')))        whereclause = index.kwargs.get('postgres_where', None)        if whereclause is not None:            compiler = self._compile(whereclause, None)            # this might belong to the compiler class            inlined_clause = str(compiler) % dict(                [(key,bind.value) for key,bind in compiler.binds.iteritems()])            self.append(" WHERE " + inlined_clause)        self.execute()class PGSchemaDropper(compiler.SchemaDropper):    def visit_sequence(self, sequence):        if not sequence.optional and (not self.checkfirst or self.dialect.has_sequence(self.connection, sequence.name)):            self.append("DROP SEQUENCE %s" % self.preparer.format_sequence(sequence))            self.execute()class PGDefaultRunner(base.DefaultRunner):    def get_column_default(self, column, isinsert=True):        if column.primary_key:            # pre-execute passive defaults on primary keys            if isinstance(column.default, schema.PassiveDefault):                return self.execute_string("select %s" % column.default.arg)            elif (isinstance(column.type, sqltypes.Integer) and column.autoincrement) and (column.default is None or (isinstance(column.default, schema.Sequence) and column.default.optional)):                sch = column.table.schema                # TODO: this has to build into the Sequence object so we can get the quoting                # logic from it                if sch is not None:                    exc = "select nextval('\"%s\".\"%s_%s_seq\"')" % (sch, column.table.name, column.name)                else:                    exc = "select nextval('\"%s_%s_seq\"')" % (column.table.name, column.name)                return self.execute_string(exc.encode(self.dialect.encoding))        return super(PGDefaultRunner, self).get_column_default(column)    def visit_sequence(self, seq):        if not seq.optional:            return self.execute_string(("select nextval('%s')" % self.dialect.identifier_preparer.format_sequence(seq)))        else:            return Noneclass PGIdentifierPreparer(compiler.IdentifierPreparer):    def _unquote_identifier(self, value):        if value[0] == self.initial_quote:            value = value[1:-1].replace('""','"')        return valuedialect = PGDialectdialect.statement_compiler = PGCompilerdialect.schemagenerator = PGSchemaGeneratordialect.schemadropper = PGSchemaDropperdialect.preparer = PGIdentifierPreparerdialect.defaultrunner = PGDefaultRunner

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -