📄 cachequery.py
字号:
import mx.DateTime
import dtuple
#################################################################
# $Revision: 7 $
# $Date: 10/19/01 1:37p $
#################################################################
class CacheQuery:
"""Defines a database query that caches database row sets.
This object is initialized with
tbl table name in the database
colnames list of field names retrieved
keynames list of keys used for retrieval
conn database connection
refresh caching refresh interval
Individual results are read by calling the object with a
tuple of key values as an argument. If the row set associated
with this particular set of keys is not present, or was read
longer than the refresh interval ago, it is re-read from the
database and stored in the content table as a list of database
tuples, which allow columns to be accessed by name.
Otherwise the already-cached database tuple set is returned.
Refinements might be added, such as registering with an
observer that might clear down all cache entries periodically
to force a global database refresh, and using a second SQL query
on record modified timestamps to determine whether a refresh is
really required (which may or may not be a win for a given set
of columns).
"""
def __init__(self, tbl, colnames, keynames, conn, refresh=0, ORDER=None):
"""Create a caching data set for the given table, columns and keys."""
self._flush()
self.tbl = tbl
self.keynames = keynames
self.refresh = refresh
self.cursor = conn.cursor()
self.sql = "SELECT %s FROM %s" % (",".join(colnames), tbl)
if keynames:
condition = " AND ".join(["%s=?" % f for f in keynames])
self.sql += " WHERE %s" % condition
if ORDER:
self.sql += " ORDER BY " + ", ".join(ORDER)
self.desc = dtuple.TupleDescriptor([[n, ] for n in colnames])
print "Descriptor:", self.desc
print "SQL:", self.sql
def _flush(self):
"""Remove all trace of previous caching."""
self.recs = {}
self.when = {}
def __call__(self, keyvals=(), debug=0):
"""Return the data set associated with given key values."""
assert len(keyvals) == len(self.keynames)
now = mx.DateTime.now()
if self.recs.has_key(keyvals) and self.refresh and (now - self.when[keyvals] < self.refresh):
if debug: print "Interval:", now - self.when[keyvals]
return self.recs[keyvals]
else:
self.cursor.execute(self.sql, keyvals)
rows = self.cursor.fetchall()
result = [dtuple.DatabaseTuple(self.desc, row) for row in rows]
if self.refresh:
if debug: print "Caching", self.tbl, keyvals, " at", now
self.recs[keyvals] = result
self.when[keyvals] = now
return result
def close(self):
self.recs = None
self.when = None
self.cursor.close()
if __name__ == "__main__":
#
# Sorry, you'll need you own database details in here
#
import mx.ODBC.Windows as odbc
conn = odbc.connect("prom2000")
s1 = CacheQuery("department", # table
"DptName DptWelcome DptLnksTxt".split(), # columns
("DptCode",), # key columns
conn, refresh=0) # other stuff
while 1:
dc = raw_input("Department Code: ")
if not dc:
break
rows = s1((dc, ), debug=1)
if len(rows) == 0:
print "No such department"
else:
for row in rows:
print """
Department: %s Full Name: %s
Welcome Text:
%s
Links Text:
%s
""" % (dc, row.DptName, row.DptWelcome, row.DptLnksTxt)
s1.close()
conn.close()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -