📄 dbtest.py
字号:
"""test script for database connectivity
usage dbtest.py <type> <dsn>
"""
def cpp(cursor, t = None):
print
d = cursor.description
if not d:
return "#### NO RESULTS ###\n"
names = []
lengths = []
rules = []
for dd in d:
l = dd[1]
if not l:
l = 12
l = max(l, len(dd[0]))
names.append(dd[0])
lengths.append("%%%ss" % l)
rules.append("-"*l)
format = " ".join(lengths)+"\n"
result = format % tuple(names)
result += format % tuple(rules)
if not t:
t = cursor.fetchall()
for row in t:
result += format % row
return result
def test():
import sys
print "testing"
connect = dbconnect()
curs = connect.cursor()
for q in cleanup_queries:
try:
print "Cleanups:", q
sys.stdout.flush()
curs.execute(q)
sys.stdout.flush()
except: # this is a bit dodgy: error classes might vary
pass
print
print "TABLE CREATES"
for x in table_creates:
print x
curs.execute(x)
curs.execute("create table empty (nothing varchar(1))")
C = """
CREATE TABLE workt (
name VARCHAR(10),
hours INTEGER,
rate FLOAT)
"""
print C
curs.execute(C)
print
C = """
CREATE TABLE pagerefs (
page VARCHAR(20),
hits INTEGER,
mnth INTEGER)
"""
print C
curs.execute(C)
print
print "INSERTS"
C = """
INSERT INTO workt(name, hours, rate) VALUES (?, ?, ?)
"""
D = [
("sam", 30, 40.2),
("norm", 45, 10.2),
("woody", 80, 5.4),
("diane", 3, 4.4),
("rebecca", 120, 12.9),
("cliff", 26, 200.00),
("carla", 9, 3.5),
]
for x in D:
print x
curs.execute(C, x)
C = "create unique index wname on workt(name)"
print "Unique index:", C
curs.execute(C)
print "trying bad insert into unique field"
C = "insert into workt(name, hours, rate) values ('sam', 0, 0)"
try:
curs.execute(C)
except:
print "exception as expected %s(%s)" %(sys.exc_type, sys.exc_value)
else:
raise "stop!", "unique index permits nonunique field"
C = """
INSERT INTO pagerefs(page, mnth, hits) VALUES (?, ?, ?)
"""
D = [
("index.html", 1, 2100),
("index.html", 2, 3300),
("index.html", 3, 1950),
("products.html", 1, 15),
("products.html", 2, 650),
("products.html", 3, 98),
("people.html", 1, 439),
("people.html", 2, 12),
("people.html", 3, 665),
]
for x in D: print x
curs.execute(C, D)
for (table, stuff) in dpairs:
ins = "insert into %s values (?, ?, ?)" % table
if table!="frequents":
for parameters in dataseq(stuff):
print "singleinsert", table, parameters
curs.execute(ins, parameters)
else:
print
print "multiinsert", table
parameters = dataseq(stuff)
for p in parameters:
print p
print "multiinsert..."
curs.execute(ins, parameters)
print;print
print
print "INDICES"
for ci in indices:
print ci
curs.execute(ci)
print
print "QUERIES"
for x in workqueries:
print;print
print x
t = curs.execute(x)
print cpp(curs, t)
statement = """select name, hours
from workt"""
curs.execute(statement)
print "Hours worked this week"
print
for (name,hours) in curs.fetchall():
print "worker", name, "worked", hours, "hours"
print
print "end of work report"
#return
for x in queries:
print; print
print x
curs.execute(x)
#for x in curs.commands:
# print x
all = curs.fetchall()
if not all:
print "empty!"
else:
print cpp(curs, all)
#return
print
print "DYNAMIC QUERIES"
for (x,y) in dynamic_queries:
print; print
print x
print "dynamic=", y
curs.execute(x, y)
#for x in curs.commands:
# print x
all = curs.fetchall()
if not all:
print "empty!"
else:
for t in all:
print t
print "repeat test"
from time import time
for x in repeats:
print "repeating", x
now = time()
curs.execute(x)
print time()-now, "first time"
now = time()
curs.execute(x)
print time()-now, "second time"
now = time()
curs.execute(x)
print time()-now, "third time"
print "*** committing work"
connect.commit()
connect.close()
print; print
print "*" * 30
print "*** reconnect test"
connect = reconnect()
cursor = connect.cursor()
for s in updates + cleanup_queries:
print; print
print s
cursor.execute(s)
print cpp(cursor)
connect.commit()
connect.close()
return connect
if __name__=="__main__":
import sys
argv = sys.argv
if len(argv)<2:
sys.exit("""
USAGE: python %s <dbtype> <db_parameters>
dbtype: gadfly | mx | odbc | ocelot
""" % argv[0])
else:
dbtype = argv[1].lower()
if dbtype == "gadfly":
DBcatop = "+"
if len(argv) < 3:
sys.exit("""
USAGE: python %s gadfly <directory>
""" % argv[0])
directory = argv[2]
def dbconnect():
from gadfly import gadfly
conn = gadfly()
conn.startup("test", directory)
return conn
def reconnect():
from gadfly import gadfly
return gadfly("test", directory)
elif dbtype == "mx":
DBcatop = "+"
if len(argv) < 4:
sys.exit("""
USAGE: python %s mx DSN user [password]
""" % argv[0])
connectstring = argv[2]
user=argv[3]
if len(argv) == 5:
password = argv[4]
def dbconnect():
import mx.ODBC.Windows as odbc
return odbc.connect(connectstring, user=user, password=password)
else:
def dbconnect():
import mx.ODBC.Windows as odbc
return odbc.connect(connectstring, user=user)
reconnect = dbconnect
elif dbtype == "odbc":
DBcatop = "+"
if len(argv) < 3:
sys.exit("""
USAGE: python %s odbc DSN
""" % argv[0])
connectstring = argv[2]
def dbconnect():
import odbc
return odbc.odbc(connectstring)
reconnect = dbconnect
else:
sys.exit("""
USAGE: python %s <dbtype> <db_parameters>
dbtype: gadfly | mx | odbc
""" % argv[0])
#
# Now we should be in a position to test any of the three
#
table_creates = [
"create table frequents (drinker varchar(10), bar varchar(10), perweek integer)",
"create table likes (drinker varchar(10), beer varchar(12), perday integer)",
"create table serves (bar varchar(10), beer varchar(12), quantity integer)",
"""Create view nondrinkers(d, b)
as select drinker, bar
from frequents
where drinker not in
(select drinker from likes)""",
]
fdata = """\
adam lolas 1
woody cheers 5
sam cheers 5
norm cheers 3
wilt joes 2
norm joes 1
lola lolas 6
norm lolas 2
woody lolas 1
pierre frankies 0"""
sdata = """\
cheers bud 500
cheers samaddams 255
joes bud 217
joes samaddams 13
joes mickies 2222
lolas mickies 1515
lolas pabst 333
winkos rollingrock 432
frankies snafu 5"""
ldata = """\
adam bud 2
wilt rollingrock 1
sam bud 2
norm rollingrock 3
norm bud 2
nan sierranevada 1
woody pabst 2
lola mickies 5"""
dpairs = [
("frequents", fdata),
("serves", sdata),
("likes", ldata),
]
def dataseq(s):
from string import split
l = split(s, "\n")
result = map(split, l)
from string import atoi
for l in result:
l[2] = atoi(l[2])
result = map(tuple, result)
return result
indices = [
"""create index fd on frequents (drinker)""",
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -