xmlrpclib.py

来自「mallet是自然语言处理、机器学习领域的一个开源项目。」· Python 代码 · 共 1,017 行 · 第 1/3 页

PY
1,017
字号
            # activate parser            self.parser = sgmlop.XMLParser()            self.parser.register(self)            self.feed = self.parser.feed            self.entity = {                "amp": "&", "gt": ">", "lt": "<",                "apos": "'", "quot": '"'                }        def close(self):            try:                self.parser.close()            finally:                self.parser = self.feed = None # nuke circular reference        def handle_proc(self, tag, attr):            import re            m = re.search("encoding\s*=\s*['\"]([^\"']+)[\"']", attr)            if m:                self.handle_xml(m.group(1), 1)        def handle_entityref(self, entity):            # <string> entity            try:                self.handle_data(self.entity[entity])            except KeyError:                self.handle_data("&%s;" % entity)try:    from xml.parsers import expat    if not hasattr(expat, "ParserCreate"):        raise ImportError, "ParserCreate"except ImportError:    ExpatParser = Noneelse:    class ExpatParser:        # fast expat parser for Python 2.0.  this is about 50%        # slower than sgmlop, on roundtrip testing        def __init__(self, target):            self._parser = parser = expat.ParserCreate(None, None)            self._target = target            parser.StartElementHandler = target.start            parser.EndElementHandler = target.end            parser.CharacterDataHandler = target.data            encoding = None            if not parser.returns_unicode:                encoding = "utf-8"            target.xml(encoding, None)        def feed(self, data):            self._parser.Parse(data, 0)        def close(self):            self._parser.Parse("", 1) # end of data            del self._target, self._parser # get rid of circular referencesclass SlowParser:    """Default XML parser (based on xmllib.XMLParser)."""    # this is about 10 times slower than sgmlop, on roundtrip    # testing.    def __init__(self, target):        import xmllib # lazy subclassing (!)        if xmllib.XMLParser not in SlowParser.__bases__:            SlowParser.__bases__ = (xmllib.XMLParser,)        self.handle_xml = target.xml        self.unknown_starttag = target.start        self.handle_data = target.data        self.unknown_endtag = target.end        try:            xmllib.XMLParser.__init__(self, accept_utf8=1)        except TypeError:            xmllib.XMLParser.__init__(self) # pre-2.0# --------------------------------------------------------------------# XML-RPC marshalling and unmarshalling codeclass Marshaller:    """Generate an XML-RPC params chunk from a Python data structure.    Create a Marshaller instance for each set of parameters, and use    the "dumps" method to convert your data (represented as a tuple)    to an XML-RPC params chunk.  To write a fault response, pass a    Fault instance instead.  You may prefer to use the "dumps" module    function for this purpose.    """    # by the way, if you don't understand what's going on in here,    # that's perfectly ok.    def __init__(self, encoding=None):        self.memo = {}        self.data = None        self.encoding = encoding    dispatch = {}    def dumps(self, values):        self.__out = []        self.write = write = self.__out.append        if isinstance(values, Fault):            # fault instance            write("<fault>\n")            self.__dump(vars(values))            write("</fault>\n")        else:            # parameter block            # FIXME: the xml-rpc specification allows us to leave out            # the entire <params> block if there are no parameters.            # however, changing this may break older code (including            # old versions of xmlrpclib.py), so this is better left as            # is for now.  See @XMLRPC3 for more information. /F            write("<params>\n")            for v in values:                write("<param>\n")                self.__dump(v)                write("</param>\n")            write("</params>\n")        result = string.join(self.__out, "")        del self.__out, self.write # don't need this any more        return result    def __dump(self, value):        try:            f = self.dispatch[type(value)]        except KeyError:            raise TypeError, "cannot marshal %s objects" % type(value)        else:            f(self, value)    def dump_int(self, value):        # in case ints are > 32 bits        if value > MAXINT or value < MININT:            raise OverflowError, "int exceeds XML-RPC limits"        self.write("<value><int>%s</int></value>\n" % value)    dispatch[IntType] = dump_int    def dump_long(self, value):        # in case ints are > 32 bits        if value > MAXINT or value < MININT:            raise OverflowError, "long int exceeds XML-RPC limits"        self.write("<value><int>%s</int></value>\n" % int(value))    dispatch[LongType] = dump_long    def dump_double(self, value):        self.write("<value><double>%s</double></value>\n" % repr(value))    dispatch[FloatType] = dump_double    def dump_string(self, value, escape=escape):        self.write("<value><string>%s</string></value>\n" % escape(value))    dispatch[StringType] = dump_string    if unicode:        def dump_unicode(self, value, escape=escape):            value = value.encode(self.encoding)            self.write("<value><string>%s</string></value>\n" % escape(value))        dispatch[UnicodeType] = dump_unicode    def opencontainer(self, value):        if value:            i = id(value)            if self.memo.has_key(i):                raise TypeError, "cannot marshal recursive data structures"            self.memo[i] = None    def closecontainer(self, value):        if value:            del self.memo[id(value)]    def dump_array(self, value):        self.opencontainer(value)        write = self.write        dump = self.__dump        write("<value><array><data>\n")        for v in value:            dump(v)        write("</data></array></value>\n")        self.closecontainer(value)    dispatch[TupleType] = dump_array    dispatch[ListType] = dump_array    def dump_struct(self, value, escape=escape):        self.opencontainer(value)        write = self.write        dump = self.__dump        write("<value><struct>\n")        for k, v in value.items():            write("<member>\n")            if type(k) is not StringType:                raise TypeError, "dictionary key must be string"            write("<name>%s</name>\n" % escape(k))            dump(v)            write("</member>\n")        write("</struct></value>\n")        self.closecontainer(value)    dispatch[DictType] = dump_struct    def dump_instance(self, value):        # check for special wrappers        if value.__class__ in WRAPPERS:            value.encode(self)        else:            # store instance attributes as a struct (really?)            self.dump_struct(value.__dict__)    dispatch[InstanceType] = dump_instanceclass Unmarshaller:    """Unmarshal an XML-RPC response, based on incoming XML event    messages (start, data, end).  Call close() to get the resulting    data structure.    Note that this reader is fairly tolerant, and gladly accepts bogus    XML-RPC data without complaining (but not bogus XML).    """    # and again, if you don't understand what's going on in here,    # that's perfectly ok.    def __init__(self):        self._type = None        self._stack = []        self._marks = []        self._data = []        self._methodname = None        self._encoding = "utf-8"        self.append = self._stack.append    def close(self):        # return response tuple and target method        if self._type is None or self._marks:            raise ResponseError()        if self._type == "fault":            raise apply(Fault, (), self._stack[0])        return tuple(self._stack)    def getmethodname(self):        return self._methodname    #    # event handlers    def xml(self, encoding, standalone):        self._encoding = encoding        # FIXME: assert standalone == 1 ???    def start(self, tag, attrs):        # prepare to handle this element        if tag == "array" or tag == "struct":            self._marks.append(len(self._stack))        self._data = []        self._value = (tag == "value")    def data(self, text):        self._data.append(text)    def end(self, tag, join=string.join):        # call the appropriate end tag handler        try:            f = self.dispatch[tag]        except KeyError:            pass # unknown tag ?        else:            return f(self, join(self._data, ""))    #    # accelerator support    def end_dispatch(self, tag, data):        # dispatch data        try:            f = self.dispatch[tag]        except KeyError:            pass # unknown tag ?        else:            return f(self, data)    #    # element decoders    dispatch = {}    def end_boolean(self, data):        if data == "0":            self.append(False)        elif data == "1":            self.append(True)        else:            raise TypeError, "bad boolean value"        self._value = 0    dispatch["boolean"] = end_boolean    def end_int(self, data):        self.append(int(data))        self._value = 0    dispatch["i4"] = end_int    dispatch["int"] = end_int    def end_double(self, data):        self.append(float(data))        self._value = 0    dispatch["double"] = end_double    def end_string(self, data):        if self._encoding:            data = _decode(data, self._encoding)        self.append(_stringify(data))        self._value = 0    dispatch["string"] = end_string    dispatch["name"] = end_string # struct keys are always strings    def end_array(self, data):        mark = self._marks[-1]        del self._marks[-1]        # map arrays to Python lists        self._stack[mark:] = [self._stack[mark:]]        self._value = 0    dispatch["array"] = end_array    def end_struct(self, data):        mark = self._marks[-1]        del self._marks[-1]        # map structs to Python dictionaries        dict = {}        items = self._stack[mark:]        for i in range(0, len(items), 2):            dict[_stringify(items[i])] = items[i+1]        self._stack[mark:] = [dict]        self._value = 0    dispatch["struct"] = end_struct    def end_base64(self, data):        value = Binary()        value.decode(data)        self.append(value)        self._value = 0    dispatch["base64"] = end_base64    def end_dateTime(self, data):        value = DateTime()        value.decode(data)

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?