⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mssql.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 3 页
字号:
class MSSQLExecutionContext_pyodbc (MSSQLExecutionContext):    def pre_exec(self):        """where appropriate, issue "select scope_identity()" in the same statement"""        super(MSSQLExecutionContext_pyodbc, self).pre_exec()        if self.compiled.isinsert and self.HASIDENT and (not self.IINSERT) \                and len(self.parameters) == 1 and self.dialect.use_scope_identity:            self.statement += "; select scope_identity()"    def post_exec(self):        if self.compiled.isinsert and self.HASIDENT and (not self.IINSERT) and self.dialect.use_scope_identity:            # do nothing - id was fetched in dialect.do_execute()            pass        else:            super(MSSQLExecutionContext_pyodbc, self).post_exec()class MSSQLDialect(default.DefaultDialect):    colspecs = {        sqltypes.Unicode : MSNVarchar,        sqltypes.Integer : MSInteger,        sqltypes.Smallinteger: MSSmallInteger,        sqltypes.Numeric : MSNumeric,        sqltypes.Float : MSFloat,        sqltypes.DateTime : MSDateTime,        sqltypes.Date : MSDate,        sqltypes.Time : MSTime,        sqltypes.String : MSString,        sqltypes.Binary : MSBinary,        sqltypes.Boolean : MSBoolean,        sqltypes.Text : MSText,        sqltypes.CHAR: MSChar,        sqltypes.NCHAR: MSNChar,        sqltypes.TIMESTAMP: MSTimeStamp,    }    ischema_names = {        'int' : MSInteger,        'bigint': MSBigInteger,        'smallint' : MSSmallInteger,        'tinyint' : MSTinyInteger,        'varchar' : MSString,        'nvarchar' : MSNVarchar,        'char' : MSChar,        'nchar' : MSNChar,        'text' : MSText,        'ntext' : MSText,        'decimal' : MSNumeric,        'numeric' : MSNumeric,        'float' : MSFloat,        'datetime' : MSDateTime,        'smalldatetime' : MSDate,        'binary' : MSBinary,        'varbinary' : MSBinary,        'bit': MSBoolean,        'real' : MSFloat,        'image' : MSBinary,        'timestamp': MSTimeStamp,        'money': MSMoney,        'smallmoney': MSSmallMoney,        'uniqueidentifier': MSUniqueIdentifier,        'sql_variant': MSVariant,    }    def __new__(cls, dbapi=None, *args, **kwargs):        if cls != MSSQLDialect:            return super(MSSQLDialect, cls).__new__(cls, *args, **kwargs)        if dbapi:            dialect = dialect_mapping.get(dbapi.__name__)            return dialect(*args, **kwargs)        else:            return object.__new__(cls, *args, **kwargs)    def __init__(self, auto_identity_insert=True, **params):        super(MSSQLDialect, self).__init__(**params)        self.auto_identity_insert = auto_identity_insert        self.text_as_varchar = False        self.use_scope_identity = False        self.has_window_funcs = False        self.set_default_schema_name("dbo")    def dbapi(cls, module_name=None):        if module_name:            try:                dialect_cls = dialect_mapping[module_name]                return dialect_cls.import_dbapi()            except KeyError:                raise exceptions.InvalidRequestError("Unsupported MSSQL module '%s' requested (must be adodbpi, pymssql or pyodbc)" % module_name)        else:            for dialect_cls in [MSSQLDialect_pyodbc, MSSQLDialect_pymssql, MSSQLDialect_adodbapi]:                try:                    return dialect_cls.import_dbapi()                except ImportError, e:                    pass            else:                raise ImportError('No DBAPI module detected for MSSQL - please install pyodbc, pymssql, or adodbapi')    dbapi = classmethod(dbapi)    def create_connect_args(self, url):        opts = url.translate_connect_args(username='user')        opts.update(url.query)        if 'auto_identity_insert' in opts:            self.auto_identity_insert = bool(int(opts.pop('auto_identity_insert')))        if 'query_timeout' in opts:            self.query_timeout = int(opts.pop('query_timeout'))        if 'text_as_varchar' in opts:            self.text_as_varchar = bool(int(opts.pop('text_as_varchar')))        if 'use_scope_identity' in opts:            self.use_scope_identity = bool(int(opts.pop('use_scope_identity')))        if 'has_window_funcs' in opts:            self.has_window_funcs =  bool(int(opts.pop('has_window_funcs')))        return self.make_connect_string(opts)    def create_execution_context(self, *args, **kwargs):        return MSSQLExecutionContext(self, *args, **kwargs)    def type_descriptor(self, typeobj):        newobj = sqltypes.adapt_type(typeobj, self.colspecs)        # Some types need to know about the dialect        if isinstance(newobj, (MSText, MSNVarchar)):            newobj.dialect = self        return newobj    def last_inserted_ids(self):        return self.context.last_inserted_ids    def get_default_schema_name(self, connection):        return self.schema_name    def set_default_schema_name(self, schema_name):        self.schema_name = schema_name    def last_inserted_ids(self):        return self.context.last_inserted_ids    def do_execute(self, cursor, statement, params, context=None, **kwargs):        if params == {}:            params = ()        try:            super(MSSQLDialect, self).do_execute(cursor, statement, params, context=context, **kwargs)        finally:            if context.IINSERT:                cursor.execute("SET IDENTITY_INSERT %s OFF" % self.identifier_preparer.format_table(context.compiled.statement.table))    def do_executemany(self, cursor, statement, params, context=None, **kwargs):        try:            super(MSSQLDialect, self).do_executemany(cursor, statement, params, context=context, **kwargs)        finally:            if context.IINSERT:                cursor.execute("SET IDENTITY_INSERT %s OFF" % self.identifier_preparer.format_table(context.compiled.statement.table))    def _execute(self, c, statement, parameters):        try:            if parameters == {}:                parameters = ()            c.execute(statement, parameters)            self.context.rowcount = c.rowcount            c.DBPROP_COMMITPRESERVE = "Y"        except Exception, e:            raise exceptions.DBAPIError.instance(statement, parameters, e)    def table_names(self, connection, schema):        from sqlalchemy.databases import information_schema as ischema        return ischema.table_names(connection, schema)    def raw_connection(self, connection):        """Pull the raw pymmsql connection out--sensative to "pool.ConnectionFairy" and pymssql.pymssqlCnx Classes"""        try:            # TODO: probably want to move this to individual dialect subclasses to            # save on the exception throw + simplify            return connection.connection.__dict__['_pymssqlCnx__cnx']        except:            return connection.connection.adoConn    def uppercase_table(self, t):        # convert all names to uppercase -- fixes refs to INFORMATION_SCHEMA for case-senstive DBs, and won't matter for case-insensitive        t.name = t.name.upper()        if t.schema:            t.schema = t.schema.upper()        for c in t.columns:            c.name = c.name.upper()        return t    def has_table(self, connection, tablename, schema=None):        import sqlalchemy.databases.information_schema as ischema        current_schema = schema or self.get_default_schema_name(connection)        columns = self.uppercase_table(ischema.columns)        s = sql.select([columns],                   current_schema                       and sql.and_(columns.c.table_name==tablename, columns.c.table_schema==current_schema)                       or columns.c.table_name==tablename,                   )        c = connection.execute(s)        row  = c.fetchone()        return row is not None    def reflecttable(self, connection, table, include_columns):        import sqlalchemy.databases.information_schema as ischema        # Get base columns        if table.schema is not None:            current_schema = table.schema        else:            current_schema = self.get_default_schema_name(connection)        columns = self.uppercase_table(ischema.columns)        s = sql.select([columns],                   current_schema                       and sql.and_(columns.c.table_name==table.name, columns.c.table_schema==current_schema)                       or columns.c.table_name==table.name,                   order_by=[columns.c.ordinal_position])        c = connection.execute(s)        found_table = False        while True:            row = c.fetchone()            if row is None:                break            found_table = True            (name, type, nullable, charlen, numericprec, numericscale, default) = (                row[columns.c.column_name],                row[columns.c.data_type],                row[columns.c.is_nullable] == 'YES',                row[columns.c.character_maximum_length],                row[columns.c.numeric_precision],                row[columns.c.numeric_scale],                row[columns.c.column_default]            )            if include_columns and name not in include_columns:                continue            args = []            for a in (charlen, numericprec, numericscale):                if a is not None:                    args.append(a)            coltype = self.ischema_names.get(type, None)            if coltype == MSString and charlen == -1:                coltype = MSText()            else:                if coltype is None:                    util.warn("Did not recognize type '%s' of column '%s'" %                              (type, name))                    coltype = sqltypes.NULLTYPE                elif coltype in (MSNVarchar, AdoMSNVarchar) and charlen == -1:                    args[0] = None                coltype = coltype(*args)            colargs= []            if default is not None:                colargs.append(schema.PassiveDefault(sql.text(default)))            table.append_column(schema.Column(name, coltype, nullable=nullable, autoincrement=False, *colargs))        if not found_table:            raise exceptions.NoSuchTableError(table.name)        # We also run an sp_columns to check for identity columns:        cursor = connection.execute("sp_columns @table_name = '%s', @table_owner = '%s'" % (table.name, current_schema))        ic = None        while True:            row = cursor.fetchone()            if row is None:                break            col_name, type_name = row[3], row[5]            if type_name.endswith("identity"):                ic = table.c[col_name]                ic.autoincrement = True                # setup a psuedo-sequence to represent the identity attribute - we interpret this at table.create() time as the identity attribute                ic.sequence = schema.Sequence(ic.name + '_identity')                # MSSQL: only one identity per table allowed                cursor.close()                break        if not ic is None:            try:                cursor = connection.execute("select ident_seed(?), ident_incr(?)", table.fullname, table.fullname)                row = cursor.fetchone()                cursor.close()                if not row is None:                    ic.sequence.start=int(row[0])                    ic.sequence.increment=int(row[1])            except:                # ignoring it, works just like before                pass        # Add constraints        RR = self.uppercase_table(ischema.ref_constraints)    #information_schema.referential_constraints        TC = self.uppercase_table(ischema.constraints)        #information_schema.table_constraints        C  = self.uppercase_table(ischema.pg_key_constraints).alias('C') #information_schema.constraint_column_usage: the constrained column        R  = self.uppercase_table(ischema.pg_key_constraints).alias('R') #information_schema.constraint_column_usage: the referenced column        # Primary key constraints        s = sql.select([C.c.column_name, TC.c.constraint_type], sql.and_(TC.c.constraint_name == C.c.constraint_name,                                                                         C.c.table_name == table.name))        c = connection.execute(s)        for row in c:            if 'PRIMARY' in row[TC.c.constraint_type.name]:                table.primary_key.add(table.c[row[0]])        # Foreign key constraints        s = sql.select([C.c.column_name,                        R.c.table_schema, R.c.table_name, R.c.column_name,                        RR.c.constraint_name, RR.c.match_option, RR.c.update_rule, RR.c.delete_rule],                       sql.and_(C.c.table_name == table.name,                                C.c.table_schema == current_schema,                                C.c.constraint_name == RR.c.constraint_name,                                R.c.constraint_name == RR.c.unique_constraint_name,                                C.c.ordinal_position == R.c.ordinal_position                                ),                       order_by = [RR.c.constraint_name, R.c.ordinal_position])        rows = connection.execute(s).fetchall()        # group rows by constraint ID, to handle multi-column FKs        fknm, scols, rcols = (None, [], [])        for r in rows:            scol, rschema, rtbl, rcol, rfknm, fkmatch, fkuprule, fkdelrule = r            if rfknm != fknm:                if fknm:                    table.append_constraint(schema.ForeignKeyConstraint(scols, ['%s.%s' % (t,c) for (s,t,c) in rcols], fknm))                fknm, scols, rcols = (rfknm, [], [])            if (not scol in scols): scols.append(scol)            if (not (rschema, rtbl, rcol) in rcols): rcols.append((rschema, rtbl, rcol))        if fknm and scols:            table.append_constraint(schema.ForeignKeyConstraint(scols, ['%s.%s' % (t,c) for (s,t,c) in rcols], fknm))class MSSQLDialect_pymssql(MSSQLDialect):    supports_sane_rowcount = False    max_identifier_length = 30    def import_dbapi(cls):        import pymssql as module        # pymmsql doesn't have a Binary method.  we use string        # TODO: monkeypatching here is less than ideal        module.Binary = lambda st: str(st)        return module    import_dbapi = classmethod(import_dbapi)    colspecs = MSSQLDialect.colspecs.copy()    colspecs[sqltypes.Date] = MSDate_pymssql    ischema_names = MSSQLDialect.ischema_names.copy()    ischema_names['smalldatetime'] = MSDate_pymssql    def __init__(self, **params):        super(MSSQLDialect_pymssql, self).__init__(**params)        self.use_scope_identity = True        # pymssql understands only ascii        if self.convert_unicode:            self.encoding = params.get('encoding', 'ascii')

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -