📄 dbexts.py
字号:
# $Id: dbexts.py,v 1.1 2005/10/28 23:47:46 kuzman Exp $"""This script provides platform independence by wrapping PythonDatabase API 2.0 compatible drivers to allow seamless databaseusage across implementations.In order to use the C version, you need mxODBC and mxDateTime.In order to use the Java version, you need zxJDBC.>>> import dbexts>>> d = dbexts.dbexts() # use the default db>>> d.isql('select count(*) count from player')count-------13569.01 row affected>>> r = d.raw('select count(*) count from player')>>> r([('count', 3, 17, None, 15, 0, 1)], [(13569.0,)])>>>The configuration file follows the following format in a file name dbexts.ini:[default]name=mysql[jdbc]name=mysqlurl=jdbc:mysql://localhost/ziclixuser=pwd=driver=org.gjt.mm.mysql.Driverdatahandler=com.ziclix.python.sql.handler.MySQLDataHandler[jdbc]name=pgurl=jdbc:postgresql://localhost:5432/ziclixuser=bzimmerpwd=driver=org.postgresql.Driverdatahandler=com.ziclix.python.sql.handler.PostgresqlDataHandler"""import os, refrom types import StringType__author__ = "brian zimmer (bzimmer@ziclix.com)"__version__ = "$Revision: 1.1 $"[11:-2]__OS__ = os.namechoose = lambda bool, a, b: (bool and [a] or [b])[0]def console(rows, headers=()): """Format the results into a list of strings (one for each row): <header> <headersep> <row1> <row2> ... headers may be given as list of strings. Columns are separated by colsep; the header is separated from the result set by a line of headersep characters. The function calls stringify to format the value data into a string. It defaults to calling str() and striping leading and trailing whitespace. - copied and modified from mxODBC """ # Check row entry lengths output = [] headers = map(lambda header: header.upper(), list(map(lambda x: x or "", headers))) collen = map(len,headers) output.append(headers) if rows and len(rows) > 0: for row in rows: row = map(lambda x: str(x), row) for i in range(len(row)): entry = row[i] if collen[i] < len(entry): collen[i] = len(entry) output.append(row) if len(output) == 1: affected = "0 rows affected" elif len(output) == 2: affected = "1 row affected" else: affected = "%d rows affected" % (len(output) - 1) # Format output for i in range(len(output)): row = output[i] l = [] for j in range(len(row)): l.append('%-*s' % (collen[j],row[j])) output[i] = " | ".join(l) # Insert header separator totallen = len(output[0]) output[1:1] = ["-"*(totallen/len("-"))] output.append("\n" + affected) return outputdef html(rows, headers=()): output = [] output.append('<table class="results">') output.append('<tr class="headers">') headers = map(lambda x: '<td class="header">%s</td>' % (x.upper()), list(headers)) map(output.append, headers) output.append('</tr>') if rows and len(rows) > 0: for row in rows: output.append('<tr class="row">') row = map(lambda x: '<td class="value">%s</td>' % (x), row) map(output.append, row) output.append('</tr>') output.append('</table>') return outputcomments = lambda x: re.compile("{.*?}", re.S).sub("", x, 0)class mxODBCProxy: """Wraps mxODBC to provide proxy support for zxJDBC's additional parameters.""" def __init__(self, c): self.c = c def __getattr__(self, name): if name == "execute": return self.execute elif name == "gettypeinfo": return self.gettypeinfo else: return getattr(self.c, name) def execute(self, sql, params=None, bindings=None, maxrows=None): if params: self.c.execute(sql, params) else: self.c.execute(sql) def gettypeinfo(self, typeid=None): if typeid: self.c.gettypeinfo(typeid)class executor: """Handles the insertion of values given dynamic data.""" def __init__(self, table, cols): self.cols = cols self.table = table if self.cols: self.sql = "insert into %s (%s) values (%s)" % (table, ",".join(self.cols), ",".join(("?",) * len(self.cols))) else: self.sql = "insert into %s values (%%s)" % (table) def execute(self, db, rows, bindings): assert rows and len(rows) > 0, "must have at least one row" if self.cols: sql = self.sql else: sql = self.sql % (",".join(("?",) * len(rows[0]))) db.raw(sql, rows, bindings)def connect(dbname): return dbexts(dbname)def lookup(dbname): return dbexts(jndiname=dbname)class dbexts: def __init__(self, dbname=None, cfg=None, formatter=console, autocommit=0, jndiname=None, out=None): self.verbose = 1 self.results = [] self.headers = [] self.autocommit = autocommit self.formatter = formatter self.out = out self.lastrowid = None self.updatecount = None if not jndiname: if cfg == None: fn = os.path.join(os.path.split(__file__)[0], "dbexts.ini") if not os.path.exists(fn): fn = os.path.join(os.environ['HOME'], ".dbexts") self.dbs = IniParser(fn) elif isinstance(cfg, IniParser): self.dbs = cfg else: self.dbs = IniParser(cfg) if dbname == None: dbname = self.dbs[("default", "name")] if __OS__ == 'java': from com.ziclix.python.sql import zxJDBC database = zxJDBC if not jndiname: t = self.dbs[("jdbc", dbname)] self.dburl, dbuser, dbpwd, jdbcdriver = t['url'], t['user'], t['pwd'], t['driver'] if t.has_key('datahandler'): self.datahandler = [] for dh in t['datahandler'].split(','): classname = dh.split(".")[-1] datahandlerclass = __import__(dh, globals(), locals(), classname) self.datahandler.append(datahandlerclass) keys = [x for x in t.keys() if x not in ['url', 'user', 'pwd', 'driver', 'datahandler', 'name']] props = {} for a in keys: props[a] = t[a] self.db = apply(database.connect, (self.dburl, dbuser, dbpwd, jdbcdriver), props) else: self.db = database.lookup(jndiname) self.db.autocommit = self.autocommit elif __OS__ == 'nt': for modname in ["mx.ODBC.Windows", "ODBC.Windows"]: try: database = __import__(modname, globals(), locals(), "Windows") break except: continue else: raise ImportError("unable to find appropriate mxODBC module") t = self.dbs[("odbc", dbname)] self.dburl, dbuser, dbpwd = t['url'], t['user'], t['pwd'] self.db = database.Connect(self.dburl, dbuser, dbpwd, clear_auto_commit=1) self.dbname = dbname for a in database.sqltype.keys(): setattr(self, database.sqltype[a], a) for a in dir(database): try: p = getattr(database, a) if issubclass(p, Exception): setattr(self, a, p) except: continue del database def __str__(self): return self.dburl def __repr__(self): return self.dburl def __getattr__(self, name): if "cfg" == name: return self.dbs.cfg raise AttributeError("'dbexts' object has no attribute '%s'" % (name)) def close(self): """ close the connection to the database """ self.db.close() def begin(self, style=None): """ reset ivars and return a new cursor, possibly binding an auxiliary datahandler """ self.headers, self.results = [], [] if style: c = self.db.cursor(style) else: c = self.db.cursor() if __OS__ == 'java': if hasattr(self, 'datahandler'): for dh in self.datahandler: c.datahandler = dh(c.datahandler) else: c = mxODBCProxy(c) return c def commit(self, cursor=None, close=1): """ commit the cursor and create the result set """ if cursor and cursor.description: self.headers = cursor.description self.results = cursor.fetchall() if hasattr(cursor, "nextset"): s = cursor.nextset() while s: self.results += cursor.fetchall() s = cursor.nextset() if hasattr(cursor, "lastrowid"): self.lastrowid = cursor.lastrowid if hasattr(cursor, "updatecount"): self.updatecount = cursor.updatecount if not self.autocommit or cursor is None: if not self.db.autocommit: self.db.commit() if cursor and close: cursor.close() def rollback(self): """ rollback the cursor """ self.db.rollback() def prepare(self, sql): """ prepare the sql statement """ cur = self.begin() try: return cur.prepare(sql) finally: self.commit(cur) def display(self): """ using the formatter, display the results """ if self.formatter and self.verbose > 0: res = self.results if res: print >> self.out, "" for a in self.formatter(res, map(lambda x: x[0], self.headers)): print >> self.out, a print >> self.out, "" def __execute__(self, sql, params=None, bindings=None, maxrows=None): """ the primary execution method """ cur = self.begin() try: if bindings: cur.execute(sql, params, bindings, maxrows=maxrows) elif params: cur.execute(sql, params, maxrows=maxrows) else: cur.execute(sql, maxrows=maxrows) finally: self.commit(cur, close=isinstance(sql, StringType)) def isql(self, sql, params=None, bindings=None, maxrows=None): """ execute and display the sql """ self.raw(sql, params, bindings, maxrows=maxrows) self.display() def raw(self, sql, params=None, bindings=None, delim=None, comments=comments, maxrows=None): """ execute the sql and return a tuple of (headers, results) """ if delim: headers = [] results = [] if type(sql) == type(StringType): if comments: sql = comments(sql) statements = filter(lambda x: len(x) > 0, map(lambda statement: statement.strip(), sql.split(delim))) else: statements = [sql] for a in statements: self.__execute__(a, params, bindings, maxrows=maxrows) headers.append(self.headers) results.append(self.results) self.headers = headers self.results = results else: self.__execute__(sql, params, bindings, maxrows=maxrows) return (self.headers, self.results) def callproc(self, procname, params=None, bindings=None, maxrows=None): """ execute a stored procedure """ cur = self.begin() try: cur.callproc(procname, params=params, bindings=bindings, maxrows=maxrows) finally: self.commit(cur) self.display() def pk(self, table, owner=None, schema=None):
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -