⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 oracle.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 2 页
字号:
            else:                return None        else:            if owner is not None:                for row in rows:                    if owner.upper() in row[0]:                        return row                else:                    if raiseerr:                        raise exceptions.AssertionError("Specified owner %s does not own table %s" % (owner, name))                    else:                        return None            else:                if len(rows)==1:                    return rows[0]                else:                    if raiseerr:                        raise exceptions.AssertionError("There are multiple tables with name '%s' visible to the schema, you must specifiy owner" % name)                    else:                        return None    def _resolve_table_owner(self, connection, name, table, dblink=''):        """Locate the given table in the ``ALL_TAB_COLUMNS`` view,        including searching for equivalent synonyms and dblinks.        """        c = connection.execute ("select distinct OWNER from ALL_TAB_COLUMNS%(dblink)s where TABLE_NAME = :table_name" % {'dblink':dblink}, {'table_name':name})        rows = c.fetchall()        try:            row = self._locate_owner_row(table.owner, name, rows, raiseerr=True)            return name, row['OWNER'], ''        except exceptions.SQLAlchemyError:            # locate synonyms            c = connection.execute ("""select OWNER, TABLE_OWNER, TABLE_NAME, DB_LINK                                       from   ALL_SYNONYMS%(dblink)s                                       where  SYNONYM_NAME = :synonym_name                                       and (DB_LINK IS NOT NULL                                               or ((TABLE_NAME, TABLE_OWNER) in                                                    (select TABLE_NAME, OWNER from ALL_TAB_COLUMNS%(dblink)s)))""" % {'dblink':dblink},                                    {'synonym_name':name})            rows = c.fetchall()            row = self._locate_owner_row(table.owner, name, rows)            if row is None:                row = self._locate_owner_row("PUBLIC", name, rows)            if row is not None:                owner, name, dblink = row['TABLE_OWNER'], row['TABLE_NAME'], row['DB_LINK']                if dblink:                    dblink = '@' + dblink                    if not owner:                        # re-resolve table owner using new dblink variable                        t1, owner, t2 = self._resolve_table_owner(connection, name, table, dblink=dblink)                else:                    dblink = ''                return name, owner, dblink            raise    def _normalize_name(self, name):        if name is None:            return None        elif name.upper() == name and not self.identifier_preparer._requires_quotes(name.lower().decode(self.encoding)):            return name.lower().decode(self.encoding)        else:            return name.decode(self.encoding)    def _denormalize_name(self, name):        if name is None:            return None        elif name.lower() == name and not self.identifier_preparer._requires_quotes(name.lower()):            return name.upper().encode(self.encoding)        else:            return name.encode(self.encoding)    def get_default_schema_name(self,connection):        try:            return self._default_schema_name        except AttributeError:            name = self._default_schema_name = \                connection.execute('SELECT USER FROM DUAL').scalar()            return name    def table_names(self, connection, schema):        # note that table_names() isnt loading DBLINKed or synonym'ed tables        if schema is None:            s = "select table_name from all_tables where tablespace_name NOT IN ('SYSTEM', 'SYSAUX')"            cursor = connection.execute(s)        else:            s = "select table_name from all_tables where tablespace_name NOT IN ('SYSTEM','SYSAUX') AND OWNER = :owner"            cursor = connection.execute(s,{'owner':self._denormalize_name(schema)})        return [self._normalize_name(row[0]) for row in cursor]    def reflecttable(self, connection, table, include_columns):        preparer = self.identifier_preparer        # search for table, including across synonyms and dblinks.        # locate the actual name of the table, the real owner, and any dblink clause needed.        actual_name, owner, dblink = self._resolve_table_owner(connection, self._denormalize_name(table.name), table)        c = connection.execute ("select COLUMN_NAME, DATA_TYPE, DATA_LENGTH, DATA_PRECISION, DATA_SCALE, NULLABLE, DATA_DEFAULT from ALL_TAB_COLUMNS%(dblink)s where TABLE_NAME = :table_name and OWNER = :owner" % {'dblink':dblink}, {'table_name':actual_name, 'owner':owner})        while True:            row = c.fetchone()            if row is None:                break            found_table = True            #print "ROW:" , row            (colname, coltype, length, precision, scale, nullable, default) = (self._normalize_name(row[0]), row[1], row[2], row[3], row[4], row[5]=='Y', row[6])            if include_columns and colname not in include_columns:                continue            # INTEGER if the scale is 0 and precision is null            # NUMBER if the scale and precision are both null            # NUMBER(9,2) if the precision is 9 and the scale is 2            # NUMBER(3) if the precision is 3 and scale is 0            #length is ignored except for CHAR and VARCHAR2            if coltype=='NUMBER' :                if precision is None and scale is None:                    coltype = OracleNumeric                elif precision is None and scale == 0  :                    coltype = OracleInteger                else :                    coltype = OracleNumeric(precision, scale)            elif coltype=='CHAR' or coltype=='VARCHAR2':                coltype = ischema_names.get(coltype, OracleString)(length)            else:                coltype = re.sub(r'\(\d+\)', '', coltype)                try:                    coltype = ischema_names[coltype]                except KeyError:                    util.warn("Did not recognize type '%s' of column '%s'" %                              (coltype, colname))                    coltype = sqltypes.NULLTYPE            colargs = []            if default is not None:                colargs.append(schema.PassiveDefault(sql.text(default)))            table.append_column(schema.Column(colname, coltype, nullable=nullable, *colargs))        if not table.columns:           raise exceptions.AssertionError("Couldn't find any column information for table %s" % actual_name)        c = connection.execute("""SELECT             ac.constraint_name,             ac.constraint_type,             loc.column_name AS local_column,             rem.table_name AS remote_table,             rem.column_name AS remote_column,             rem.owner AS remote_owner           FROM all_constraints%(dblink)s ac,             all_cons_columns%(dblink)s loc,             all_cons_columns%(dblink)s rem           WHERE ac.table_name = :table_name           AND ac.constraint_type IN ('R','P')           AND ac.owner = :owner           AND ac.owner = loc.owner           AND ac.constraint_name = loc.constraint_name           AND ac.r_owner = rem.owner(+)           AND ac.r_constraint_name = rem.constraint_name(+)           -- order multiple primary keys correctly           ORDER BY ac.constraint_name, loc.position, rem.position"""         % {'dblink':dblink}, {'table_name' : actual_name, 'owner' : owner})        fks = {}        while True:            row = c.fetchone()            if row is None:                break            #print "ROW:" , row            (cons_name, cons_type, local_column, remote_table, remote_column, remote_owner) = row[0:2] + tuple([self._normalize_name(x) for x in row[2:]])            if cons_type == 'P':                table.primary_key.add(table.c[local_column])            elif cons_type == 'R':                try:                    fk = fks[cons_name]                except KeyError:                   fk = ([], [])                   fks[cons_name] = fk                if remote_table is None:                    # ticket 363                    util.warn(                        ("Got 'None' querying 'table_name' from "                         "all_cons_columns%(dblink)s - does the user have "                         "proper rights to the table?") % {'dblink':dblink})                    continue                refspec = ".".join([remote_table, remote_column])                schema.Table(remote_table, table.metadata, autoload=True, autoload_with=connection, owner=remote_owner)                if local_column not in fk[0]:                    fk[0].append(local_column)                if refspec not in fk[1]:                    fk[1].append(refspec)        for name, value in fks.iteritems():            table.append_constraint(schema.ForeignKeyConstraint(value[0], value[1], name=name))OracleDialect.logger = logging.class_logger(OracleDialect)class _OuterJoinColumn(sql.ClauseElement):    __visit_name__ = 'outer_join_column'    def __init__(self, column):        self.column = columnclass OracleCompiler(compiler.DefaultCompiler):    """Oracle compiler modifies the lexical structure of Select    statements to work under non-ANSI configured Oracle databases, if    the use_ansi flag is False.    """    operators = compiler.DefaultCompiler.operators.copy()    operators.update(        {            sql_operators.mod : lambda x, y:"mod(%s, %s)" % (x, y)        }    )    functions = compiler.DefaultCompiler.functions.copy()    functions.update (        {            sql_functions.now : 'CURRENT_TIMESTAMP'        }    )    def __init__(self, *args, **kwargs):        super(OracleCompiler, self).__init__(*args, **kwargs)        self.__wheres = {}    def default_from(self):        """Called when a ``SELECT`` statement has no froms, and no ``FROM`` clause is to be appended.        The Oracle compiler tacks a "FROM DUAL" to the statement.        """        return " FROM DUAL"    def apply_function_parens(self, func):        return len(func.clauses) > 0    def visit_join(self, join, **kwargs):        if self.dialect.use_ansi:            return compiler.DefaultCompiler.visit_join(self, join, **kwargs)        (where, parentjoin) = self.__wheres.get(join, (None, None))        class VisitOn(visitors.ClauseVisitor):            def visit_binary(s, binary):                if binary.operator == sql_operators.eq:                    if binary.left.table is join.right:                        binary.left = _OuterJoinColumn(binary.left)                    elif binary.right.table is join.right:                        binary.right = _OuterJoinColumn(binary.right)        if join.isouter:            if where is not None:                self.__wheres[join.left] = self.__wheres[parentjoin] = (sql.and_(VisitOn().traverse(join.onclause, clone=True), where), parentjoin)            else:                self.__wheres[join.left] = self.__wheres[join] = (VisitOn().traverse(join.onclause, clone=True), join)        else:            if where is not None:                self.__wheres[join.left] = self.__wheres[parentjoin] = (sql.and_(join.onclause, where), parentjoin)            else:                self.__wheres[join.left] = self.__wheres[join] = (join.onclause, join)        return self.process(join.left, asfrom=True) + ", " + self.process(join.right, asfrom=True)    def get_whereclause(self, f):        if f in self.__wheres:            return self.__wheres[f][0]        else:            return None    def visit_outer_join_column(self, vc):        return self.process(vc.column) + "(+)"    def visit_sequence(self, seq):        return self.dialect.identifier_preparer.format_sequence(seq) + ".nextval"    def visit_alias(self, alias, asfrom=False, **kwargs):        """Oracle doesn't like ``FROM table AS alias``.  Is the AS standard SQL??"""        if asfrom:            return self.process(alias.original, asfrom=asfrom, **kwargs) + " " + self.preparer.format_alias(alias, self._anonymize(alias.name))        else:            return self.process(alias.original, **kwargs)    def _TODO_visit_compound_select(self, select):        """Need to determine how to get ``LIMIT``/``OFFSET`` into a ``UNION`` for Oracle."""        pass    def visit_select(self, select, **kwargs):        """Look for ``LIMIT`` and OFFSET in a select statement, and if        so tries to wrap it in a subquery with ``row_number()`` criterion.        """        if not getattr(select, '_oracle_visit', None) and (select._limit is not None or select._offset is not None):            # to use ROW_NUMBER(), an ORDER BY is required.            orderby = self.process(select._order_by_clause)            if not orderby:                orderby = list(select.oid_column.proxies)[0]                orderby = self.process(orderby)            oldselect = select            select = select.column(sql.literal_column("ROW_NUMBER() OVER (ORDER BY %s)" % orderby).label("ora_rn")).order_by(None)            select._oracle_visit = True            limitselect = sql.select([c for c in select.c if c.key!='ora_rn'])            if select._offset is not None:                limitselect.append_whereclause("ora_rn>%d" % select._offset)                if select._limit is not None:                    limitselect.append_whereclause("ora_rn<=%d" % (select._limit + select._offset))            else:                limitselect.append_whereclause("ora_rn<=%d" % select._limit)            return self.process(limitselect, iswrapper=True, **kwargs)        else:            return compiler.DefaultCompiler.visit_select(self, select, **kwargs)    def limit_clause(self, select):        return ""    def for_update_clause(self, select):        if select.for_update=="nowait":            return " FOR UPDATE NOWAIT"        else:            return super(OracleCompiler, self).for_update_clause(select)class OracleSchemaGenerator(compiler.SchemaGenerator):    def get_column_specification(self, column, **kwargs):        colspec = self.preparer.format_column(column)        colspec += " " + column.type.dialect_impl(self.dialect, _for_ddl=column).get_col_spec()        default = self.get_column_default_string(column)        if default is not None:            colspec += " DEFAULT " + default        if not column.nullable:            colspec += " NOT NULL"        return colspec    def visit_sequence(self, sequence):        if not self.checkfirst  or not self.dialect.has_sequence(self.connection, sequence.name):            self.append("CREATE SEQUENCE %s" % self.preparer.format_sequence(sequence))            self.execute()class OracleSchemaDropper(compiler.SchemaDropper):    def visit_sequence(self, sequence):        if not self.checkfirst or self.dialect.has_sequence(self.connection, sequence.name):            self.append("DROP SEQUENCE %s" % self.preparer.format_sequence(sequence))            self.execute()class OracleDefaultRunner(base.DefaultRunner):    def visit_sequence(self, seq):        return self.execute_string("SELECT " + self.dialect.identifier_preparer.format_sequence(seq) + ".nextval FROM DUAL", {})class OracleIdentifierPreparer(compiler.IdentifierPreparer):    def format_savepoint(self, savepoint):        name = re.sub(r'^_+', '', savepoint.ident)        return super(OracleIdentifierPreparer, self).format_savepoint(savepoint, name)dialect = OracleDialectdialect.statement_compiler = OracleCompilerdialect.schemagenerator = OracleSchemaGeneratordialect.schemadropper = OracleSchemaDropperdialect.preparer = OracleIdentifierPreparerdialect.defaultrunner = OracleDefaultRunner

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -