⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 drv_xmlproc.py

📁 Python Development Environment (Python IDE plugin for Eclipse). Features editor, code completion, re
💻 PY
字号:
"""
A SAX 2.0 driver for xmlproc.

$Id$
"""

import types, string

from xml.parsers.xmlproc import xmlproc, xmlval, xmlapp
from xml.sax import saxlib
from xml.sax.xmlreader import AttributesImpl, AttributesNSImpl
from xml.sax.saxutils import ContentGenerator, prepare_input_source

# Todo
# - EntityResolver
# - as much as possible of LexicalHandler
# - entity expansion features
# - core properties
# - extra properties/features
#   - element stack
#   - entity stack
#   - current error code
#   - byte offset
#   - DTD object
#   - catalog path
#   - use catalogs
# - regression test
# - methods from Python SAX extensions?
# - remove FIXMEs

class XmlprocDriver(saxlib.XMLReader):

    # ===== SAX 2.0 INTERFACES
    
    # --- XMLReader methods
    
    def __init__(self):
        saxlib.XMLReader.__init__(self)
        self.__parsing = 0
        self.__validate = 0
        self.__namespaces = 1

        self.__locator = 0

        self._lex_handler = saxlib.LexicalHandler()
        self._decl_handler = saxlib.DeclHandler()
        
    def parse(self, source):
        try:
            self.__parsing = 1

            # interpret source
            
            source = prepare_input_source(source)

            # create parser
                
            if self.__validate:
                parser = xmlval.XMLValidator()
            else:
                parser = xmlproc.XMLProcessor()

            # set handlers
                
            if self._cont_handler != None or self._lex_handler != None:
                if self._cont_handler == None:
                    self._cont_handler = saxlib.ContentHandler()
                if self._lex_handler == None:
                    self._lex_handler = saxlib.LexicalHandler()

                if self.__namespaces:
                    filter = NamespaceFilter(parser, self._cont_handler,
                                             self._lex_handler, self)
                    parser.set_application(filter)
                else:
                    parser.set_application(self)

            if self._err_handler != None:
                parser.set_error_handler(self)

            if self._decl_handler != None or self._dtd_handler != None:
                parser.set_dtd_listener(self)
                
            # FIXME: set other handlers

            bufsize=16384
            self._parser = parser # make it available for callbacks
            #parser.parse_resource(source.getSystemId()) # FIXME: rest!
            parser.set_sysid(source.getSystemId())
            parser.read_from(source.getByteStream(), bufsize)
            source.getByteStream().close()
            parser.flush()
            parser.parseEnd()
            
        finally:
            self._parser = None
            self.__parsing = 0
        
    def setLocale(self, locale):
        pass
    
    def getFeature(self, name):
        if name == saxlib.feature_string_interning or \
           name == saxlib.feature_external_ges or \
           name == saxlib.feature_external_pes:
            return 1
        elif name == saxlib.feature_validation:
            return self.__validate
        elif name == saxlib.feature_namespaces:
            return self.__namespaces
        else:
            raise saxlib.SAXNotRecognizedException("Feature '%s' not recognized" %
                                            name)

    def setFeature(self, name, state):
        if self.__parsing:
            raise saxlib.SAXNotSupportedException("Cannot set feature '%s' during parsing" % name)
        
        if name == saxlib.feature_string_interning:
            pass
        elif name == saxlib.feature_validation:
            self.__validate = state
        elif name == saxlib.feature_namespaces:
            self.__namespaces = state
        elif name == saxlib.feature_external_ges or \
             name == saxlib.feature_external_pes:
            if not state:
                raise saxlib.SAXNotSupportedException("This feature cannot be turned off with xmlproc.")
        else:
            raise saxlib.SAXNotRecognizedException("Feature '%s' not recognized" %
                                            name)

    def getProperty(self, name):
        if name == saxlib.property_lexical_handler:
            return self._lex_handler
        elif name == saxlib.property_declaration_handler:
            return self._decl_handler
        
        raise saxlib.SAXNotRecognizedException("Property '%s' not recognized" % name)

    def setProperty(self, name, value):
        if name == saxlib.property_lexical_handler:
            self._lex_handler = value
        elif name == saxlib.property_declaration_handler:
            self._decl_handler = value
        else:
            raise saxlib.SAXNotRecognizedException("Property '%s' not recognized" % name)

    # --- Locator methods

    def getColumnNumber(self):
        return self._parser.get_column()

    def getLineNumber(self):
        return self._parser.get_line()

    def getPublicId(self):
        return None  # FIXME: Try to find this. Perhaps from InputSource?

    def getSystemId(self):
        return self._parser.get_current_sysid() # FIXME?

    # ===== XMLPROC INTERFACES
    
    # --- Application methods

    def set_locator(self, locator):
        self._locator = locator
    
    def doc_start(self):
        self._cont_handler.startDocument()

    def doc_end(self):
        self._cont_handler.endDocument()
	
    def handle_comment(self, data):
	self._lex_handler.comment(data)

    def handle_start_tag(self, name, attrs):
        self._cont_handler.startElement(name, AttributesImpl(attrs))

    def handle_end_tag(self,name):
        self._cont_handler.endElement(name)
    
    def handle_data(self, data, start, end):
        self._cont_handler.characters(data[start:end])
        
    def handle_ignorable_data(self, data, start, end):
        self._cont_handler.ignorableWhitespace(data[start:end])
    
    def handle_pi(self, target, data):
        self._cont_handler.processingInstruction(target, data)

    def handle_doctype(self, root, pubId, sysId):
        self._lex_handler.startDTD(root, pubId, sysId)
    
    def set_entity_info(self, xmlver, enc, sddecl):
        pass

    # --- ErrorHandler methods

    # set_locator implemented as Application method above
    
    def get_locator(self):
	return self._locator
	
    def warning(self, msg):
        self._err_handler.warning(saxlib.SAXParseException(msg, None, self))

    def error(self, msg):
        self._err_handler.error(saxlib.SAXParseException(msg, None, self))
    
    def fatal(self, msg):
        self._err_handler.fatalError(saxlib.SAXParseException(msg, None, self))

    # --- DTDConsumer methods

    def dtd_start(self):
        pass # this is done by handle_doctype
    
    def dtd_end(self):

        self._lex_handler.endDTD()
    
    def handle_comment(self, contents):
        self._lex_handler.comment(contents)

    def handle_pi(self, target, rem):
        self._cont_handler.processingInstruction(target, rem)
    
    def new_general_entity(self, name, val):
        self._decl_handler.internalEntityDecl(name, val)

    def new_external_entity(self, ent_name, pub_id, sys_id, ndata):
        if not ndata:
            self._decl_handler.externalEntityDecl(ent_name, pub_id, sys_id)
        else:
            self._dtd_handler.unparsedEntityDecl(ent_name, pub_id, sys_id,
                                                 ndata)

    def new_parameter_entity(self, name, val):
        self._decl_handler.internalEntityDecl("%" + name, val)
    
    def new_external_pe(self, name, pubid, sysid):
        self._decl_handler.externalEntityDecl("%" + name, pubid, sysid)
	
    def new_notation(self, name, pubid, sysid):
        self._dtd_handler.notationDecl(name, pubid, sysid)

    def new_element_type(self, elem_name, elem_cont):
        if elem_cont == None:
            elem_cont = "ANY"
        elif elem_cont == ("", [], ""):
            elem_cont = "EMPTY"
        self._decl_handler.elementDecl(elem_name, elem_cont)
	    
    def new_attribute(self, elem, attr, type, a_decl, a_def):
        self._decl_handler.attributeDecl(elem, attr, type, a_decl, a_def)

# --- NamespaceFilter
        
class NamespaceFilter:
    """An xmlproc application that processes qualified names and reports them
    as (URI, local-part). It reports errors through the error reporting
    mechanisms of the parser."""   

    def __init__(self, parser, content, lexical, driver):
        self._cont_handler = content
        self._lex_handler = lexical
        self.driver = driver
        self.ns_map = {}       # Current prefix -> URI map
        self.ns_map["xml"] = "http://www.w3.org/XML/1998/namespace"
        self.ns_stack = []     # Pushed for each element, used to maint ns_map
        self.rep_ns_attrs = 0  # Report xmlns-attributes?
        self.parser = parser

    def set_locator(self, locator):
        self.driver.set_locator(locator)
    
    def doc_start(self):
        self._cont_handler.startDocument()

    def doc_end(self):
        self._cont_handler.endDocument()
	
    def handle_comment(self, data):
	self._lex_handler.comment(data)

    def handle_start_tag(self,name,attrs):
        old_ns={} # Reset ns_map to these values when we leave this element
        del_ns=[] # Delete these prefixes from ns_map when we leave element

        # attrs=attrs.copy()   Will have to do this if more filters are made

        # Find declarations, update self.ns_map and self.ns_stack
        for (a,v) in attrs.items():
            if a[:6]=="xmlns:":
                prefix=a[6:]
                if string.find(prefix,":")!=-1:
                    self.parser.report_error(1900)

                #if v=="":
                #    self.parser.report_error(1901)
            elif a=="xmlns":
                prefix=""
            else:
                continue

            if self.ns_map.has_key(prefix):
                old_ns[prefix]=self.ns_map[prefix]
            if v:
                self.ns_map[prefix]=v
            else:
                del self.ns_map[prefix]

            if not self.rep_ns_attrs:
                del attrs[a]

        self.ns_stack.append((old_ns,del_ns))
        
        # Process elem and attr names
        cooked_name = self.__process_name(name)
        ns = cooked_name[0]

        rawnames = {}
        for (a,v) in attrs.items():
            del attrs[a]
            aname = self.__process_name(a, is_attr=1)
            if attrs.has_key(aname):
                self.parser.report_error(1903)         
            attrs[aname] = v
            rawnames[aname] = a
        
        # Report event
        self._cont_handler.startElementNS(cooked_name, name,
                                          AttributesNSImpl(attrs, rawnames))

    def handle_end_tag(self, rawname):
        name = self.__process_name(rawname)

        # Clean up self.ns_map and self.ns_stack
        (old_ns,del_ns)=self.ns_stack[-1]
        del self.ns_stack[-1]

        self.ns_map.update(old_ns)
        for prefix in del_ns:
            del self.ns_map[prefix]        
            
        self._cont_handler.endElementNS(name, rawname)
    
    def handle_data(self, data, start, end):
        self._cont_handler.characters(data[start:end])
        
    def handle_ignorable_data(self, data, start, end):
        self._cont_handler.ignorableWhitespace(data[start:end])
    
    def handle_pi(self, target, data):
        self._cont_handler.processingInstruction(target, data)

    def handle_doctype(self, root, pubId, sysId):
        self._lex_handler.startDTD(root, pubId, sysId)
    
    def set_entity_info(self, xmlver, enc, sddecl):
        pass

    # --- Internal methods
        
    def __process_name(self, name, default_to=None, is_attr=0):
        n=string.split(name,":")
        if len(n)>2:
            self.parser.report_error(1900)
            return (None, name)
        elif len(n)==2:
            if n[0]=="xmlns":
                return (None, name)
                
            try:
                return (self.ns_map[n[0]], n[1])
            except KeyError:
                self.parser.report_error(1902)
                return (None, name)
        elif is_attr:
            return (None, name)
        elif default_to != None:
            return (default_to, name)
        elif self.ns_map.has_key("") and name != "xmlns":
            return (self.ns_map[""],name)
        else:
            return (None, name)

def create_parser():
    return XmlprocDriver()

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -