⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 odb.py

📁 在 linux平台上的网页编程的模板
💻 PY
📖 第 1 页 / 共 4 页
字号:
#!/usr/bin/env python## odb.py## Object Database Api## Written by David Jeske <jeske@neotonic.com>, 2001/07. # Inspired by eGroups' sqldb.py originally written by Scott Hassan circa 1998.## Copyright (C) 2001, by David Jeske and Neotonic## Goals:#       - a simple object-like interface to database data#       - database independent (someday)#       - relational-style "rigid schema definition"#       - object style easy-access## Example:##  import odb##  # define table#  class AgentsTable(odb.Table):#    def _defineRows(self):#      self.d_addColumn("agent_id",kInteger,None,primarykey = 1,autoincrement = 1)#      self.d_addColumn("login",kVarString,200,notnull=1)#      self.d_addColumn("ticket_count",kIncInteger,None)##  if __name__ == "__main__":#    # open database#    ndb = MySQLdb.connect(host = 'localhost',#                          user='username', #                          passwd = 'password', #                          db='testdb')#    db = Database(ndb)#    tbl = AgentsTable(db,"agents")##    # create row#    agent_row = tbl.newRow()#    agent_row.login = "foo"#    agent_row.save()##    # fetch row (must use primary key)#    try:#      get_row = tbl.fetchRow( ('agent_id', agent_row.agent_id) )#    except odb.eNoMatchingRows:#      print "this is bad, we should have found the row"##    # fetch rows (can return empty list)#    list_rows = tbl.fetchRows( ('login', "foo") )#import stringimport sys, zlibfrom log import *import handle_erroreNoSuchColumn         = "odb.eNoSuchColumn"eNonUniqueMatchSpec   = "odb.eNonUniqueMatchSpec"eNoMatchingRows       = "odb.eNoMatchingRows"eInternalError        = "odb.eInternalError"eInvalidMatchSpec     = "odb.eInvalidMatchSpec"eInvalidData          = "odb.eInvalidData"eUnsavedObjectLost    = "odb.eUnsavedObjectLost"eDuplicateKey         = "odb.eDuplicateKey"###################################### COLUMN TYPES                       ################                     ####################### typename     ####################### size data means:#              #                     # kInteger       = "kInteger"          # -kFixedString   = "kFixedString"      # sizekVarString     = "kVarString"        # maxsizekBigString     = "kBigString"        # -kIncInteger    = "kIncInteger"       # -kDateTime      = "kDateTime"kTimeStamp     = "kTimeStamp"kReal          = "kReal"DEBUG = 0############### Database## this will ultimately turn into a mostly abstract base class for# the DB adaptors for different database types....#class Database:    def __init__(self, db, debug=0):        self._tables = {}        self.db = db        self._cursor = None        self.compression_enabled = 0        self.debug = debug        self.SQLError = None	self.__defaultRowClass = self.defaultRowClass()	self.__defaultRowListClass = self.defaultRowListClass()    def defaultCursor(self):        if self._cursor is None:            self._cursor = self.db.cursor()        return self._cursor    def escape(self,str):	raise "Unimplemented Error"    def getDefaultRowClass(self): return self.__defaultRowClass    def setDefaultRowClass(self, clss): self.__defaultRowClass = clss    def getDefaultRowListClass(self): return self.__defaultRowListClass    def setDefaultRowListClass(self, clss): self.__defaultRowListClass = clss    def defaultRowClass(self):        return Row    def defaultRowListClass(self):        # base type is list...        return list    def addTable(self, attrname, tblname, tblclass,                  rowClass = None, check = 0, create = 0, rowListClass = None):        tbl = tblclass(self, tblname, rowClass=rowClass, check=check,                        create=create, rowListClass=rowListClass)        self._tables[attrname] = tbl        return tbl    def close(self):        for name, tbl in self._tables.items():            tbl.db = None        self._tables = {}        if self.db is not None:            self.db.close()            self.db = None    def __getattr__(self, key):        if key == "_tables":            raise AttributeError, "odb.Database: not initialized properly, self._tables does not exist"        try:            table_dict = getattr(self,"_tables")            return table_dict[key]        except KeyError:            raise AttributeError, "odb.Database: unknown attribute %s" % (key)            def beginTransaction(self, cursor=None):        if cursor is None:            cursor = self.defaultCursor()        dlog(DEV_UPDATE,"begin")        cursor.execute("begin")    def commitTransaction(self, cursor=None):        if cursor is None:            cursor = self.defaultCursor()        dlog(DEV_UPDATE,"commit")        cursor.execute("commit")    def rollbackTransaction(self, cursor=None):        if cursor is None:            cursor = self.defaultCursor()        dlog(DEV_UPDATE,"rollback")        cursor.execute("rollback")    ##     ## schema creation code    ##    def createTables(self):      tables = self.listTables()      for attrname, tbl in self._tables.items():        tblname = tbl.getTableName()        if tblname not in tables:          print "table %s does not exist" % tblname          tbl.createTable()        else:          invalidAppCols, invalidDBCols = tbl.checkTable()##          self.alterTableToMatch(tbl)    def createIndices(self):      indices = self.listIndices()      for attrname, tbl in self._tables.items():        for indexName, (columns, unique) in tbl.getIndices().items():          if indexName in indices: continue          tbl.createIndex(columns, indexName=indexName, unique=unique)    def synchronizeSchema(self):      tables = self.listTables()      for attrname, tbl in self._tables.items():        tblname = tbl.getTableName()        self.alterTableToMatch(tbl)            def listTables(self, cursor=None):      raise "Unimplemented Error"    def listFieldsDict(self, table_name, cursor=None):      raise "Unimplemented Error"    def listFields(self, table_name, cursor=None):      columns = self.listFieldsDict(table_name, cursor=cursor)      return columns.keys()########################################### Table#class Table:    def subclassinit(self):        pass    def __init__(self,database,table_name,                 rowClass = None, check = 0, create = 0, rowListClass = None):        self.db = database        self.__table_name = table_name        if rowClass:            self.__defaultRowClass = rowClass        else:            self.__defaultRowClass = database.getDefaultRowClass()        if rowListClass:            self.__defaultRowListClass = rowListClass        else:            self.__defaultRowListClass = database.getDefaultRowListClass()        # get this stuff ready!                self.__column_list = []        self.__vcolumn_list = []        self.__columns_locked = 0        self.__has_value_column = 0        self.__indices = {}        # this will be used during init...        self.__col_def_hash = None        self.__vcol_def_hash = None        self.__primary_key_list = None        self.__relations_by_table = {}        # ask the subclass to def his rows        self._defineRows()        # get ready to run!        self.__lockColumnsAndInit()        self.subclassinit()                if create:            self.createTable()        if check:            self.checkTable()    def _colTypeToSQLType(self, colname, coltype, options):            if coltype == kInteger:        coltype = "integer"      elif coltype == kFixedString:        sz = options.get('size', None)        if sz is None: coltype = 'char'        else:  coltype = "char(%s)" % sz      elif coltype == kVarString:        sz = options.get('size', None)        if sz is None: coltype = 'varchar'        else:  coltype = "varchar(%s)" % sz      elif coltype == kBigString:        coltype = "text"      elif coltype == kIncInteger:        coltype = "integer"      elif coltype == kDateTime:        coltype = "datetime"      elif coltype == kTimeStamp:        coltype = "timestamp"      elif coltype == kReal:        coltype = "real"      coldef = "%s %s" % (colname, coltype)      if options.get('notnull', 0): coldef = coldef + " NOT NULL"      if options.get('autoincrement', 0): coldef = coldef + " AUTO_INCREMENT"      if options.get('unique', 0): coldef = coldef + " UNIQUE"#      if options.get('primarykey', 0): coldef = coldef + " primary key"      if options.get('default', None) is not None: coldef = coldef + " DEFAULT %s" % options.get('default')      return coldef    def getTableName(self):  return self.__table_name    def setTableName(self, tablename):  self.__table_name = tablename    def getIndices(self): return self.__indices    def _createTableSQL(self):      defs = []      for colname, coltype, options in self.__column_list:        defs.append(self._colTypeToSQLType(colname, coltype, options))      defs = string.join(defs, ", ")      primarykeys = self.getPrimaryKeyList()      primarykey_str = ""      if primarykeys:	primarykey_str = ", PRIMARY KEY (" + string.join(primarykeys, ",") + ")"      sql = "create table %s (%s %s)" % (self.__table_name, defs, primarykey_str)      return sql    def createTable(self, cursor=None):      if cursor is None: cursor = self.db.defaultCursor()      sql = self._createTableSQL()      print "CREATING TABLE:", sql      cursor.execute(sql)    def dropTable(self, cursor=None):      if cursor is None: cursor = self.db.defaultCursor()      try:        cursor.execute("drop table %s" % self.__table_name)   # clean out the table      except self.SQLError, reason:        pass    def renameTable(self, newTableName, cursor=None):      if cursor is None: cursor = self.db.defaultCursor()      try:        cursor.execute("rename table %s to %s" % (self.__table_name, newTableName))      except sel.SQLError, reason:        pass      self.setTableName(newTableName)          def getTableColumnsFromDB(self):      return self.db.listFieldsDict(self.__table_name)          def checkTable(self, warnflag=1):      invalidDBCols = {}      invalidAppCols = {}      dbcolumns = self.getTableColumnsFromDB()      for coldef in self.__column_list:        colname = coldef[0]        dbcoldef = dbcolumns.get(colname, None)        if dbcoldef is None:          invalidAppCols[colname] = 1            for colname, row in dbcolumns.items():        coldef = self.__col_def_hash.get(colname, None)        if coldef is None:          invalidDBCols[colname] = 1      if warnflag == 1:        if invalidDBCols:          print "----- WARNING ------------------------------------------"          print "  There are columns defined in the database schema that do"          print "  not match the application's schema."          print "  columns:", invalidDBCols.keys()          print "--------------------------------------------------------"        if invalidAppCols:           print "----- WARNING ------------------------------------------"          print "  There are new columns defined in the application schema"          print "  that do not match the database's schema."          print "  columns:", invalidAppCols.keys()          print "--------------------------------------------------------"      return invalidAppCols, invalidDBCols

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -