📄 drv_xmlproc.py
字号:
"""
A SAX 2.0 driver for xmlproc.
$Id$
"""
import types, string
from xml.parsers.xmlproc import xmlproc, xmlval, xmlapp
from xml.sax import saxlib
from xml.sax.xmlreader import AttributesImpl, AttributesNSImpl
from xml.sax.saxutils import ContentGenerator, prepare_input_source
# Todo
# - EntityResolver
# - as much as possible of LexicalHandler
# - entity expansion features
# - core properties
# - extra properties/features
# - element stack
# - entity stack
# - current error code
# - byte offset
# - DTD object
# - catalog path
# - use catalogs
# - regression test
# - methods from Python SAX extensions?
# - remove FIXMEs
class XmlprocDriver(saxlib.XMLReader):
# ===== SAX 2.0 INTERFACES
# --- XMLReader methods
def __init__(self):
saxlib.XMLReader.__init__(self)
self.__parsing = 0
self.__validate = 0
self.__namespaces = 1
self.__locator = 0
self._lex_handler = saxlib.LexicalHandler()
self._decl_handler = saxlib.DeclHandler()
def parse(self, source):
try:
self.__parsing = 1
# interpret source
source = prepare_input_source(source)
# create parser
if self.__validate:
parser = xmlval.XMLValidator()
else:
parser = xmlproc.XMLProcessor()
# set handlers
if self._cont_handler != None or self._lex_handler != None:
if self._cont_handler == None:
self._cont_handler = saxlib.ContentHandler()
if self._lex_handler == None:
self._lex_handler = saxlib.LexicalHandler()
if self.__namespaces:
filter = NamespaceFilter(parser, self._cont_handler,
self._lex_handler, self)
parser.set_application(filter)
else:
parser.set_application(self)
if self._err_handler != None:
parser.set_error_handler(self)
if self._decl_handler != None or self._dtd_handler != None:
parser.set_dtd_listener(self)
# FIXME: set other handlers
bufsize=16384
self._parser = parser # make it available for callbacks
#parser.parse_resource(source.getSystemId()) # FIXME: rest!
parser.set_sysid(source.getSystemId())
parser.read_from(source.getByteStream(), bufsize)
source.getByteStream().close()
parser.flush()
parser.parseEnd()
finally:
self._parser = None
self.__parsing = 0
def setLocale(self, locale):
pass
def getFeature(self, name):
if name == saxlib.feature_string_interning or \
name == saxlib.feature_external_ges or \
name == saxlib.feature_external_pes:
return 1
elif name == saxlib.feature_validation:
return self.__validate
elif name == saxlib.feature_namespaces:
return self.__namespaces
else:
raise saxlib.SAXNotRecognizedException("Feature '%s' not recognized" %
name)
def setFeature(self, name, state):
if self.__parsing:
raise saxlib.SAXNotSupportedException("Cannot set feature '%s' during parsing" % name)
if name == saxlib.feature_string_interning:
pass
elif name == saxlib.feature_validation:
self.__validate = state
elif name == saxlib.feature_namespaces:
self.__namespaces = state
elif name == saxlib.feature_external_ges or \
name == saxlib.feature_external_pes:
if not state:
raise saxlib.SAXNotSupportedException("This feature cannot be turned off with xmlproc.")
else:
raise saxlib.SAXNotRecognizedException("Feature '%s' not recognized" %
name)
def getProperty(self, name):
if name == saxlib.property_lexical_handler:
return self._lex_handler
elif name == saxlib.property_declaration_handler:
return self._decl_handler
raise saxlib.SAXNotRecognizedException("Property '%s' not recognized" % name)
def setProperty(self, name, value):
if name == saxlib.property_lexical_handler:
self._lex_handler = value
elif name == saxlib.property_declaration_handler:
self._decl_handler = value
else:
raise saxlib.SAXNotRecognizedException("Property '%s' not recognized" % name)
# --- Locator methods
def getColumnNumber(self):
return self._parser.get_column()
def getLineNumber(self):
return self._parser.get_line()
def getPublicId(self):
return None # FIXME: Try to find this. Perhaps from InputSource?
def getSystemId(self):
return self._parser.get_current_sysid() # FIXME?
# ===== XMLPROC INTERFACES
# --- Application methods
def set_locator(self, locator):
self._locator = locator
def doc_start(self):
self._cont_handler.startDocument()
def doc_end(self):
self._cont_handler.endDocument()
def handle_comment(self, data):
self._lex_handler.comment(data)
def handle_start_tag(self, name, attrs):
self._cont_handler.startElement(name, AttributesImpl(attrs))
def handle_end_tag(self,name):
self._cont_handler.endElement(name)
def handle_data(self, data, start, end):
self._cont_handler.characters(data[start:end])
def handle_ignorable_data(self, data, start, end):
self._cont_handler.ignorableWhitespace(data[start:end])
def handle_pi(self, target, data):
self._cont_handler.processingInstruction(target, data)
def handle_doctype(self, root, pubId, sysId):
self._lex_handler.startDTD(root, pubId, sysId)
def set_entity_info(self, xmlver, enc, sddecl):
pass
# --- ErrorHandler methods
# set_locator implemented as Application method above
def get_locator(self):
return self._locator
def warning(self, msg):
self._err_handler.warning(saxlib.SAXParseException(msg, None, self))
def error(self, msg):
self._err_handler.error(saxlib.SAXParseException(msg, None, self))
def fatal(self, msg):
self._err_handler.fatalError(saxlib.SAXParseException(msg, None, self))
# --- DTDConsumer methods
def dtd_start(self):
pass # this is done by handle_doctype
def dtd_end(self):
self._lex_handler.endDTD()
def handle_comment(self, contents):
self._lex_handler.comment(contents)
def handle_pi(self, target, rem):
self._cont_handler.processingInstruction(target, rem)
def new_general_entity(self, name, val):
self._decl_handler.internalEntityDecl(name, val)
def new_external_entity(self, ent_name, pub_id, sys_id, ndata):
if not ndata:
self._decl_handler.externalEntityDecl(ent_name, pub_id, sys_id)
else:
self._dtd_handler.unparsedEntityDecl(ent_name, pub_id, sys_id,
ndata)
def new_parameter_entity(self, name, val):
self._decl_handler.internalEntityDecl("%" + name, val)
def new_external_pe(self, name, pubid, sysid):
self._decl_handler.externalEntityDecl("%" + name, pubid, sysid)
def new_notation(self, name, pubid, sysid):
self._dtd_handler.notationDecl(name, pubid, sysid)
def new_element_type(self, elem_name, elem_cont):
if elem_cont == None:
elem_cont = "ANY"
elif elem_cont == ("", [], ""):
elem_cont = "EMPTY"
self._decl_handler.elementDecl(elem_name, elem_cont)
def new_attribute(self, elem, attr, type, a_decl, a_def):
self._decl_handler.attributeDecl(elem, attr, type, a_decl, a_def)
# --- NamespaceFilter
class NamespaceFilter:
"""An xmlproc application that processes qualified names and reports them
as (URI, local-part). It reports errors through the error reporting
mechanisms of the parser."""
def __init__(self, parser, content, lexical, driver):
self._cont_handler = content
self._lex_handler = lexical
self.driver = driver
self.ns_map = {} # Current prefix -> URI map
self.ns_map["xml"] = "http://www.w3.org/XML/1998/namespace"
self.ns_stack = [] # Pushed for each element, used to maint ns_map
self.rep_ns_attrs = 0 # Report xmlns-attributes?
self.parser = parser
def set_locator(self, locator):
self.driver.set_locator(locator)
def doc_start(self):
self._cont_handler.startDocument()
def doc_end(self):
self._cont_handler.endDocument()
def handle_comment(self, data):
self._lex_handler.comment(data)
def handle_start_tag(self,name,attrs):
old_ns={} # Reset ns_map to these values when we leave this element
del_ns=[] # Delete these prefixes from ns_map when we leave element
# attrs=attrs.copy() Will have to do this if more filters are made
# Find declarations, update self.ns_map and self.ns_stack
for (a,v) in attrs.items():
if a[:6]=="xmlns:":
prefix=a[6:]
if string.find(prefix,":")!=-1:
self.parser.report_error(1900)
#if v=="":
# self.parser.report_error(1901)
elif a=="xmlns":
prefix=""
else:
continue
if self.ns_map.has_key(prefix):
old_ns[prefix]=self.ns_map[prefix]
if v:
self.ns_map[prefix]=v
else:
del self.ns_map[prefix]
if not self.rep_ns_attrs:
del attrs[a]
self.ns_stack.append((old_ns,del_ns))
# Process elem and attr names
cooked_name = self.__process_name(name)
ns = cooked_name[0]
rawnames = {}
for (a,v) in attrs.items():
del attrs[a]
aname = self.__process_name(a, is_attr=1)
if attrs.has_key(aname):
self.parser.report_error(1903)
attrs[aname] = v
rawnames[aname] = a
# Report event
self._cont_handler.startElementNS(cooked_name, name,
AttributesNSImpl(attrs, rawnames))
def handle_end_tag(self, rawname):
name = self.__process_name(rawname)
# Clean up self.ns_map and self.ns_stack
(old_ns,del_ns)=self.ns_stack[-1]
del self.ns_stack[-1]
self.ns_map.update(old_ns)
for prefix in del_ns:
del self.ns_map[prefix]
self._cont_handler.endElementNS(name, rawname)
def handle_data(self, data, start, end):
self._cont_handler.characters(data[start:end])
def handle_ignorable_data(self, data, start, end):
self._cont_handler.ignorableWhitespace(data[start:end])
def handle_pi(self, target, data):
self._cont_handler.processingInstruction(target, data)
def handle_doctype(self, root, pubId, sysId):
self._lex_handler.startDTD(root, pubId, sysId)
def set_entity_info(self, xmlver, enc, sddecl):
pass
# --- Internal methods
def __process_name(self, name, default_to=None, is_attr=0):
n=string.split(name,":")
if len(n)>2:
self.parser.report_error(1900)
return (None, name)
elif len(n)==2:
if n[0]=="xmlns":
return (None, name)
try:
return (self.ns_map[n[0]], n[1])
except KeyError:
self.parser.report_error(1902)
return (None, name)
elif is_attr:
return (None, name)
elif default_to != None:
return (default_to, name)
elif self.ns_map.has_key("") and name != "xmlns":
return (self.ns_map[""],name)
else:
return (None, name)
def create_parser():
return XmlprocDriver()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -