📄 util.py
字号:
line = nextline match = boundary.match(line) if (not line) or match: # we stop if we reached the end of the stream or a # stop boundary (which means '--' after the # boundary) we continue to the next part if we # reached a simple boundary in either case this # would mean the entity is malformed, but we're # tolerating it anyway. skip_this_part = True end_of_stream = (not line) or (match.group(1) is not None) break if skip_this_part: continue if disp_options.has_key("name"): name = disp_options["name"] else: name = None # create a file object # is this a file? if disp_options.has_key("filename"): if file_callback and callable(file_callback): file = file_callback(disp_options["filename"]) else: file = tempfile.TemporaryFile("w+b") else: if field_callback and callable(field_callback): file = field_callback() else: file = cStringIO.StringIO() # read it in self.read_to_boundary(req, boundary, file) file.seek(0) # make a Field if disp_options.has_key("filename"): field = Field(name) field.filename = disp_options["filename"] else: field = StringField(file.read()) field.name = name field.file = file field.type = ctype field.type_options = type_options field.disposition = disp field.disposition_options = disp_options field.headers = headers self.list.append(field) def add_field(self, key, value): """Insert a field as key/value pair""" item = StringField(value) item.name = key self.list.append(item) def __setitem__(self, key, value): table = self.list.table() if table.has_key(key): items = table[key] for item in items: self.list.remove(item) item = StringField(value) item.name = key self.list.append(item) def read_to_boundary(self, req, boundary, file): previous_delimiter = None while True: line = req.readline(readBlockSize) if not line: # end of stream if file is not None and previous_delimiter is not None: file.write(previous_delimiter) return True match = boundary.match(line) if match: # the line is the boundary, so we bail out # if the two last chars are '--' it is the end of the entity return match.group(1) is not None if line[-2:] == '\r\n': # the line ends with a \r\n, which COULD be part # of the next boundary. We write the previous line delimiter # then we write the line without \r\n and save it for the next # iteration if it was not part of the boundary if file is not None: if previous_delimiter is not None: file.write(previous_delimiter) file.write(line[:-2]) previous_delimiter = '\r\n' elif line[-1:] == '\r': # the line ends with \r, which is only possible if # readBlockSize bytes have been read. In that case the # \r COULD be part of the next boundary, so we save it # for the next iteration assert len(line) == readBlockSize if file is not None: if previous_delimiter is not None: file.write(previous_delimiter) file.write(line[:-1]) previous_delimiter = '\r' elif line == '\n' and previous_delimiter == '\r': # the line us a single \n and we were in the middle of a \r\n, # so we complete the delimiter previous_delimiter = '\r\n' else: if file is not None: if previous_delimiter is not None: file.write(previous_delimiter) file.write(line) previous_delimiter = None def __getitem__(self, key): """Dictionary style indexing.""" found = self.list.table()[key] if len(found) == 1: return found[0] else: return found def get(self, key, default): try: return self.__getitem__(key) except (TypeError, KeyError): return default def keys(self): """Dictionary style keys() method.""" return self.list.table().keys() def __iter__(self): return iter(self.keys()) def __repr__(self): return repr(self.list.table()) def has_key(self, key): """Dictionary style has_key() method.""" return (key in self.list.table()) __contains__ = has_key def __len__(self): """Dictionary style len(x) support.""" return len(self.list.table()) def getfirst(self, key, default=None): """ return the first value received """ try: return self.list.table()[key][0] except KeyError: return default def getlist(self, key): """ return a list of received values """ try: return self.list.table()[key] except KeyError: return [] def items(self): """Dictionary-style items(), except that items are returned in the same order as they were supplied in the form.""" return [(item.name, item) for item in self.list] def __delitem__(self, key): table = self.list.table() values = table[key] for value in values: self.list.remove(value) def clear(self): self.list = FieldList()def parse_header(line): """Parse a Content-type like header. Return the main content-type and a dictionary of options. """ plist = map(lambda a: a.strip(), line.split(';')) key = plist[0].lower() del plist[0] pdict = {} for p in plist: i = p.find('=') if i >= 0: name = p[:i].strip().lower() value = p[i+1:].strip() if len(value) >= 2 and value[0] == value[-1] == '"': value = value[1:-1] pdict[name] = value return key, pdictdef apply_fs_data(object, fs, **args): """ Apply FieldStorage data to an object - the object must be callable. Examine the args, and match then with fs data, then call the object, return the result. """ # we need to weed out unexpected keyword arguments # and for that we need to get a list of them. There # are a few options for callable objects here: fc = None expected = [] if hasattr(object, "func_code"): # function fc = object.func_code expected = fc.co_varnames[0:fc.co_argcount] elif hasattr(object, 'im_func'): # method fc = object.im_func.func_code expected = fc.co_varnames[1:fc.co_argcount] elif type(object) in (TypeType,ClassType): # class fc = object.__init__.im_func.func_code expected = fc.co_varnames[1:fc.co_argcount] elif type(object) is BuiltinFunctionType: # builtin fc = None expected = [] elif hasattr(object, '__call__'): # callable object if type(object.__call__) is MethodType: fc = object.__call__.im_func.func_code expected = fc.co_varnames[1:fc.co_argcount] else: # abuse of objects to create hierarchy return apply_fs_data(object.__call__, fs, **args) # add form data to args for field in fs.list: if field.filename: val = field else: val = field.value args.setdefault(field.name, []).append(val) # replace lists with single values for arg in args: if ((type(args[arg]) is ListType) and (len(args[arg]) == 1)): args[arg] = args[arg][0] # remove unexpected args unless co_flags & 0x08, # meaning function accepts **kw syntax if fc is None: args = {} elif not (fc.co_flags & 0x08): for name in args.keys(): if name not in expected: del args[name] return object(**args)def redirect(req, location, permanent=0, text=None): """ A convenience function to provide redirection """ if req.sent_bodyct: raise IOError, "Cannot redirect after headers have already been sent." req.err_headers_out["Location"] = location if permanent: req.status = apache.HTTP_MOVED_PERMANENTLY else: req.status = apache.HTTP_MOVED_TEMPORARILY if text is None: req.write('<p>The document has moved' ' <a href="%s">here</a></p>\n' % location) else: req.write(text) raise apache.SERVER_RETURN, apache.DONE
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -