⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 oracle.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 2 页
字号:
# oracle.py# Copyright (C) 2005, 2006, 2007, 2008 Michael Bayer mike_mp@zzzcomputing.com## This module is part of SQLAlchemy and is released under# the MIT License: http://www.opensource.org/licenses/mit-license.phpimport datetime, random, refrom sqlalchemy import util, sql, schema, exceptions, loggingfrom sqlalchemy.engine import default, basefrom sqlalchemy.sql import compiler, visitorsfrom sqlalchemy.sql import operators as sql_operators, functions as sql_functionsfrom sqlalchemy import types as sqltypesclass OracleNumeric(sqltypes.Numeric):    def get_col_spec(self):        if self.precision is None:            return "NUMERIC"        else:            return "NUMERIC(%(precision)s, %(length)s)" % {'precision': self.precision, 'length' : self.length}class OracleInteger(sqltypes.Integer):    def get_col_spec(self):        return "INTEGER"class OracleSmallInteger(sqltypes.Smallinteger):    def get_col_spec(self):        return "SMALLINT"class OracleDate(sqltypes.Date):    def get_col_spec(self):        return "DATE"    def bind_processor(self, dialect):        return None    def result_processor(self, dialect):        def process(value):            if not isinstance(value, datetime.datetime):                return value            else:                return value.date()        return processclass OracleDateTime(sqltypes.DateTime):    def get_col_spec(self):        return "DATE"    def result_processor(self, dialect):        def process(value):            if value is None or isinstance(value,datetime.datetime):                return value            else:                # convert cx_oracle datetime object returned pre-python 2.4                return datetime.datetime(value.year,value.month,                    value.day,value.hour, value.minute, value.second)        return process# Note:# Oracle DATE == DATETIME# Oracle does not allow milliseconds in DATE# Oracle does not support TIME columns# only if cx_oracle contains TIMESTAMPclass OracleTimestamp(sqltypes.TIMESTAMP):    def get_col_spec(self):        return "TIMESTAMP"    def get_dbapi_type(self, dialect):        return dialect.TIMESTAMP    def result_processor(self, dialect):        def process(value):            if value is None or isinstance(value,datetime.datetime):                return value            else:                # convert cx_oracle datetime object returned pre-python 2.4                return datetime.datetime(value.year,value.month,                    value.day,value.hour, value.minute, value.second)        return processclass OracleString(sqltypes.String):    def get_col_spec(self):        return "VARCHAR(%(length)s)" % {'length' : self.length}class OracleText(sqltypes.Text):    def get_dbapi_type(self, dbapi):        return dbapi.CLOB    def get_col_spec(self):        return "CLOB"    def result_processor(self, dialect):        super_process = super(OracleText, self).result_processor(dialect)        lob = dialect.dbapi.LOB        def process(value):            if isinstance(value, lob):                if super_process:                    return super_process(value.read())                else:                    return value.read()            else:                if super_process:                    return super_process(value)                else:                    return value        return processclass OracleRaw(sqltypes.Binary):    def get_col_spec(self):        return "RAW(%(length)s)" % {'length' : self.length}class OracleChar(sqltypes.CHAR):    def get_col_spec(self):        return "CHAR(%(length)s)" % {'length' : self.length}class OracleBinary(sqltypes.Binary):    def get_dbapi_type(self, dbapi):        return dbapi.BLOB    def get_col_spec(self):        return "BLOB"    def bind_processor(self, dialect):        return None    def result_processor(self, dialect):        lob = dialect.dbapi.LOB        def process(value):            if isinstance(value, lob):                return value.read()            else:                return value        return processclass OracleBoolean(sqltypes.Boolean):    def get_col_spec(self):        return "SMALLINT"    def result_processor(self, dialect):        def process(value):            if value is None:                return None            return value and True or False        return process    def bind_processor(self, dialect):        def process(value):            if value is True:                return 1            elif value is False:                return 0            elif value is None:                return None            else:                return value and True or False        return processcolspecs = {    sqltypes.Integer : OracleInteger,    sqltypes.Smallinteger : OracleSmallInteger,    sqltypes.Numeric : OracleNumeric,    sqltypes.Float : OracleNumeric,    sqltypes.DateTime : OracleDateTime,    sqltypes.Date : OracleDate,    sqltypes.String : OracleString,    sqltypes.Binary : OracleBinary,    sqltypes.Boolean : OracleBoolean,    sqltypes.Text : OracleText,    sqltypes.TIMESTAMP : OracleTimestamp,    sqltypes.CHAR: OracleChar,}ischema_names = {    'VARCHAR2' : OracleString,    'DATE' : OracleDateTime,    'DATETIME' : OracleDateTime,    'NUMBER' : OracleNumeric,    'BLOB' : OracleBinary,    'CLOB' : OracleText,    'TIMESTAMP' : OracleTimestamp,    'RAW' : OracleRaw,    'FLOAT' : OracleNumeric,    'DOUBLE PRECISION' : OracleNumeric,    'LONG' : OracleText,}def descriptor():    return {'name':'oracle',    'description':'Oracle',    'arguments':[        ('dsn', 'Data Source Name', None),        ('user', 'Username', None),        ('password', 'Password', None)    ]}class OracleExecutionContext(default.DefaultExecutionContext):    def pre_exec(self):        super(OracleExecutionContext, self).pre_exec()        if self.dialect.auto_setinputsizes:            self.set_input_sizes()        if self.compiled_parameters is not None and len(self.compiled_parameters) == 1:            for key in self.compiled.binds:                bindparam = self.compiled.binds[key]                name = self.compiled.bind_names[bindparam]                value = self.compiled_parameters[0][name]                if bindparam.isoutparam:                    dbtype = bindparam.type.dialect_impl(self.dialect).get_dbapi_type(self.dialect.dbapi)                    if not hasattr(self, 'out_parameters'):                        self.out_parameters = {}                    self.out_parameters[name] = self.cursor.var(dbtype)                    self.parameters[0][name] = self.out_parameters[name]    def get_result_proxy(self):        if hasattr(self, 'out_parameters'):            if self.compiled_parameters is not None and len(self.compiled_parameters) == 1:                 for bind, name in self.compiled.bind_names.iteritems():                     if name in self.out_parameters:                         type = bind.type                         self.out_parameters[name] = type.dialect_impl(self.dialect).result_processor(self.dialect)(self.out_parameters[name].getvalue())            else:                 for k in self.out_parameters:                     self.out_parameters[k] = self.out_parameters[k].getvalue()        if self.cursor.description is not None:            for column in self.cursor.description:                type_code = column[1]                if type_code in self.dialect.ORACLE_BINARY_TYPES:                    return base.BufferedColumnResultProxy(self)        return base.ResultProxy(self)class OracleDialect(default.DefaultDialect):    supports_alter = True    supports_unicode_statements = False    max_identifier_length = 30    supports_sane_rowcount = True    supports_sane_multi_rowcount = False    preexecute_pk_sequences = True    supports_pk_autoincrement = False    def __init__(self, use_ansi=True, auto_setinputsizes=True, auto_convert_lobs=True, threaded=True, allow_twophase=True, **kwargs):        default.DefaultDialect.__init__(self, default_paramstyle='named', **kwargs)        self.use_ansi = use_ansi        self.threaded = threaded        self.allow_twophase = allow_twophase        self.supports_timestamp = self.dbapi is None or hasattr(self.dbapi, 'TIMESTAMP' )        self.auto_setinputsizes = auto_setinputsizes        self.auto_convert_lobs = auto_convert_lobs        if self.dbapi is None or not self.auto_convert_lobs or not 'CLOB' in self.dbapi.__dict__:            self.dbapi_type_map = {}            self.ORACLE_BINARY_TYPES = []        else:            # only use this for LOB objects.  using it for strings, dates            # etc. leads to a little too much magic, reflection doesn't know if it should            # expect encoded strings or unicodes, etc.            self.dbapi_type_map = {                self.dbapi.CLOB: OracleText(),                self.dbapi.BLOB: OracleBinary(),                self.dbapi.BINARY: OracleRaw(),            }            self.ORACLE_BINARY_TYPES = [getattr(self.dbapi, k) for k in ["BFILE", "CLOB", "NCLOB", "BLOB"] if hasattr(self.dbapi, k)]    def dbapi(cls):        import cx_Oracle        return cx_Oracle    dbapi = classmethod(dbapi)    def create_connect_args(self, url):        dialect_opts = dict(url.query)        for opt in ('use_ansi', 'auto_setinputsizes', 'auto_convert_lobs',                    'threaded', 'allow_twophase'):            if opt in dialect_opts:                util.coerce_kw_type(dialect_opts, opt, bool)                setattr(self, opt, dialect_opts[opt])        if url.database:            # if we have a database, then we have a remote host            port = url.port            if port:                port = int(port)            else:                port = 1521            dsn = self.dbapi.makedsn(url.host, port, url.database)        else:            # we have a local tnsname            dsn = url.host        opts = dict(            user=url.username,            password=url.password,            dsn=dsn,            threaded=self.threaded,            twophase=self.allow_twophase,            )        if 'mode' in url.query:            opts['mode'] = url.query['mode']            if isinstance(opts['mode'], basestring):                mode = opts['mode'].upper()                if mode == 'SYSDBA':                    opts['mode'] = self.dbapi.SYSDBA                elif mode == 'SYSOPER':                    opts['mode'] = self.dbapi.SYSOPER                else:                    util.coerce_kw_type(opts, 'mode', int)        # Can't set 'handle' or 'pool' via URL query args, use connect_args        return ([], opts)    def is_disconnect(self, e):        if isinstance(e, self.dbapi.InterfaceError):            return "not connected" in str(e)        else:            return "ORA-03114" in str(e) or "ORA-03113" in str(e)    def type_descriptor(self, typeobj):        return sqltypes.adapt_type(typeobj, colspecs)    def oid_column_name(self, column):        if not isinstance(column.table, (sql.TableClause, sql.Select)):            return None        else:            return "rowid"    def create_xid(self):        """create a two-phase transaction ID.        this id will be passed to do_begin_twophase(), do_rollback_twophase(),        do_commit_twophase().  its format is unspecified."""        id = random.randint(0,2**128)        return (0x1234, "%032x" % 9, "%032x" % id)    def do_release_savepoint(self, connection, name):        # Oracle does not support RELEASE SAVEPOINT        pass    def do_begin_twophase(self, connection, xid):        connection.connection.begin(*xid)    def do_prepare_twophase(self, connection, xid):        connection.connection.prepare()    def do_rollback_twophase(self, connection, xid, is_prepared=True, recover=False):        self.do_rollback(connection.connection)    def do_commit_twophase(self, connection, xid, is_prepared=True, recover=False):        self.do_commit(connection.connection)    def do_recover_twophase(self, connection):        pass    def create_execution_context(self, *args, **kwargs):        return OracleExecutionContext(self, *args, **kwargs)    def has_table(self, connection, table_name, schema=None):        cursor = connection.execute("""select table_name from all_tables where table_name=:name""", {'name':self._denormalize_name(table_name)})        return bool( cursor.fetchone() is not None )    def has_sequence(self, connection, sequence_name):        cursor = connection.execute("""select sequence_name from all_sequences where sequence_name=:name""", {'name':self._denormalize_name(sequence_name)})        return bool( cursor.fetchone() is not None )    def _locate_owner_row(self, owner, name, rows, raiseerr=False):        """return the row in the given list of rows which references the given table name and owner name."""        if not rows:            if raiseerr:                raise exceptions.NoSuchTableError(name)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -