📄 xmlmarshaller.py
字号:
elif (not isinstance(xmlnametuple, (tuple, list))):
flattenDict[str(xmlnametuple)] = sequencename
else:
for xmlname in xmlnametuple:
flattenDict[xmlname] = sequencename
else:
raise Exception("Invalid type for __xmlflattensequence___ : it must be a dict")
# reattach an object"s attributes to it
for childname, child in element.children:
if (childname in flattenDict):
sequencename = _toAttrName(obj, flattenDict[childname])
if (not hasattr(obj, sequencename)):
obj.__dict__[sequencename] = []
sequencevalue = getattr(obj, sequencename)
if (sequencevalue == None):
obj.__dict__[sequencename] = []
sequencevalue = getattr(obj, sequencename)
sequencevalue.append(child)
elif (objtype == "list"):
obj.append(child)
elif isinstance(obj, dict):
if (childname == DICT_ITEM_NAME):
obj[child[DICT_ITEM_KEY_NAME]] = child[DICT_ITEM_VALUE_NAME]
else:
obj[childname] = child
else:
# don't replace a good attribute value with a bad one
childAttrName = _toAttrName(obj, childname)
if (not hasattr(obj, childAttrName)) or (getattr(obj, childAttrName) == None) or (getattr(obj, childAttrName) == []) or (not isinstance(child, GenericXMLObject)):
try:
setattrignorecase(obj, childAttrName, child)
except AttributeError:
raise MarshallerException("Error unmarshalling child element \"%s\" of XML element \"%s\": object type not specified or known" % (childname, name))
if (complexType != None):
for element in complexType.elements:
if element.default:
elementName = _toAttrName(obj, element.name)
if ((elementName not in obj.__dict__) or (obj.__dict__[elementName] == None)):
langType = xsdToLangType(element.type)
defaultValue = _objectfactory(langType, element.default)
obj.__dict__[elementName] = defaultValue
ifDefPy()
if (isinstance(obj, list)):
if ((element.attrs.has_key("mutable")) and (element.attrs.getValue("mutable") == "false")):
obj = tuple(obj)
endIfDef()
if (len(self.elementstack) > 0):
## print "[endElement] appending child with name: ", name, "; objtype: ", objtype
parentElement.children.append((name, obj))
else:
self.rootelement = obj
def getRootObject(self):
return self.rootelement
def _toAttrName(obj, name):
if (hasattr(obj, "__xmlrename__")):
for key, val in obj.__xmlrename__.iteritems():
if (name == val):
name = key
break
## if (name.startswith("__") and not name.endswith("__")):
## name = "_%s%s" % (obj.__class__.__name__, name)
return str(name)
def printKnownTypes(kt, where):
print 'KnownTypes from %s' % (where)
for tag, cls in kt.iteritems():
print '%s => %s' % (tag, str(cls))
__typeMappingXsdToLang = {
"string": "str",
"char": "str",
"varchar": "str",
"date": "str", # ToDO Need to work out how to create lang date types
"boolean": "bool",
"decimal": "float", # ToDO Does python have a better fixed point type?
"int": "int",
"integer":"int",
"long": "long",
"float": "float",
"bool": "bool",
"str": "str",
"unicode": "unicode",
"short": "int",
"duration": "str", # see above (date)
"datetime": "str", # see above (date)
"time": "str", # see above (date)
"double": "float",
"QName" : "str",
"blob" : "str", # ag:blob
"currency" : "str", # ag:currency
}
def xsdToLangType(xsdType):
if xsdType.startswith(XMLSCHEMA_XSD_URL):
xsdType = xsdType[len(XMLSCHEMA_XSD_URL)+1:]
elif xsdType.startswith(AG_URL):
xsdType = xsdType[len(AG_URL)+1:]
langType = __typeMappingXsdToLang.get(xsdType)
if (langType == None):
raise Exception("Unknown xsd type %s" % xsdType)
return langType
def langToXsdType(langType):
if langType in asDict(__typeMappingXsdToLang):
return '%s:%s' % (XMLSCHEMA_XSD_URL, langType)
return langType
def _getXmlValue(langValue):
if (isinstance(langValue, bool)):
return str(langValue).lower()
elif (isinstance(langValue, unicode)):
return langValue.encode()
else:
return str(langValue)
def unmarshal(xmlstr, knownTypes=None, knownNamespaces=None, xmlSource=None, createGenerics=False):
objectfactory = XMLObjectFactory(knownTypes, knownNamespaces, xmlSource, createGenerics)
# on Linux, pyXML's sax.parseString fails when passed unicode
if (not sysutils.isWindows()):
xmlstr = str(xmlstr)
try:
xml.sax.parseString(xmlstr, objectfactory)
except xml.sax.SAXParseException, errorData:
if xmlSource == None:
xmlSource = 'unknown'
errorString = 'SAXParseException ("%s") detected at line %d, column %d in XML document from source "%s" ' % (errorData.getMessage(), errorData.getLineNumber(), errorData.getColumnNumber(), xmlSource)
raise UnmarshallerException(errorString)
return objectfactory.getRootObject()
def marshal(obj, elementName=None, prettyPrint=False, marshalType=True, indent=0, knownTypes=None, knownNamespaces=None, encoding=-1):
worker = XMLMarshalWorker(prettyPrint=prettyPrint, marshalType=marshalType, knownTypes=knownTypes, knownNamespaces=knownNamespaces)
if obj != None and hasattr(obj, '__xmldeepexclude__'):
worker.xmldeepexclude = obj.__xmldeepexclude__
xmlstr = "".join(worker._marshal(obj, elementName, indent=indent))
aglogging.info(xmlMarshallerLogger, "marshal produced string of type %s", type(xmlstr))
if (encoding == None):
return xmlstr
if (not isinstance(encoding, basestring)):
encoding = sys.getdefaultencoding()
if (not isinstance(xmlstr, unicode)):
xmlstr = xmlstr.decode()
xmlstr = u'<?xml version="1.0" encoding="%s"?>\n%s' % (encoding, xmlstr)
return xmlstr.encode(encoding)
class XMLMarshalWorker(object):
def __init__(self, marshalType=True, prettyPrint=False, knownTypes=None, knownNamespaces=None):
if knownTypes == None:
self.knownTypes = {}
else:
self.knownTypes = knownTypes
if knownNamespaces == None:
self.knownNamespaces = {}
else:
self.knownNamespaces = knownNamespaces
self.prettyPrint = prettyPrint
self.marshalType = marshalType
self.xmldeepexclude = []
self.nsstack = []
def getNSPrefix(self):
if len(self.nsstack) > 0:
return self.nsstack[-1].prefix
return ''
def isKnownType(self, elementName):
tagLongNs = None
nse = self.nsstack[-1]
i = elementName.rfind(':')
if i > 0:
prefix = elementName[:i]
name = elementName[i+1:]
else:
prefix = DEFAULT_NAMESPACE_KEY
name = elementName
for shortNs, longNs in nse.nameSpaces.iteritems():
if shortNs == prefix:
tagLongNs = longNs
break
if tagLongNs == None:
knownTagName = elementName
else:
knownShortNs = self.knownNamespaces[tagLongNs]
knownTagName = knownShortNs + ':' + name
if (knownTagName in asDict(self.knownTypes)):
knownClass = self.knownTypes[knownTagName]
return True
return False
def popNSStack(self):
self.nsstack.pop()
def appendNSStack(self, obj):
nameSpaces = {}
defaultLongNS = None
for nse in self.nsstack:
for k, v in nse.nsMap.iteritems():
nameSpaces[k] = v
if k == DEFAULT_NAMESPACE_KEY:
defaultLongNS = v
newNS = NsElement()
nameSpaceAttrs = ""
if hasattr(obj, "__xmlnamespaces__"):
ns = getattr(obj, "__xmlnamespaces__")
keys = ns.keys()
keys.sort()
for nameSpaceKey in keys:
nameSpaceUrl = ns[nameSpaceKey]
if nameSpaceUrl in nameSpaces.values():
for k, v in nameSpaces.iteritems():
if v == nameSpaceUrl:
nameSpaceKey = k
break
else:
if nameSpaceKey == "":
defaultLongNS = nameSpaceUrl
nameSpaces[DEFAULT_NAMESPACE_KEY] = nameSpaceUrl
newNS.nsMap[DEFAULT_NAMESPACE_KEY] = nameSpaceUrl
nameSpaceAttrs += ' xmlns="%s" ' % (nameSpaceUrl)
else:
nameSpaces[nameSpaceKey] = nameSpaceUrl
newNS.nsMap[nameSpaceKey] = nameSpaceUrl
nameSpaceAttrs += ' xmlns:%s="%s" ' % (nameSpaceKey, nameSpaceUrl)
nameSpaceAttrs = nameSpaceAttrs.rstrip()
if len(self.nsstack) > 0:
newNS.prefix = self.nsstack[-1].prefix
else:
newNS.prefix = ''
if obj != None and hasattr(obj, "__xmldefaultnamespace__"):
longPrefixNS = getattr(obj, "__xmldefaultnamespace__")
if longPrefixNS == defaultLongNS:
newNS.prefix = ''
else:
try:
for k, v in nameSpaces.iteritems():
if v == longPrefixNS:
newNS.prefix = k + ':'
break;
except:
if (longPrefixNS in asDict(self.knownNamespaces)):
newNS.prefix = self.knownNamespaces[longPrefixNS] + ':'
else:
raise MarshallerException('Error marshalling __xmldefaultnamespace__ ("%s") not defined in namespace stack' % (longPrefixNS))
if obj != None and hasattr(obj, "targetNamespace"):
newNS.targetNS = obj.targetNamespace
elif len(self.nsstack) > 0:
newNS.targetNS = self.nsstack[-1].targetNS
newNS.nameSpaces = nameSpaces
self.nsstack.append(newNS)
return nameSpaceAttrs
def contractQName(self, value, obj, attr):
value = langToXsdType(value)
i = value.rfind(':')
if i >= 0:
longNS = value[:i]
else:
# the value doesn't have a namespace and we couldn't map it to an XSD type...what to do?
# (a) just write it, as is, and hope it's in the default namespace (for now)
# (b) throw an exception so we can track down the bad code (later)
return value
if (longNS in self.nsstack[-1].nameSpaces.values()):
for kShort, vLong in self.nsstack[-1].nameSpaces.iteritems():
if vLong == longNS:
shortNS = kShort
break
else:
shortNS = longNS # if we can't find the long->short mappping, just use longNS
if shortNS == DEFAULT_NAMESPACE_KEY:
value = value[i+1:]
else:
value = shortNS + ':' + value[i+1:]
return value
def _genObjTypeStr(self, typeString):
if self.marshalType:
return ' objtype="%s"' % typeString
return ""
def _marshal(self, obj, elementName=None, nameSpacePrefix="", indent=0):
if (obj != None):
aglogging.debug(xmlMarshallerLogger, "--> _marshal: elementName=%s%s, type=%s, obj=%s, indent=%d", nameSpacePrefix, elementName, type(obj), str(obj), indent)
else:
aglogging.debug(xmlMarshallerLogger, "--> _marshal: elementName=%s%s, obj is None, indent=%d", nameSpacePrefix, elementName, indent)
if ((obj != None) and (hasattr(obj, 'preMarshal'))):
obj.preMarshal()
excludeAttrs = []
excludeAttrs.extend(self.xmldeepexclude)
if hasattr(obj, "__xmlexclude__"):
excludeAttrs.extend(obj.__xmlexclude__)
prettyPrint = self.prettyPrint
knownTypes = self.knownTypes
xmlString = None
if self.prettyPrint or indent:
prefix = " "*indent
newline = "\n"
increment = 2
else:
prefix = ""
newline = ""
increment = 0
## Determine the XML element name. If it isn"t specified in the
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -