⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dbtest.py

📁 python web programming 部分
💻 PY
📖 第 1 页 / 共 2 页
字号:
"""test script for database connectivity

usage dbtest.py <type> <dsn>

"""
def cpp(cursor, t = None):
    print
    d = cursor.description
    if not d:
        return "#### NO RESULTS ###\n"
    names = []
    lengths = []
    rules = []
    for dd in d:
        l = dd[1]
        if not l:
            l = 12
        l = max(l, len(dd[0]))
        names.append(dd[0])
        lengths.append("%%%ss" % l)
        rules.append("-"*l)
    format = " ".join(lengths)+"\n"
    result = format % tuple(names)
    result += format % tuple(rules)
    if not t:
        t = cursor.fetchall()
    for row in t:
        result += format % row
    return result

def test():
    import sys
    print "testing"
    connect = dbconnect()
    curs = connect.cursor()
    for q in cleanup_queries:
        try:
            print "Cleanups:", q
            sys.stdout.flush()
            curs.execute(q)
            sys.stdout.flush()
        except: # this is a bit dodgy: error classes might vary
            pass
    print
    print "TABLE CREATES"
    for x in table_creates:
        print x
        curs.execute(x)
    curs.execute("create table empty (nothing varchar(1))")
    C = """
    CREATE TABLE workt (
       name VARCHAR(10),
       hours INTEGER,
       rate FLOAT)
       """
    print C
    curs.execute(C)
    print
    C = """
    CREATE TABLE pagerefs (
       page VARCHAR(20),
       hits INTEGER,
       mnth INTEGER)
       """
    print C
    curs.execute(C)
    print
    print "INSERTS"
    C = """
    INSERT INTO workt(name, hours, rate) VALUES (?, ?, ?)
    """
    D = [
         ("sam", 30, 40.2),
         ("norm", 45, 10.2),
         ("woody", 80, 5.4),
         ("diane", 3, 4.4),
         ("rebecca", 120, 12.9),
         ("cliff", 26, 200.00),
         ("carla", 9, 3.5),
         ]
    for x in D:
        print x
        curs.execute(C, x)
    C = "create unique index wname on workt(name)"
    print "Unique index:", C
    curs.execute(C)
    print "trying bad insert into unique field"
    C = "insert into workt(name, hours, rate) values ('sam', 0, 0)"
    try:
        curs.execute(C)
    except:
        print "exception as expected %s(%s)" %(sys.exc_type, sys.exc_value)
    else:
        raise "stop!", "unique index permits nonunique field"
    C = """
    INSERT INTO pagerefs(page, mnth, hits) VALUES (?, ?, ?)
    """
    D = [
         ("index.html", 1, 2100),
         ("index.html", 2, 3300),
         ("index.html", 3, 1950),    
         ("products.html", 1, 15),   
         ("products.html", 2, 650),   
         ("products.html", 3, 98),   
         ("people.html", 1, 439),
         ("people.html", 2, 12),
         ("people.html", 3, 665),
         ]
    for x in D: print x
    curs.execute(C, D)
    for (table, stuff) in dpairs:
        ins = "insert into %s values (?, ?, ?)" % table
        if table!="frequents":
           for parameters in dataseq(stuff):
               print "singleinsert", table, parameters
               curs.execute(ins, parameters)
        else:
           print
           print "multiinsert", table
           parameters = dataseq(stuff)
           for p in parameters:
               print p
           print "multiinsert..."
           curs.execute(ins, parameters)
           print;print
    print
    print "INDICES"
    for ci in indices:
        print ci
        curs.execute(ci)
    print
    print "QUERIES"
    for x in workqueries:
        print;print
        print x
        t = curs.execute(x)
        print cpp(curs, t)
        
    statement = """select name, hours
                   from workt"""
    curs.execute(statement)
    print "Hours worked this week"
    print
    for (name,hours) in curs.fetchall():
        print "worker", name, "worked", hours, "hours"
    print
    print "end of work report"

    #return
    for x in queries:
        print; print
        print x
        curs.execute(x)
        #for x in curs.commands:
        #    print x
        all = curs.fetchall()
        if not all:
           print "empty!"
        else:
           print cpp(curs, all)
    #return
    print
    print "DYNAMIC QUERIES"
    for (x,y) in dynamic_queries:
        print; print
        print x
        print "dynamic=", y
        curs.execute(x, y)
        #for x in curs.commands:
        #    print x
        all = curs.fetchall()
        if not all:
           print "empty!"
        else:
           for t in all:
               print t
    print "repeat test"
    from time import time
    for x in repeats:
        print "repeating", x
        now = time()
        curs.execute(x)
        print time()-now, "first time"
        now = time()
        curs.execute(x)
        print time()-now, "second time"
        now = time()
        curs.execute(x)
        print time()-now, "third time"
    print "*** committing work"
    connect.commit()
    connect.close()
    print; print
    print "*" * 30
    print "*** reconnect test"
    connect = reconnect()
    cursor = connect.cursor()
    for s in updates + cleanup_queries:
        print; print
        print s
        cursor.execute(s)
        print cpp(cursor)
    connect.commit()
    connect.close()
    return connect

if __name__=="__main__":
    import sys
    argv = sys.argv
    if len(argv)<2:
        sys.exit("""
USAGE: python %s <dbtype> <db_parameters>
  dbtype: gadfly | mx | odbc | ocelot
""" % argv[0])
    else:
        dbtype = argv[1].lower()
        if dbtype == "gadfly":
            DBcatop = "+"
            if len(argv) < 3:
                sys.exit("""
USAGE: python %s gadfly <directory>
""" % argv[0])
            directory = argv[2]
            def dbconnect():
                from gadfly import gadfly
                conn = gadfly()
                conn.startup("test", directory)
                return conn
            def reconnect():
                from gadfly import gadfly
                return gadfly("test", directory)
        elif dbtype == "mx":
            DBcatop = "+"
            if len(argv) < 4:
                sys.exit("""
USAGE: python %s mx DSN user [password]
""" % argv[0])
            connectstring = argv[2]
            user=argv[3]
            if len(argv) == 5:
                password = argv[4]
                def dbconnect():
                    import mx.ODBC.Windows as odbc
                    return odbc.connect(connectstring, user=user, password=password)
            else:
                def dbconnect():
                    import mx.ODBC.Windows as odbc
                    return odbc.connect(connectstring, user=user)
            reconnect = dbconnect
        elif dbtype == "odbc":
            DBcatop = "+"
            if len(argv) < 3:
                sys.exit("""
USAGE: python %s odbc DSN
""" % argv[0])
            connectstring = argv[2]
            def dbconnect():
                import odbc
                return odbc.odbc(connectstring)
            reconnect = dbconnect
        else:
            sys.exit("""
USAGE: python %s <dbtype> <db_parameters>
  dbtype: gadfly | mx | odbc
""" % argv[0])
#
# Now we should be in a position to test any of the three
#

table_creates = [
 "create table frequents (drinker varchar(10), bar varchar(10), perweek integer)",
 "create table likes (drinker varchar(10), beer varchar(12), perday integer)",
 "create table serves (bar varchar(10), beer varchar(12), quantity integer)",
 """Create view nondrinkers(d, b)
    as select drinker, bar
       from frequents
       where drinker not in
         (select drinker from likes)""",
 ]
 
fdata = """\
  adam	lolas		1
  woody	cheers	5
  sam		cheers	5
  norm	cheers	3
  wilt	joes		2
  norm	joes		1
  lola	lolas		6
  norm	lolas		2
  woody	lolas		1
  pierre	frankies	0"""
  
sdata = """\
  cheers	bud		500
  cheers	samaddams	255
  joes	bud		217
  joes	samaddams	13
  joes	mickies	2222
  lolas	mickies	1515
  lolas	pabst		333
  winkos	rollingrock	432
  frankies	snafu       5"""
  
ldata = """\
  adam	bud			2
  wilt	rollingrock		1
  sam		bud			2
  norm	rollingrock		3
  norm	bud			2
  nan		sierranevada	1	
  woody	pabst			2
  lola	mickies		5"""
  
dpairs = [
   ("frequents", fdata),
   ("serves", sdata),
   ("likes", ldata),
  ]
  
def dataseq(s):
    from string import split
    l = split(s, "\n")
    result = map(split, l)
    from string import atoi
    for l in result:
        l[2] = atoi(l[2])
    result = map(tuple, result)
    return result
    
indices = [
"""create index fd on frequents (drinker)""",

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -