⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 firebird.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 2 页
字号:
# firebird.py# Copyright (C) 2005, 2006, 2007, 2008 Michael Bayer mike_mp@zzzcomputing.com## This module is part of SQLAlchemy and is released under# the MIT License: http://www.opensource.org/licenses/mit-license.php"""Firebird backend================This module implements the Firebird backend, thru the kinterbasdb_DBAPI module.Firebird dialects-----------------Firebird offers two distinct dialects_ (not to be confused with theSA ``Dialect`` thing):dialect 1  This is the old syntax and behaviour, inherited from Interbase pre-6.0.dialect 3  This is the newer and supported syntax, introduced in Interbase 6.0.From the user point of view, the biggest change is in date/timehandling: under dialect 1, there's a single kind of field, ``DATE``with a synonim ``DATETIME``, that holds a `timestamp` value, that is adate with hour, minute, second. Under dialect 3 there are three kinds,a ``DATE`` that holds a date, a ``TIME`` that holds a *time of theday* value and a ``TIMESTAMP``, equivalent to the old ``DATE``.The problem is that the dialect of a Firebird database is a propertyof the database itself [#]_ (that is, any single database has beencreated with one dialect or the other: there is no way to change theafter creation). SQLAlchemy has a single instance of the class thatcontrols all the connections to a particular kind of database, so itcannot easily differentiate between the two modes, and in particularit **cannot** simultaneously talk with two distinct Firebird databaseswith different dialects.By default this module is biased toward dialect 3, but you can easilytweak it to handle dialect 1 if needed::  from sqlalchemy import types as sqltypes  from sqlalchemy.databases.firebird import FBDate, colspecs, ischema_names  # Adjust the mapping of the timestamp kind  ischema_names['TIMESTAMP'] = FBDate  colspecs[sqltypes.DateTime] = FBDate,Other aspects may be version-specific. You can use the ``server_version_info()`` methodon the ``FBDialect`` class to do whatever is needed::  from sqlalchemy.databases.firebird import FBCompiler  if engine.dialect.server_version_info(connection) < (2,0):      # Change the name of the function ``length`` to use the UDF version      # instead of ``char_length``      FBCompiler.LENGTH_FUNCTION_NAME = 'strlen'Pooling connections-------------------The default strategy used by SQLAlchemy to pool the database connectionsin particular cases may raise an ``OperationalError`` with a message`"object XYZ is in use"`. This happens on Firebird when there are twoconnections to the database, one is using, or has used, a particular tableand the other tries to drop or alter the same table. To garantee DDLoperations success Firebird recommend doing them as the single connected user.In case your SA application effectively needs to do DDL operations while otherconnections are active, the following setting may alleviate the problem::  from sqlalchemy import pool  from sqlalchemy.databases.firebird import dialect  # Force SA to use a single connection per thread  dialect.poolclass = pool.SingletonThreadPool.. [#] Well, that is not the whole story, as the client may still ask       a different (lower) dialect..... _dialects: http://mc-computing.com/Databases/Firebird/SQL_Dialect.html.. _kinterbasdb: http://sourceforge.net/projects/kinterbasdb"""import datetimefrom sqlalchemy import exceptions, schema, types as sqltypes, sql, utilfrom sqlalchemy.engine import base, default_initialized_kb = Falseclass FBNumeric(sqltypes.Numeric):    """Handle ``NUMERIC(precision,length)`` datatype."""    def get_col_spec(self):        if self.precision is None:            return "NUMERIC"        else:            return "NUMERIC(%(precision)s, %(length)s)" % { 'precision': self.precision,                                                            'length' : self.length }    def bind_processor(self, dialect):        return None    def result_processor(self, dialect):        if self.asdecimal:            return None        else:            def process(value):                if isinstance(value, util.decimal_type):                    return float(value)                else:                    return value            return processclass FBFloat(sqltypes.Float):    """Handle ``FLOAT(precision)`` datatype."""    def get_col_spec(self):        if not self.precision:            return "FLOAT"        else:            return "FLOAT(%(precision)s)" % {'precision': self.precision}class FBInteger(sqltypes.Integer):    """Handle ``INTEGER`` datatype."""    def get_col_spec(self):        return "INTEGER"class FBSmallInteger(sqltypes.Smallinteger):    """Handle ``SMALLINT`` datatype."""    def get_col_spec(self):        return "SMALLINT"class FBDateTime(sqltypes.DateTime):    """Handle ``TIMESTAMP`` datatype."""    def get_col_spec(self):        return "TIMESTAMP"    def bind_processor(self, dialect):        def process(value):            if value is None or isinstance(value, datetime.datetime):                return value            else:                return datetime.datetime(year=value.year,                                         month=value.month,                                         day=value.day)        return processclass FBDate(sqltypes.DateTime):    """Handle ``DATE`` datatype."""    def get_col_spec(self):        return "DATE"class FBTime(sqltypes.Time):    """Handle ``TIME`` datatype."""    def get_col_spec(self):        return "TIME"class FBText(sqltypes.Text):    """Handle ``BLOB SUB_TYPE 1`` datatype (aka *textual* blob)."""    def get_col_spec(self):        return "BLOB SUB_TYPE 1"class FBString(sqltypes.String):    """Handle ``VARCHAR(length)`` datatype."""    def get_col_spec(self):        return "VARCHAR(%(length)s)" % {'length' : self.length}class FBChar(sqltypes.CHAR):    """Handle ``CHAR(length)`` datatype."""    def get_col_spec(self):        return "CHAR(%(length)s)" % {'length' : self.length}class FBBinary(sqltypes.Binary):    """Handle ``BLOB SUB_TYPE 0`` datatype (aka *binary* blob)."""    def get_col_spec(self):        return "BLOB SUB_TYPE 0"class FBBoolean(sqltypes.Boolean):    """Handle boolean values as a ``SMALLINT`` datatype."""    def get_col_spec(self):        return "SMALLINT"colspecs = {    sqltypes.Integer : FBInteger,    sqltypes.Smallinteger : FBSmallInteger,    sqltypes.Numeric : FBNumeric,    sqltypes.Float : FBFloat,    sqltypes.DateTime : FBDateTime,    sqltypes.Date : FBDate,    sqltypes.Time : FBTime,    sqltypes.String : FBString,    sqltypes.Binary : FBBinary,    sqltypes.Boolean : FBBoolean,    sqltypes.Text : FBText,    sqltypes.CHAR: FBChar,}ischema_names = {      'SHORT': lambda r: FBSmallInteger(),       'LONG': lambda r: FBInteger(),       'QUAD': lambda r: FBFloat(),      'FLOAT': lambda r: FBFloat(),       'DATE': lambda r: FBDate(),       'TIME': lambda r: FBTime(),       'TEXT': lambda r: FBString(r['flen']),      'INT64': lambda r: FBNumeric(precision=r['fprec'], length=r['fscale'] * -1), # This generically handles NUMERIC()     'DOUBLE': lambda r: FBFloat(),  'TIMESTAMP': lambda r: FBDateTime(),    'VARYING': lambda r: FBString(r['flen']),    'CSTRING': lambda r: FBChar(r['flen']),       'BLOB': lambda r: r['stype']==1 and FBText() or FBBinary()      }def descriptor():    return {'name':'firebird',    'description':'Firebird',    'arguments':[        ('host', 'Host Server Name', None),        ('database', 'Database Name', None),        ('user', 'Username', None),        ('password', 'Password', None)    ]}class FBExecutionContext(default.DefaultExecutionContext):    passclass FBDialect(default.DefaultDialect):    """Firebird dialect"""    supports_sane_rowcount = False    supports_sane_multi_rowcount = False    max_identifier_length = 31    preexecute_pk_sequences = True    supports_pk_autoincrement = False    def __init__(self, type_conv=200, concurrency_level=1, **kwargs):        default.DefaultDialect.__init__(self, **kwargs)        self.type_conv = type_conv        self.concurrency_level= concurrency_level    def dbapi(cls):        import kinterbasdb        return kinterbasdb    dbapi = classmethod(dbapi)    def create_connect_args(self, url):        opts = url.translate_connect_args(username='user')        if opts.get('port'):            opts['host'] = "%s/%s" % (opts['host'], opts['port'])            del opts['port']        opts.update(url.query)        type_conv = opts.pop('type_conv', self.type_conv)        concurrency_level = opts.pop('concurrency_level', self.concurrency_level)        global _initialized_kb        if not _initialized_kb and self.dbapi is not None:            _initialized_kb = True            self.dbapi.init(type_conv=type_conv, concurrency_level=concurrency_level)        return ([], opts)    def create_execution_context(self, *args, **kwargs):        return FBExecutionContext(self, *args, **kwargs)    def type_descriptor(self, typeobj):        return sqltypes.adapt_type(typeobj, colspecs)    def server_version_info(self, connection):        """Get the version of the Firebird server used by a connection.        Returns a tuple of (`major`, `minor`, `build`), three integers        representing the version of the attached server.        """        # This is the simpler approach (the other uses the services api),        # that for backward compatibility reasons returns a string like        #   LI-V6.3.3.12981 Firebird 2.0        # where the first version is a fake one resembling the old        # Interbase signature. This is more than enough for our purposes,        # as this is mainly (only?) used by the testsuite.        from re import match        fbconn = connection.connection.connection        version = fbconn.server_version        m = match('\w+-V(\d+)\.(\d+)\.(\d+)\.(\d+) \w+ (\d+)\.(\d+)', version)        if not m:            raise exceptions.AssertionError("Could not determine version from string '%s'" % version)        return tuple([int(x) for x in m.group(5, 6, 4)])    def _normalize_name(self, name):        """Convert the name to lowercase if it is possible"""        # Remove trailing spaces: FB uses a CHAR() type,        # that is padded with spaces        name = name and name.rstrip()        if name is None:            return None        elif name.upper() == name and not self.identifier_preparer._requires_quotes(name.lower()):            return name.lower()        else:            return name    def _denormalize_name(self, name):        """Revert a *normalized* name to its uppercase equivalent"""        if name is None:            return None        elif name.lower() == name and not self.identifier_preparer._requires_quotes(name.lower()):            return name.upper()        else:            return name    def table_names(self, connection, schema):        """Return a list of *normalized* table names omitting system relations."""        s = """        SELECT r.rdb$relation_name        FROM rdb$relations r        WHERE r.rdb$system_flag=0        """

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -