📄 postgres.py
字号:
if match is not None: # the default is related to a Sequence sch = table.schema if '.' not in match.group(2) and sch is not None: # unconditionally quote the schema name. this could # later be enhanced to obey quoting rules / "quote schema" default = match.group(1) + ('"%s"' % sch) + '.' + match.group(2) + match.group(3) colargs.append(schema.PassiveDefault(sql.text(default))) table.append_column(schema.Column(name, coltype, nullable=nullable, *colargs)) # Primary keys PK_SQL = """ SELECT attname FROM pg_attribute WHERE attrelid = ( SELECT indexrelid FROM pg_index i WHERE i.indrelid = :table AND i.indisprimary = 't') ORDER BY attnum """ t = sql.text(PK_SQL, typemap={'attname':sqltypes.Unicode}) c = connection.execute(t, table=table_oid) for row in c.fetchall(): pk = row[0] col = table.c[pk] table.primary_key.add(col) if col.default is None: col.autoincrement=False # Foreign keys FK_SQL = """ SELECT conname, pg_catalog.pg_get_constraintdef(oid, true) as condef FROM pg_catalog.pg_constraint r WHERE r.conrelid = :table AND r.contype = 'f' ORDER BY 1 """ t = sql.text(FK_SQL, typemap={'conname':sqltypes.Unicode, 'condef':sqltypes.Unicode}) c = connection.execute(t, table=table_oid) for conname, condef in c.fetchall(): m = re.search('FOREIGN KEY \((.*?)\) REFERENCES (?:(.*?)\.)?(.*?)\((.*?)\)', condef).groups() (constrained_columns, referred_schema, referred_table, referred_columns) = m constrained_columns = [preparer._unquote_identifier(x) for x in re.split(r'\s*,\s*', constrained_columns)] if referred_schema: referred_schema = preparer._unquote_identifier(referred_schema) elif table.schema is not None and table.schema == self.get_default_schema_name(connection): # no schema (i.e. its the default schema), and the table we're # reflecting has the default schema explicit, then use that. # i.e. try to use the user's conventions referred_schema = table.schema referred_table = preparer._unquote_identifier(referred_table) referred_columns = [preparer._unquote_identifier(x) for x in re.split(r'\s*,\s', referred_columns)] refspec = [] if referred_schema is not None: schema.Table(referred_table, table.metadata, autoload=True, schema=referred_schema, autoload_with=connection) for column in referred_columns: refspec.append(".".join([referred_schema, referred_table, column])) else: schema.Table(referred_table, table.metadata, autoload=True, autoload_with=connection) for column in referred_columns: refspec.append(".".join([referred_table, column])) table.append_constraint(schema.ForeignKeyConstraint(constrained_columns, refspec, conname)) def _load_domains(self, connection): ## Load data types for domains: SQL_DOMAINS = """ SELECT t.typname as "name", pg_catalog.format_type(t.typbasetype, t.typtypmod) as "attype", not t.typnotnull as "nullable", t.typdefault as "default", pg_catalog.pg_type_is_visible(t.oid) as "visible", n.nspname as "schema" FROM pg_catalog.pg_type t LEFT JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace LEFT JOIN pg_catalog.pg_constraint r ON t.oid = r.contypid WHERE t.typtype = 'd' """ s = sql.text(SQL_DOMAINS, typemap={'attname':sqltypes.Unicode}) c = connection.execute(s) domains = {} for domain in c.fetchall(): ## strip (30) from character varying(30) attype = re.search('([^\(]+)', domain['attype']).group(1) if domain['visible']: # 'visible' just means whether or not the domain is in a # schema that's on the search path -- or not overriden by # a schema with higher presedence. If it's not visible, # it will be prefixed with the schema-name when it's used. name = domain['name'] else: name = "%s.%s" % (domain['schema'], domain['name']) domains[name] = {'attype':attype, 'nullable': domain['nullable'], 'default': domain['default']} return domainsclass PGCompiler(compiler.DefaultCompiler): operators = compiler.DefaultCompiler.operators.copy() operators.update( { sql_operators.mod : '%%', sql_operators.ilike_op: 'ILIKE', sql_operators.notilike_op: 'NOT ILIKE' } ) def visit_sequence(self, seq): if seq.optional: return None else: return "nextval('%s')" % self.preparer.format_sequence(seq) def limit_clause(self, select): text = "" if select._limit is not None: text += " \n LIMIT " + str(select._limit) if select._offset is not None: if select._limit is None: text += " \n LIMIT ALL" text += " OFFSET " + str(select._offset) return text def get_select_precolumns(self, select): if select._distinct: if isinstance(select._distinct, bool): return "DISTINCT " elif isinstance(select._distinct, (list, tuple)): return "DISTINCT ON (" + ', '.join( [(isinstance(col, basestring) and col or self.process(col)) for col in select._distinct] )+ ") " else: return "DISTINCT ON (" + unicode(select._distinct) + ") " else: return "" def for_update_clause(self, select): if select.for_update == 'nowait': return " FOR UPDATE NOWAIT" else: return super(PGCompiler, self).for_update_clause(select) def _append_returning(self, text, stmt): returning_cols = stmt.kwargs['postgres_returning'] def flatten_columnlist(collist): for c in collist: if isinstance(c, expression.Selectable): for co in c.columns: yield co else: yield c columns = [self.process(c) for c in flatten_columnlist(returning_cols)] text += ' RETURNING ' + string.join(columns, ', ') return text def visit_update(self, update_stmt): text = super(PGCompiler, self).visit_update(update_stmt) if 'postgres_returning' in update_stmt.kwargs: return self._append_returning(text, update_stmt) else: return text def visit_insert(self, insert_stmt): text = super(PGCompiler, self).visit_insert(insert_stmt) if 'postgres_returning' in insert_stmt.kwargs: return self._append_returning(text, insert_stmt) else: return textclass PGSchemaGenerator(compiler.SchemaGenerator): def get_column_specification(self, column, **kwargs): colspec = self.preparer.format_column(column) if column.primary_key and len(column.foreign_keys)==0 and column.autoincrement and isinstance(column.type, sqltypes.Integer) and not isinstance(column.type, sqltypes.SmallInteger) and (column.default is None or (isinstance(column.default, schema.Sequence) and column.default.optional)): if isinstance(column.type, PGBigInteger): colspec += " BIGSERIAL" else: colspec += " SERIAL" else: colspec += " " + column.type.dialect_impl(self.dialect, _for_ddl=column).get_col_spec() default = self.get_column_default_string(column) if default is not None: colspec += " DEFAULT " + default if not column.nullable: colspec += " NOT NULL" return colspec def visit_sequence(self, sequence): if not sequence.optional and (not self.checkfirst or not self.dialect.has_sequence(self.connection, sequence.name)): self.append("CREATE SEQUENCE %s" % self.preparer.format_sequence(sequence)) self.execute() def visit_index(self, index): preparer = self.preparer self.append("CREATE ") if index.unique: self.append("UNIQUE ") self.append("INDEX %s ON %s (%s)" \ % (preparer.format_index(index), preparer.format_table(index.table), string.join([preparer.format_column(c) for c in index.columns], ', '))) whereclause = index.kwargs.get('postgres_where', None) if whereclause is not None: compiler = self._compile(whereclause, None) # this might belong to the compiler class inlined_clause = str(compiler) % dict( [(key,bind.value) for key,bind in compiler.binds.iteritems()]) self.append(" WHERE " + inlined_clause) self.execute()class PGSchemaDropper(compiler.SchemaDropper): def visit_sequence(self, sequence): if not sequence.optional and (not self.checkfirst or self.dialect.has_sequence(self.connection, sequence.name)): self.append("DROP SEQUENCE %s" % self.preparer.format_sequence(sequence)) self.execute()class PGDefaultRunner(base.DefaultRunner): def get_column_default(self, column, isinsert=True): if column.primary_key: # pre-execute passive defaults on primary keys if isinstance(column.default, schema.PassiveDefault): return self.execute_string("select %s" % column.default.arg) elif (isinstance(column.type, sqltypes.Integer) and column.autoincrement) and (column.default is None or (isinstance(column.default, schema.Sequence) and column.default.optional)): sch = column.table.schema # TODO: this has to build into the Sequence object so we can get the quoting # logic from it if sch is not None: exc = "select nextval('\"%s\".\"%s_%s_seq\"')" % (sch, column.table.name, column.name) else: exc = "select nextval('\"%s_%s_seq\"')" % (column.table.name, column.name) return self.execute_string(exc.encode(self.dialect.encoding)) return super(PGDefaultRunner, self).get_column_default(column) def visit_sequence(self, seq): if not seq.optional: return self.execute_string(("select nextval('%s')" % self.dialect.identifier_preparer.format_sequence(seq))) else: return Noneclass PGIdentifierPreparer(compiler.IdentifierPreparer): def _unquote_identifier(self, value): if value[0] == self.initial_quote: value = value[1:-1].replace('""','"') return valuedialect = PGDialectdialect.statement_compiler = PGCompilerdialect.schemagenerator = PGSchemaGeneratordialect.schemadropper = PGSchemaDropperdialect.preparer = PGIdentifierPreparerdialect.defaultrunner = PGDefaultRunner
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -