⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sybase.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 3 页
字号:
                return None            return datetime.datetime(1970, 1, 1, value.hour, value.minute, value.second, value.microsecond)        return processclass SybaseText(sqltypes.Text):    def get_col_spec(self):        return "TEXT"class SybaseString(sqltypes.String):    def get_col_spec(self):        return "VARCHAR(%(length)s)" % {'length' : self.length}class SybaseChar(sqltypes.CHAR):    def get_col_spec(self):        return "CHAR(%(length)s)" % {'length' : self.length}class SybaseBinary(sqltypes.Binary):    def get_col_spec(self):        return "IMAGE"class SybaseBoolean(sqltypes.Boolean):    def get_col_spec(self):        return "BIT"    def result_processor(self, dialect):        def process(value):            if value is None:                return None            return value and True or False        return process    def bind_processor(self, dialect):        def process(value):            if value is True:                return 1            elif value is False:                return 0            elif value is None:                return None            else:                return value and True or False        return processclass SybaseTimeStamp(sqltypes.TIMESTAMP):    def get_col_spec(self):        return "TIMESTAMP"class SybaseMoney(sqltypes.TypeEngine):    def get_col_spec(self):        return "MONEY"class SybaseSmallMoney(SybaseMoney):    def get_col_spec(self):        return "SMALLMONEY"class SybaseUniqueIdentifier(sqltypes.TypeEngine):    def get_col_spec(self):        return "UNIQUEIDENTIFIER"def descriptor():    return {'name':'sybase',    'description':'SybaseSQL',    'arguments':[        ('user',"Database Username",None),        ('password',"Database Password",None),        ('db',"Database Name",None),        ('host',"Hostname", None),    ]}class SybaseSQLExecutionContext(default.DefaultExecutionContext):    passclass SybaseSQLExecutionContext_mxodbc(SybaseSQLExecutionContext):    def __init__(self, dialect, connection, compiled=None, statement=None, parameters=None):        super(SybaseSQLExecutionContext_mxodbc, self).__init__(dialect, connection, compiled, statement, parameters)    def pre_exec(self):        super(SybaseSQLExecutionContext_mxodbc, self).pre_exec()    def post_exec(self):        if self.compiled.isinsert:            table = self.compiled.statement.table            # get the inserted values of the primary key            # get any sequence IDs first (using @@identity)            self.cursor.execute("SELECT @@identity AS lastrowid")            row = self.cursor.fetchone()            lastrowid = int(row[0])            if lastrowid > 0:                # an IDENTITY was inserted, fetch it                # FIXME: always insert in front ? This only works if the IDENTITY is the first column, no ?!                if not hasattr(self, '_last_inserted_ids') or self._last_inserted_ids is None:                    self._last_inserted_ids = [lastrowid]                else:                    self._last_inserted_ids = [lastrowid] + self._last_inserted_ids[1:]        super(SybaseSQLExecutionContext_mxodbc, self).post_exec()class SybaseSQLExecutionContext_pyodbc(SybaseSQLExecutionContext):    def __init__(self, dialect, connection, compiled=None, statement=None, parameters=None):        super(SybaseSQLExecutionContext_pyodbc, self).__init__(dialect, connection, compiled, statement, parameters)    def pre_exec(self):        super(SybaseSQLExecutionContext_pyodbc, self).pre_exec()    def post_exec(self):        if self.compiled.isinsert:            table = self.compiled.statement.table            # get the inserted values of the primary key            # get any sequence IDs first (using @@identity)            self.cursor.execute("SELECT @@identity AS lastrowid")            row = self.cursor.fetchone()            lastrowid = int(row[0])            if lastrowid > 0:                # an IDENTITY was inserted, fetch it                # FIXME: always insert in front ? This only works if the IDENTITY is the first column, no ?!                if not hasattr(self, '_last_inserted_ids') or self._last_inserted_ids is None:                    self._last_inserted_ids = [lastrowid]                else:                    self._last_inserted_ids = [lastrowid] + self._last_inserted_ids[1:]        super(SybaseSQLExecutionContext_pyodbc, self).post_exec()class SybaseSQLDialect(default.DefaultDialect):    colspecs = {        # FIXME: unicode support        #sqltypes.Unicode : SybaseUnicode,        sqltypes.Integer : SybaseInteger,        sqltypes.SmallInteger : SybaseSmallInteger,        sqltypes.Numeric : SybaseNumeric,        sqltypes.Float : SybaseFloat,        sqltypes.String : SybaseString,        sqltypes.Binary : SybaseBinary,        sqltypes.Boolean : SybaseBoolean,        sqltypes.Text : SybaseText,        sqltypes.CHAR : SybaseChar,        sqltypes.TIMESTAMP : SybaseTimeStamp,        sqltypes.FLOAT : SybaseFloat,    }    ischema_names = {        'integer' : SybaseInteger,        'unsigned int' : SybaseInteger,        'unsigned smallint' : SybaseInteger,        'unsigned bigint' : SybaseInteger,        'bigint': SybaseBigInteger,        'smallint' : SybaseSmallInteger,        'tinyint' : SybaseTinyInteger,        'varchar' : SybaseString,        'long varchar' : SybaseText,        'char' : SybaseChar,        'decimal' : SybaseNumeric,        'numeric' : SybaseNumeric,        'float' : SybaseFloat,        'double' : SybaseFloat,        'binary' : SybaseBinary,        'long binary' : SybaseBinary,        'varbinary' : SybaseBinary,        'bit': SybaseBoolean,        'image' : SybaseBinary,        'timestamp': SybaseTimeStamp,        'money': SybaseMoney,        'smallmoney': SybaseSmallMoney,        'uniqueidentifier': SybaseUniqueIdentifier,        'java.lang.Object' : SybaseTypeError,        'java serialization' : SybaseTypeError,    }    # Sybase backend peculiarities    supports_unicode_statements = False    supports_sane_rowcount = False    supports_sane_multi_rowcount = False    def __new__(cls, dbapi=None, *args, **kwargs):        if cls != SybaseSQLDialect:            return super(SybaseSQLDialect, cls).__new__(cls, *args, **kwargs)        if dbapi:            print dbapi.__name__            dialect = dialect_mapping.get(dbapi.__name__)            return dialect(*args, **kwargs)        else:            return object.__new__(cls, *args, **kwargs)    def __init__(self, **params):        super(SybaseSQLDialect, self).__init__(**params)        self.text_as_varchar = False        # FIXME: what is the default schema for sybase connections (DBA?) ?        self.set_default_schema_name("dba")    def dbapi(cls, module_name=None):        if module_name:            try:                dialect_cls = dialect_mapping[module_name]                return dialect_cls.import_dbapi()            except KeyError:                raise exceptions.InvalidRequestError("Unsupported SybaseSQL module '%s' requested (must be " + " or ".join([x for x in dialect_mapping.keys()]) + ")" % module_name)        else:            for dialect_cls in dialect_mapping.values():                try:                    return dialect_cls.import_dbapi()                except ImportError, e:                    pass            else:                raise ImportError('No DBAPI module detected for SybaseSQL - please install mxodbc')    dbapi = classmethod(dbapi)    def create_execution_context(self, *args, **kwargs):        return SybaseSQLExecutionContext(self, *args, **kwargs)    def type_descriptor(self, typeobj):        newobj = sqltypes.adapt_type(typeobj, self.colspecs)        return newobj    def last_inserted_ids(self):        return self.context.last_inserted_ids    def get_default_schema_name(self, connection):        return self.schema_name    def set_default_schema_name(self, schema_name):        self.schema_name = schema_name    def do_execute(self, cursor, statement, params, **kwargs):        params = tuple(params)        super(SybaseSQLDialect, self).do_execute(cursor, statement, params, **kwargs)    # FIXME: remove ?    def _execute(self, c, statement, parameters):        try:            if parameters == {}:                parameters = ()            c.execute(statement, parameters)            self.context.rowcount = c.rowcount            c.DBPROP_COMMITPRESERVE = "Y"        except Exception, e:            raise exceptions.DBAPIError.instance(statement, parameters, e)    def table_names(self, connection, schema):        """Ignore the schema and the charset for now."""        s = sql.select([tables.c.table_name],                       sql.not_(tables.c.table_name.like("SYS%")) and                       tables.c.creator >= 100                       )        rp = connection.execute(s)        return [row[0] for row in rp.fetchall()]    def has_table(self, connection, tablename, schema=None):        # FIXME: ignore schemas for sybase        s = sql.select([tables.c.table_name], tables.c.table_name == tablename)        c = connection.execute(s)        row = c.fetchone()        print "has_table: " + tablename + ": " + str(bool(row is not None))        return row is not None    def reflecttable(self, connection, table, include_columns):        # Get base columns        if table.schema is not None:            current_schema = table.schema        else:            current_schema = self.get_default_schema_name(connection)        s = sql.select([columns, domains], tables.c.table_name==table.name, from_obj=[columns.join(tables).join(domains)], order_by=[columns.c.column_id])        c = connection.execute(s)        found_table = False        # makes sure we append the columns in the correct order        while True:            row = c.fetchone()            if row is None:                break            found_table = True            (name, type, nullable, charlen, numericprec, numericscale, default, primary_key, max_identity, table_id, column_id) = (                row[columns.c.column_name],                row[domains.c.domain_name],                row[columns.c.nulls] == 'Y',                row[columns.c.width],                row[domains.c.precision],                row[columns.c.scale],                row[columns.c.default],                row[columns.c.pkey] == 'Y',                row[columns.c.max_identity],                row[tables.c.table_id],                row[columns.c.column_id],            )            if include_columns and name not in include_columns:                continue            # FIXME: else problems with SybaseBinary(size)            if numericscale == 0:                numericscale = None

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -