⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 db.py

📁 LINUX下
💻 PY
📖 第 1 页 / 共 2 页
字号:
class TransactionError(Exception): passclass transaction:    """    A context that can be used in conjunction with "with" statements    to implement SQL transactions. Starts a transaction on enter,    rolls it back if there's an error; otherwise it commits it at the    end.    """    def __enter__(self):        transact()    def __exit__(self, exctype, excvalue, traceback):        if exctype is not None:            rollback()        else:            commit()def transact():    """Start a transaction."""    if not web.ctx.db_transaction:        # commit everything up to now, so we don't rollback it later        if hasattr(web.ctx.db, 'commit'):             web.ctx.db.commit()    else:        db_cursor = web.ctx.db_cursor()        web.ctx.db_execute(db_cursor,             SQLQuery("SAVEPOINT webpy_sp_%s" % web.ctx.db_transaction))    web.ctx.db_transaction += 1def commit():    """Commits a transaction."""    web.ctx.db_transaction -= 1    if web.ctx.db_transaction < 0:         raise TransactionError, "not in a transaction"    if not web.ctx.db_transaction:        if hasattr(web.ctx.db, 'commit'):             web.ctx.db.commit()    else:        db_cursor = web.ctx.db_cursor()        web.ctx.db_execute(db_cursor,             SQLQuery("RELEASE SAVEPOINT webpy_sp_%s" % web.ctx.db_transaction))def rollback(care=True):    """Rolls back a transaction."""    web.ctx.db_transaction -= 1         if web.ctx.db_transaction < 0:        web.db_transaction = 0        if care:            raise TransactionError, "not in a transaction"        else:            return    if not web.ctx.db_transaction:        if hasattr(web.ctx.db, 'rollback'):             web.ctx.db.rollback()    else:        db_cursor = web.ctx.db_cursor()        web.ctx.db_execute(db_cursor,            SQLQuery("ROLLBACK TO SAVEPOINT webpy_sp_%s" % web.ctx.db_transaction),            dorollback=False)def query(sql_query, vars=None, processed=False, _test=False):    """    Execute SQL query `sql_query` using dictionary `vars` to interpolate it.    If `processed=True`, `vars` is a `reparam`-style list to use     instead of interpolating.            >>> query("SELECT * FROM foo", _test=True)        <sql: 'SELECT * FROM foo'>        >>> query("SELECT * FROM foo WHERE x = $x", vars=dict(x='f'), _test=True)        <sql: "SELECT * FROM foo WHERE x = 'f'">        >>> query("SELECT * FROM foo WHERE x = " + sqlquote('f'), _test=True)        <sql: "SELECT * FROM foo WHERE x = 'f'">    """    if vars is None: vars = {}        if not processed and not isinstance(sql_query, SQLQuery):        sql_query = reparam(sql_query, vars)        if _test: return sql_query        db_cursor = web.ctx.db_cursor()    web.ctx.db_execute(db_cursor, sql_query)        if db_cursor.description:        names = [x[0] for x in db_cursor.description]        def iterwrapper():            row = db_cursor.fetchone()            while row:                yield storage(dict(zip(names, row)))                row = db_cursor.fetchone()        out = iterbetter(iterwrapper())        if web.ctx.db_name != "sqlite":            out.__len__ = lambda: int(db_cursor.rowcount)        out.list = lambda: [storage(dict(zip(names, x))) \                           for x in db_cursor.fetchall()]    else:        out = db_cursor.rowcount        if not web.ctx.db_transaction: web.ctx.db.commit()    return outdef sqllist(lst):    """    Converts the arguments for use in something like a WHERE clause.            >>> sqllist(['a', 'b'])        'a, b'        >>> sqllist('a')        'a'            """    if isinstance(lst, str):         return lst    else:        return ', '.join(lst)def sqlors(left, lst):    """    `left is a SQL clause like `tablename.arg = `     and `lst` is a list of values. Returns a reparam-style    pair featuring the SQL that ORs together the clause    for each item in the lst.        >>> sqlors('foo = ', [])        <sql: '2+2=5'>        >>> sqlors('foo = ', [1])        <sql: 'foo = 1'>        >>> sqlors('foo = ', 1)        <sql: 'foo = 1'>        >>> sqlors('foo = ', [1,2,3])        <sql: '(foo = 1 OR foo = 2 OR foo = 3)'>    """    if isinstance(lst, iters):        lst = list(lst)        ln = len(lst)        if ln == 0:            return SQLQuery("2+2=5", [])        if ln == 1:             lst = lst[0]    if isinstance(lst, iters):        return SQLQuery('(' + left +                (' OR ' + left).join([aparam() for param in lst]) + ")", lst)    else:        return SQLQuery(left + aparam(), [lst])def sqlwhere(dictionary, grouping=' AND '):    """    Converts a `dictionary` to an SQL WHERE clause `SQLQuery`.            >>> sqlwhere({'cust_id': 2, 'order_id':3})        <sql: 'order_id = 3 AND cust_id = 2'>        >>> sqlwhere({'cust_id': 2, 'order_id':3}, grouping=', ')        <sql: 'order_id = 3, cust_id = 2'>    """        return SQLQuery(grouping.join([      '%s = %s' % (k, aparam()) for k in dictionary.keys()    ]), dictionary.values())def select(tables, vars=None, what='*', where=None, order=None, group=None,            limit=None, offset=None, _test=False):    """    Selects `what` from `tables` with clauses `where`, `order`,     `group`, `limit`, and `offset`. Uses vars to interpolate.     Otherwise, each clause can be a SQLQuery.            >>> select('foo', _test=True)        <sql: 'SELECT * FROM foo'>        >>> select(['foo', 'bar'], where="foo.bar_id = bar.id", limit=5, _test=True)        <sql: 'SELECT * FROM foo, bar WHERE foo.bar_id = bar.id LIMIT 5'>    """    if vars is None: vars = {}    qout = ""        def gen_clause(sql, val):        if isinstance(val, (int, long)):            if sql == 'WHERE':                nout = 'id = ' + sqlquote(val)            else:                nout = SQLQuery(val)        elif isinstance(val, (list, tuple)) and len(val) == 2:            nout = SQLQuery(val[0], val[1]) # backwards-compatibility        elif isinstance(val, SQLQuery):            nout = val        elif val:            nout = reparam(val, vars)        else:             return ""        out = ""        if qout: out += " "         out += sql + " " + nout        return out        if web.ctx.get('db_name') == "firebird":        for (sql, val) in (           ('FIRST', limit),           ('SKIP', offset)        ):            qout += gen_clause(sql, val)        if qout:            SELECT = 'SELECT ' + qout        else:            SELECT = 'SELECT'        qout = ""        sql_clauses = (          (SELECT, what),          ('FROM', sqllist(tables)),          ('WHERE', where),          ('GROUP BY', group),          ('ORDER BY', order)        )    else:        sql_clauses = (          ('SELECT', what),          ('FROM', sqllist(tables)),          ('WHERE', where),          ('GROUP BY', group),          ('ORDER BY', order),          ('LIMIT', limit),          ('OFFSET', offset)        )    for (sql, val) in sql_clauses:        qout += gen_clause(sql, val)    if _test: return qout    return query(qout, processed=True)def insert(tablename, seqname=None, _test=False, **values):    """    Inserts `values` into `tablename`. Returns current sequence ID.    Set `seqname` to the ID if it's not the default, or to `False`    if there isn't one.            >>> insert('foo', joe='bob', a=2, _test=True)        <sql: "INSERT INTO foo (a, joe) VALUES (2, 'bob')">    """        if values:        sql_query = SQLQuery("INSERT INTO %s (%s) VALUES (%s)" % (            tablename,            ", ".join(values.keys()),            ', '.join([aparam() for x in values])        ), values.values())    else:        sql_query = SQLQuery("INSERT INTO %s DEFAULT VALUES" % tablename)    if _test: return sql_query        db_cursor = web.ctx.db_cursor()    if seqname is False:         pass    elif web.ctx.db_name == "postgres":         if seqname is None:             seqname = tablename + "_id_seq"        sql_query += "; SELECT currval('%s')" % seqname    elif web.ctx.db_name == "mysql":        web.ctx.db_execute(db_cursor, sql_query)        sql_query = SQLQuery("SELECT last_insert_id()")    elif web.ctx.db_name == "sqlite":        web.ctx.db_execute(db_cursor, sql_query)        # not really the same...        sql_query = SQLQuery("SELECT last_insert_rowid()")        web.ctx.db_execute(db_cursor, sql_query)    try:         out = db_cursor.fetchone()[0]    except Exception:         out = None        if not web.ctx.db_transaction: web.ctx.db.commit()    return outdef update(tables, where, vars=None, _test=False, **values):    """    Update `tables` with clause `where` (interpolated using `vars`)    and setting `values`.            >>> joe = 'Joseph'        >>> update('foo', where='name = $joe', name='bob', age=5,        ...   vars=locals(), _test=True)        <sql: "UPDATE foo SET age = 5, name = 'bob' WHERE name = 'Joseph'">    """    if vars is None: vars = {}        if isinstance(where, (int, long)):        where = "id = " + sqlquote(where)    elif isinstance(where, (list, tuple)) and len(where) == 2:        where = SQLQuery(where[0], where[1])    elif isinstance(where, SQLQuery):        pass    else:        where = reparam(where, vars)        query = (      "UPDATE " + sqllist(tables) +       " SET " + sqlwhere(values, ', ') +       " WHERE " + where)        if _test: return query        db_cursor = web.ctx.db_cursor()    web.ctx.db_execute(db_cursor, query)        if not web.ctx.db_transaction: web.ctx.db.commit()    return db_cursor.rowcountdef delete(table, where=None, using=None, vars=None, _test=False):    """    Deletes from `table` with clauses `where` and `using`.            >>> name = 'Joe'        >>> delete('foo', where='name = $name', vars=locals(), _test=True)        <sql: "DELETE FROM foo WHERE name = 'Joe'">    """    if vars is None: vars = {}    if isinstance(where, (int, long)):        where = "id = " + sqlquote(where)    elif isinstance(where, (list, tuple)) and len(where) == 2:        where = SQLQuery(where[0], where[1])    elif isinstance(where, SQLQuery):        pass    elif where is None:        pass    else:        where = reparam(where, vars)    q = 'DELETE FROM ' + table    if where:        q += ' WHERE ' + where    if using and web.ctx.get('db_name') != "firebird":        q += ' USING ' + sqllist(using)        if _test: return q        db_cursor = web.ctx.db_cursor()    web.ctx.db_execute(db_cursor, q)    if not web.ctx.db_transaction: web.ctx.db.commit()    return db_cursor.rowcountif __name__ == "__main__":    import doctest    doctest.testmod()

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -