⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sqlite.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 2 页
字号:
    def oid_column_name(self, column):        return "oid"    def is_disconnect(self, e):        return isinstance(e, self.dbapi.ProgrammingError) and "Cannot operate on a closed database." in str(e)    def table_names(self, connection, schema):        if schema is not None:            qschema = self.identifier_preparer.quote_identifier(schema)            master = '%s.sqlite_master' % qschema            s = ("SELECT name FROM %s "                 "WHERE type='table' ORDER BY name") % (master,)            rs = connection.execute(s)        else:            try:                s = ("SELECT name FROM "                     " (SELECT * FROM sqlite_master UNION ALL "                     "  SELECT * FROM sqlite_temp_master) "                     "WHERE type='table' ORDER BY name")                rs = connection.execute(s)            except exceptions.DBAPIError:                raise                s = ("SELECT name FROM sqlite_master "                     "WHERE type='table' ORDER BY name")                rs = connection.execute(s)        return [row[0] for row in rs]    def has_table(self, connection, table_name, schema=None):        quote = self.identifier_preparer.quote_identifier        if schema is not None:            pragma = "PRAGMA %s." % quote(schema)        else:            pragma = "PRAGMA "        qtable = quote(table_name)        cursor = connection.execute("%stable_info(%s)" % (pragma, qtable))        row = cursor.fetchone()        # consume remaining rows, to work around        # http://www.sqlite.org/cvstrac/tktview?tn=1884        while cursor.fetchone() is not None:            pass        return (row is not None)    def reflecttable(self, connection, table, include_columns):        preparer = self.identifier_preparer        if table.schema is None:            pragma = "PRAGMA "        else:            pragma = "PRAGMA %s." % preparer.quote_identifier(table.schema)        qtable = preparer.format_table(table, False)        c = connection.execute("%stable_info(%s)" % (pragma, qtable))        found_table = False        while True:            row = c.fetchone()            if row is None:                break            found_table = True            (name, type_, nullable, has_default, primary_key) = (row[1], row[2].upper(), not row[3], row[4] is not None, row[5])            name = re.sub(r'^\"|\"$', '', name)            if include_columns and name not in include_columns:                continue            match = re.match(r'(\w+)(\(.*?\))?', type_)            if match:                coltype = match.group(1)                args = match.group(2)            else:                coltype = "VARCHAR"                args = ''            try:                coltype = ischema_names[coltype]            except KeyError:                util.warn("Did not recognize type '%s' of column '%s'" %                          (coltype, name))                coltype = sqltypes.NullType            if args is not None:                args = re.findall(r'(\d+)', args)                coltype = coltype(*[int(a) for a in args])            colargs= []            if has_default:                colargs.append(PassiveDefault('?'))            table.append_column(schema.Column(name, coltype, primary_key = primary_key, nullable = nullable, *colargs))        if not found_table:            raise exceptions.NoSuchTableError(table.name)        c = connection.execute("%sforeign_key_list(%s)" % (pragma, qtable))        fks = {}        while True:            row = c.fetchone()            if row is None:                break            (constraint_name, tablename, localcol, remotecol) = (row[0], row[2], row[3], row[4])            tablename = re.sub(r'^\"|\"$', '', tablename)            localcol = re.sub(r'^\"|\"$', '', localcol)            remotecol = re.sub(r'^\"|\"$', '', remotecol)            try:                fk = fks[constraint_name]            except KeyError:                fk = ([],[])                fks[constraint_name] = fk            # look up the table based on the given table's engine, not 'self',            # since it could be a ProxyEngine            remotetable = schema.Table(tablename, table.metadata, autoload=True, autoload_with=connection)            constrained_column = table.c[localcol].name            refspec = ".".join([tablename, remotecol])            if constrained_column not in fk[0]:                fk[0].append(constrained_column)            if refspec not in fk[1]:                fk[1].append(refspec)        for name, value in fks.iteritems():            table.append_constraint(schema.ForeignKeyConstraint(value[0], value[1]))        # check for UNIQUE indexes        c = connection.execute("%sindex_list(%s)" % (pragma, qtable))        unique_indexes = []        while True:            row = c.fetchone()            if row is None:                break            if (row[2] == 1):                unique_indexes.append(row[1])        # loop thru unique indexes for one that includes the primary key        for idx in unique_indexes:            c = connection.execute("%sindex_info(%s)" % (pragma, idx))            cols = []            while True:                row = c.fetchone()                if row is None:                    break                cols.append(row[2])class SQLiteCompiler(compiler.DefaultCompiler):    functions = compiler.DefaultCompiler.functions.copy()    functions.update (        {            sql_functions.now: 'CURRENT_TIMESTAMP'        }    )    def visit_cast(self, cast, **kwargs):        if self.dialect.supports_cast:            return super(SQLiteCompiler, self).visit_cast(cast)        else:            return self.process(cast.clause)    def limit_clause(self, select):        text = ""        if select._limit is not None:            text +=  " \n LIMIT " + str(select._limit)        if select._offset is not None:            if select._limit is None:                text += " \n LIMIT -1"            text += " OFFSET " + str(select._offset)        else:            text += " OFFSET 0"        return text    def for_update_clause(self, select):        # sqlite has no "FOR UPDATE" AFAICT        return ''    def visit_insert(self, insert_stmt):        self.isinsert = True        colparams = self._get_colparams(insert_stmt)        preparer = self.preparer        if not colparams:            return "INSERT INTO %s DEFAULT VALUES" % (                (preparer.format_table(insert_stmt.table),))        else:            return ("INSERT INTO %s (%s) VALUES (%s)" %                    (preparer.format_table(insert_stmt.table),                     ', '.join([preparer.format_column(c[0])                                for c in colparams]),                     ', '.join([c[1] for c in colparams])))class SQLiteSchemaGenerator(compiler.SchemaGenerator):    def get_column_specification(self, column, **kwargs):        colspec = self.preparer.format_column(column) + " " + column.type.dialect_impl(self.dialect, _for_ddl=column).get_col_spec()        default = self.get_column_default_string(column)        if default is not None:            colspec += " DEFAULT " + default        if not column.nullable:            colspec += " NOT NULL"        return colspec    # this doesnt seem to be needed, although i suspect older versions of sqlite might still    # not directly support composite primary keys    #def visit_primary_key_constraint(self, constraint):    #    if len(constraint) > 1:    #        self.append(", \n")    #        # put all PRIMARY KEYS in a UNIQUE index    #        self.append("\tUNIQUE (%s)" % string.join([c.name for c in constraint],', '))    #    else:    #        super(SQLiteSchemaGenerator, self).visit_primary_key_constraint(constraint)class SQLiteSchemaDropper(compiler.SchemaDropper):    passclass SQLiteIdentifierPreparer(compiler.IdentifierPreparer):    reserved_words = util.Set([        'add', 'after', 'all', 'alter', 'analyze', 'and', 'as', 'asc',        'attach', 'autoincrement', 'before', 'begin', 'between', 'by',        'cascade', 'case', 'cast', 'check', 'collate', 'column', 'commit',        'conflict', 'constraint', 'create', 'cross', 'current_date',        'current_time', 'current_timestamp', 'database', 'default',        'deferrable', 'deferred', 'delete', 'desc', 'detach', 'distinct',        'drop', 'each', 'else', 'end', 'escape', 'except', 'exclusive',        'explain', 'false', 'fail', 'for', 'foreign', 'from', 'full', 'glob',        'group', 'having', 'if', 'ignore', 'immediate', 'in', 'index',        'initially', 'inner', 'insert', 'instead', 'intersect', 'into', 'is',        'isnull', 'join', 'key', 'left', 'like', 'limit', 'match', 'natural',        'not', 'notnull', 'null', 'of', 'offset', 'on', 'or', 'order', 'outer',        'plan', 'pragma', 'primary', 'query', 'raise', 'references',        'reindex', 'rename', 'replace', 'restrict', 'right', 'rollback',        'row', 'select', 'set', 'table', 'temp', 'temporary', 'then', 'to',        'transaction', 'trigger', 'true', 'union', 'unique', 'update', 'using',        'vacuum', 'values', 'view', 'virtual', 'when', 'where',        ])    def __init__(self, dialect):        super(SQLiteIdentifierPreparer, self).__init__(dialect)dialect = SQLiteDialectdialect.poolclass = pool.SingletonThreadPooldialect.statement_compiler = SQLiteCompilerdialect.schemagenerator = SQLiteSchemaGeneratordialect.schemadropper = SQLiteSchemaDropperdialect.preparer = SQLiteIdentifierPreparer

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -