📄 lztest.py
字号:
import mx.DateTime
import dtuple
#################################################################
class CachedQuery:
"""Defines a database query that caches database row sets.
This object is initialized with
tbl table name in the database
colnames list of field names retrieved
keynames list of keys used for retrieval
conn database connection
refresh caching refresh interval
Individual results are read by calling the object with a
tuple of key values as an argument. If the row set associated
with this particular set of keys is not present, or was read
longer than the refresh interval ago, it is re-read from the
database and stored in the content table as a list of database
tuples, which allow columns to be accessed by name.
Otherwise the already-cached database tuple set is returned.
Refinements might be added, such as registering with an
observer that might clear down all cache entries periodically
to force a global database refresh, and using a second SQL query
on record modified timestamps to determine whether a refresh is
really required (which may or may not be a win for a given set
of columns).
"""
def __init__(self, tbl, colnames, keynames, conn, refresh=600):
"""Create a caching data set for the given table, columns and keys."""
self._flush()
self.tbl = tbl
self.refresh = refresh
self.cursor = conn.cursor()
condition = " AND ".join(["%s=?" % f for f in keynames])
self.sql = "SELECT %s FROM %s WHERE %s" % \
(", ".join(colnames), tbl, condition)
self.desc = dtuple.TupleDescriptor([[n, ] for n in colnames])
print "Descriptor:", self.desc
print "SQL:", self.sql
def _flush(self):
"""Remove all trace of previous caching."""
self.recs = {}
self.when = {}
def __call__(self, keyvals, debug=0):
"""Return the data set associated with given key values."""
now = mx.DateTime.now()
if self.recs.has_key(keyvals) and self.refresh and (now - self.when[keyvals] < self.refresh):
if debug: print "Interval:", now - self.when[keyvals]
return self.recs[keyvals]
else:
self.cursor.execute(self.sql, keyvals)
rows = self.cursor.fetchall()
if self.refresh and len(rows):
if debug: print "Caching", self.tbl, keyvals, " at", now
self.recs[keyvals] = [dtuple.DatabaseTuple(self.desc, row) for row in rows]
self.when[keyvals] = now
return self.recs[keyvals]
else:
return ()
def close(self):
self.recs = None
self.when = None
self.cursor.close()
if __name__ == "__main__":
import mx.ODBC.Windows as odbc
conn = odbc.connect("prom2000")
s1 = CachedQuery("department", # table
"DptName DptWelcome DptLnksTxt".split(), # columns
("DptCode",), # key columns
conn, refresh = 15) # other stuff
while 1:
dc = raw_input("Department: ")
if not dc:
break
s = s1((dc, ), debug=1)
if len(s) == 0:
print "No such department"
else:
print s[0].DptName
s1.close()
conn.close()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -