📄 db.py
字号:
"""Database API(part of web.py)"""# todo:# - test with sqlite# - a store function?__all__ = [ "UnknownParamstyle", "UnknownDB", "sqllist", "sqlors", "aparam", "reparam", "SQLQuery", "sqlquote", "SQLLiteral", "sqlliteral", "connect", "TransactionError", "transaction", "transact", "commit", "rollback", "query", "select", "insert", "update", "delete"]import timetry: import datetimeexcept ImportError: datetime = Nonefrom utils import storage, iters, iterbetterimport webapi as webtry: from DBUtils import PooledDB web.config._hasPooling = Trueexcept ImportError: web.config._hasPooling = Falseclass _ItplError(ValueError): def __init__(self, text, pos): ValueError.__init__(self) self.text = text self.pos = pos def __str__(self): return "unfinished expression in %s at char %d" % ( repr(self.text), self.pos)def _interpolate(format): """ Takes a format string and returns a list of 2-tuples of the form (boolean, string) where boolean says whether string should be evaled or not. from <http://lfw.org/python/Itpl.py> (public domain, Ka-Ping Yee) """ from tokenize import tokenprog def matchorfail(text, pos): match = tokenprog.match(text, pos) if match is None: raise _ItplError(text, pos) return match, match.end() namechars = "abcdefghijklmnopqrstuvwxyz" \ "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_"; chunks = [] pos = 0 while 1: dollar = format.find("$", pos) if dollar < 0: break nextchar = format[dollar + 1] if nextchar == "{": chunks.append((0, format[pos:dollar])) pos, level = dollar + 2, 1 while level: match, pos = matchorfail(format, pos) tstart, tend = match.regs[3] token = format[tstart:tend] if token == "{": level = level + 1 elif token == "}": level = level - 1 chunks.append((1, format[dollar + 2:pos - 1])) elif nextchar in namechars: chunks.append((0, format[pos:dollar])) match, pos = matchorfail(format, dollar + 1) while pos < len(format): if format[pos] == "." and \ pos + 1 < len(format) and format[pos + 1] in namechars: match, pos = matchorfail(format, pos + 1) elif format[pos] in "([": pos, level = pos + 1, 1 while level: match, pos = matchorfail(format, pos) tstart, tend = match.regs[3] token = format[tstart:tend] if token[0] in "([": level = level + 1 elif token[0] in ")]": level = level - 1 else: break chunks.append((1, format[dollar + 1:pos])) else: chunks.append((0, format[pos:dollar + 1])) pos = dollar + 1 + (nextchar == "$") if pos < len(format): chunks.append((0, format[pos:])) return chunksclass UnknownParamstyle(Exception): """ raised for unsupported db paramstyles (currently supported: qmark, numeric, format, pyformat) """ passdef aparam(): """ Returns the appropriate string to be used to interpolate a value with the current `web.ctx.db_module` or simply %s if there isn't one. >>> aparam() '%s' """ if hasattr(web.ctx, 'db_module'): style = web.ctx.db_module.paramstyle else: style = 'pyformat' if style == 'qmark': return '?' elif style == 'numeric': return ':1' elif style in ['format', 'pyformat']: return '%s' raise UnknownParamstyle, styledef reparam(string_, dictionary): """ Takes a string and a dictionary and interpolates the string using values from the dictionary. Returns an `SQLQuery` for the result. >>> reparam("s = $s", dict(s=True)) <sql: "s = 't'"> """ vals = [] result = [] for live, chunk in _interpolate(string_): if live: result.append(aparam()) vals.append(eval(chunk, dictionary)) else: result.append(chunk) return SQLQuery(''.join(result), vals)def sqlify(obj): """ converts `obj` to its proper SQL version >>> sqlify(None) 'NULL' >>> sqlify(True) "'t'" >>> sqlify(3) '3' """ # because `1 == True and hash(1) == hash(True)` # we have to do this the hard way... if obj is None: return 'NULL' elif obj is True: return "'t'" elif obj is False: return "'f'" elif datetime and isinstance(obj, datetime.datetime): return repr(obj.isoformat()) else: return repr(obj)class SQLQuery: """ You can pass this sort of thing as a clause in any db function. Otherwise, you can pass a dictionary to the keyword argument `vars` and the function will call reparam for you. """ # tested in sqlquote's docstring def __init__(self, s='', v=()): self.s, self.v = str(s), tuple(v) def __getitem__(self, key): # for backwards-compatibility return [self.s, self.v][key] def __add__(self, other): if isinstance(other, str): self.s += other elif isinstance(other, SQLQuery): self.s += other.s self.v += other.v return self def __radd__(self, other): if isinstance(other, str): self.s = other + self.s return self else: return NotImplemented def __str__(self): try: return self.s % tuple([sqlify(x) for x in self.v]) except (ValueError, TypeError): return self.s def __repr__(self): return '<sql: %s>' % repr(str(self))class SQLLiteral: """ Protects a string from `sqlquote`. >>> insert('foo', time=SQLLiteral('NOW()'), _test=True) <sql: 'INSERT INTO foo (time) VALUES (NOW())'> """ def __init__(self, v): self.v = v def __repr__(self): return self.vsqlliteral = SQLLiteraldef sqlquote(a): """ Ensures `a` is quoted properly for use in a SQL query. >>> 'WHERE x = ' + sqlquote(True) + ' AND y = ' + sqlquote(3) <sql: "WHERE x = 't' AND y = 3"> """ return SQLQuery(aparam(), (a,))class UnknownDB(Exception): """raised for unsupported dbms""" passdef connect(dbn, **keywords): """ Connects to the specified database. `dbn` currently must be "postgres", "mysql", or "sqlite". If DBUtils is installed, connection pooling will be used. """ if dbn == "postgres": try: import psycopg2 as db except ImportError: try: import psycopg as db except ImportError: import pgdb as db if 'pw' in keywords: keywords['password'] = keywords['pw'] del keywords['pw'] keywords['database'] = keywords['db'] del keywords['db'] elif dbn == "mysql": import MySQLdb as db if 'pw' in keywords: keywords['passwd'] = keywords['pw'] del keywords['pw'] db.paramstyle = 'pyformat' # it's both, like psycopg elif dbn == "sqlite": try: import sqlite3 as db db.paramstyle = 'qmark' except ImportError: try: from pysqlite2 import dbapi2 as db db.paramstyle = 'qmark' except ImportError: import sqlite as db web.config._hasPooling = False keywords['database'] = keywords['db'] del keywords['db'] elif dbn == "firebird": import kinterbasdb as db if 'pw' in keywords: keywords['passwd'] = keywords['pw'] del keywords['pw'] keywords['database'] = keywords['db'] del keywords['db'] else: raise UnknownDB, dbn web.ctx.db_name = dbn web.ctx.db_module = db web.ctx.db_transaction = 0 web.ctx.db = keywords def _PooledDB(db, keywords): # In DBUtils 0.9.3, `dbapi` argument is renamed as `creator` # see Bug#122112 if PooledDB.__version__.split('.') < '0.9.3'.split('.'): return PooledDB.PooledDB(dbapi=db, **keywords) else: return PooledDB.PooledDB(creator=db, **keywords) def db_cursor(): if isinstance(web.ctx.db, dict): keywords = web.ctx.db if web.config._hasPooling: if 'db' not in globals(): globals()['db'] = _PooledDB(db, keywords) web.ctx.db = globals()['db'].connection() else: web.ctx.db = db.connect(**keywords) return web.ctx.db.cursor() web.ctx.db_cursor = db_cursor web.ctx.dbq_count = 0 def db_execute(cur, sql_query, dorollback=True): """executes an sql query""" web.ctx.dbq_count += 1 try: a = time.time() out = cur.execute(sql_query.s, sql_query.v) b = time.time() except: if web.config.get('db_printing'): print >> web.debug, 'ERR:', str(sql_query) if dorollback: rollback(care=False) raise if web.config.get('db_printing'): print >> web.debug, '%s (%s): %s' % (round(b-a, 2), web.ctx.dbq_count, str(sql_query)) return out web.ctx.db_execute = db_execute return web.ctx.db
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -