⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 alltest.py

📁 python web programming 部分
💻 PY
📖 第 1 页 / 共 2 页
字号:
"""test script for database adaptation layers

usage test.py <driver> <database> [params...]

Creates and deletes several tables

Developed from Aaron Watters' gadfly test program.
"""

def cpp(cursor, t = None):
    """Return the result of the last operation on a cursor,
    retrieving unless it is provided as an argument."""
    if not t:
        try:
            t = cursor.fetchall()
        except: # XXX should specify exception type
            return """\n###EXCEPTION ON FETCHING RESULTS###\nException data:\n%s\n(%s)"""\
                    % (sys.exc_type, sys.exc_value)
    print
    d = cursor.description
    if not d:
        return "\n#### NO RESULTS ###\n"
    names = []
    lengths = []
    rules = []
    col = 0
    for dd in d: # scan column descriptions
        name = dd[0]
        l = dd[1]
        if not l:
            l = 12
        l = max([l, len(dd[0])] + [len(str(t[i][col])) for i in range(len(t))])
        if not name:
            name = '?'*l
        names.append(str(dd[0]))
        lengths.append("%%%ss" % l)
        rules.append("-"*l)
        col += 1
    format = " ".join(lengths)+"\n"
    result = "\n" + format % tuple(names)
    result += format % tuple(rules)
    for row in t:
        result += format % row
    return result

def test(c, conn):
    print "\n+++ STARTING TESTS+++\n"

    print "\n+++PREPARING CLEANUPS+++\n"
    cleanup_queries = [
    c.DROP_INDEX("wname", "workt"),
    c.DROP_VIEW("nondrinkers"),
    c.DROP_TABLE("workt"),
    c.DROP_TABLE("accesses"),
    c.DROP_TABLE("empty"),
    c.DROP_TABLE("frequents"),
    c.DROP_TABLE("likes"),
    c.DROP_TABLE("serves"),
    ]

    print "\n+++RUNNING PRE-TEST CLEANUPS+++\n"
    print "(expect failures if last test completed)"
    for q in cleanup_queries:
        try:
            print q
            c.execute(q)
        except: # this is a bit dodgy: error classes might vary
            pass

    print "\n+++ PREPARING TABLE CREATES+++\n"
    table_creates = [
    c.CREATE_TABLE("frequents",
            ["drinker", c.VARCHAR(10)],
            ["bar", c.VARCHAR(10)],
            ["perweek", c.INTEGER]),
    c.CREATE_TABLE("likes",
            ["drinker", c.VARCHAR(10)],
            ["beer", c.VARCHAR(12)],
            ["perday", c.INTEGER]),
    c.CREATE_TABLE("serves",
            ["bar", c.VARCHAR(10)],
            ["beer", c.VARCHAR(12)],
            ["quantity", c.INTEGER]),
    c.CREATE_VIEW("nondrinkers", ("d", "b"),
            c.SELECT(["drinker", "bar"],
                FROM="frequents",
                WHERE="drinker not in (%s)" %
                    c.SELECT("drinker", FROM="likes"))),
     ]

    print "\n+++RUNNING TABLE CREATES\n"
    for x in table_creates:
        print x
        c.execute(x)
    C = c.CREATE_TABLE("empty", ["nothing", c.VARCHAR(10)])
    print C
    c.execute(C)
    C = c.CREATE_TABLE("workt",
       ["name", c.VARCHAR(10)],
       ["hours", c.INTEGER],
       ["rate", c.FLOAT])
    print "\nad hoc:\n", C
    c.execute(C)
    C = c.CREATE_TABLE("accesses",
       ['page', c.VARCHAR(14)],
       ['hits', c.INTEGER],
       ['mnth', c.INTEGER])

    print "\nad hoc:\n", C
    c.execute(C)

    print "\n+++INSERTS+++\n"
    C = c.INSERT("workt", ("name", "hours", "rate"), VALUES=("?", "?", "?"))
    print "\nad hoc:\n", C
    D = [
         ("sam", 30, 40.2),
         ("norm", 45, 10.2),
         ("woody", 80, 5.4),
         ("diane", 3, 4.4),
         ("rebecca", 120, 12.9),
         ("cliff", 26, 200.00),
         ("carla", 9, 3.5),
         ]
    for x in D: print x
    # XXX should use executemany if possible here
    # and report back to user which worked...
    c.execute(C, D)

    C = c.CREATE_INDEX("wname", "workt", "name", UNIQUE=1)
    print "\nad hoc:\n", C
    c.execute(C)
    print "Trying bad insert into unique field"
    C = c.INSERT("workt", ("name", "hours", "rate"), VALUES=("'sam'", '0', '0'))
    print "\nad hoc:\n", C
    import sys
    try:
        c.execute(C)
    except:
        print "OK. Exception as expected %s(%s)" %(sys.exc_type, sys.exc_value)
    else:
        print "Error! unique index permits nonunique field"

    C = c.SELECT("*", FROM="workt")
    print "\nad hoc:\n", C
    c.execute(C)
    print cpp(c)
    print "deleting jo"; print
    C = c.DELETE("workt", WHERE="name='jo'")
    print "\nad hoc:\n", C
    c.execute(C)

    print "\n+++DYNAMIC INSERTS+++\n"
    fdata = """\
      adam	lolas		1
      woody	cheers	5
      sam		cheers	5
      norm	cheers	3
      wilt	joes		2
      norm	joes		1
      lola	lolas		6
      norm	lolas		2
      woody	lolas		1
      pierre	frankies	0"""
  
    sdata = """\
      cheers	bud		500
      cheers	samaddams	255
      joes	bud		217
      joes	samaddams	13
      joes	mickies	2222
      lolas	mickies	1515
      lolas	pabst		333
      winkos	rollingrock	432
      frankies	snafu       5"""
  
    ldata = """\
      adam	bud			2
      wilt	rollingrock		1
      sam		bud			2
      norm	rollingrock		3
      norm	bud			2
      nan		sierranevada	1	
      woody	pabst			2
      lola	mickies		5"""
  
    dpairs = [
       ("frequents", fdata),
       ("serves", sdata),
       ("likes", ldata),
      ]
  
    C = c.INSERT("accesses", ("page", "mnth", "hits"), VALUES=("?", "?", "?"))
    print "\nad hoc:\n", C
    D = [
         ("index.html", 1, 2100),
         ("index.html", 2, 3300),
         ("index.html", 3, 1950),    
         ("products.html", 1, 15),   
         ("products.html", 2, 650),   
         ("products.html", 3, 98),   
         ("people.html", 1, 439),
         ("people.html", 2, 12),
         ("people.html", 3, 665),
         ]
    for x in D: print x
    # XXX executemany() should be an option here too
    c.execute(C, D)
    for (table, stuff) in dpairs:
        ins = c.INSERT(table, VALUES=("?", "?", "?"))
        print "\nad hoc:\n", ins
        if table!="frequents":
           for parameters in dataseq(stuff):
               print "singleinsert", table, parameters
               c.execute(ins, parameters)
        else:
           print
           print "multiinsert", table
           parameters = dataseq(stuff)
           for p in parameters:
               print p
           print "multiinsert..."
           # XXX this should be executemany as well ...
           c.execute(ins, parameters)

    print "\n+++PREPARING INDICES+++\n"
    indices = [
    c.CREATE_INDEX("fd", "frequents", "drinker"),
    c.CREATE_INDEX("sbb", "serves", ("beer", "bar")),
    c.CREATE_INDEX("lb", "likes", "beer"),
    c.CREATE_INDEX("fb", "frequents", "bar"),
    ]

    print "\n+++INDICES+++\n"
    for ci in indices:
        print ci
        c.execute(ci)

    print "\n+++PREPARING WORK QUERIES+++\n"
    workqueries = [
    c.SELECT("name", FROM="workt"),
    c.SELECT("*", FROM="workt"),
    c.SELECT("*", FROM="workt",
            WHERE="name='carla'"),
    c.SELECT(["name", "' ain''t worth '", "rate"],
            FROM="workt",
            WHERE="name='carla'"),
    c.SELECT(["name", "hours"], FROM="workt"),
    c.SELECT(["name", ["hours*rate", "pay"]],
            FROM="workt",
            REST="order by name"),
    c.SELECT(["name", "rate"],
            FROM="workt",
            WHERE="rate>=20 and rate<=100"),
    c.SELECT(["name", "rate"],
            FROM="workt",
            WHERE="rate between 20 and 100"),
    c.SELECT(["name", "rate"],
            FROM="workt",
            WHERE="rate not between 20 and 100"),
    c.SELECT(["name", "rate", "hours", ["hours*rate", "pay"]],
            FROM="workt"),
    c.SELECT(["name", "rate", "hours", ["hours*rate", "pay"]],
            FROM="workt",
            WHERE="hours*rate>500 and (rate<100 or hours>5)"),
    c.SELECT(["name", "rate", "hours", ["hours*rate", "pay"]],
            FROM="workt",
            WHERE="hours*rate>500 and rate<100 or hours>5"),
    c.SELECT(["avg(rate)", "min(hours)", "max(hours)", ["sum(hours*rate)", "expenses"]],
            FROM="workt"),
    c.SELECT("*", FROM="accesses"),
    c.SELECT(["mnth", ["sum(hits)", "totalhits"]],
            FROM="accesses",
            WHERE="mnth<>1",
            REST="""group by mnth
               order by 2"""),
    c.SELECT(["mnth", ["sum(hits)", "totalhits"]],
            FROM="accesses",
            REST="""group by mnth
                order by 2 desc"""),
    c.SELECT(["mnth", ["sum(hits)", "totalhits"]],
            FROM="accesses",
            REST="""group by mnth
                having sum(hits)<3000
                order by 2 desc"""),
    c.SELECT(["count(distinct mnth)", "count(distinct page)"],
            FROM="accesses"),
    c.SELECT(["mnth", "hits", "page"],
            FROM="accesses",
            REST="order by mnth, hits desc"),
    ]
    
    print "\n+++WORK QUERIES+++\n"
    for x in workqueries:
        print x
        c.execute(x)
        print cpp(c)
        
    print "\n+++WORK REPORT+++\n"
    statement = c.SELECT(["name", "hours"],
                   FROM="workt")
    print "\nad hoc:\n", statement
    c.execute(statement)
    print "Hours worked this week"
    print
    for (name, hours) in c.fetchall():
        print "worker", name, "worked", hours, "hours"
    print
    print "end of work report"

    print "\n+++PREPARING QUERIES+++\n"
    queries = [
    c.SELECT("*", FROM="nondrinkers"),
    c.UNION(c.SELECT([["drinker", "x"]], FROM="likes"),
            c.SELECT([["beer", "x"]], FROM="serves"),
            c.SELECT([["drinker","x"]], FROM="frequents")),
    c.SELECT(("f.drinker", "s.bar", "l.beer"),
            FROM=(["frequents", "f"], ["serves", "s"], ["likes", "l"]),
            WHERE="f.drinker=l.drinker and s.beer=l.beer and s.bar=f.bar"),
    c.SELECT("*", FROM="likes", WHERE="beer in ('bud', 'pabst')"),
    c.SELECT(["l.beer", "l.drinker", "count(distinct s.bar)"],
            FROM=(["likes", "l"], ["serves", "s"]),
            WHERE="l.beer=s.beer",
            REST="""group by l.beer, l.drinker
            order by 3 desc"""),
    c.UNION(c.SELECT(["l.beer", "l.drinker", ["count(distinct s.bar)", "nbars"]],
            FROM=[["likes", "l"], ["serves", "s"]],
            WHERE="l.beer=s.beer",
            REST="group by l.beer, l.drinker"),
            c.SELECT("distinct beer, drinker, 0 as nbars",
            FROM="likes",
            WHERE="beer not in (%s)" %
                c.SELECT("beer",
                    FROM="serves")) #,
                    #REST="order by 3 desc"),
                    ),
    c.SELECT("avg(perweek)", FROM="frequents"),
    c.SELECT("*",
            FROM="frequents",
            WHERE="perweek <= (%s)" %
                c.SELECT("avg(perweek)",
                    FROM="frequents")),
    c.SELECT("*", FROM="serves"),
    c.SELECT("bar, avg(quantity)",
        FROM="serves",
        REST="group by bar"),
    c.SELECT("*", FROM="serves s1", WHERE="quantity <= (%s)" %
                c.SELECT("avg(quantity)", FROM="serves s2", WHERE="s1.bar=s2.bar")),
    c.SELECT("*", FROM="frequents", WHERE="perweek > (%s)" %
                c.SELECT("avg(perweek)", FROM="frequents")),
    c.SELECT("*", FROM=["frequents", "f1"],
            WHERE="perweek > (%s)" %
            c.SELECT("avg(perweek)", FROM=["frequents", "f2"],
                    WHERE="f1.drinker = f2.drinker")),
    c.SELECT("*", FROM="frequents", WHERE="perweek < any (%s)" %
                c.SELECT("perweek", FROM="frequents")),
    c.SELECT("*", FROM="frequents", WHERE="perweek >= all (%s)" %
                c.SELECT("perweek", FROM="frequents")),
    c.SELECT("*", FROM="frequents", WHERE="perweek <= all (%s)" %
                c.SELECT("perweek", FROM="frequents")),
    c.SELECT("*", FROM=["frequents", "f1"],
            WHERE="perweek < any (%s)" %
            c.SELECT("perweek", FROM=["frequents ", "f2"],
                    WHERE="f1.drinker = f2.drinker")),
    c.SELECT("*", FROM=["frequents", "f1"],
            WHERE="perweek = all (%s)" %
                c.SELECT("perweek",
                    FROM=["frequents", "f2"],
                    WHERE="f1.drinker = f2.drinker")),
    c.SELECT("*", FROM=["frequents", "f1"],
            WHERE="""perweek <> all
                (%s)""" %
                c.SELECT("perweek", FROM=["frequents", "f2"],
                    WHERE="f1.drinker <> f2.drinker")),
    c.SELECT("beer", FROM="serves", WHERE="beer = any (%s)" %
        c.SELECT("beer", FROM="likes")),
    c.SELECT("beer",
        FROM="serves",
        WHERE="beer <> all (%s)" %
            c.SELECT("beer", FROM="likes")),
    c.SELECT("beer", FROM="serves", WHERE="beer in (%s)" %
        c.SELECT("beer", FROM="likes")),
    c.SELECT("beer", FROM="serves", WHERE="beer not in (%s)" %
        c.SELECT("beer", FROM="likes")),

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -