📄 oracle.py
字号:
else: return None else: if owner is not None: for row in rows: if owner.upper() in row[0]: return row else: if raiseerr: raise exceptions.AssertionError("Specified owner %s does not own table %s" % (owner, name)) else: return None else: if len(rows)==1: return rows[0] else: if raiseerr: raise exceptions.AssertionError("There are multiple tables with name '%s' visible to the schema, you must specifiy owner" % name) else: return None def _resolve_table_owner(self, connection, name, table, dblink=''): """Locate the given table in the ``ALL_TAB_COLUMNS`` view, including searching for equivalent synonyms and dblinks. """ c = connection.execute ("select distinct OWNER from ALL_TAB_COLUMNS%(dblink)s where TABLE_NAME = :table_name" % {'dblink':dblink}, {'table_name':name}) rows = c.fetchall() try: row = self._locate_owner_row(table.owner, name, rows, raiseerr=True) return name, row['OWNER'], '' except exceptions.SQLAlchemyError: # locate synonyms c = connection.execute ("""select OWNER, TABLE_OWNER, TABLE_NAME, DB_LINK from ALL_SYNONYMS%(dblink)s where SYNONYM_NAME = :synonym_name and (DB_LINK IS NOT NULL or ((TABLE_NAME, TABLE_OWNER) in (select TABLE_NAME, OWNER from ALL_TAB_COLUMNS%(dblink)s)))""" % {'dblink':dblink}, {'synonym_name':name}) rows = c.fetchall() row = self._locate_owner_row(table.owner, name, rows) if row is None: row = self._locate_owner_row("PUBLIC", name, rows) if row is not None: owner, name, dblink = row['TABLE_OWNER'], row['TABLE_NAME'], row['DB_LINK'] if dblink: dblink = '@' + dblink if not owner: # re-resolve table owner using new dblink variable t1, owner, t2 = self._resolve_table_owner(connection, name, table, dblink=dblink) else: dblink = '' return name, owner, dblink raise def _normalize_name(self, name): if name is None: return None elif name.upper() == name and not self.identifier_preparer._requires_quotes(name.lower().decode(self.encoding)): return name.lower().decode(self.encoding) else: return name.decode(self.encoding) def _denormalize_name(self, name): if name is None: return None elif name.lower() == name and not self.identifier_preparer._requires_quotes(name.lower()): return name.upper().encode(self.encoding) else: return name.encode(self.encoding) def get_default_schema_name(self,connection): try: return self._default_schema_name except AttributeError: name = self._default_schema_name = \ connection.execute('SELECT USER FROM DUAL').scalar() return name def table_names(self, connection, schema): # note that table_names() isnt loading DBLINKed or synonym'ed tables if schema is None: s = "select table_name from all_tables where tablespace_name NOT IN ('SYSTEM', 'SYSAUX')" cursor = connection.execute(s) else: s = "select table_name from all_tables where tablespace_name NOT IN ('SYSTEM','SYSAUX') AND OWNER = :owner" cursor = connection.execute(s,{'owner':self._denormalize_name(schema)}) return [self._normalize_name(row[0]) for row in cursor] def reflecttable(self, connection, table, include_columns): preparer = self.identifier_preparer # search for table, including across synonyms and dblinks. # locate the actual name of the table, the real owner, and any dblink clause needed. actual_name, owner, dblink = self._resolve_table_owner(connection, self._denormalize_name(table.name), table) c = connection.execute ("select COLUMN_NAME, DATA_TYPE, DATA_LENGTH, DATA_PRECISION, DATA_SCALE, NULLABLE, DATA_DEFAULT from ALL_TAB_COLUMNS%(dblink)s where TABLE_NAME = :table_name and OWNER = :owner" % {'dblink':dblink}, {'table_name':actual_name, 'owner':owner}) while True: row = c.fetchone() if row is None: break found_table = True #print "ROW:" , row (colname, coltype, length, precision, scale, nullable, default) = (self._normalize_name(row[0]), row[1], row[2], row[3], row[4], row[5]=='Y', row[6]) if include_columns and colname not in include_columns: continue # INTEGER if the scale is 0 and precision is null # NUMBER if the scale and precision are both null # NUMBER(9,2) if the precision is 9 and the scale is 2 # NUMBER(3) if the precision is 3 and scale is 0 #length is ignored except for CHAR and VARCHAR2 if coltype=='NUMBER' : if precision is None and scale is None: coltype = OracleNumeric elif precision is None and scale == 0 : coltype = OracleInteger else : coltype = OracleNumeric(precision, scale) elif coltype=='CHAR' or coltype=='VARCHAR2': coltype = ischema_names.get(coltype, OracleString)(length) else: coltype = re.sub(r'\(\d+\)', '', coltype) try: coltype = ischema_names[coltype] except KeyError: util.warn("Did not recognize type '%s' of column '%s'" % (coltype, colname)) coltype = sqltypes.NULLTYPE colargs = [] if default is not None: colargs.append(schema.PassiveDefault(sql.text(default))) table.append_column(schema.Column(colname, coltype, nullable=nullable, *colargs)) if not table.columns: raise exceptions.AssertionError("Couldn't find any column information for table %s" % actual_name) c = connection.execute("""SELECT ac.constraint_name, ac.constraint_type, loc.column_name AS local_column, rem.table_name AS remote_table, rem.column_name AS remote_column, rem.owner AS remote_owner FROM all_constraints%(dblink)s ac, all_cons_columns%(dblink)s loc, all_cons_columns%(dblink)s rem WHERE ac.table_name = :table_name AND ac.constraint_type IN ('R','P') AND ac.owner = :owner AND ac.owner = loc.owner AND ac.constraint_name = loc.constraint_name AND ac.r_owner = rem.owner(+) AND ac.r_constraint_name = rem.constraint_name(+) -- order multiple primary keys correctly ORDER BY ac.constraint_name, loc.position, rem.position""" % {'dblink':dblink}, {'table_name' : actual_name, 'owner' : owner}) fks = {} while True: row = c.fetchone() if row is None: break #print "ROW:" , row (cons_name, cons_type, local_column, remote_table, remote_column, remote_owner) = row[0:2] + tuple([self._normalize_name(x) for x in row[2:]]) if cons_type == 'P': table.primary_key.add(table.c[local_column]) elif cons_type == 'R': try: fk = fks[cons_name] except KeyError: fk = ([], []) fks[cons_name] = fk if remote_table is None: # ticket 363 util.warn( ("Got 'None' querying 'table_name' from " "all_cons_columns%(dblink)s - does the user have " "proper rights to the table?") % {'dblink':dblink}) continue refspec = ".".join([remote_table, remote_column]) schema.Table(remote_table, table.metadata, autoload=True, autoload_with=connection, owner=remote_owner) if local_column not in fk[0]: fk[0].append(local_column) if refspec not in fk[1]: fk[1].append(refspec) for name, value in fks.iteritems(): table.append_constraint(schema.ForeignKeyConstraint(value[0], value[1], name=name))OracleDialect.logger = logging.class_logger(OracleDialect)class _OuterJoinColumn(sql.ClauseElement): __visit_name__ = 'outer_join_column' def __init__(self, column): self.column = columnclass OracleCompiler(compiler.DefaultCompiler): """Oracle compiler modifies the lexical structure of Select statements to work under non-ANSI configured Oracle databases, if the use_ansi flag is False. """ operators = compiler.DefaultCompiler.operators.copy() operators.update( { sql_operators.mod : lambda x, y:"mod(%s, %s)" % (x, y) } ) functions = compiler.DefaultCompiler.functions.copy() functions.update ( { sql_functions.now : 'CURRENT_TIMESTAMP' } ) def __init__(self, *args, **kwargs): super(OracleCompiler, self).__init__(*args, **kwargs) self.__wheres = {} def default_from(self): """Called when a ``SELECT`` statement has no froms, and no ``FROM`` clause is to be appended. The Oracle compiler tacks a "FROM DUAL" to the statement. """ return " FROM DUAL" def apply_function_parens(self, func): return len(func.clauses) > 0 def visit_join(self, join, **kwargs): if self.dialect.use_ansi: return compiler.DefaultCompiler.visit_join(self, join, **kwargs) (where, parentjoin) = self.__wheres.get(join, (None, None)) class VisitOn(visitors.ClauseVisitor): def visit_binary(s, binary): if binary.operator == sql_operators.eq: if binary.left.table is join.right: binary.left = _OuterJoinColumn(binary.left) elif binary.right.table is join.right: binary.right = _OuterJoinColumn(binary.right) if join.isouter: if where is not None: self.__wheres[join.left] = self.__wheres[parentjoin] = (sql.and_(VisitOn().traverse(join.onclause, clone=True), where), parentjoin) else: self.__wheres[join.left] = self.__wheres[join] = (VisitOn().traverse(join.onclause, clone=True), join) else: if where is not None: self.__wheres[join.left] = self.__wheres[parentjoin] = (sql.and_(join.onclause, where), parentjoin) else: self.__wheres[join.left] = self.__wheres[join] = (join.onclause, join) return self.process(join.left, asfrom=True) + ", " + self.process(join.right, asfrom=True) def get_whereclause(self, f): if f in self.__wheres: return self.__wheres[f][0] else: return None def visit_outer_join_column(self, vc): return self.process(vc.column) + "(+)" def visit_sequence(self, seq): return self.dialect.identifier_preparer.format_sequence(seq) + ".nextval" def visit_alias(self, alias, asfrom=False, **kwargs): """Oracle doesn't like ``FROM table AS alias``. Is the AS standard SQL??""" if asfrom: return self.process(alias.original, asfrom=asfrom, **kwargs) + " " + self.preparer.format_alias(alias, self._anonymize(alias.name)) else: return self.process(alias.original, **kwargs) def _TODO_visit_compound_select(self, select): """Need to determine how to get ``LIMIT``/``OFFSET`` into a ``UNION`` for Oracle.""" pass def visit_select(self, select, **kwargs): """Look for ``LIMIT`` and OFFSET in a select statement, and if so tries to wrap it in a subquery with ``row_number()`` criterion. """ if not getattr(select, '_oracle_visit', None) and (select._limit is not None or select._offset is not None): # to use ROW_NUMBER(), an ORDER BY is required. orderby = self.process(select._order_by_clause) if not orderby: orderby = list(select.oid_column.proxies)[0] orderby = self.process(orderby) oldselect = select select = select.column(sql.literal_column("ROW_NUMBER() OVER (ORDER BY %s)" % orderby).label("ora_rn")).order_by(None) select._oracle_visit = True limitselect = sql.select([c for c in select.c if c.key!='ora_rn']) if select._offset is not None: limitselect.append_whereclause("ora_rn>%d" % select._offset) if select._limit is not None: limitselect.append_whereclause("ora_rn<=%d" % (select._limit + select._offset)) else: limitselect.append_whereclause("ora_rn<=%d" % select._limit) return self.process(limitselect, iswrapper=True, **kwargs) else: return compiler.DefaultCompiler.visit_select(self, select, **kwargs) def limit_clause(self, select): return "" def for_update_clause(self, select): if select.for_update=="nowait": return " FOR UPDATE NOWAIT" else: return super(OracleCompiler, self).for_update_clause(select)class OracleSchemaGenerator(compiler.SchemaGenerator): def get_column_specification(self, column, **kwargs): colspec = self.preparer.format_column(column) colspec += " " + column.type.dialect_impl(self.dialect, _for_ddl=column).get_col_spec() default = self.get_column_default_string(column) if default is not None: colspec += " DEFAULT " + default if not column.nullable: colspec += " NOT NULL" return colspec def visit_sequence(self, sequence): if not self.checkfirst or not self.dialect.has_sequence(self.connection, sequence.name): self.append("CREATE SEQUENCE %s" % self.preparer.format_sequence(sequence)) self.execute()class OracleSchemaDropper(compiler.SchemaDropper): def visit_sequence(self, sequence): if not self.checkfirst or self.dialect.has_sequence(self.connection, sequence.name): self.append("DROP SEQUENCE %s" % self.preparer.format_sequence(sequence)) self.execute()class OracleDefaultRunner(base.DefaultRunner): def visit_sequence(self, seq): return self.execute_string("SELECT " + self.dialect.identifier_preparer.format_sequence(seq) + ".nextval FROM DUAL", {})class OracleIdentifierPreparer(compiler.IdentifierPreparer): def format_savepoint(self, savepoint): name = re.sub(r'^_+', '', savepoint.ident) return super(OracleIdentifierPreparer, self).format_savepoint(savepoint, name)dialect = OracleDialectdialect.statement_compiler = OracleCompilerdialect.schemagenerator = OracleSchemaGeneratordialect.schemadropper = OracleSchemaDropperdialect.preparer = OracleIdentifierPreparerdialect.defaultrunner = OracleDefaultRunner
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -