⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sqlite.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 2 页
字号:
# sqlite.py# Copyright (C) 2005, 2006, 2007, 2008 Michael Bayer mike_mp@zzzcomputing.com## This module is part of SQLAlchemy and is released under# the MIT License: http://www.opensource.org/licenses/mit-license.phpimport datetime, re, timefrom sqlalchemy import schema, exceptions, pool, PassiveDefaultfrom sqlalchemy.engine import defaultimport sqlalchemy.types as sqltypesimport sqlalchemy.util as utilfrom sqlalchemy.sql import compiler, functions as sql_functionsSELECT_REGEXP = re.compile(r'\s*(?:SELECT|PRAGMA)', re.I | re.UNICODE)class SLNumeric(sqltypes.Numeric):    def bind_processor(self, dialect):        type_ = self.asdecimal and str or float        def process(value):            if value is not None:                return type_(value)            else:                return value        return process    def get_col_spec(self):        if self.precision is None:            return "NUMERIC"        else:            return "NUMERIC(%(precision)s, %(length)s)" % {'precision': self.precision, 'length' : self.length}class SLInteger(sqltypes.Integer):    def get_col_spec(self):        return "INTEGER"class SLSmallInteger(sqltypes.Smallinteger):    def get_col_spec(self):        return "SMALLINT"class DateTimeMixin(object):    __format__ = "%Y-%m-%d %H:%M:%S"    def bind_processor(self, dialect):        def process(value):            if isinstance(value, basestring):                # pass string values thru                return value            elif value is not None:                if self.__microsecond__ and getattr(value, 'microsecond', None) is not None:                    return value.strftime(self.__format__ + "." + str(value.microsecond))                else:                    return value.strftime(self.__format__)            else:                return None        return process    def _cvt(self, value, dialect):        if value is None:            return None        try:            (value, microsecond) = value.split('.')            microsecond = int(microsecond)        except ValueError:            microsecond = 0        return time.strptime(value, self.__format__)[0:6] + (microsecond,)class SLDateTime(DateTimeMixin,sqltypes.DateTime):    __format__ = "%Y-%m-%d %H:%M:%S"    __microsecond__ = True    def get_col_spec(self):        return "TIMESTAMP"    def result_processor(self, dialect):        def process(value):            tup = self._cvt(value, dialect)            return tup and datetime.datetime(*tup)        return processclass SLDate(DateTimeMixin, sqltypes.Date):    __format__ = "%Y-%m-%d"    __microsecond__ = False    def get_col_spec(self):        return "DATE"    def result_processor(self, dialect):        def process(value):            tup = self._cvt(value, dialect)            return tup and datetime.date(*tup[0:3])        return processclass SLTime(DateTimeMixin, sqltypes.Time):    __format__ = "%H:%M:%S"    __microsecond__ = True    def get_col_spec(self):        return "TIME"    def result_processor(self, dialect):        def process(value):            tup = self._cvt(value, dialect)            return tup and datetime.time(*tup[3:7])        return processclass SLText(sqltypes.Text):    def get_col_spec(self):        return "TEXT"class SLString(sqltypes.String):    def get_col_spec(self):        return "VARCHAR(%(length)s)" % {'length' : self.length}class SLChar(sqltypes.CHAR):    def get_col_spec(self):        return "CHAR(%(length)s)" % {'length' : self.length}class SLBinary(sqltypes.Binary):    def get_col_spec(self):        return "BLOB"class SLBoolean(sqltypes.Boolean):    def get_col_spec(self):        return "BOOLEAN"    def bind_processor(self, dialect):        def process(value):            if value is None:                return None            return value and 1 or 0        return process    def result_processor(self, dialect):        def process(value):            if value is None:                return None            return value and True or False        return processcolspecs = {    sqltypes.Binary: SLBinary,    sqltypes.Boolean: SLBoolean,    sqltypes.CHAR: SLChar,    sqltypes.Date: SLDate,    sqltypes.DateTime: SLDateTime,    sqltypes.Float: SLNumeric,    sqltypes.Integer: SLInteger,    sqltypes.NCHAR: SLChar,    sqltypes.Numeric: SLNumeric,    sqltypes.Smallinteger: SLSmallInteger,    sqltypes.String: SLString,    sqltypes.Text: SLText,    sqltypes.Time: SLTime,}ischema_names = {    'BLOB': SLBinary,    'BOOL': SLBoolean,    'BOOLEAN': SLBoolean,    'CHAR': SLChar,    'DATE': SLDate,    'DATETIME': SLDateTime,    'DECIMAL': SLNumeric,    'FLOAT': SLNumeric,    'INT': SLInteger,    'INTEGER': SLInteger,    'NUMERIC': SLNumeric,    'REAL': SLNumeric,    'SMALLINT': SLSmallInteger,    'TEXT': SLText,    'TIME': SLTime,    'TIMESTAMP': SLDateTime,    'VARCHAR': SLString,}def descriptor():    return {'name':'sqlite',    'description':'SQLite',    'arguments':[        ('database', "Database Filename",None)    ]}class SQLiteExecutionContext(default.DefaultExecutionContext):    def post_exec(self):        if self.compiled.isinsert and not self.executemany:            if not len(self._last_inserted_ids) or self._last_inserted_ids[0] is None:                self._last_inserted_ids = [self.cursor.lastrowid] + self._last_inserted_ids[1:]    def returns_rows_text(self, statement):        return SELECT_REGEXP.match(statement)class SQLiteDialect(default.DefaultDialect):    supports_alter = False    supports_unicode_statements = True    def __init__(self, **kwargs):        default.DefaultDialect.__init__(self, default_paramstyle='qmark', **kwargs)        def vers(num):            return tuple([int(x) for x in num.split('.')])        if self.dbapi is not None:            sqlite_ver = self.dbapi.version_info            if sqlite_ver < (2,1,'3'):                util.warn(                    ("The installed version of pysqlite2 (%s) is out-dated "                     "and will cause errors in some cases.  Version 2.1.3 "                     "or greater is recommended.") %                    '.'.join([str(subver) for subver in sqlite_ver]))        self.supports_cast = (self.dbapi is None or vers(self.dbapi.sqlite_version) >= vers("3.2.3"))    def dbapi(cls):        try:            from pysqlite2 import dbapi2 as sqlite        except ImportError, e:            try:                from sqlite3 import dbapi2 as sqlite #try the 2.5+ stdlib name.            except ImportError:                raise e        return sqlite    dbapi = classmethod(dbapi)    def server_version_info(self, connection):        return self.dbapi.sqlite_version_info    def create_connect_args(self, url):        filename = url.database or ':memory:'        opts = url.query.copy()        util.coerce_kw_type(opts, 'timeout', float)        util.coerce_kw_type(opts, 'isolation_level', str)        util.coerce_kw_type(opts, 'detect_types', int)        util.coerce_kw_type(opts, 'check_same_thread', bool)        util.coerce_kw_type(opts, 'cached_statements', int)        return ([filename], opts)    def type_descriptor(self, typeobj):        return sqltypes.adapt_type(typeobj, colspecs)    def create_execution_context(self, connection, **kwargs):        return SQLiteExecutionContext(self, connection, **kwargs)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -