⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pysqlrdb.py

📁 适合于Unix/Linux下的一个持久数据库连接池
💻 PY
字号:
#! /usr/bin/env python# Copyright (c) 2000 Roman Milner# See the file COPYING for more informationimport sysfrom time import localtime, time, mktimeimport exceptionsimport weakreffrom SQLRelay import CSQLRelayapilevel='2.0'threadsafety=1paramstyle='named'class Error(exceptions.StandardError):    passclass Warning(exceptions.StandardError):    passclass InterfaceError(Error):    passclass DatabaseError(Error):    passclass InternalError(DatabaseError):    passclass OperationalError(DatabaseError):    passclass ProgrammingError(DatabaseError):    passclass IntegrityError(DatabaseError):    passclass DataError(DatabaseError):    passclass NotSupportedError(DatabaseError):    passdef connect(host, port, socket, user, password, retrytime=0, tries=1):    return SQLRConnection(host, port, socket, user, password, retrytime, tries)class SQLRConnection:    connection=None    def __init__(self, host, port, socket, user, password, retrytime, tries):        self.connection=CSQLRelay.sqlrcon_alloc(host, port, socket, user, password, retrytime, tries)        self._cursors=weakref.WeakValueDictionary()    def __del__(self):        self.close()    def close(self):        if self.connection is not None:                # first we close any remaining cursors to avoid segfaults                for cursor in self._cursors.values():                        cursor.close()                # then we close the connection                CSQLRelay.sqlrcon_free(self.connection)                del self.connection    def commit(self):        self.rowcount=0        if self.connection is not None:                return CSQLRelay.commit(self.connection)    def rollback(self):        self.rowcount=0        if self.connection is not None:                return CSQLRelay.rollback(self.connection)    def cursor(self):        cursor=SQLRCursor(self.connection)        self._cursors[id(cursor)]=cursor;        return cursorclass SQLRCursor:    cursor=None    def __init__(self, con):        self.cursor=CSQLRelay.sqlrcur_alloc(con)        self.arraysize=1    def __del__(self):        self.close()    def setinputsizes(self):        pass    def setoutputsize(self):        pass            def execute(self, operation, parameters=None):        self.rowcount=0        self.cur_row=0        CSQLRelay.prepareQuery(self.cursor,operation)        if parameters is not None:                for i in parameters.keys():                        precision=0                        scale=0                        if type(parameters.keys())==type(1.1):                                precision=len(str(parameters.keys()))-1                                scale=precision-len(str(int(parameters.keys())))-1                        CSQLRelay.inputBind(self.cursor,i,parameters[i],precision,scale)        CSQLRelay.executeQuery(self.cursor)        the_error=CSQLRelay.errorMessage(self.cursor)        if the_error:            raise DatabaseError, '<pre>%s</pre>' % the_error        self.__set_description()        self.rowcount=CSQLRelay.totalRows(self.cursor)        if CSQLRelay.affectedRows(self.cursor)>0:                self.rowcount=CSQLRelay.affectedRows(self.cursor)    def executemany(self, operation, parameters=None):        if parameters is None:                self.execute(operation)        else:                self.rowcount=0                self.cur_row=0                CSQLRelay.prepareQuery(self.cursor,operation)                for p in parameters:                        CSQLRelay.clearBinds(self.cursor)                        if p is not None:                                for i in p.keys():                                        precision=0                                        scale=0                                        if type(p.keys())==type(1.1):                                                precision=len(str(p.keys()))-1                                                scale=precision-len(str(int(p.keys())))-1                                        CSQLRelay.inputBind(self.cursor,i,p[i],precision,scale)                        CSQLRelay.executeQuery(self.cursor)                        the_error=CSQLRelay.errorMessage(self.cursor)                        if the_error:                                raise DatabaseError, '<pre>%s</pre>' % the_error                        self.__set_description()                        self.rowcount=CSQLRelay.totalRows(self.cursor)                        if CSQLRelay.affectedRows(self.cursor)>0:                                self.rowcount=CSQLRelay.affectedRows(self.cursor)            def callproc(self, procname, parameters=None):        self.execute(procname,parameters)            def fetchone(self):        row=self.__getRow(self.cur_row)        self.cur_row=self.cur_row+1        return row    def fetchmany(self, size=None):        if not size:            size=self.arraysize        num_rows=CSQLRelay.rowCount(self.cursor)        if num_rows==0:            return None        if size>=num_rows:            size=num_rows-1        rc=self.__getRowRange(self.cur_row, size)        self.cur_row=size        return rc    def fetchall(self):        rc=[]        num_rows=CSQLRelay.rowCount(self.cursor)        for row in range(self.cur_row, num_rows):            row=self.__getRow(self.cur_row)            self.cur_row=self.cur_row+1            rc.append(row)        return rc    def close(self):        if self.cursor is not None:                CSQLRelay.sqlrcur_free(self.cursor)                del self.cursor    def __set_description(self):        desc=[]        col_c=CSQLRelay.colCount(self.cursor)        if not col_c:            self.description=[]            return None        for field in range(col_c):            row=[]            row.append(CSQLRelay.getColumnName(self.cursor,field),)            row.append(CSQLRelay.getColumnType(self.cursor,field),)            row.append(CSQLRelay.getColumnLength(self.cursor,field))            desc.append(tuple(row))        self.description=tuple(desc)        return tuple(desc)    def __getRow(self, row):        # FIXME: go through each field, if the column type is a date/time        # column, then create a DateTime object and return it instead of        # a string        rc=CSQLRelay.getRow(self.cursor,row)        return rc    def __getRowRange(self, row, size):        # FIXME: go through each field, if the column type is a date/time        # column, then create a DateTime object and return it instead of        # a string        rc=CSQLRelay.getRowRange(self.cursor,row,size)        return rc

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -