📄 odb.py
字号:
#!/usr/bin/env python## odb.py## Object Database Api## Written by David Jeske <jeske@neotonic.com>, 2001/07. # Inspired by eGroups' sqldb.py originally written by Scott Hassan circa 1998.## Copyright (C) 2001, by David Jeske and Neotonic## Goals:# - a simple object-like interface to database data# - database independent (someday)# - relational-style "rigid schema definition"# - object style easy-access## Example:## import odb## # define table# class AgentsTable(odb.Table):# def _defineRows(self):# self.d_addColumn("agent_id",kInteger,None,primarykey = 1,autoincrement = 1)# self.d_addColumn("login",kVarString,200,notnull=1)# self.d_addColumn("ticket_count",kIncInteger,None)## if __name__ == "__main__":# # open database# ndb = MySQLdb.connect(host = 'localhost',# user='username', # passwd = 'password', # db='testdb')# db = Database(ndb)# tbl = AgentsTable(db,"agents")## # create row# agent_row = tbl.newRow()# agent_row.login = "foo"# agent_row.save()## # fetch row (must use primary key)# try:# get_row = tbl.fetchRow( ('agent_id', agent_row.agent_id) )# except odb.eNoMatchingRows:# print "this is bad, we should have found the row"## # fetch rows (can return empty list)# list_rows = tbl.fetchRows( ('login', "foo") )#import stringimport sys, zlibfrom log import *import handle_erroreNoSuchColumn = "odb.eNoSuchColumn"eNonUniqueMatchSpec = "odb.eNonUniqueMatchSpec"eNoMatchingRows = "odb.eNoMatchingRows"eInternalError = "odb.eInternalError"eInvalidMatchSpec = "odb.eInvalidMatchSpec"eInvalidData = "odb.eInvalidData"eUnsavedObjectLost = "odb.eUnsavedObjectLost"eDuplicateKey = "odb.eDuplicateKey"###################################### COLUMN TYPES ################ ####################### typename ####################### size data means:# # # kInteger = "kInteger" # -kFixedString = "kFixedString" # sizekVarString = "kVarString" # maxsizekBigString = "kBigString" # -kIncInteger = "kIncInteger" # -kDateTime = "kDateTime"kTimeStamp = "kTimeStamp"kReal = "kReal"DEBUG = 0############### Database## this will ultimately turn into a mostly abstract base class for# the DB adaptors for different database types....#class Database: def __init__(self, db, debug=0): self._tables = {} self.db = db self._cursor = None self.compression_enabled = 0 self.debug = debug self.SQLError = None self.__defaultRowClass = self.defaultRowClass() self.__defaultRowListClass = self.defaultRowListClass() def defaultCursor(self): if self._cursor is None: self._cursor = self.db.cursor() return self._cursor def escape(self,str): raise "Unimplemented Error" def getDefaultRowClass(self): return self.__defaultRowClass def setDefaultRowClass(self, clss): self.__defaultRowClass = clss def getDefaultRowListClass(self): return self.__defaultRowListClass def setDefaultRowListClass(self, clss): self.__defaultRowListClass = clss def defaultRowClass(self): return Row def defaultRowListClass(self): # base type is list... return list def addTable(self, attrname, tblname, tblclass, rowClass = None, check = 0, create = 0, rowListClass = None): tbl = tblclass(self, tblname, rowClass=rowClass, check=check, create=create, rowListClass=rowListClass) self._tables[attrname] = tbl return tbl def close(self): for name, tbl in self._tables.items(): tbl.db = None self._tables = {} if self.db is not None: self.db.close() self.db = None def __getattr__(self, key): if key == "_tables": raise AttributeError, "odb.Database: not initialized properly, self._tables does not exist" try: table_dict = getattr(self,"_tables") return table_dict[key] except KeyError: raise AttributeError, "odb.Database: unknown attribute %s" % (key) def beginTransaction(self, cursor=None): if cursor is None: cursor = self.defaultCursor() dlog(DEV_UPDATE,"begin") cursor.execute("begin") def commitTransaction(self, cursor=None): if cursor is None: cursor = self.defaultCursor() dlog(DEV_UPDATE,"commit") cursor.execute("commit") def rollbackTransaction(self, cursor=None): if cursor is None: cursor = self.defaultCursor() dlog(DEV_UPDATE,"rollback") cursor.execute("rollback") ## ## schema creation code ## def createTables(self): tables = self.listTables() for attrname, tbl in self._tables.items(): tblname = tbl.getTableName() if tblname not in tables: print "table %s does not exist" % tblname tbl.createTable() else: invalidAppCols, invalidDBCols = tbl.checkTable()## self.alterTableToMatch(tbl) def createIndices(self): indices = self.listIndices() for attrname, tbl in self._tables.items(): for indexName, (columns, unique) in tbl.getIndices().items(): if indexName in indices: continue tbl.createIndex(columns, indexName=indexName, unique=unique) def synchronizeSchema(self): tables = self.listTables() for attrname, tbl in self._tables.items(): tblname = tbl.getTableName() self.alterTableToMatch(tbl) def listTables(self, cursor=None): raise "Unimplemented Error" def listFieldsDict(self, table_name, cursor=None): raise "Unimplemented Error" def listFields(self, table_name, cursor=None): columns = self.listFieldsDict(table_name, cursor=cursor) return columns.keys()########################################### Table#class Table: def subclassinit(self): pass def __init__(self,database,table_name, rowClass = None, check = 0, create = 0, rowListClass = None): self.db = database self.__table_name = table_name if rowClass: self.__defaultRowClass = rowClass else: self.__defaultRowClass = database.getDefaultRowClass() if rowListClass: self.__defaultRowListClass = rowListClass else: self.__defaultRowListClass = database.getDefaultRowListClass() # get this stuff ready! self.__column_list = [] self.__vcolumn_list = [] self.__columns_locked = 0 self.__has_value_column = 0 self.__indices = {} # this will be used during init... self.__col_def_hash = None self.__vcol_def_hash = None self.__primary_key_list = None self.__relations_by_table = {} # ask the subclass to def his rows self._defineRows() # get ready to run! self.__lockColumnsAndInit() self.subclassinit() if create: self.createTable() if check: self.checkTable() def _colTypeToSQLType(self, colname, coltype, options): if coltype == kInteger: coltype = "integer" elif coltype == kFixedString: sz = options.get('size', None) if sz is None: coltype = 'char' else: coltype = "char(%s)" % sz elif coltype == kVarString: sz = options.get('size', None) if sz is None: coltype = 'varchar' else: coltype = "varchar(%s)" % sz elif coltype == kBigString: coltype = "text" elif coltype == kIncInteger: coltype = "integer" elif coltype == kDateTime: coltype = "datetime" elif coltype == kTimeStamp: coltype = "timestamp" elif coltype == kReal: coltype = "real" coldef = "%s %s" % (colname, coltype) if options.get('notnull', 0): coldef = coldef + " NOT NULL" if options.get('autoincrement', 0): coldef = coldef + " AUTO_INCREMENT" if options.get('unique', 0): coldef = coldef + " UNIQUE"# if options.get('primarykey', 0): coldef = coldef + " primary key" if options.get('default', None) is not None: coldef = coldef + " DEFAULT %s" % options.get('default') return coldef def getTableName(self): return self.__table_name def setTableName(self, tablename): self.__table_name = tablename def getIndices(self): return self.__indices def _createTableSQL(self): defs = [] for colname, coltype, options in self.__column_list: defs.append(self._colTypeToSQLType(colname, coltype, options)) defs = string.join(defs, ", ") primarykeys = self.getPrimaryKeyList() primarykey_str = "" if primarykeys: primarykey_str = ", PRIMARY KEY (" + string.join(primarykeys, ",") + ")" sql = "create table %s (%s %s)" % (self.__table_name, defs, primarykey_str) return sql def createTable(self, cursor=None): if cursor is None: cursor = self.db.defaultCursor() sql = self._createTableSQL() print "CREATING TABLE:", sql cursor.execute(sql) def dropTable(self, cursor=None): if cursor is None: cursor = self.db.defaultCursor() try: cursor.execute("drop table %s" % self.__table_name) # clean out the table except self.SQLError, reason: pass def renameTable(self, newTableName, cursor=None): if cursor is None: cursor = self.db.defaultCursor() try: cursor.execute("rename table %s to %s" % (self.__table_name, newTableName)) except sel.SQLError, reason: pass self.setTableName(newTableName) def getTableColumnsFromDB(self): return self.db.listFieldsDict(self.__table_name) def checkTable(self, warnflag=1): invalidDBCols = {} invalidAppCols = {} dbcolumns = self.getTableColumnsFromDB() for coldef in self.__column_list: colname = coldef[0] dbcoldef = dbcolumns.get(colname, None) if dbcoldef is None: invalidAppCols[colname] = 1 for colname, row in dbcolumns.items(): coldef = self.__col_def_hash.get(colname, None) if coldef is None: invalidDBCols[colname] = 1 if warnflag == 1: if invalidDBCols: print "----- WARNING ------------------------------------------" print " There are columns defined in the database schema that do" print " not match the application's schema." print " columns:", invalidDBCols.keys() print "--------------------------------------------------------" if invalidAppCols: print "----- WARNING ------------------------------------------" print " There are new columns defined in the application schema" print " that do not match the database's schema." print " columns:", invalidAppCols.keys() print "--------------------------------------------------------" return invalidAppCols, invalidDBCols
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -