📄 db.py
字号:
class TransactionError(Exception): passclass transaction: """ A context that can be used in conjunction with "with" statements to implement SQL transactions. Starts a transaction on enter, rolls it back if there's an error; otherwise it commits it at the end. """ def __enter__(self): transact() def __exit__(self, exctype, excvalue, traceback): if exctype is not None: rollback() else: commit()def transact(): """Start a transaction.""" if not web.ctx.db_transaction: # commit everything up to now, so we don't rollback it later if hasattr(web.ctx.db, 'commit'): web.ctx.db.commit() else: db_cursor = web.ctx.db_cursor() web.ctx.db_execute(db_cursor, SQLQuery("SAVEPOINT webpy_sp_%s" % web.ctx.db_transaction)) web.ctx.db_transaction += 1def commit(): """Commits a transaction.""" web.ctx.db_transaction -= 1 if web.ctx.db_transaction < 0: raise TransactionError, "not in a transaction" if not web.ctx.db_transaction: if hasattr(web.ctx.db, 'commit'): web.ctx.db.commit() else: db_cursor = web.ctx.db_cursor() web.ctx.db_execute(db_cursor, SQLQuery("RELEASE SAVEPOINT webpy_sp_%s" % web.ctx.db_transaction))def rollback(care=True): """Rolls back a transaction.""" web.ctx.db_transaction -= 1 if web.ctx.db_transaction < 0: web.db_transaction = 0 if care: raise TransactionError, "not in a transaction" else: return if not web.ctx.db_transaction: if hasattr(web.ctx.db, 'rollback'): web.ctx.db.rollback() else: db_cursor = web.ctx.db_cursor() web.ctx.db_execute(db_cursor, SQLQuery("ROLLBACK TO SAVEPOINT webpy_sp_%s" % web.ctx.db_transaction), dorollback=False)def query(sql_query, vars=None, processed=False, _test=False): """ Execute SQL query `sql_query` using dictionary `vars` to interpolate it. If `processed=True`, `vars` is a `reparam`-style list to use instead of interpolating. >>> query("SELECT * FROM foo", _test=True) <sql: 'SELECT * FROM foo'> >>> query("SELECT * FROM foo WHERE x = $x", vars=dict(x='f'), _test=True) <sql: "SELECT * FROM foo WHERE x = 'f'"> >>> query("SELECT * FROM foo WHERE x = " + sqlquote('f'), _test=True) <sql: "SELECT * FROM foo WHERE x = 'f'"> """ if vars is None: vars = {} if not processed and not isinstance(sql_query, SQLQuery): sql_query = reparam(sql_query, vars) if _test: return sql_query db_cursor = web.ctx.db_cursor() web.ctx.db_execute(db_cursor, sql_query) if db_cursor.description: names = [x[0] for x in db_cursor.description] def iterwrapper(): row = db_cursor.fetchone() while row: yield storage(dict(zip(names, row))) row = db_cursor.fetchone() out = iterbetter(iterwrapper()) if web.ctx.db_name != "sqlite": out.__len__ = lambda: int(db_cursor.rowcount) out.list = lambda: [storage(dict(zip(names, x))) \ for x in db_cursor.fetchall()] else: out = db_cursor.rowcount if not web.ctx.db_transaction: web.ctx.db.commit() return outdef sqllist(lst): """ Converts the arguments for use in something like a WHERE clause. >>> sqllist(['a', 'b']) 'a, b' >>> sqllist('a') 'a' """ if isinstance(lst, str): return lst else: return ', '.join(lst)def sqlors(left, lst): """ `left is a SQL clause like `tablename.arg = ` and `lst` is a list of values. Returns a reparam-style pair featuring the SQL that ORs together the clause for each item in the lst. >>> sqlors('foo = ', []) <sql: '2+2=5'> >>> sqlors('foo = ', [1]) <sql: 'foo = 1'> >>> sqlors('foo = ', 1) <sql: 'foo = 1'> >>> sqlors('foo = ', [1,2,3]) <sql: '(foo = 1 OR foo = 2 OR foo = 3)'> """ if isinstance(lst, iters): lst = list(lst) ln = len(lst) if ln == 0: return SQLQuery("2+2=5", []) if ln == 1: lst = lst[0] if isinstance(lst, iters): return SQLQuery('(' + left + (' OR ' + left).join([aparam() for param in lst]) + ")", lst) else: return SQLQuery(left + aparam(), [lst])def sqlwhere(dictionary, grouping=' AND '): """ Converts a `dictionary` to an SQL WHERE clause `SQLQuery`. >>> sqlwhere({'cust_id': 2, 'order_id':3}) <sql: 'order_id = 3 AND cust_id = 2'> >>> sqlwhere({'cust_id': 2, 'order_id':3}, grouping=', ') <sql: 'order_id = 3, cust_id = 2'> """ return SQLQuery(grouping.join([ '%s = %s' % (k, aparam()) for k in dictionary.keys() ]), dictionary.values())def select(tables, vars=None, what='*', where=None, order=None, group=None, limit=None, offset=None, _test=False): """ Selects `what` from `tables` with clauses `where`, `order`, `group`, `limit`, and `offset`. Uses vars to interpolate. Otherwise, each clause can be a SQLQuery. >>> select('foo', _test=True) <sql: 'SELECT * FROM foo'> >>> select(['foo', 'bar'], where="foo.bar_id = bar.id", limit=5, _test=True) <sql: 'SELECT * FROM foo, bar WHERE foo.bar_id = bar.id LIMIT 5'> """ if vars is None: vars = {} qout = "" def gen_clause(sql, val): if isinstance(val, (int, long)): if sql == 'WHERE': nout = 'id = ' + sqlquote(val) else: nout = SQLQuery(val) elif isinstance(val, (list, tuple)) and len(val) == 2: nout = SQLQuery(val[0], val[1]) # backwards-compatibility elif isinstance(val, SQLQuery): nout = val elif val: nout = reparam(val, vars) else: return "" out = "" if qout: out += " " out += sql + " " + nout return out if web.ctx.get('db_name') == "firebird": for (sql, val) in ( ('FIRST', limit), ('SKIP', offset) ): qout += gen_clause(sql, val) if qout: SELECT = 'SELECT ' + qout else: SELECT = 'SELECT' qout = "" sql_clauses = ( (SELECT, what), ('FROM', sqllist(tables)), ('WHERE', where), ('GROUP BY', group), ('ORDER BY', order) ) else: sql_clauses = ( ('SELECT', what), ('FROM', sqllist(tables)), ('WHERE', where), ('GROUP BY', group), ('ORDER BY', order), ('LIMIT', limit), ('OFFSET', offset) ) for (sql, val) in sql_clauses: qout += gen_clause(sql, val) if _test: return qout return query(qout, processed=True)def insert(tablename, seqname=None, _test=False, **values): """ Inserts `values` into `tablename`. Returns current sequence ID. Set `seqname` to the ID if it's not the default, or to `False` if there isn't one. >>> insert('foo', joe='bob', a=2, _test=True) <sql: "INSERT INTO foo (a, joe) VALUES (2, 'bob')"> """ if values: sql_query = SQLQuery("INSERT INTO %s (%s) VALUES (%s)" % ( tablename, ", ".join(values.keys()), ', '.join([aparam() for x in values]) ), values.values()) else: sql_query = SQLQuery("INSERT INTO %s DEFAULT VALUES" % tablename) if _test: return sql_query db_cursor = web.ctx.db_cursor() if seqname is False: pass elif web.ctx.db_name == "postgres": if seqname is None: seqname = tablename + "_id_seq" sql_query += "; SELECT currval('%s')" % seqname elif web.ctx.db_name == "mysql": web.ctx.db_execute(db_cursor, sql_query) sql_query = SQLQuery("SELECT last_insert_id()") elif web.ctx.db_name == "sqlite": web.ctx.db_execute(db_cursor, sql_query) # not really the same... sql_query = SQLQuery("SELECT last_insert_rowid()") web.ctx.db_execute(db_cursor, sql_query) try: out = db_cursor.fetchone()[0] except Exception: out = None if not web.ctx.db_transaction: web.ctx.db.commit() return outdef update(tables, where, vars=None, _test=False, **values): """ Update `tables` with clause `where` (interpolated using `vars`) and setting `values`. >>> joe = 'Joseph' >>> update('foo', where='name = $joe', name='bob', age=5, ... vars=locals(), _test=True) <sql: "UPDATE foo SET age = 5, name = 'bob' WHERE name = 'Joseph'"> """ if vars is None: vars = {} if isinstance(where, (int, long)): where = "id = " + sqlquote(where) elif isinstance(where, (list, tuple)) and len(where) == 2: where = SQLQuery(where[0], where[1]) elif isinstance(where, SQLQuery): pass else: where = reparam(where, vars) query = ( "UPDATE " + sqllist(tables) + " SET " + sqlwhere(values, ', ') + " WHERE " + where) if _test: return query db_cursor = web.ctx.db_cursor() web.ctx.db_execute(db_cursor, query) if not web.ctx.db_transaction: web.ctx.db.commit() return db_cursor.rowcountdef delete(table, where=None, using=None, vars=None, _test=False): """ Deletes from `table` with clauses `where` and `using`. >>> name = 'Joe' >>> delete('foo', where='name = $name', vars=locals(), _test=True) <sql: "DELETE FROM foo WHERE name = 'Joe'"> """ if vars is None: vars = {} if isinstance(where, (int, long)): where = "id = " + sqlquote(where) elif isinstance(where, (list, tuple)) and len(where) == 2: where = SQLQuery(where[0], where[1]) elif isinstance(where, SQLQuery): pass elif where is None: pass else: where = reparam(where, vars) q = 'DELETE FROM ' + table if where: q += ' WHERE ' + where if using and web.ctx.get('db_name') != "firebird": q += ' USING ' + sqllist(using) if _test: return q db_cursor = web.ctx.db_cursor() web.ctx.db_execute(db_cursor, q) if not web.ctx.db_transaction: web.ctx.db.commit() return db_cursor.rowcountif __name__ == "__main__": import doctest doctest.testmod()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -