⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mssql.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 3 页
字号:
# mssql.py"""MSSQL backend, thru either pymssq, adodbapi or pyodbc interfaces.* ``IDENTITY`` columns are supported by using SA ``schema.Sequence()``  objects. In other words::    Table('test', mss_engine,           Column('id',   Integer, Sequence('blah',100,10), primary_key=True),           Column('name', String(20))         ).create()  would yield::   CREATE TABLE test (     id INTEGER NOT NULL IDENTITY(100,10) PRIMARY KEY,     name VARCHAR(20)     )  Note that the start & increment values for sequences are optional  and will default to 1,1.* Support for ``SET IDENTITY_INSERT ON`` mode (automagic on / off for  ``INSERT`` s)* Support for auto-fetching of ``@@IDENTITY/@@SCOPE_IDENTITY()`` on ``INSERT``* ``select._limit`` implemented as ``SELECT TOP n``* Experimental implemention of LIMIT / OFFSET with row_number()Known issues / TODO:* No support for more than one ``IDENTITY`` column per table* pymssql has problems with binary and unicode data that this module  does **not** work around"""import datetime, operator, re, sysfrom sqlalchemy import sql, schema, exceptions, utilfrom sqlalchemy.sql import compiler, expression, operators as sqlops, functions as sql_functionsfrom sqlalchemy.engine import default, basefrom sqlalchemy import types as sqltypesfrom sqlalchemy.util import Decimal as _python_DecimalMSSQL_RESERVED_WORDS = util.Set(['function'])class MSNumeric(sqltypes.Numeric):    def result_processor(self, dialect):        if self.asdecimal:            def process(value):                if value is not None:                    return _python_Decimal(str(value))                else:                    return value            return process        else:            def process(value):                return float(value)            return process    def bind_processor(self, dialect):        def process(value):            if value is None:                # Not sure that this exception is needed                return value            else:                return str(value)        return process    def get_col_spec(self):        if self.precision is None:            return "NUMERIC"        else:            return "NUMERIC(%(precision)s, %(length)s)" % {'precision': self.precision, 'length' : self.length}class MSFloat(sqltypes.Float):    def get_col_spec(self):        return "FLOAT(%(precision)s)" % {'precision': self.precision}    def bind_processor(self, dialect):        def process(value):            """By converting to string, we can use Decimal types round-trip."""            if not value is None:                return str(value)            return None        return processclass MSInteger(sqltypes.Integer):    def get_col_spec(self):        return "INTEGER"class MSBigInteger(MSInteger):    def get_col_spec(self):        return "BIGINT"class MSTinyInteger(MSInteger):    def get_col_spec(self):        return "TINYINT"class MSSmallInteger(MSInteger):    def get_col_spec(self):        return "SMALLINT"class MSDateTime(sqltypes.DateTime):    def __init__(self, *a, **kw):        super(MSDateTime, self).__init__(False)    def get_col_spec(self):        return "DATETIME"class MSDate(sqltypes.Date):    def __init__(self, *a, **kw):        super(MSDate, self).__init__(False)    def get_col_spec(self):        return "SMALLDATETIME"class MSTime(sqltypes.Time):    __zero_date = datetime.date(1900, 1, 1)    def __init__(self, *a, **kw):        super(MSTime, self).__init__(False)    def get_col_spec(self):        return "DATETIME"    def bind_processor(self, dialect):        def process(value):            if type(value) is datetime.datetime:                value = datetime.datetime.combine(self.__zero_date, value.time())            elif type(value) is datetime.time:                value = datetime.datetime.combine(self.__zero_date, value)            return value        return process    def result_processor(self, dialect):        def process(value):            if type(value) is datetime.datetime:                return value.time()            elif type(value) is datetime.date:                return datetime.time(0, 0, 0)            return value        return processclass MSDateTime_adodbapi(MSDateTime):    def result_processor(self, dialect):        def process(value):            # adodbapi will return datetimes with empty time values as datetime.date() objects.            # Promote them back to full datetime.datetime()            if type(value) is datetime.date:                return datetime.datetime(value.year, value.month, value.day)            return value        return processclass MSDateTime_pyodbc(MSDateTime):    def bind_processor(self, dialect):        def process(value):            if type(value) is datetime.date:                return datetime.datetime(value.year, value.month, value.day)            return value        return processclass MSDate_pyodbc(MSDate):    def bind_processor(self, dialect):        def process(value):            if type(value) is datetime.date:                return datetime.datetime(value.year, value.month, value.day)            return value        return process    def result_processor(self, dialect):        def process(value):            # pyodbc returns SMALLDATETIME values as datetime.datetime(). truncate it back to datetime.date()            if type(value) is datetime.datetime:                return value.date()            return value        return processclass MSDate_pymssql(MSDate):    def result_processor(self, dialect):        def process(value):            # pymssql will return SMALLDATETIME values as datetime.datetime(), truncate it back to datetime.date()            if type(value) is datetime.datetime:                return value.date()            return value        return processclass MSText(sqltypes.Text):    def get_col_spec(self):        if self.dialect.text_as_varchar:            return "VARCHAR(max)"        else:            return "TEXT"class MSString(sqltypes.String):    def get_col_spec(self):        return "VARCHAR(%(length)s)" % {'length' : self.length}class MSNVarchar(sqltypes.Unicode):    def get_col_spec(self):        if self.length:            return "NVARCHAR(%(length)s)" % {'length' : self.length}        elif self.dialect.text_as_varchar:            return "NVARCHAR(max)"        else:            return "NTEXT"class AdoMSNVarchar(MSNVarchar):    """overrides bindparam/result processing to not convert any unicode strings"""    def bind_processor(self, dialect):        return None    def result_processor(self, dialect):        return Noneclass MSChar(sqltypes.CHAR):    def get_col_spec(self):        return "CHAR(%(length)s)" % {'length' : self.length}class MSNChar(sqltypes.NCHAR):    def get_col_spec(self):        return "NCHAR(%(length)s)" % {'length' : self.length}class MSBinary(sqltypes.Binary):    def get_col_spec(self):        return "IMAGE"class MSBoolean(sqltypes.Boolean):    def get_col_spec(self):        return "BIT"    def result_processor(self, dialect):        def process(value):            if value is None:                return None            return value and True or False        return process    def bind_processor(self, dialect):        def process(value):            if value is True:                return 1            elif value is False:                return 0            elif value is None:                return None            else:                return value and True or False        return processclass MSTimeStamp(sqltypes.TIMESTAMP):    def get_col_spec(self):        return "TIMESTAMP"class MSMoney(sqltypes.TypeEngine):    def get_col_spec(self):        return "MONEY"class MSSmallMoney(MSMoney):    def get_col_spec(self):        return "SMALLMONEY"class MSUniqueIdentifier(sqltypes.TypeEngine):    def get_col_spec(self):        return "UNIQUEIDENTIFIER"class MSVariant(sqltypes.TypeEngine):    def get_col_spec(self):        return "SQL_VARIANT"def descriptor():    return {'name':'mssql',    'description':'MSSQL',    'arguments':[        ('user',"Database Username",None),        ('password',"Database Password",None),        ('db',"Database Name",None),        ('host',"Hostname", None),    ]}class MSSQLExecutionContext(default.DefaultExecutionContext):    def __init__(self, *args, **kwargs):        self.IINSERT = self.HASIDENT = False        super(MSSQLExecutionContext, self).__init__(*args, **kwargs)    def _has_implicit_sequence(self, column):        if column.primary_key and column.autoincrement:            if isinstance(column.type, sqltypes.Integer) and not column.foreign_keys:                if column.default is None or (isinstance(column.default, schema.Sequence) and \                                              column.default.optional):                    return True        return False    def pre_exec(self):        """MS-SQL has a special mode for inserting non-NULL values        into IDENTITY columns.        Activate it if the feature is turned on and needed.        """        if self.compiled.isinsert:            tbl = self.compiled.statement.table            if not hasattr(tbl, 'has_sequence'):                tbl.has_sequence = None                for column in tbl.c:                    if getattr(column, 'sequence', False) or self._has_implicit_sequence(column):                        tbl.has_sequence = column                        break            self.HASIDENT = bool(tbl.has_sequence)            if self.dialect.auto_identity_insert and self.HASIDENT:                if isinstance(self.compiled_parameters, list):                    self.IINSERT = tbl.has_sequence.key in self.compiled_parameters[0]                else:                    self.IINSERT = tbl.has_sequence.key in self.compiled_parameters            else:                self.IINSERT = False            if self.IINSERT:                self.cursor.execute("SET IDENTITY_INSERT %s ON" % self.dialect.identifier_preparer.format_table(self.compiled.statement.table))        super(MSSQLExecutionContext, self).pre_exec()    def post_exec(self):        """Turn off the INDENTITY_INSERT mode if it's been activated,        and fetch recently inserted IDENTIFY values (works only for        one column).        """        if self.compiled.isinsert and self.HASIDENT and not self.IINSERT:            if not len(self._last_inserted_ids) or self._last_inserted_ids[0] is None:                if self.dialect.use_scope_identity:                    self.cursor.execute("SELECT scope_identity() AS lastrowid")                else:                    self.cursor.execute("SELECT @@identity AS lastrowid")                row = self.cursor.fetchone()                self._last_inserted_ids = [int(row[0])] + self._last_inserted_ids[1:]                # print "LAST ROW ID", self._last_inserted_ids        super(MSSQLExecutionContext, self).post_exec()    _ms_is_select = re.compile(r'\s*(?:SELECT|sp_columns|EXEC)',                               re.I | re.UNICODE)    def returns_rows_text(self, statement):        return self._ms_is_select.match(statement) is not None

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -