⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 odb.py

📁 在 linux平台上的网页编程的模板
💻 PY
📖 第 1 页 / 共 4 页
字号:
        row = self.__defaultRowClass(self,None,create=1,replace=replace)        for (cname, ctype, opts) in self.__column_list:            if opts['default'] is not None and ctype is not kIncInteger:                row[cname] = opts['default']        return rowclass Row:    __instance_data_locked  = 0    def subclassinit(self):        pass    def __init__(self,_table,data_dict,create=0,joined_cols = None,replace=0):        self._inside_getattr = 0  # stop recursive __getattr__        self._table = _table        self._should_insert = create or replace        self._should_replace = replace        self._rowInactive = None        self._joinedRows = []                self.__pk_match_spec = None        self.__vcoldata = {}        self.__inc_coldata = {}        self.__joined_cols_dict = {}        for a_col in joined_cols or []:            self.__joined_cols_dict[a_col] = 1                if create:            self.__coldata = {}        else:            if type(data_dict) != type({}):                raise eInternalError, "rowdict instantiate with bad data_dict"            self.__coldata = data_dict            self.__unpackVColumn()        self.markClean()        self.subclassinit()        self.__instance_data_locked = 1    def joinRowData(self,another_row):        self._joinedRows.append(another_row)    def getPKMatchSpec(self):        return self.__pk_match_spec    def markClean(self):        self.__vcolchanged = 0        self.__colchanged_dict = {}        for key in self.__inc_coldata.keys():            self.__coldata[key] = self.__coldata.get(key, 0) + self.__inc_coldata[key]        self.__inc_coldata = {}        if not self._should_insert:            # rebuild primary column match spec            new_match_spec = []            for col_name in self._table.getPrimaryKeyList():                try:                    rdata = self[col_name]                except KeyError:                    raise eInternalError, "must have primary key data filled in to save %s:Row(col:%s)" % (self._table.getTableName(),col_name)                                    new_match_spec.append( (col_name, rdata) )            self.__pk_match_spec = new_match_spec    def __unpackVColumn(self):        if self._table.hasValueColumn():            pass            def __packVColumn(self):        if self._table.hasValueColumn():            pass    ## ----- utility stuff ----------------------------------    def __del__(self):        # check for unsaved changes        changed_list = self.changedList()        if len(changed_list):            info = "unsaved Row for table (%s) lost, call discard() to avoid this error. Lost changes: %s\n" % (self._table.getTableName(), repr(changed_list)[:256])            if 0:                raise eUnsavedObjectLost, info            else:                sys.stderr.write(info)                    def __repr__(self):        return "Row from (%s): %s" % (self._table.getTableName(),repr(self.__coldata) + repr(self.__vcoldata))    ## ---- class emulation --------------------------------    def __getattr__(self,key):        if self._inside_getattr:          raise AttributeError, "recursively called __getattr__ (%s,%s)" % (key,self._table.getTableName())        try:            self._inside_getattr = 1            try:                return self[key]            except KeyError:                if self._table.hasColumn(key) or self._table.hasVColumn(key):                    return None                else:                    raise AttributeError, "unknown field '%s' in Row(%s)" % (key,self._table.getTableName())        finally:            self._inside_getattr = 0    def __setattr__(self,key,val):        if not self.__instance_data_locked:            self.__dict__[key] = val        else:            my_dict = self.__dict__            if my_dict.has_key(key):                my_dict[key] = val            else:                # try and put it into the rowdata                try:                    self[key] = val                except KeyError, reason:                    raise AttributeError, reason    ## ---- dict emulation ---------------------------------        def __getitem__(self,key):        self.checkRowActive()        try:            c_type = self._table.columnType(key)        except eNoSuchColumn, reason:            # Ugh, this sucks, we can't determine the type for a joined            # row, so we just default to kVarString and let the code below            # determine if this is a joined column or not            c_type = kVarString        if c_type == kIncInteger:            c_data = self.__coldata.get(key, 0)             if c_data is None: c_data = 0            i_data = self.__inc_coldata.get(key, 0)            if i_data is None: i_data = 0            return c_data + i_data                try:            return self.__coldata[key]        except KeyError:            try:                return self.__vcoldata[key]            except KeyError:                for a_joined_row in self._joinedRows:                    try:                        return a_joined_row[key]                    except KeyError:                        pass                raise KeyError, "unknown column %s in %s" % (key,self)    def __setitem__(self,key,data):        self.checkRowActive()                try:            newdata = self._table.convertDataForColumn(data,key)        except eNoSuchColumn, reason:            raise KeyError, reason        if self._table.hasColumn(key):            self.__coldata[key] = newdata            self.__colchanged_dict[key] = 1        elif self._table.hasVColumn(key):            self.__vcoldata[key] = newdata            self.__vcolchanged = 1        else:            for a_joined_row in self._joinedRows:                try:                    a_joined_row[key] = data                    return                except KeyError:                    pass            raise KeyError, "unknown column name %s" % key    def __delitem__(self,key,data):        self.checkRowActive()                if self.table.hasVColumn(key):            del self.__vcoldata[key]        else:            for a_joined_row in self._joinedRows:                try:                    del a_joined_row[key]                    return                except KeyError:                    pass            raise KeyError, "unknown column name %s" % key    def copyFrom(self,source):        for name,t,options in self._table.getColumnList():            if not options.has_key("autoincrement"):                self[name] = source[name]    # make sure that .keys(), and .items() come out in a nice order!    def keys(self):        self.checkRowActive()                key_list = []        for name,t,options in self._table.getColumnList():            key_list.append(name)        for name in self.__joined_cols_dict.keys():            key_list.append(name)        for a_joined_row in self._joinedRows:            key_list = key_list + a_joined_row.keys()                    return key_list    def items(self):        self.checkRowActive()                item_list = []        for name,t,options in self._table.getColumnList():            item_list.append( (name,self[name]) )        for name in self.__joined_cols_dict.keys():            item_list.append( (name,self[name]) )        for a_joined_row in self._joinedRows:            item_list = item_list + a_joined_row.items()        return item_list    def values(elf):        self.checkRowActive()        value_list = self.__coldata.values() + self.__vcoldata.values()        for a_joined_row in self._joinedRows:            value_list = value_list + a_joined_row.values()        return value_list            def __len__(self):        self.checkRowActive()                my_len = len(self.__coldata) + len(self.__vcoldata)        for a_joined_row in self._joinedRows:            my_len = my_len + len(a_joined_row)        return my_len    def has_key(self,key):        self.checkRowActive()                if self.__coldata.has_key(key) or self.__vcoldata.has_key(key):            return 1        else:            for a_joined_row in self._joinedRows:                if a_joined_row.has_key(key):                    return 1            return 0            def get(self,key,default = None):        self.checkRowActive()                        if self.__coldata.has_key(key):            return self.__coldata[key]        elif self.__vcoldata.has_key(key):            return self.__vcoldata[key]        else:            for a_joined_row in self._joinedRows:                try:                    return a_joined_row.get(key,default)                except eNoSuchColumn:                    pass            if self._table.hasColumn(key):                return default                        raise eNoSuchColumn, "no such column %s" % key    def inc(self,key,count=1):        self.checkRowActive()        if self._table.hasColumn(key):            try:                self.__inc_coldata[key] = self.__inc_coldata[key] + count            except KeyError:                self.__inc_coldata[key] = count            self.__colchanged_dict[key] = 1        else:            raise AttributeError, "unknown field '%s' in Row(%s)" % (key,self._table.getTableName())        ## ----------------------------------    ## real interface    def fillDefaults(self):        for field_def in self._table.fieldList():            name,type,size,options = field_def            if options.has_key("default"):                self[name] = options["default"]    ###############    # changedList()    #    # returns a list of tuples for the columns which have changed    #    #   changedList() -> [ ('name', 'fred'), ('age', 20) ]    def changedList(self):        if self.__vcolchanged:            self.__packVColumn()        changed_list = []        for a_col in self.__colchanged_dict.keys():            changed_list.append( (a_col,self.get(a_col,None),self.__inc_coldata.get(a_col,None)) )        return changed_list    def discard(self):        self.__coldata = None        self.__vcoldata = None        self.__colchanged_dict = {}        self.__vcolchanged = 0    def delete(self,cursor = None):        self.checkRowActive()                fromTable = self._table        curs = cursor        fromTable.r_deleteRow(self,cursor=curs)        self._rowInactive = "deleted"    def save(self,cursor = None):        toTable = self._table        self.checkRowActive()        if self._should_insert:            toTable.r_insertRow(self,replace=self._should_replace)            self._should_insert = 0            self._should_replace = 0            self.markClean()  # rebuild the primary key list        else:            curs = cursor            toTable.r_updateRow(self,cursor = curs)        # the table will mark us clean!        # self.markClean()    def checkRowActive(self):        if self._rowInactive:            raise eInvalidData, "row is inactive: %s" % self._rowInactive    def databaseSizeForColumn(self,key):        return self._table.databaseSizeForData_ColumnName_(self[key],key)if __name__ == "__main__":    print "run odb_test.py"

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -