⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 db.py

📁 LINUX下
💻 PY
📖 第 1 页 / 共 2 页
字号:
"""Database API(part of web.py)"""# todo:#  - test with sqlite#  - a store function?__all__ = [  "UnknownParamstyle", "UnknownDB",  "sqllist", "sqlors", "aparam", "reparam",  "SQLQuery", "sqlquote",  "SQLLiteral", "sqlliteral",  "connect",   "TransactionError", "transaction", "transact", "commit", "rollback",  "query",  "select", "insert", "update", "delete"]import timetry: import datetimeexcept ImportError: datetime = Nonefrom utils import storage, iters, iterbetterimport webapi as webtry:    from DBUtils import PooledDB    web.config._hasPooling = Trueexcept ImportError:    web.config._hasPooling = Falseclass _ItplError(ValueError):    def __init__(self, text, pos):        ValueError.__init__(self)        self.text = text        self.pos = pos    def __str__(self):        return "unfinished expression in %s at char %d" % (            repr(self.text), self.pos)def _interpolate(format):    """    Takes a format string and returns a list of 2-tuples of the form    (boolean, string) where boolean says whether string should be evaled    or not.        from <http://lfw.org/python/Itpl.py> (public domain, Ka-Ping Yee)    """    from tokenize import tokenprog        def matchorfail(text, pos):        match = tokenprog.match(text, pos)        if match is None:            raise _ItplError(text, pos)        return match, match.end()        namechars = "abcdefghijklmnopqrstuvwxyz" \        "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_";    chunks = []    pos = 0    while 1:        dollar = format.find("$", pos)        if dollar < 0:             break        nextchar = format[dollar + 1]        if nextchar == "{":            chunks.append((0, format[pos:dollar]))            pos, level = dollar + 2, 1            while level:                match, pos = matchorfail(format, pos)                tstart, tend = match.regs[3]                token = format[tstart:tend]                if token == "{":                     level = level + 1                elif token == "}":                      level = level - 1            chunks.append((1, format[dollar + 2:pos - 1]))        elif nextchar in namechars:            chunks.append((0, format[pos:dollar]))            match, pos = matchorfail(format, dollar + 1)            while pos < len(format):                if format[pos] == "." and \                    pos + 1 < len(format) and format[pos + 1] in namechars:                    match, pos = matchorfail(format, pos + 1)                elif format[pos] in "([":                    pos, level = pos + 1, 1                    while level:                        match, pos = matchorfail(format, pos)                        tstart, tend = match.regs[3]                        token = format[tstart:tend]                        if token[0] in "([":                             level = level + 1                        elif token[0] in ")]":                              level = level - 1                else:                     break            chunks.append((1, format[dollar + 1:pos]))        else:            chunks.append((0, format[pos:dollar + 1]))            pos = dollar + 1 + (nextchar == "$")    if pos < len(format):         chunks.append((0, format[pos:]))    return chunksclass UnknownParamstyle(Exception):    """    raised for unsupported db paramstyles        (currently supported: qmark, numeric, format, pyformat)    """    passdef aparam():    """    Returns the appropriate string to be used to interpolate    a value with the current `web.ctx.db_module` or simply %s    if there isn't one.            >>> aparam()        '%s'    """    if hasattr(web.ctx, 'db_module'):        style = web.ctx.db_module.paramstyle    else:        style = 'pyformat'        if style == 'qmark':         return '?'    elif style == 'numeric':         return ':1'    elif style in ['format', 'pyformat']:         return '%s'    raise UnknownParamstyle, styledef reparam(string_, dictionary):    """    Takes a string and a dictionary and interpolates the string    using values from the dictionary. Returns an `SQLQuery` for the result.            >>> reparam("s = $s", dict(s=True))        <sql: "s = 't'">    """    vals = []    result = []    for live, chunk in _interpolate(string_):        if live:            result.append(aparam())            vals.append(eval(chunk, dictionary))        else: result.append(chunk)    return SQLQuery(''.join(result), vals)def sqlify(obj):    """    converts `obj` to its proper SQL version            >>> sqlify(None)        'NULL'        >>> sqlify(True)        "'t'"        >>> sqlify(3)        '3'    """        # because `1 == True and hash(1) == hash(True)`    # we have to do this the hard way...        if obj is None:        return 'NULL'    elif obj is True:        return "'t'"    elif obj is False:        return "'f'"    elif datetime and isinstance(obj, datetime.datetime):        return repr(obj.isoformat())    else:        return repr(obj)class SQLQuery:    """    You can pass this sort of thing as a clause in any db function.    Otherwise, you can pass a dictionary to the keyword argument `vars`    and the function will call reparam for you.    """    # tested in sqlquote's docstring    def __init__(self, s='', v=()):        self.s, self.v = str(s), tuple(v)        def __getitem__(self, key): # for backwards-compatibility        return [self.s, self.v][key]    def __add__(self, other):        if isinstance(other, str):            self.s += other        elif isinstance(other, SQLQuery):            self.s += other.s            self.v += other.v        return self    def __radd__(self, other):        if isinstance(other, str):            self.s = other + self.s            return self        else:            return NotImplemented        def __str__(self):        try:            return self.s % tuple([sqlify(x) for x in self.v])        except (ValueError, TypeError):            return self.s        def __repr__(self):        return '<sql: %s>' % repr(str(self))class SQLLiteral:    """    Protects a string from `sqlquote`.        >>> insert('foo', time=SQLLiteral('NOW()'), _test=True)        <sql: 'INSERT INTO foo (time) VALUES (NOW())'>    """    def __init__(self, v):         self.v = v    def __repr__(self):         return self.vsqlliteral = SQLLiteraldef sqlquote(a):    """    Ensures `a` is quoted properly for use in a SQL query.            >>> 'WHERE x = ' + sqlquote(True) + ' AND y = ' + sqlquote(3)        <sql: "WHERE x = 't' AND y = 3">    """    return SQLQuery(aparam(), (a,))class UnknownDB(Exception):    """raised for unsupported dbms"""    passdef connect(dbn, **keywords):    """    Connects to the specified database.         `dbn` currently must be "postgres", "mysql", or "sqlite".         If DBUtils is installed, connection pooling will be used.    """    if dbn == "postgres":         try:             import psycopg2 as db        except ImportError:             try:                 import psycopg as db            except ImportError:                 import pgdb as db        if 'pw' in keywords:            keywords['password'] = keywords['pw']            del keywords['pw']        keywords['database'] = keywords['db']        del keywords['db']    elif dbn == "mysql":        import MySQLdb as db        if 'pw' in keywords:            keywords['passwd'] = keywords['pw']            del keywords['pw']        db.paramstyle = 'pyformat' # it's both, like psycopg    elif dbn == "sqlite":        try:            import sqlite3 as db            db.paramstyle = 'qmark'        except ImportError:            try:                from pysqlite2 import dbapi2 as db                db.paramstyle = 'qmark'            except ImportError:                import sqlite as db        web.config._hasPooling = False        keywords['database'] = keywords['db']        del keywords['db']        elif dbn == "firebird":        import kinterbasdb as db        if 'pw' in keywords:            keywords['passwd'] = keywords['pw']            del keywords['pw']        keywords['database'] = keywords['db']        del keywords['db']    else:         raise UnknownDB, dbn    web.ctx.db_name = dbn    web.ctx.db_module = db    web.ctx.db_transaction = 0    web.ctx.db = keywords    def _PooledDB(db, keywords):        # In DBUtils 0.9.3, `dbapi` argument is renamed as `creator`        # see Bug#122112        if PooledDB.__version__.split('.') < '0.9.3'.split('.'):            return PooledDB.PooledDB(dbapi=db, **keywords)        else:            return PooledDB.PooledDB(creator=db, **keywords)        def db_cursor():        if isinstance(web.ctx.db, dict):            keywords = web.ctx.db            if web.config._hasPooling:                if 'db' not in globals():                     globals()['db'] = _PooledDB(db, keywords)                web.ctx.db = globals()['db'].connection()            else:                web.ctx.db = db.connect(**keywords)        return web.ctx.db.cursor()    web.ctx.db_cursor = db_cursor    web.ctx.dbq_count = 0        def db_execute(cur, sql_query, dorollback=True):        """executes an sql query"""        web.ctx.dbq_count += 1                try:            a = time.time()            out = cur.execute(sql_query.s, sql_query.v)            b = time.time()        except:            if web.config.get('db_printing'):                print >> web.debug, 'ERR:', str(sql_query)            if dorollback: rollback(care=False)            raise        if web.config.get('db_printing'):            print >> web.debug, '%s (%s): %s' % (round(b-a, 2), web.ctx.dbq_count, str(sql_query))        return out    web.ctx.db_execute = db_execute    return web.ctx.db

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -