⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 maxdb.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 3 页
字号:
# maxdb.py## This module is part of SQLAlchemy and is released under# the MIT License: http://www.opensource.org/licenses/mit-license.php"""Support for the MaxDB database.TODO: More module docs!  MaxDB support is currently experimental.Overview--------The ``maxdb`` dialect is **experimental** and has only been tested on 7.6.03.007and 7.6.00.037.  Of these, **only 7.6.03.007 will work** with SQLAlchemy's ORM.The earlier version has severe ``LEFT JOIN`` limitations and will returnincorrect results from even very simple ORM queries.Only the native Python DB-API is currently supported.  ODBC driver supportis a future enhancement.Connecting----------The username is case-sensitive.  If you usually connect to thedatabase with sqlcli and other tools in lower case, you likely need touse upper case for DB-API.Implementation Notes--------------------Also check the DatabaseNotes page on the wiki for detailed information.With the 7.6.00.37 driver and Python 2.5, it seems that all DB-APIgenerated exceptions are broken and can cause Python to crash.For 'somecol.in_([])' to work, the IN operator's generation must be changedto cast 'NULL' to a numeric, i.e. NUM(NULL).  The DB-API doesn't accept abind parameter there, so that particular generation must inline the NULL value,which depends on [ticket:807].The DB-API is very picky about where bind params may be used in queries.Bind params for some functions (e.g. MOD) need type information supplied.The dialect does not yet do this automatically.Max will occasionally throw up 'bad sql, compile again' exceptions forperfectly valid SQL.  The dialect does not currently handle these, moreresearch is needed.MaxDB 7.5 and Sap DB <= 7.4 reportedly do not support schemas.  A veryslightly different version of this dialect would be required to supportthose versions, and can easily be added if there is demand.  Some otherrequired components such as an Max-aware 'old oracle style' join compiler(thetas with (+) outer indicators) are already done and available forintegration- email the devel list if you're interested in working onthis."""import datetime, itertools, refrom sqlalchemy import exceptions, schema, sql, utilfrom sqlalchemy.sql import operators as sql_operators, expression as sql_exprfrom sqlalchemy.sql import compiler, visitorsfrom sqlalchemy.engine import base as engine_base, defaultfrom sqlalchemy import types as sqltypes__all__ = [    'MaxString', 'MaxUnicode', 'MaxChar', 'MaxText', 'MaxInteger',    'MaxSmallInteger', 'MaxNumeric', 'MaxFloat', 'MaxTimestamp',    'MaxDate', 'MaxTime', 'MaxBoolean', 'MaxBlob',    ]class _StringType(sqltypes.String):    _type = None    def __init__(self, length=None, encoding=None, **kw):        super(_StringType, self).__init__(length=length, **kw)        self.encoding = encoding    def get_col_spec(self):        if self.length is None:            spec = 'LONG'        else:            spec = '%s(%s)' % (self._type, self.length)        if self.encoding is not None:            spec = ' '.join([spec, self.encoding.upper()])        return spec    def bind_processor(self, dialect):        if self.encoding == 'unicode':            return None        else:            def process(value):                if isinstance(value, unicode):                    return value.encode(dialect.encoding)                else:                    return value            return process    def result_processor(self, dialect):        def process(value):            while True:                if value is None:                    return None                elif isinstance(value, unicode):                    return value                elif isinstance(value, str):                    if self.convert_unicode or dialect.convert_unicode:                        return value.decode(dialect.encoding)                    else:                        return value                elif hasattr(value, 'read'):                    # some sort of LONG, snarf and retry                    value = value.read(value.remainingLength())                    continue                else:                    # unexpected type, return as-is                    return value        return processclass MaxString(_StringType):    _type = 'VARCHAR'    def __init__(self, *a, **kw):        super(MaxString, self).__init__(*a, **kw)class MaxUnicode(_StringType):    _type = 'VARCHAR'    def __init__(self, length=None, **kw):        super(MaxUnicode, self).__init__(length=length, encoding='unicode')class MaxChar(_StringType):    _type = 'CHAR'class MaxText(_StringType):    _type = 'LONG'    def __init__(self, *a, **kw):        super(MaxText, self).__init__(*a, **kw)    def get_col_spec(self):        spec = 'LONG'        if self.encoding is not None:            spec = ' '.join((spec, self.encoding))        elif self.convert_unicode:            spec = ' '.join((spec, 'UNICODE'))        return specclass MaxInteger(sqltypes.Integer):    def get_col_spec(self):        return 'INTEGER'class MaxSmallInteger(MaxInteger):    def get_col_spec(self):        return 'SMALLINT'class MaxNumeric(sqltypes.Numeric):    """The FIXED (also NUMERIC, DECIMAL) data type."""    def __init__(self, precision=None, length=None, **kw):        kw.setdefault('asdecimal', True)        super(MaxNumeric, self).__init__(length=length, precision=precision,                                         **kw)    def bind_processor(self, dialect):        return None    def get_col_spec(self):        if self.length and self.precision:            return 'FIXED(%s, %s)' % (self.precision, self.length)        elif self.precision:            return 'FIXED(%s)' % self.precision        else:            return 'INTEGER'class MaxFloat(sqltypes.Float):    """The FLOAT data type."""    def get_col_spec(self):        if self.precision is None:            return 'FLOAT'        else:            return 'FLOAT(%s)' % (self.precision,)class MaxTimestamp(sqltypes.DateTime):    def get_col_spec(self):        return 'TIMESTAMP'    def bind_processor(self, dialect):        def process(value):            if value is None:                return None            elif isinstance(value, basestring):                return value            elif dialect.datetimeformat == 'internal':                ms = getattr(value, 'microsecond', 0)                return value.strftime("%Y%m%d%H%M%S" + ("%06u" % ms))            elif dialect.datetimeformat == 'iso':                ms = getattr(value, 'microsecond', 0)                return value.strftime("%Y-%m-%d %H:%M:%S." + ("%06u" % ms))            else:                raise exceptions.InvalidRequestError(                    "datetimeformat '%s' is not supported." % (                    dialect.datetimeformat,))        return process    def result_processor(self, dialect):        def process(value):            if value is None:                return None            elif dialect.datetimeformat == 'internal':                return datetime.datetime(                    *[int(v)                      for v in (value[0:4], value[4:6], value[6:8],                                value[8:10], value[10:12], value[12:14],                                value[14:])])            elif dialect.datetimeformat == 'iso':                return datetime.datetime(                    *[int(v)                      for v in (value[0:4], value[5:7], value[8:10],                                value[11:13], value[14:16], value[17:19],                                value[20:])])            else:                raise exceptions.InvalidRequestError(                    "datetimeformat '%s' is not supported." % (                    dialect.datetimeformat,))        return processclass MaxDate(sqltypes.Date):    def get_col_spec(self):        return 'DATE'    def bind_processor(self, dialect):        def process(value):            if value is None:                return None            elif isinstance(value, basestring):                return value            elif dialect.datetimeformat == 'internal':                return value.strftime("%Y%m%d")            elif dialect.datetimeformat == 'iso':                return value.strftime("%Y-%m-%d")            else:                raise exceptions.InvalidRequestError(                    "datetimeformat '%s' is not supported." % (                    dialect.datetimeformat,))        return process    def result_processor(self, dialect):        def process(value):            if value is None:                return None            elif dialect.datetimeformat == 'internal':                return datetime.date(                    *[int(v) for v in (value[0:4], value[4:6], value[6:8])])            elif dialect.datetimeformat == 'iso':                return datetime.date(                    *[int(v) for v in (value[0:4], value[5:7], value[8:10])])            else:                raise exceptions.InvalidRequestError(                    "datetimeformat '%s' is not supported." % (                    dialect.datetimeformat,))        return processclass MaxTime(sqltypes.Time):    def get_col_spec(self):        return 'TIME'    def bind_processor(self, dialect):        def process(value):            if value is None:                return None            elif isinstance(value, basestring):                return value            elif dialect.datetimeformat == 'internal':                return value.strftime("%H%M%S")            elif dialect.datetimeformat == 'iso':                return value.strftime("%H-%M-%S")            else:                raise exceptions.InvalidRequestError(                    "datetimeformat '%s' is not supported." % (                    dialect.datetimeformat,))        return process    def result_processor(self, dialect):        def process(value):            if value is None:                return None            elif dialect.datetimeformat == 'internal':                t = datetime.time(                    *[int(v) for v in (value[0:4], value[4:6], value[6:8])])                return t            elif dialect.datetimeformat == 'iso':                return datetime.time(                    *[int(v) for v in (value[0:4], value[5:7], value[8:10])])            else:                raise exceptions.InvalidRequestError(                    "datetimeformat '%s' is not supported." % (                    dialect.datetimeformat,))        return processclass MaxBoolean(sqltypes.Boolean):    def get_col_spec(self):        return 'BOOLEAN'class MaxBlob(sqltypes.Binary):    def get_col_spec(self):        return 'LONG BYTE'    def bind_processor(self, dialect):        def process(value):            if value is None:                return None            else:                return str(value)        return process    def result_processor(self, dialect):        def process(value):            if value is None:                return None            else:                return value.read(value.remainingLength())        return processcolspecs = {    sqltypes.Integer: MaxInteger,    sqltypes.Smallinteger: MaxSmallInteger,    sqltypes.Numeric: MaxNumeric,    sqltypes.Float: MaxFloat,    sqltypes.DateTime: MaxTimestamp,    sqltypes.Date: MaxDate,    sqltypes.Time: MaxTime,    sqltypes.String: MaxString,    sqltypes.Binary: MaxBlob,    sqltypes.Boolean: MaxBoolean,    sqltypes.Text: MaxText,    sqltypes.CHAR: MaxChar,    sqltypes.TIMESTAMP: MaxTimestamp,    sqltypes.BLOB: MaxBlob,    sqltypes.Unicode: MaxUnicode,    }ischema_names = {    'boolean': MaxBoolean,    'char': MaxChar,    'character': MaxChar,    'date': MaxDate,    'fixed': MaxNumeric,    'float': MaxFloat,    'int': MaxInteger,

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -