📄 odb.py
字号:
row = self.__defaultRowClass(self,None,create=1,replace=replace) for (cname, ctype, opts) in self.__column_list: if opts['default'] is not None and ctype is not kIncInteger: row[cname] = opts['default'] return rowclass Row: __instance_data_locked = 0 def subclassinit(self): pass def __init__(self,_table,data_dict,create=0,joined_cols = None,replace=0): self._inside_getattr = 0 # stop recursive __getattr__ self._table = _table self._should_insert = create or replace self._should_replace = replace self._rowInactive = None self._joinedRows = [] self.__pk_match_spec = None self.__vcoldata = {} self.__inc_coldata = {} self.__joined_cols_dict = {} for a_col in joined_cols or []: self.__joined_cols_dict[a_col] = 1 if create: self.__coldata = {} else: if type(data_dict) != type({}): raise eInternalError, "rowdict instantiate with bad data_dict" self.__coldata = data_dict self.__unpackVColumn() self.markClean() self.subclassinit() self.__instance_data_locked = 1 def joinRowData(self,another_row): self._joinedRows.append(another_row) def getPKMatchSpec(self): return self.__pk_match_spec def markClean(self): self.__vcolchanged = 0 self.__colchanged_dict = {} for key in self.__inc_coldata.keys(): self.__coldata[key] = self.__coldata.get(key, 0) + self.__inc_coldata[key] self.__inc_coldata = {} if not self._should_insert: # rebuild primary column match spec new_match_spec = [] for col_name in self._table.getPrimaryKeyList(): try: rdata = self[col_name] except KeyError: raise eInternalError, "must have primary key data filled in to save %s:Row(col:%s)" % (self._table.getTableName(),col_name) new_match_spec.append( (col_name, rdata) ) self.__pk_match_spec = new_match_spec def __unpackVColumn(self): if self._table.hasValueColumn(): pass def __packVColumn(self): if self._table.hasValueColumn(): pass ## ----- utility stuff ---------------------------------- def __del__(self): # check for unsaved changes changed_list = self.changedList() if len(changed_list): info = "unsaved Row for table (%s) lost, call discard() to avoid this error. Lost changes: %s\n" % (self._table.getTableName(), repr(changed_list)[:256]) if 0: raise eUnsavedObjectLost, info else: sys.stderr.write(info) def __repr__(self): return "Row from (%s): %s" % (self._table.getTableName(),repr(self.__coldata) + repr(self.__vcoldata)) ## ---- class emulation -------------------------------- def __getattr__(self,key): if self._inside_getattr: raise AttributeError, "recursively called __getattr__ (%s,%s)" % (key,self._table.getTableName()) try: self._inside_getattr = 1 try: return self[key] except KeyError: if self._table.hasColumn(key) or self._table.hasVColumn(key): return None else: raise AttributeError, "unknown field '%s' in Row(%s)" % (key,self._table.getTableName()) finally: self._inside_getattr = 0 def __setattr__(self,key,val): if not self.__instance_data_locked: self.__dict__[key] = val else: my_dict = self.__dict__ if my_dict.has_key(key): my_dict[key] = val else: # try and put it into the rowdata try: self[key] = val except KeyError, reason: raise AttributeError, reason ## ---- dict emulation --------------------------------- def __getitem__(self,key): self.checkRowActive() try: c_type = self._table.columnType(key) except eNoSuchColumn, reason: # Ugh, this sucks, we can't determine the type for a joined # row, so we just default to kVarString and let the code below # determine if this is a joined column or not c_type = kVarString if c_type == kIncInteger: c_data = self.__coldata.get(key, 0) if c_data is None: c_data = 0 i_data = self.__inc_coldata.get(key, 0) if i_data is None: i_data = 0 return c_data + i_data try: return self.__coldata[key] except KeyError: try: return self.__vcoldata[key] except KeyError: for a_joined_row in self._joinedRows: try: return a_joined_row[key] except KeyError: pass raise KeyError, "unknown column %s in %s" % (key,self) def __setitem__(self,key,data): self.checkRowActive() try: newdata = self._table.convertDataForColumn(data,key) except eNoSuchColumn, reason: raise KeyError, reason if self._table.hasColumn(key): self.__coldata[key] = newdata self.__colchanged_dict[key] = 1 elif self._table.hasVColumn(key): self.__vcoldata[key] = newdata self.__vcolchanged = 1 else: for a_joined_row in self._joinedRows: try: a_joined_row[key] = data return except KeyError: pass raise KeyError, "unknown column name %s" % key def __delitem__(self,key,data): self.checkRowActive() if self.table.hasVColumn(key): del self.__vcoldata[key] else: for a_joined_row in self._joinedRows: try: del a_joined_row[key] return except KeyError: pass raise KeyError, "unknown column name %s" % key def copyFrom(self,source): for name,t,options in self._table.getColumnList(): if not options.has_key("autoincrement"): self[name] = source[name] # make sure that .keys(), and .items() come out in a nice order! def keys(self): self.checkRowActive() key_list = [] for name,t,options in self._table.getColumnList(): key_list.append(name) for name in self.__joined_cols_dict.keys(): key_list.append(name) for a_joined_row in self._joinedRows: key_list = key_list + a_joined_row.keys() return key_list def items(self): self.checkRowActive() item_list = [] for name,t,options in self._table.getColumnList(): item_list.append( (name,self[name]) ) for name in self.__joined_cols_dict.keys(): item_list.append( (name,self[name]) ) for a_joined_row in self._joinedRows: item_list = item_list + a_joined_row.items() return item_list def values(elf): self.checkRowActive() value_list = self.__coldata.values() + self.__vcoldata.values() for a_joined_row in self._joinedRows: value_list = value_list + a_joined_row.values() return value_list def __len__(self): self.checkRowActive() my_len = len(self.__coldata) + len(self.__vcoldata) for a_joined_row in self._joinedRows: my_len = my_len + len(a_joined_row) return my_len def has_key(self,key): self.checkRowActive() if self.__coldata.has_key(key) or self.__vcoldata.has_key(key): return 1 else: for a_joined_row in self._joinedRows: if a_joined_row.has_key(key): return 1 return 0 def get(self,key,default = None): self.checkRowActive() if self.__coldata.has_key(key): return self.__coldata[key] elif self.__vcoldata.has_key(key): return self.__vcoldata[key] else: for a_joined_row in self._joinedRows: try: return a_joined_row.get(key,default) except eNoSuchColumn: pass if self._table.hasColumn(key): return default raise eNoSuchColumn, "no such column %s" % key def inc(self,key,count=1): self.checkRowActive() if self._table.hasColumn(key): try: self.__inc_coldata[key] = self.__inc_coldata[key] + count except KeyError: self.__inc_coldata[key] = count self.__colchanged_dict[key] = 1 else: raise AttributeError, "unknown field '%s' in Row(%s)" % (key,self._table.getTableName()) ## ---------------------------------- ## real interface def fillDefaults(self): for field_def in self._table.fieldList(): name,type,size,options = field_def if options.has_key("default"): self[name] = options["default"] ############### # changedList() # # returns a list of tuples for the columns which have changed # # changedList() -> [ ('name', 'fred'), ('age', 20) ] def changedList(self): if self.__vcolchanged: self.__packVColumn() changed_list = [] for a_col in self.__colchanged_dict.keys(): changed_list.append( (a_col,self.get(a_col,None),self.__inc_coldata.get(a_col,None)) ) return changed_list def discard(self): self.__coldata = None self.__vcoldata = None self.__colchanged_dict = {} self.__vcolchanged = 0 def delete(self,cursor = None): self.checkRowActive() fromTable = self._table curs = cursor fromTable.r_deleteRow(self,cursor=curs) self._rowInactive = "deleted" def save(self,cursor = None): toTable = self._table self.checkRowActive() if self._should_insert: toTable.r_insertRow(self,replace=self._should_replace) self._should_insert = 0 self._should_replace = 0 self.markClean() # rebuild the primary key list else: curs = cursor toTable.r_updateRow(self,cursor = curs) # the table will mark us clean! # self.markClean() def checkRowActive(self): if self._rowInactive: raise eInvalidData, "row is inactive: %s" % self._rowInactive def databaseSizeForColumn(self,key): return self._table.databaseSizeForData_ColumnName_(self[key],key)if __name__ == "__main__": print "run odb_test.py"
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -