xmlrpclib.py
来自「mallet是自然语言处理、机器学习领域的一个开源项目。」· Python 代码 · 共 1,017 行 · 第 1/3 页
PY
1,017 行
# activate parser self.parser = sgmlop.XMLParser() self.parser.register(self) self.feed = self.parser.feed self.entity = { "amp": "&", "gt": ">", "lt": "<", "apos": "'", "quot": '"' } def close(self): try: self.parser.close() finally: self.parser = self.feed = None # nuke circular reference def handle_proc(self, tag, attr): import re m = re.search("encoding\s*=\s*['\"]([^\"']+)[\"']", attr) if m: self.handle_xml(m.group(1), 1) def handle_entityref(self, entity): # <string> entity try: self.handle_data(self.entity[entity]) except KeyError: self.handle_data("&%s;" % entity)try: from xml.parsers import expat if not hasattr(expat, "ParserCreate"): raise ImportError, "ParserCreate"except ImportError: ExpatParser = Noneelse: class ExpatParser: # fast expat parser for Python 2.0. this is about 50% # slower than sgmlop, on roundtrip testing def __init__(self, target): self._parser = parser = expat.ParserCreate(None, None) self._target = target parser.StartElementHandler = target.start parser.EndElementHandler = target.end parser.CharacterDataHandler = target.data encoding = None if not parser.returns_unicode: encoding = "utf-8" target.xml(encoding, None) def feed(self, data): self._parser.Parse(data, 0) def close(self): self._parser.Parse("", 1) # end of data del self._target, self._parser # get rid of circular referencesclass SlowParser: """Default XML parser (based on xmllib.XMLParser).""" # this is about 10 times slower than sgmlop, on roundtrip # testing. def __init__(self, target): import xmllib # lazy subclassing (!) if xmllib.XMLParser not in SlowParser.__bases__: SlowParser.__bases__ = (xmllib.XMLParser,) self.handle_xml = target.xml self.unknown_starttag = target.start self.handle_data = target.data self.unknown_endtag = target.end try: xmllib.XMLParser.__init__(self, accept_utf8=1) except TypeError: xmllib.XMLParser.__init__(self) # pre-2.0# --------------------------------------------------------------------# XML-RPC marshalling and unmarshalling codeclass Marshaller: """Generate an XML-RPC params chunk from a Python data structure. Create a Marshaller instance for each set of parameters, and use the "dumps" method to convert your data (represented as a tuple) to an XML-RPC params chunk. To write a fault response, pass a Fault instance instead. You may prefer to use the "dumps" module function for this purpose. """ # by the way, if you don't understand what's going on in here, # that's perfectly ok. def __init__(self, encoding=None): self.memo = {} self.data = None self.encoding = encoding dispatch = {} def dumps(self, values): self.__out = [] self.write = write = self.__out.append if isinstance(values, Fault): # fault instance write("<fault>\n") self.__dump(vars(values)) write("</fault>\n") else: # parameter block # FIXME: the xml-rpc specification allows us to leave out # the entire <params> block if there are no parameters. # however, changing this may break older code (including # old versions of xmlrpclib.py), so this is better left as # is for now. See @XMLRPC3 for more information. /F write("<params>\n") for v in values: write("<param>\n") self.__dump(v) write("</param>\n") write("</params>\n") result = string.join(self.__out, "") del self.__out, self.write # don't need this any more return result def __dump(self, value): try: f = self.dispatch[type(value)] except KeyError: raise TypeError, "cannot marshal %s objects" % type(value) else: f(self, value) def dump_int(self, value): # in case ints are > 32 bits if value > MAXINT or value < MININT: raise OverflowError, "int exceeds XML-RPC limits" self.write("<value><int>%s</int></value>\n" % value) dispatch[IntType] = dump_int def dump_long(self, value): # in case ints are > 32 bits if value > MAXINT or value < MININT: raise OverflowError, "long int exceeds XML-RPC limits" self.write("<value><int>%s</int></value>\n" % int(value)) dispatch[LongType] = dump_long def dump_double(self, value): self.write("<value><double>%s</double></value>\n" % repr(value)) dispatch[FloatType] = dump_double def dump_string(self, value, escape=escape): self.write("<value><string>%s</string></value>\n" % escape(value)) dispatch[StringType] = dump_string if unicode: def dump_unicode(self, value, escape=escape): value = value.encode(self.encoding) self.write("<value><string>%s</string></value>\n" % escape(value)) dispatch[UnicodeType] = dump_unicode def opencontainer(self, value): if value: i = id(value) if self.memo.has_key(i): raise TypeError, "cannot marshal recursive data structures" self.memo[i] = None def closecontainer(self, value): if value: del self.memo[id(value)] def dump_array(self, value): self.opencontainer(value) write = self.write dump = self.__dump write("<value><array><data>\n") for v in value: dump(v) write("</data></array></value>\n") self.closecontainer(value) dispatch[TupleType] = dump_array dispatch[ListType] = dump_array def dump_struct(self, value, escape=escape): self.opencontainer(value) write = self.write dump = self.__dump write("<value><struct>\n") for k, v in value.items(): write("<member>\n") if type(k) is not StringType: raise TypeError, "dictionary key must be string" write("<name>%s</name>\n" % escape(k)) dump(v) write("</member>\n") write("</struct></value>\n") self.closecontainer(value) dispatch[DictType] = dump_struct def dump_instance(self, value): # check for special wrappers if value.__class__ in WRAPPERS: value.encode(self) else: # store instance attributes as a struct (really?) self.dump_struct(value.__dict__) dispatch[InstanceType] = dump_instanceclass Unmarshaller: """Unmarshal an XML-RPC response, based on incoming XML event messages (start, data, end). Call close() to get the resulting data structure. Note that this reader is fairly tolerant, and gladly accepts bogus XML-RPC data without complaining (but not bogus XML). """ # and again, if you don't understand what's going on in here, # that's perfectly ok. def __init__(self): self._type = None self._stack = [] self._marks = [] self._data = [] self._methodname = None self._encoding = "utf-8" self.append = self._stack.append def close(self): # return response tuple and target method if self._type is None or self._marks: raise ResponseError() if self._type == "fault": raise apply(Fault, (), self._stack[0]) return tuple(self._stack) def getmethodname(self): return self._methodname # # event handlers def xml(self, encoding, standalone): self._encoding = encoding # FIXME: assert standalone == 1 ??? def start(self, tag, attrs): # prepare to handle this element if tag == "array" or tag == "struct": self._marks.append(len(self._stack)) self._data = [] self._value = (tag == "value") def data(self, text): self._data.append(text) def end(self, tag, join=string.join): # call the appropriate end tag handler try: f = self.dispatch[tag] except KeyError: pass # unknown tag ? else: return f(self, join(self._data, "")) # # accelerator support def end_dispatch(self, tag, data): # dispatch data try: f = self.dispatch[tag] except KeyError: pass # unknown tag ? else: return f(self, data) # # element decoders dispatch = {} def end_boolean(self, data): if data == "0": self.append(False) elif data == "1": self.append(True) else: raise TypeError, "bad boolean value" self._value = 0 dispatch["boolean"] = end_boolean def end_int(self, data): self.append(int(data)) self._value = 0 dispatch["i4"] = end_int dispatch["int"] = end_int def end_double(self, data): self.append(float(data)) self._value = 0 dispatch["double"] = end_double def end_string(self, data): if self._encoding: data = _decode(data, self._encoding) self.append(_stringify(data)) self._value = 0 dispatch["string"] = end_string dispatch["name"] = end_string # struct keys are always strings def end_array(self, data): mark = self._marks[-1] del self._marks[-1] # map arrays to Python lists self._stack[mark:] = [self._stack[mark:]] self._value = 0 dispatch["array"] = end_array def end_struct(self, data): mark = self._marks[-1] del self._marks[-1] # map structs to Python dictionaries dict = {} items = self._stack[mark:] for i in range(0, len(items), 2): dict[_stringify(items[i])] = items[i+1] self._stack[mark:] = [dict] self._value = 0 dispatch["struct"] = end_struct def end_base64(self, data): value = Binary() value.decode(data) self.append(value) self._value = 0 dispatch["base64"] = end_base64 def end_dateTime(self, data): value = DateTime() value.decode(data)
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?