📄 alltest.py
字号:
"""test script for database adaptation layers
usage test.py <driver> <database> [params...]
Creates and deletes several tables
Developed from Aaron Watters' gadfly test program.
"""
def cpp(cursor, t = None):
"""Return the result of the last operation on a cursor,
retrieving unless it is provided as an argument."""
if not t:
try:
t = cursor.fetchall()
except: # XXX should specify exception type
return """\n###EXCEPTION ON FETCHING RESULTS###\nException data:\n%s\n(%s)"""\
% (sys.exc_type, sys.exc_value)
print
d = cursor.description
if not d:
return "\n#### NO RESULTS ###\n"
names = []
lengths = []
rules = []
col = 0
for dd in d: # scan column descriptions
name = dd[0]
l = dd[1]
if not l:
l = 12
l = max([l, len(dd[0])] + [len(str(t[i][col])) for i in range(len(t))])
if not name:
name = '?'*l
names.append(str(dd[0]))
lengths.append("%%%ss" % l)
rules.append("-"*l)
col += 1
format = " ".join(lengths)+"\n"
result = "\n" + format % tuple(names)
result += format % tuple(rules)
for row in t:
result += format % row
return result
def test(c, conn):
print "\n+++ STARTING TESTS+++\n"
print "\n+++PREPARING CLEANUPS+++\n"
cleanup_queries = [
c.DROP_INDEX("wname", "workt"),
c.DROP_VIEW("nondrinkers"),
c.DROP_TABLE("workt"),
c.DROP_TABLE("accesses"),
c.DROP_TABLE("empty"),
c.DROP_TABLE("frequents"),
c.DROP_TABLE("likes"),
c.DROP_TABLE("serves"),
]
print "\n+++RUNNING PRE-TEST CLEANUPS+++\n"
print "(expect failures if last test completed)"
for q in cleanup_queries:
try:
print q
c.execute(q)
except: # this is a bit dodgy: error classes might vary
pass
print "\n+++ PREPARING TABLE CREATES+++\n"
table_creates = [
c.CREATE_TABLE("frequents",
["drinker", c.VARCHAR(10)],
["bar", c.VARCHAR(10)],
["perweek", c.INTEGER]),
c.CREATE_TABLE("likes",
["drinker", c.VARCHAR(10)],
["beer", c.VARCHAR(12)],
["perday", c.INTEGER]),
c.CREATE_TABLE("serves",
["bar", c.VARCHAR(10)],
["beer", c.VARCHAR(12)],
["quantity", c.INTEGER]),
c.CREATE_VIEW("nondrinkers", ("d", "b"),
c.SELECT(["drinker", "bar"],
FROM="frequents",
WHERE="drinker not in (%s)" %
c.SELECT("drinker", FROM="likes"))),
]
print "\n+++RUNNING TABLE CREATES\n"
for x in table_creates:
print x
c.execute(x)
C = c.CREATE_TABLE("empty", ["nothing", c.VARCHAR(10)])
print C
c.execute(C)
C = c.CREATE_TABLE("workt",
["name", c.VARCHAR(10)],
["hours", c.INTEGER],
["rate", c.FLOAT])
print "\nad hoc:\n", C
c.execute(C)
C = c.CREATE_TABLE("accesses",
['page', c.VARCHAR(14)],
['hits', c.INTEGER],
['mnth', c.INTEGER])
print "\nad hoc:\n", C
c.execute(C)
print "\n+++INSERTS+++\n"
C = c.INSERT("workt", ("name", "hours", "rate"), VALUES=("?", "?", "?"))
print "\nad hoc:\n", C
D = [
("sam", 30, 40.2),
("norm", 45, 10.2),
("woody", 80, 5.4),
("diane", 3, 4.4),
("rebecca", 120, 12.9),
("cliff", 26, 200.00),
("carla", 9, 3.5),
]
for x in D: print x
# XXX should use executemany if possible here
# and report back to user which worked...
c.execute(C, D)
C = c.CREATE_INDEX("wname", "workt", "name", UNIQUE=1)
print "\nad hoc:\n", C
c.execute(C)
print "Trying bad insert into unique field"
C = c.INSERT("workt", ("name", "hours", "rate"), VALUES=("'sam'", '0', '0'))
print "\nad hoc:\n", C
import sys
try:
c.execute(C)
except:
print "OK. Exception as expected %s(%s)" %(sys.exc_type, sys.exc_value)
else:
print "Error! unique index permits nonunique field"
C = c.SELECT("*", FROM="workt")
print "\nad hoc:\n", C
c.execute(C)
print cpp(c)
print "deleting jo"; print
C = c.DELETE("workt", WHERE="name='jo'")
print "\nad hoc:\n", C
c.execute(C)
print "\n+++DYNAMIC INSERTS+++\n"
fdata = """\
adam lolas 1
woody cheers 5
sam cheers 5
norm cheers 3
wilt joes 2
norm joes 1
lola lolas 6
norm lolas 2
woody lolas 1
pierre frankies 0"""
sdata = """\
cheers bud 500
cheers samaddams 255
joes bud 217
joes samaddams 13
joes mickies 2222
lolas mickies 1515
lolas pabst 333
winkos rollingrock 432
frankies snafu 5"""
ldata = """\
adam bud 2
wilt rollingrock 1
sam bud 2
norm rollingrock 3
norm bud 2
nan sierranevada 1
woody pabst 2
lola mickies 5"""
dpairs = [
("frequents", fdata),
("serves", sdata),
("likes", ldata),
]
C = c.INSERT("accesses", ("page", "mnth", "hits"), VALUES=("?", "?", "?"))
print "\nad hoc:\n", C
D = [
("index.html", 1, 2100),
("index.html", 2, 3300),
("index.html", 3, 1950),
("products.html", 1, 15),
("products.html", 2, 650),
("products.html", 3, 98),
("people.html", 1, 439),
("people.html", 2, 12),
("people.html", 3, 665),
]
for x in D: print x
# XXX executemany() should be an option here too
c.execute(C, D)
for (table, stuff) in dpairs:
ins = c.INSERT(table, VALUES=("?", "?", "?"))
print "\nad hoc:\n", ins
if table!="frequents":
for parameters in dataseq(stuff):
print "singleinsert", table, parameters
c.execute(ins, parameters)
else:
print
print "multiinsert", table
parameters = dataseq(stuff)
for p in parameters:
print p
print "multiinsert..."
# XXX this should be executemany as well ...
c.execute(ins, parameters)
print "\n+++PREPARING INDICES+++\n"
indices = [
c.CREATE_INDEX("fd", "frequents", "drinker"),
c.CREATE_INDEX("sbb", "serves", ("beer", "bar")),
c.CREATE_INDEX("lb", "likes", "beer"),
c.CREATE_INDEX("fb", "frequents", "bar"),
]
print "\n+++INDICES+++\n"
for ci in indices:
print ci
c.execute(ci)
print "\n+++PREPARING WORK QUERIES+++\n"
workqueries = [
c.SELECT("name", FROM="workt"),
c.SELECT("*", FROM="workt"),
c.SELECT("*", FROM="workt",
WHERE="name='carla'"),
c.SELECT(["name", "' ain''t worth '", "rate"],
FROM="workt",
WHERE="name='carla'"),
c.SELECT(["name", "hours"], FROM="workt"),
c.SELECT(["name", ["hours*rate", "pay"]],
FROM="workt",
REST="order by name"),
c.SELECT(["name", "rate"],
FROM="workt",
WHERE="rate>=20 and rate<=100"),
c.SELECT(["name", "rate"],
FROM="workt",
WHERE="rate between 20 and 100"),
c.SELECT(["name", "rate"],
FROM="workt",
WHERE="rate not between 20 and 100"),
c.SELECT(["name", "rate", "hours", ["hours*rate", "pay"]],
FROM="workt"),
c.SELECT(["name", "rate", "hours", ["hours*rate", "pay"]],
FROM="workt",
WHERE="hours*rate>500 and (rate<100 or hours>5)"),
c.SELECT(["name", "rate", "hours", ["hours*rate", "pay"]],
FROM="workt",
WHERE="hours*rate>500 and rate<100 or hours>5"),
c.SELECT(["avg(rate)", "min(hours)", "max(hours)", ["sum(hours*rate)", "expenses"]],
FROM="workt"),
c.SELECT("*", FROM="accesses"),
c.SELECT(["mnth", ["sum(hits)", "totalhits"]],
FROM="accesses",
WHERE="mnth<>1",
REST="""group by mnth
order by 2"""),
c.SELECT(["mnth", ["sum(hits)", "totalhits"]],
FROM="accesses",
REST="""group by mnth
order by 2 desc"""),
c.SELECT(["mnth", ["sum(hits)", "totalhits"]],
FROM="accesses",
REST="""group by mnth
having sum(hits)<3000
order by 2 desc"""),
c.SELECT(["count(distinct mnth)", "count(distinct page)"],
FROM="accesses"),
c.SELECT(["mnth", "hits", "page"],
FROM="accesses",
REST="order by mnth, hits desc"),
]
print "\n+++WORK QUERIES+++\n"
for x in workqueries:
print x
c.execute(x)
print cpp(c)
print "\n+++WORK REPORT+++\n"
statement = c.SELECT(["name", "hours"],
FROM="workt")
print "\nad hoc:\n", statement
c.execute(statement)
print "Hours worked this week"
print
for (name, hours) in c.fetchall():
print "worker", name, "worked", hours, "hours"
print
print "end of work report"
print "\n+++PREPARING QUERIES+++\n"
queries = [
c.SELECT("*", FROM="nondrinkers"),
c.UNION(c.SELECT([["drinker", "x"]], FROM="likes"),
c.SELECT([["beer", "x"]], FROM="serves"),
c.SELECT([["drinker","x"]], FROM="frequents")),
c.SELECT(("f.drinker", "s.bar", "l.beer"),
FROM=(["frequents", "f"], ["serves", "s"], ["likes", "l"]),
WHERE="f.drinker=l.drinker and s.beer=l.beer and s.bar=f.bar"),
c.SELECT("*", FROM="likes", WHERE="beer in ('bud', 'pabst')"),
c.SELECT(["l.beer", "l.drinker", "count(distinct s.bar)"],
FROM=(["likes", "l"], ["serves", "s"]),
WHERE="l.beer=s.beer",
REST="""group by l.beer, l.drinker
order by 3 desc"""),
c.UNION(c.SELECT(["l.beer", "l.drinker", ["count(distinct s.bar)", "nbars"]],
FROM=[["likes", "l"], ["serves", "s"]],
WHERE="l.beer=s.beer",
REST="group by l.beer, l.drinker"),
c.SELECT("distinct beer, drinker, 0 as nbars",
FROM="likes",
WHERE="beer not in (%s)" %
c.SELECT("beer",
FROM="serves")) #,
#REST="order by 3 desc"),
),
c.SELECT("avg(perweek)", FROM="frequents"),
c.SELECT("*",
FROM="frequents",
WHERE="perweek <= (%s)" %
c.SELECT("avg(perweek)",
FROM="frequents")),
c.SELECT("*", FROM="serves"),
c.SELECT("bar, avg(quantity)",
FROM="serves",
REST="group by bar"),
c.SELECT("*", FROM="serves s1", WHERE="quantity <= (%s)" %
c.SELECT("avg(quantity)", FROM="serves s2", WHERE="s1.bar=s2.bar")),
c.SELECT("*", FROM="frequents", WHERE="perweek > (%s)" %
c.SELECT("avg(perweek)", FROM="frequents")),
c.SELECT("*", FROM=["frequents", "f1"],
WHERE="perweek > (%s)" %
c.SELECT("avg(perweek)", FROM=["frequents", "f2"],
WHERE="f1.drinker = f2.drinker")),
c.SELECT("*", FROM="frequents", WHERE="perweek < any (%s)" %
c.SELECT("perweek", FROM="frequents")),
c.SELECT("*", FROM="frequents", WHERE="perweek >= all (%s)" %
c.SELECT("perweek", FROM="frequents")),
c.SELECT("*", FROM="frequents", WHERE="perweek <= all (%s)" %
c.SELECT("perweek", FROM="frequents")),
c.SELECT("*", FROM=["frequents", "f1"],
WHERE="perweek < any (%s)" %
c.SELECT("perweek", FROM=["frequents ", "f2"],
WHERE="f1.drinker = f2.drinker")),
c.SELECT("*", FROM=["frequents", "f1"],
WHERE="perweek = all (%s)" %
c.SELECT("perweek",
FROM=["frequents", "f2"],
WHERE="f1.drinker = f2.drinker")),
c.SELECT("*", FROM=["frequents", "f1"],
WHERE="""perweek <> all
(%s)""" %
c.SELECT("perweek", FROM=["frequents", "f2"],
WHERE="f1.drinker <> f2.drinker")),
c.SELECT("beer", FROM="serves", WHERE="beer = any (%s)" %
c.SELECT("beer", FROM="likes")),
c.SELECT("beer",
FROM="serves",
WHERE="beer <> all (%s)" %
c.SELECT("beer", FROM="likes")),
c.SELECT("beer", FROM="serves", WHERE="beer in (%s)" %
c.SELECT("beer", FROM="likes")),
c.SELECT("beer", FROM="serves", WHERE="beer not in (%s)" %
c.SELECT("beer", FROM="likes")),
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -