⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 db.py

📁 适合于Unix/Linux下的一个持久数据库连接池
💻 PY
字号:
import SQLRelay, DateTimefrom SQLRelay import PySQLRDBfrom Shared.DC.ZRDB.TM import TMimport string, sysfrom string import strip, split, findfrom time import timefailures=0calls=0last_call_time=time()class DB(TM):    Database_Error = PySQLRDB.DatabaseError    Database_Connection=PySQLRDB.connect    def __init__(self,connection):        info = string.split(connection)        if len(info)==4:            self.host = info[0]            self.port = int(info[1])            self.socket = ""            self.user = info[2]            self.password = info[3]        elif len(info)==3:            self.host = ""            self.port = 0            self.socket = info[0]            self.user = info[1]            self.password = info[2]        else:            raise self.Database_Error, ('Invalid connection string, <code>%s</code>')    def str(self,v, StringType=type('')):        if v is None: return ''        r=str(v)        if r[-1:]=='L' and type(v) is not StringType: r=r[:-1]        return r    def _begin(self):        self.con = PySQLRDB.connect(self.host, self.port, self.socket, self.user, self.password,0,1)        self.cur = self.con.cursor()    def _finish(self, *ignored):        self.con.commit()        self.con.close()    def _abort(self, *ignored):        self.con.rollback()        self.con.close()            def tables(self, rdb=0, _care=('TABLE', 'VIEW')):        r=[]        a=r.append        for name, typ in self.db.objects():            if typ in _care:                a({'TABLE_NAME': name, 'TABLE_TYPE': typ})        return r    def columns(self, table_name):        try:             r=self.cur.execute('select * from %s' % table_name)        except:             return ()        desc=self.cur.description        r=[]        a=r.append        for name, type, width, ds, p, scale, null_ok in desc:            if type=='NUMBER' and scale==0: type='INTEGER'            a({ 'Name': name,                'Type': type,                'Precision': p,                'Scale': scale,                'Nullable': null_ok,                })        return r    def query(self, query_string, max_rows=9999999):        global failures, calls, last_call_time        calls=calls+1        desc=None        result=()        self._register()        try:            for qs in filter(None, map(strip,split(query_string, '\0'))):                r=self.cur.execute(qs)                if r is None:                    if desc is not None:                        if self.cur.description != desc:                            raise 'Query Error', (                                'Multiple select schema are not allowed'                                )                        if 'append' not in dir(result):                             result = list(result)                        if max_rows:                            for row in self.cur.fetchmany(max_rows-len(result)):                                result.append(row)                    else:                        desc=self.cur.description                        if max_rows:                            if max_rows==1:                                result=(self.cur.fetchone(),)                            else:                                result=self.cur.fetchmany(max_rows)            failures=0            last_call_time=time()        except self.Database_Error, mess:            failures=failures+1            if (failures > 1000 or time()-last_call_time > 600):                # Hm. maybe the db is hosed.  Let's try again.                failures=0                last_call_time=time()                return self.query(query_string, max_rows)            else: raise sys.exc_type, sys.exc_value, sys.exc_traceback        if desc is None:            return (),()        # Above, fetchmany returns None for empty result sets.  Maybe it        # should return (), but it doesn't and Zope expects () so we'll fix        # that here        if result is None:            result=()        items=[]        for name, type, length in desc:            if type=='NUMBER':                type='n'            elif type=='DATE':                type='d'            else: type='s'            items.append({                'name': name,                'type': type,})        return items, result

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -