⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mssql.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 3 页
字号:
    def do_rollback(self, connection):        # pymssql throws an error on repeated rollbacks. Ignore it.        # TODO: this is normal behavior for most DBs.  are we sure we want to ignore it ?        try:            connection.rollback()        except:            pass    def create_connect_args(self, url):        r = super(MSSQLDialect_pymssql, self).create_connect_args(url)        if hasattr(self, 'query_timeout'):            self.dbapi._mssql.set_query_timeout(self.query_timeout)        return r    def make_connect_string(self, keys):        if keys.get('port'):            # pymssql expects port as host:port, not a separate arg            keys['host'] = ''.join([keys.get('host', ''), ':', str(keys['port'])])            del keys['port']        return [[], keys]    def is_disconnect(self, e):        return isinstance(e, self.dbapi.DatabaseError) and "Error 10054" in str(e)##    This code is leftover from the initial implementation, for reference##    def do_begin(self, connection):##        """implementations might want to put logic here for turning autocommit on/off, etc."""##        pass##    def do_rollback(self, connection):##        """implementations might want to put logic here for turning autocommit on/off, etc."""##        try:##            # connection.rollback() for pymmsql failed sometimes--the begin tran doesn't show up##            # this is a workaround that seems to be handle it.##            r = self.raw_connection(connection)##            r.query("if @@trancount > 0 rollback tran")##            r.fetch_array()##            r.query("begin tran")##            r.fetch_array()##        except:##            pass##    def do_commit(self, connection):##        """implementations might want to put logic here for turning autocommit on/off, etc.##            do_commit is set for pymmsql connections--ADO seems to handle transactions without any issue##        """##        # ADO Uses Implicit Transactions.##        # This is very pymssql specific.  We use this instead of its commit, because it hangs on failed rollbacks.##        # By using the "if" we don't assume an open transaction--much better.##        r = self.raw_connection(connection)##        r.query("if @@trancount > 0 commit tran")##        r.fetch_array()##        r.query("begin tran")##        r.fetch_array()class MSSQLDialect_pyodbc(MSSQLDialect):    supports_sane_rowcount = False    supports_sane_multi_rowcount = False    # PyODBC unicode is broken on UCS-4 builds    supports_unicode = sys.maxunicode == 65535    supports_unicode_statements = supports_unicode    def __init__(self, **params):        super(MSSQLDialect_pyodbc, self).__init__(**params)        # whether use_scope_identity will work depends on the version of pyodbc        try:            import pyodbc            self.use_scope_identity = hasattr(pyodbc.Cursor, 'nextset')        except:            pass    def import_dbapi(cls):        import pyodbc as module        return module    import_dbapi = classmethod(import_dbapi)    colspecs = MSSQLDialect.colspecs.copy()    if supports_unicode:        colspecs[sqltypes.Unicode] = AdoMSNVarchar    colspecs[sqltypes.Date] = MSDate_pyodbc    colspecs[sqltypes.DateTime] = MSDateTime_pyodbc    ischema_names = MSSQLDialect.ischema_names.copy()    if supports_unicode:        ischema_names['nvarchar'] = AdoMSNVarchar    ischema_names['smalldatetime'] = MSDate_pyodbc    ischema_names['datetime'] = MSDateTime_pyodbc    def make_connect_string(self, keys):        if 'dsn' in keys:            connectors = ['dsn=%s' % keys['dsn']]        else:            connectors = ["Driver={SQL Server}"]            if 'port' in keys:                connectors.append('Server=%s,%d' % (keys.get('host'), keys.get('port')))            else:                connectors.append('Server=%s' % keys.get('host'))            connectors.append("Database=%s" % keys.get("database"))        user = keys.get("user")        if user:            connectors.append("UID=%s" % user)            connectors.append("PWD=%s" % keys.get("password", ""))        else:            connectors.append ("TrustedConnection=Yes")        return [[";".join (connectors)], {}]    def is_disconnect(self, e):        return isinstance(e, self.dbapi.Error) and '[08S01]' in str(e)    def create_execution_context(self, *args, **kwargs):        return MSSQLExecutionContext_pyodbc(self, *args, **kwargs)    def do_execute(self, cursor, statement, parameters, context=None, **kwargs):        super(MSSQLDialect_pyodbc, self).do_execute(cursor, statement, parameters, context=context, **kwargs)        if context and context.HASIDENT and (not context.IINSERT) and context.dialect.use_scope_identity:            import pyodbc            # Fetch the last inserted id from the manipulated statement            # We may have to skip over a number of result sets with no data (due to triggers, etc.)            while True:                try:                    row = cursor.fetchone()                    break                except pyodbc.Error, e:                    cursor.nextset()            context._last_inserted_ids = [int(row[0])]class MSSQLDialect_adodbapi(MSSQLDialect):    supports_sane_rowcount = True    supports_sane_multi_rowcount = True    supports_unicode = sys.maxunicode == 65535    supports_unicode_statements = True    def import_dbapi(cls):        import adodbapi as module        return module    import_dbapi = classmethod(import_dbapi)    colspecs = MSSQLDialect.colspecs.copy()    colspecs[sqltypes.Unicode] = AdoMSNVarchar    colspecs[sqltypes.DateTime] = MSDateTime_adodbapi    ischema_names = MSSQLDialect.ischema_names.copy()    ischema_names['nvarchar'] = AdoMSNVarchar    ischema_names['datetime'] = MSDateTime_adodbapi    def make_connect_string(self, keys):        connectors = ["Provider=SQLOLEDB"]        if 'port' in keys:            connectors.append ("Data Source=%s, %s" % (keys.get("host"), keys.get("port")))        else:            connectors.append ("Data Source=%s" % keys.get("host"))        connectors.append ("Initial Catalog=%s" % keys.get("database"))        user = keys.get("user")        if user:            connectors.append("User Id=%s" % user)            connectors.append("Password=%s" % keys.get("password", ""))        else:            connectors.append("Integrated Security=SSPI")        return [[";".join (connectors)], {}]    def is_disconnect(self, e):        return isinstance(e, self.dbapi.adodbapi.DatabaseError) and "'connection failure'" in str(e)dialect_mapping = {    'pymssql':  MSSQLDialect_pymssql,    'pyodbc':   MSSQLDialect_pyodbc,    'adodbapi': MSSQLDialect_adodbapi    }class MSSQLCompiler(compiler.DefaultCompiler):    operators = compiler.OPERATORS.copy()    operators[sqlops.concat_op] = '+'    functions = compiler.DefaultCompiler.functions.copy()    functions.update (        {            sql_functions.now: 'CURRENT_TIMESTAMP'        }    )        def __init__(self, *args, **kwargs):        super(MSSQLCompiler, self).__init__(*args, **kwargs)        self.tablealiases = {}    def get_select_precolumns(self, select):        """ MS-SQL puts TOP, it's version of LIMIT here """        if not self.dialect.has_window_funcs:            s = select._distinct and "DISTINCT " or ""            if select._limit:                s += "TOP %s " % (select._limit,)            if select._offset:                raise exceptions.InvalidRequestError('MSSQL does not support LIMIT with an offset')            return s        return compiler.DefaultCompiler.get_select_precolumns(self, select)    def limit_clause(self, select):        # Limit in mssql is after the select keyword        return ""    def visit_select(self, select, **kwargs):        """Look for ``LIMIT`` and OFFSET in a select statement, and if        so tries to wrap it in a subquery with ``row_number()`` criterion.        """        if self.dialect.has_window_funcs and (not getattr(select, '_mssql_visit', None)) and (select._limit is not None or select._offset is not None):            # to use ROW_NUMBER(), an ORDER BY is required.            orderby = self.process(select._order_by_clause)            if not orderby:                orderby = list(select.oid_column.proxies)[0]                orderby = self.process(orderby)            _offset = select._offset            _limit = select._limit            select._mssql_visit = True            select = select.column(sql.literal_column("ROW_NUMBER() OVER (ORDER BY %s)" % orderby).label("mssql_rn")).order_by(None).alias()            limitselect = sql.select([c for c in select.c if c.key!='mssql_rn'])            if _offset is not None:                limitselect.append_whereclause("mssql_rn>=%d" % _offset)                if _limit is not None:                    limitselect.append_whereclause("mssql_rn<=%d" % (_limit + _offset))            else:                limitselect.append_whereclause("mssql_rn<=%d" % _limit)            return self.process(limitselect, iswrapper=True, **kwargs)        else:            return compiler.DefaultCompiler.visit_select(self, select, **kwargs)    def _schema_aliased_table(self, table):        if getattr(table, 'schema', None) is not None:            if table not in self.tablealiases:                self.tablealiases[table] = table.alias()            return self.tablealiases[table]        else:            return None    def visit_table(self, table, mssql_aliased=False, **kwargs):        if mssql_aliased:            return super(MSSQLCompiler, self).visit_table(table, **kwargs)        # alias schema-qualified tables        alias = self._schema_aliased_table(table)        if alias is not None:            return self.process(alias, mssql_aliased=True, **kwargs)        else:            return super(MSSQLCompiler, self).visit_table(table, **kwargs)    def visit_alias(self, alias, **kwargs):        # translate for schema-qualified table aliases        self.tablealiases[alias.original] = alias        kwargs['mssql_aliased'] = True        return super(MSSQLCompiler, self).visit_alias(alias, **kwargs)    def visit_column(self, column, **kwargs):        if column.table is not None and not self.isupdate and not self.isdelete:            # translate for schema-qualified table aliases            t = self._schema_aliased_table(column.table)            if t is not None:                return self.process(expression._corresponding_column_or_error(t, column))        else:            kwargs['use_schema'] = True        return super(MSSQLCompiler, self).visit_column(column, **kwargs)    def visit_binary(self, binary, **kwargs):        """Move bind parameters to the right-hand side of an operator, where possible."""        if isinstance(binary.left, expression._BindParamClause) and binary.operator == operator.eq:            return self.process(expression._BinaryExpression(binary.right, binary.left, binary.operator), **kwargs)        else:            return super(MSSQLCompiler, self).visit_binary(binary, **kwargs)    def label_select_column(self, select, column, asfrom):        if isinstance(column, expression._Function):            return column.label(None)        else:            return super(MSSQLCompiler, self).label_select_column(select, column, asfrom)    function_rewrites =  {'current_date': 'getdate',                          'length':     'len',                          }    def visit_function(self, func, **kwargs):        func.name = self.function_rewrites.get(func.name, func.name)        return super(MSSQLCompiler, self).visit_function(func, **kwargs)    def for_update_clause(self, select):        # "FOR UPDATE" is only allowed on "DECLARE CURSOR" which SQLAlchemy doesn't use        return ''    def order_by_clause(self, select):        order_by = self.process(select._order_by_clause)        # MSSQL only allows ORDER BY in subqueries if there is a LIMIT        if order_by and (not self.is_subquery(select) or select._limit):            return " ORDER BY " + order_by        else:            return ""class MSSQLSchemaGenerator(compiler.SchemaGenerator):    def get_column_specification(self, column, **kwargs):        colspec = self.preparer.format_column(column) + " " + column.type.dialect_impl(self.dialect, _for_ddl=column).get_col_spec()        # install a IDENTITY Sequence if we have an implicit IDENTITY column        if (not getattr(column.table, 'has_sequence', False)) and column.primary_key and \                column.autoincrement and isinstance(column.type, sqltypes.Integer) and not column.foreign_keys:            if column.default is None or (isinstance(column.default, schema.Sequence) and column.default.optional):                column.sequence = schema.Sequence(column.name + '_seq')        if not column.nullable:            colspec += " NOT NULL"        if hasattr(column, 'sequence'):            column.table.has_sequence = column            colspec += " IDENTITY(%s,%s)" % (column.sequence.start or 1, column.sequence.increment or 1)        else:            default = self.get_column_default_string(column)            if default is not None:                colspec += " DEFAULT " + default        return colspecclass MSSQLSchemaDropper(compiler.SchemaDropper):    def visit_index(self, index):        self.append("\nDROP INDEX %s.%s" % (            self.preparer.quote_identifier(index.table.name),            self.preparer.quote_identifier(index.name)            ))        self.execute()class MSSQLDefaultRunner(base.DefaultRunner):    # TODO: does ms-sql have standalone sequences ?    passclass MSSQLIdentifierPreparer(compiler.IdentifierPreparer):    reserved_words = compiler.IdentifierPreparer.reserved_words.union(MSSQL_RESERVED_WORDS)    def __init__(self, dialect):        super(MSSQLIdentifierPreparer, self).__init__(dialect, initial_quote='[', final_quote=']')    def _escape_identifier(self, value):        #TODO: determin MSSQL's escapeing rules        return valuedialect = MSSQLDialectdialect.statement_compiler = MSSQLCompilerdialect.schemagenerator = MSSQLSchemaGeneratordialect.schemadropper = MSSQLSchemaDropperdialect.preparer = MSSQLIdentifierPreparerdialect.defaultrunner = MSSQLDefaultRunner

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -