📄 odb.py
字号:
def alterTableToMatch(self): raise "Unimplemented Error!" def addIndex(self, columns, indexName=None, unique=0): if indexName is None: indexName = self.getTableName() + "_index_" + string.join(columns, "_") self.__indices[indexName] = (columns, unique) def createIndex(self, columns, indexName=None, unique=0, cursor=None): if cursor is None: cursor = self.db.defaultCursor() cols = string.join(columns, ",") if indexName is None: indexName = self.getTableName() + "_index_" + string.join(columns, "_") uniquesql = "" if unique: uniquesql = " unique" sql = "create %s index %s on %s (%s)" % (uniquesql, indexName, self.getTableName(), cols) warn("creating index", sql) cursor.execute(sql) ## Column Definition def getColumnDef(self,column_name): try: return self.__col_def_hash[column_name] except KeyError: try: return self.__vcol_def_hash[column_name] except KeyError: raise eNoSuchColumn, "no column (%s) on table %s" % (column_name,self.__table_name) def getColumnList(self): return self.__column_list + self.__vcolumn_list def getAppColumnList(self): return self.__column_list def databaseSizeForData_ColumnName_(self,data,col_name): try: col_def = self.__col_def_hash[col_name] except KeyError: try: col_def = self.__vcol_def_hash[col_name] except KeyError: raise eNoSuchColumn, "no column (%s) on table %s" % (col_name,self.__table_name) c_name,c_type,c_options = col_def if c_type == kBigString: if c_options.get("compress_ok",0) and self.db.compression_enabled: z_size = len(zlib.compress(data,9)) r_size = len(data) if z_size < r_size: return z_size else: return r_size else: return len(data) else: # really simplistic database size computation: try: a = data[0] return len(data) except: return 4 def columnType(self, col_name): try: col_def = self.__col_def_hash[col_name] except KeyError: try: col_def = self.__vcol_def_hash[col_name] except KeyError: raise eNoSuchColumn, "no column (%s) on table %s" % (col_name,self.__table_name) c_name,c_type,c_options = col_def return c_type def convertDataForColumn(self,data,col_name): try: col_def = self.__col_def_hash[col_name] except KeyError: try: col_def = self.__vcol_def_hash[col_name] except KeyError: raise eNoSuchColumn, "no column (%s) on table %s" % (col_name,self.__table_name) c_name,c_type,c_options = col_def if c_type == kIncInteger: raise eInvalidData, "invalid operation for column (%s:%s) on table (%s)" % (col_name,c_type,self.__table_name) if c_type == kInteger: try: if data is None: data = 0 else: return long(data) except (ValueError,TypeError): raise eInvalidData, "invalid data (%s) for col (%s:%s) on table (%s)" % (repr(data),col_name,c_type,self.__table_name) elif c_type == kReal: try: if data is None: data = 0.0 else: return float(data) except (ValueError,TypeError): raise eInvalidData, "invalid data (%s) for col (%s:%s) on table (%s)" % (repr(data), col_name,c_type,self.__table_name) else: if type(data) == type(long(0)): return "%d" % data else: return str(data) def getPrimaryKeyList(self): return self.__primary_key_list def hasValueColumn(self): return self.__has_value_column def hasColumn(self,name): return self.__col_def_hash.has_key(name) def hasVColumn(self,name): return self.__vcol_def_hash.has_key(name) def _defineRows(self): raise "can't instantiate base odb.Table type, make a subclass and override _defineRows()" def __lockColumnsAndInit(self): # add a 'odb_value column' before we lockdown the table def if self.__has_value_column: self.d_addColumn("odb_value",kBigText,default='') self.__columns_locked = 1 # walk column list and make lookup hashes, primary_key_list, etc.. primary_key_list = [] col_def_hash = {} for a_col in self.__column_list: name,type,options = a_col col_def_hash[name] = a_col if options.has_key('primarykey'): primary_key_list.append(name) self.__col_def_hash = col_def_hash self.__primary_key_list = primary_key_list # setup the value columns! if (not self.__has_value_column) and (len(self.__vcolumn_list) > 0): raise "can't define vcolumns on table without ValueColumn, call d_addValueColumn() in your _defineRows()" vcol_def_hash = {} for a_col in self.__vcolumn_list: name,type,size_data,options = a_col vcol_def_hash[name] = a_col self.__vcol_def_hash = vcol_def_hash def __checkColumnLock(self): if self.__columns_locked: raise "can't change column definitions outside of subclass' _defineRows() method!" # table definition methods, these are only available while inside the # subclass's _defineRows method # # Ex: # # import odb # class MyTable(odb.Table): # def _defineRows(self): # self.d_addColumn("id",kInteger,primarykey = 1,autoincrement = 1) # self.d_addColumn("name",kVarString,120) # self.d_addColumn("type",kInteger, # enum_values = { 0 : "alive", 1 : "dead" } def d_addColumn(self,col_name,ctype,size=None,primarykey = 0, notnull = 0,indexed=0, default=None,unique=0,autoincrement=0,safeupdate=0, enum_values = None, no_export = 0, relations=None,compress_ok=0,int_date=0): self.__checkColumnLock() options = {} options['default'] = default if primarykey: options['primarykey'] = primarykey if unique: options['unique'] = unique if indexed: options['indexed'] = indexed self.addIndex((col_name,)) if safeupdate: options['safeupdate'] = safeupdate if autoincrement: options['autoincrement'] = autoincrement if notnull: options['notnull'] = notnull if size: options['size'] = size if no_export: options['no_export'] = no_export if int_date: if ctype != kInteger: raise eInvalidData, "can't flag columns int_date unless they are kInteger" else: options['int_date'] = int_date if enum_values: options['enum_values'] = enum_values inv_enum_values = {} for k,v in enum_values.items(): if inv_enum_values.has_key(v): raise eInvalidData, "enum_values paramater must be a 1 to 1 mapping for Table(%s)" % self.__table_name else: inv_enum_values[v] = k options['inv_enum_values'] = inv_enum_values if relations: options['relations'] = relations for a_relation in relations: table, foreign_column_name = a_relation if self.__relations_by_table.has_key(table): raise eInvalidData, "multiple relations for the same foreign table are not yet supported" self.__relations_by_table[table] = (col_name,foreign_column_name) if compress_ok: if ctype == kBigString: options['compress_ok'] = 1 else: raise eInvalidData, "only kBigString fields can be compress_ok=1" self.__column_list.append( (col_name,ctype,options) ) def d_addValueColumn(self): self.__checkColumnLock() self.__has_value_column = 1 def d_addVColumn(self,col_name,type,size=None,default=None): self.__checkColumnLock() if (not self.__has_value_column): raise "can't define VColumns on table without ValueColumn, call d_addValueColumn() first" options = {} if default: options['default'] = default if size: options['size'] = size self.__vcolumn_list.append( (col_name,type,options) ) ##################### # _checkColMatchSpec(col_match_spec,should_match_unique_row = 0) # # raise an error if the col_match_spec contains invalid columns, or # (in the case of should_match_unique_row) if it does not fully specify # a unique row. # # NOTE: we don't currently support where clauses with value column fields! # def _fixColMatchSpec(self,col_match_spec, should_match_unique_row = 0): if type(col_match_spec) == type([]): if type(col_match_spec[0]) != type((0,)): raise eInvalidMatchSpec, "invalid types in match spec, use [(,)..] or (,)" elif type(col_match_spec) == type((0,)): col_match_spec = [ col_match_spec ] elif type(col_match_spec) == type(None): if should_match_unique_row: raise eNonUniqueMatchSpec, "can't use a non-unique match spec (%s) here" % col_match_spec else: return None else: raise eInvalidMatchSpec, "invalid types in match spec, use [(,)..] or (,)" if should_match_unique_row: unique_column_lists = [] # first the primary key list my_primary_key_list = [] for a_key in self.__primary_key_list: my_primary_key_list.append(a_key) # then other unique keys for a_col in self.__column_list: col_name,a_type,options = a_col if options.has_key('unique'): unique_column_lists.append( (col_name, [col_name]) ) unique_column_lists.append( ('primary_key', my_primary_key_list) ) new_col_match_spec = [] for a_col in col_match_spec: name,val = a_col # newname = string.lower(name) # what is this doing?? - jeske newname = name if not self.__col_def_hash.has_key(newname): raise eNoSuchColumn, "no such column in match spec: '%s'" % newname new_col_match_spec.append( (newname,val) ) if should_match_unique_row: for name,a_list in unique_column_lists: try: a_list.remove(newname) except ValueError: # it's okay if they specify too many columns! pass if should_match_unique_row: for name,a_list in unique_column_lists: if len(a_list) == 0: # we matched at least one unique colum spec! # log("using unique column (%s) for query %s" % (name,col_match_spec)) return new_col_match_spec raise eNonUniqueMatchSpec, "can't use a non-unique match spec (%s) here" % col_match_spec return new_col_match_spec def __buildWhereClause (self, col_match_spec,other_clauses = None): sql_where_list = [] if not col_match_spec is None: for m_col in col_match_spec: m_col_name,m_col_val = m_col c_name,c_type,c_options = self.__col_def_hash[m_col_name] if c_type in (kIncInteger, kInteger): try: m_col_val_long = long(m_col_val) except ValueError: raise ValueError, "invalid literal for long(%s) in table %s" % (repr(m_col_val),self.__table_name) sql_where_list.append("%s = %d" % (c_name, m_col_val_long)) elif c_type == kReal: try: m_col_val_float = float(m_col_val) except ValueError: raise ValueError, "invalid literal for float(%s) is table %s" % (repr(m_col_val), self.__table_name) sql_where_list.append("%s = %s" % (c_name, m_col_val_float)) else: sql_where_list.append("%s = '%s'" % (c_name, self.db.escape(m_col_val))) if other_clauses is None: pass elif type(other_clauses) == type(""): sql_where_list = sql_where_list + [other_clauses] elif type(other_clauses) == type([]): sql_where_list = sql_where_list + other_clauses else: raise eInvalidData, "unknown type of extra where clause: %s" % repr(other_clauses) return sql_where_list def __fetchRows(self,col_match_spec,cursor = None, where = None, order_by = None, limit_to = None, skip_to = None, join = None): if cursor is None: cursor = self.db.defaultCursor() # build column list sql_columns = [] for name,t,options in self.__column_list: sql_columns.append(name) # build join information joined_cols = [] joined_cols_hash = {} join_clauses = []
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -