📄 xmlutils.py
字号:
"""
Some common declarations for the xmlproc system gathered in one file.
"""
# $Id$
import string,re,urlparse,os,sys
import xmlapp,charconv,errors
# Standard exceptions
class OutOfDataException(Exception):
"""An exception that signals that more data is expected, but the current
buffer has been exhausted."""
pass
# ==============================
# The general entity parser
# ==============================
class EntityParser:
"""A generalized parser for XML entities, whether DTD, documents or even
catalog files."""
def __init__(self):
# --- Creating support objects
self.err=xmlapp.ErrorHandler(self)
self.ent=xmlapp.EntityHandler(self.err)
self.isf=xmlapp.InputSourceFactory()
self.pubres=xmlapp.PubIdResolver()
self.data_charset="iso-8859-1"
self.charset_converter=charconv.id_conv # the identity transform
self.err_lang="en"
self.errors=errors.get_error_list(self.err_lang)
self.reset()
def set_error_language(self,language):
"""Sets the language in which errors are reported. (ISO 3166 codes.)
Throws a KeyError if the language is not supported."""
self.errors=errors.get_error_list(string.lower(language))
self.err_lang=string.lower(language) # only set if supported
def set_error_handler(self,err):
"Sets the object to send error events to."
self.err=err
def set_pubid_resolver(self,pubres):
self.pubres=pubres
def set_entity_handler(self,ent):
"Sets the object that resolves entity references."
self.ent=ent
def set_inputsource_factory(self,isf):
"Sets the object factory used to create input sources from sysids."
self.isf=isf
def set_data_charset(self,charset):
"""Tells the parser which character encoding to use when reporting data
to applications. [Currently not in use!]"""
self.data_charset=charset
def parse_resource(self,sysID,bufsize=16384):
"""Begin parsing an XML entity with the specified system
identifier. Only used for the document entity, not to handle
subentities, which open_entity takes care of."""
self.current_sysID=sysID
try:
infile=self.isf.create_input_source(sysID)
except IOError,e:
self.report_error(3000,sysID)
return
self.read_from(infile,bufsize)
infile.close()
self.flush()
self.parseEnd()
def open_entity(self,sysID,name="None"):
"""Starts parsing a new entity, pushing the old onto the stack. This
method must not be used to start parsing, use parse_resource for
that."""
sysID=join_sysids(self.get_current_sysid(),sysID)
try:
inf=self.isf.create_input_source(sysID)
except IOError,e:
self.report_error(3000,sysID)
return
self._push_ent_stack(name)
self.current_sysID=sysID
self.pos=0
self.line=1
self.last_break=0
self.data=""
self.read_from(inf)
self.flush()
self.pop_entity()
def push_entity(self,sysID,contents,name="None"):
"""Parse some text and consider it a new entity, making it possible
to return to the original entity later."""
self._push_ent_stack(name)
self.data=contents
self.current_sysID=sysID
self.pos=0
self.line=1
self.last_break=0
self.datasize=len(contents)
self.last_upd_pos=0
self.final=1
def pop_entity(self):
"Skips out of the current entity and back to the previous one."
if self.ent_stack==[]: self.report_error(4000)
self._pop_ent_stack()
self.final=0
def read_from(self,fileobj,bufsize=16384):
"""Reads data from a file-like object until EOF. Does not close it.
**WARNING**: This method does not call the parseStart/parseEnd methods,
since it does not know if it may be called several times. Use
parse_resource if you just want to read a file."""
while 1:
buf=fileobj.read(bufsize)
if buf=="": break
try:
self.feed(buf)
except OutOfDataException,e:
break
def reset(self):
"""Resets the parser, losing all unprocessed data."""
self.ent_stack=[]
self.open_ents=[] # Used to test for entity recursion
self.current_sysID="Unknown"
self.first_feed=1
# Block information
self.data=""
self.final=0
self.datasize=0
self.start_point=-1
# Location tracking
self.line=1
self.last_break=0
self.block_offset=0 # Offset from start of stream to start of cur block
self.pos=0
self.last_upd_pos=0
def feed(self,new_data):
"""Accepts more data from the data source. This method must
set self.datasize and correctly update self.pos and self.data.
It also does character encoding translation."""
if self.first_feed:
self.first_feed=0
self.parseStart()
self.update_pos() # Update line/col count
new_data=self.charset_converter(new_data) # Character enc conversion
if self.start_point==-1:
self.block_offset=self.block_offset+self.datasize
self.data=self.data[self.pos:]
self.last_break=self.last_break-self.pos # Keep track of column
self.pos=0
self.last_upd_pos=0
# Adding new data and doing line end normalization
self.data=string.replace(self.data+new_data,
"\015\012","\012")
self.datasize=len(self.data)
self.do_parse()
def close(self):
"Closes the parser, processing all remaining data. Calls parseEnd."
self.flush()
self.parseEnd()
def parseStart(self):
"Called before the parse starts to notify subclasses."
pass
def parseEnd(self):
"Called when there are no more data to notify subclasses."
pass
def flush(self):
"Parses any remnants of data in the last block."
if not self.pos+1==self.datasize:
self.final=1
pos=self.pos
try:
self.do_parse()
except OutOfDataException,e:
if pos!=self.pos:
self.report_error(3001)
# --- GENERAL UTILITY
# --- LOW-LEVEL SCANNING METHODS
def set_start_point(self):
"""Stores the current position and tells the parser not to forget any
of the data beyond this point until get_region is called."""
self.start_point=self.pos
def store_state(self):
"""Makes the parser remember where we are now, so we can go back
later with restore_state."""
self.set_start_point()
self.old_state=(self.last_upd_pos,self.line,self.last_break)
def restore_state(self):
"""Goes back to a state previously remembered with store_state."""
self.pos=self.start_point
self.start_point=-1
(self.last_upd_pos,self.line,self.last_break)=self.old_state
def get_region(self):
"""Returns the area from start_point to current position and remove
start_point."""
data=self.data[self.start_point:self.pos]
self.start_point=-1
return data
def find_reg(self,regexp,required=1):
"""Moves self.pos to the first character that matches the regexp and
returns everything from pos and up to (but not including) that
character."""
oldpos=self.pos
mo=regexp.search(self.data,self.pos)
if mo==None:
if self.final and not required:
self.pos=len(self.data) # Just moved to the end
return self.data[oldpos:]
raise OutOfDataException()
self.pos=mo.start(0)
return self.data[oldpos:self.pos]
def scan_to(self,target):
"Moves self.pos to beyond target and returns skipped text."
new_pos=string.find(self.data,target,self.pos)
if new_pos==-1:
raise OutOfDataException()
res=self.data[self.pos:new_pos]
self.pos=new_pos+len(target)
return res
def get_index(self,target):
"Finds the position where target starts and returns it."
new_pos=string.find(self.data,target,self.pos)
if new_pos==-1:
raise OutOfDataException()
return new_pos
def test_str(self,test_str):
"See if text at current position matches test_str, without moving."
if self.datasize-self.pos<len(test_str) and not self.final:
raise OutOfDataException()
return self.data[self.pos:self.pos+len(test_str)]==test_str
def now_at(self,test_str):
"Checks if we are at this string now, and if so skips over it."
pos=self.pos
if self.datasize-pos<len(test_str) and not self.final:
raise OutOfDataException()
if self.data[pos:pos+len(test_str)]==test_str:
self.pos=self.pos+len(test_str)
return 1
else:
return 0
def skip_ws(self,necessary=0):
"Skips over any whitespace at this point."
start=self.pos
try:
while self.data[self.pos] in whitespace:
self.pos=self.pos+1
except IndexError:
if necessary and start==self.pos:
if self.final:
self.report_error(3002)
else:
raise OutOfDataException()
def test_reg(self,regexp):
"Checks if we match the regexp."
if self.pos>self.datasize-5 and not self.final:
raise OutOfDataException()
return regexp.match(self.data,self.pos)!=None
def get_match(self,regexp):
"Returns the result of matching the regexp and advances self.pos."
if self.pos>self.datasize-5 and not self.final:
raise OutOfDataException()
ent=regexp.match(self.data,self.pos)
if ent==None:
self.report_error(reg2code[regexp.pattern])
return ""
end=ent.end(0) # Speeds us up slightly
if end==self.datasize:
raise OutOfDataException()
self.pos=end
return ent.group(0)
def update_pos(self):
"Updates (line,col)-pos by checking processed blocks."
breaks=string.count(self.data,"\n",self.last_upd_pos,self.pos)
self.last_upd_pos=self.pos
if breaks>0:
self.line=self.line+breaks
self.last_break=string.rfind(self.data,"\n",0,self.pos)
def get_wrapped_match(self,wraps):
"Returns a contained match. Useful for regexps inside quotes."
found=0
for (wrap,regexp) in wraps:
if self.test_str(wrap):
found=1
self.pos=self.pos+len(wrap)
break
if not found:
msg=""
for (wrap,regexp) in wraps[:-1]:
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -