📄 xmlmarshaller.py
字号:
elif (not isinstance(xmlnametuple, (tuple, list))): flattenDict[str(xmlnametuple)] = sequencename else: for xmlname in xmlnametuple: flattenDict[xmlname] = sequencename else: raise Exception("Invalid type for __xmlflattensequence___ : it must be a dict") # reattach an object"s attributes to it for childname, child in element.children: if (childname in flattenDict): sequencename = _toAttrName(obj, flattenDict[childname]) if (not hasattr(obj, sequencename)): obj.__dict__[sequencename] = [] sequencevalue = getattr(obj, sequencename) if (sequencevalue == None): obj.__dict__[sequencename] = [] sequencevalue = getattr(obj, sequencename) sequencevalue.append(child) elif (objtype == "list"): obj.append(child) elif isinstance(obj, dict): if (childname == DICT_ITEM_NAME): obj[child[DICT_ITEM_KEY_NAME]] = child[DICT_ITEM_VALUE_NAME] else: obj[childname] = child else: # don't replace a good attribute value with a bad one childAttrName = _toAttrName(obj, childname) if (not hasattr(obj, childAttrName)) or (getattr(obj, childAttrName) == None) or (getattr(obj, childAttrName) == []) or (not isinstance(child, GenericXMLObject)): try: setattrignorecase(obj, childAttrName, child) except AttributeError: raise MarshallerException("Error unmarshalling child element \"%s\" of XML element \"%s\": object type not specified or known" % (childname, name)) if (complexType != None): for element in complexType.elements: if element.default: elementName = _toAttrName(obj, element.name) if ((elementName not in obj.__dict__) or (obj.__dict__[elementName] == None)): langType = xsdToLangType(element.type) defaultValue = _objectfactory(langType, element.default) obj.__dict__[elementName] = defaultValue ifDefPy() if (isinstance(obj, list)): if ((element.attrs.has_key("mutable")) and (element.attrs.getValue("mutable") == "false")): obj = tuple(obj) endIfDef() if (len(self.elementstack) > 0):## print "[endElement] appending child with name: ", name, "; objtype: ", objtype parentElement.children.append((name, obj)) else: self.rootelement = obj def getRootObject(self): return self.rootelementdef _toAttrName(obj, name): if (hasattr(obj, "__xmlrename__")): for key, val in obj.__xmlrename__.iteritems(): if (name == val): name = key break## if (name.startswith("__") and not name.endswith("__")):## name = "_%s%s" % (obj.__class__.__name__, name) return str(name) def printKnownTypes(kt, where): print 'KnownTypes from %s' % (where) for tag, cls in kt.iteritems(): print '%s => %s' % (tag, str(cls))__typeMappingXsdToLang = { "string": "str", "char": "str", "varchar": "str", "date": "str", # ToDO Need to work out how to create lang date types "boolean": "bool", "decimal": "float", # ToDO Does python have a better fixed point type? "int": "int", "integer":"int", "long": "long", "float": "float", "bool": "bool", "str": "str", "unicode": "unicode", "short": "int", "duration": "str", # see above (date) "datetime": "str", # see above (date) "time": "str", # see above (date) "double": "float", "QName" : "str", "blob" : "str", # ag:blob "currency" : "str", # ag:currency } def xsdToLangType(xsdType): if xsdType.startswith(XMLSCHEMA_XSD_URL): xsdType = xsdType[len(XMLSCHEMA_XSD_URL)+1:] elif xsdType.startswith(AG_URL): xsdType = xsdType[len(AG_URL)+1:] langType = __typeMappingXsdToLang.get(xsdType) if (langType == None): raise Exception("Unknown xsd type %s" % xsdType) return langType def langToXsdType(langType): if langType in asDict(__typeMappingXsdToLang): return '%s:%s' % (XMLSCHEMA_XSD_URL, langType) return langTypedef _getXmlValue(langValue): if (isinstance(langValue, bool)): return str(langValue).lower() elif (isinstance(langValue, unicode)): return langValue.encode() else: return str(langValue)def unmarshal(xmlstr, knownTypes=None, knownNamespaces=None, xmlSource=None, createGenerics=False): objectfactory = XMLObjectFactory(knownTypes, knownNamespaces, xmlSource, createGenerics) # on Linux, pyXML's sax.parseString fails when passed unicode if (not sysutils.isWindows()): xmlstr = str(xmlstr) try: xml.sax.parseString(xmlstr, objectfactory) except xml.sax.SAXParseException, errorData: if xmlSource == None: xmlSource = 'unknown' errorString = 'SAXParseException ("%s") detected at line %d, column %d in XML document from source "%s" ' % (errorData.getMessage(), errorData.getLineNumber(), errorData.getColumnNumber(), xmlSource) raise UnmarshallerException(errorString) return objectfactory.getRootObject()def marshal(obj, elementName=None, prettyPrint=False, marshalType=True, indent=0, knownTypes=None, knownNamespaces=None, encoding=-1): worker = XMLMarshalWorker(prettyPrint=prettyPrint, marshalType=marshalType, knownTypes=knownTypes, knownNamespaces=knownNamespaces) if obj != None and hasattr(obj, '__xmldeepexclude__'): worker.xmldeepexclude = obj.__xmldeepexclude__ xmlstr = "".join(worker._marshal(obj, elementName, indent=indent)) aglogging.info(xmlMarshallerLogger, "marshal produced string of type %s", type(xmlstr)) if (encoding == None): return xmlstr if (not isinstance(encoding, basestring)): encoding = sys.getdefaultencoding() if (not isinstance(xmlstr, unicode)): xmlstr = xmlstr.decode() xmlstr = u'<?xml version="1.0" encoding="%s"?>\n%s' % (encoding, xmlstr) return xmlstr.encode(encoding)class XMLMarshalWorker(object): def __init__(self, marshalType=True, prettyPrint=False, knownTypes=None, knownNamespaces=None): if knownTypes == None: self.knownTypes = {} else: self.knownTypes = knownTypes if knownNamespaces == None: self.knownNamespaces = {} else: self.knownNamespaces = knownNamespaces self.prettyPrint = prettyPrint self.marshalType = marshalType self.xmldeepexclude = [] self.nsstack = [] def getNSPrefix(self): if len(self.nsstack) > 0: return self.nsstack[-1].prefix return '' def isKnownType(self, elementName): tagLongNs = None nse = self.nsstack[-1] i = elementName.rfind(':') if i > 0: prefix = elementName[:i] name = elementName[i+1:] else: prefix = DEFAULT_NAMESPACE_KEY name = elementName for shortNs, longNs in nse.nameSpaces.iteritems(): if shortNs == prefix: tagLongNs = longNs break if tagLongNs == None: knownTagName = elementName else: knownShortNs = self.knownNamespaces[tagLongNs] knownTagName = knownShortNs + ':' + name if (knownTagName in asDict(self.knownTypes)): knownClass = self.knownTypes[knownTagName] return True return False def popNSStack(self): self.nsstack.pop() def appendNSStack(self, obj): nameSpaces = {} defaultLongNS = None for nse in self.nsstack: for k, v in nse.nsMap.iteritems(): nameSpaces[k] = v if k == DEFAULT_NAMESPACE_KEY: defaultLongNS = v newNS = NsElement() nameSpaceAttrs = "" if hasattr(obj, "__xmlnamespaces__"): ns = getattr(obj, "__xmlnamespaces__") keys = ns.keys() keys.sort() for nameSpaceKey in keys: nameSpaceUrl = ns[nameSpaceKey] if nameSpaceUrl in nameSpaces.values(): for k, v in nameSpaces.iteritems(): if v == nameSpaceUrl: nameSpaceKey = k break else: if nameSpaceKey == "": defaultLongNS = nameSpaceUrl nameSpaces[DEFAULT_NAMESPACE_KEY] = nameSpaceUrl newNS.nsMap[DEFAULT_NAMESPACE_KEY] = nameSpaceUrl nameSpaceAttrs += ' xmlns="%s" ' % (nameSpaceUrl) else: nameSpaces[nameSpaceKey] = nameSpaceUrl newNS.nsMap[nameSpaceKey] = nameSpaceUrl nameSpaceAttrs += ' xmlns:%s="%s" ' % (nameSpaceKey, nameSpaceUrl) nameSpaceAttrs = nameSpaceAttrs.rstrip() if len(self.nsstack) > 0: newNS.prefix = self.nsstack[-1].prefix else: newNS.prefix = '' if obj != None and hasattr(obj, "__xmldefaultnamespace__"): longPrefixNS = getattr(obj, "__xmldefaultnamespace__") if longPrefixNS == defaultLongNS: newNS.prefix = '' else: try: for k, v in nameSpaces.iteritems(): if v == longPrefixNS: newNS.prefix = k + ':' break; except: if (longPrefixNS in asDict(self.knownNamespaces)): newNS.prefix = self.knownNamespaces[longPrefixNS] + ':' else: raise MarshallerException('Error marshalling __xmldefaultnamespace__ ("%s") not defined in namespace stack' % (longPrefixNS)) if obj != None and hasattr(obj, "targetNamespace"): newNS.targetNS = obj.targetNamespace elif len(self.nsstack) > 0: newNS.targetNS = self.nsstack[-1].targetNS newNS.nameSpaces = nameSpaces self.nsstack.append(newNS) return nameSpaceAttrs def contractQName(self, value, obj, attr): value = langToXsdType(value) i = value.rfind(':') if i >= 0: longNS = value[:i] else: # the value doesn't have a namespace and we couldn't map it to an XSD type...what to do? # (a) just write it, as is, and hope it's in the default namespace (for now) # (b) throw an exception so we can track down the bad code (later) return value if (longNS in self.nsstack[-1].nameSpaces.values()): for kShort, vLong in self.nsstack[-1].nameSpaces.iteritems(): if vLong == longNS: shortNS = kShort break else: shortNS = longNS # if we can't find the long->short mappping, just use longNS if shortNS == DEFAULT_NAMESPACE_KEY: value = value[i+1:] else: value = shortNS + ':' + value[i+1:] return value def _genObjTypeStr(self, typeString): if self.marshalType: return ' objtype="%s"' % typeString return "" def _marshal(self, obj, elementName=None, nameSpacePrefix="", indent=0): if (obj != None): aglogging.debug(xmlMarshallerLogger, "--> _marshal: elementName=%s%s, type=%s, obj=%s, indent=%d", nameSpacePrefix, elementName, type(obj), str(obj), indent) else: aglogging.debug(xmlMarshallerLogger, "--> _marshal: elementName=%s%s, obj is None, indent=%d", nameSpacePrefix, elementName, indent) if ((obj != None) and (hasattr(obj, 'preMarshal'))): obj.preMarshal() excludeAttrs = [] excludeAttrs.extend(self.xmldeepexclude) if hasattr(obj, "__xmlexclude__"): excludeAttrs.extend(obj.__xmlexclude__) prettyPrint = self.prettyPrint knownTypes = self.knownTypes xmlString = None if self.prettyPrint or indent: prefix = " "*indent newline = "\n" increment = 2 else: prefix = "" newline = "" increment = 0 ## Determine the XML element name. If it isn"t specified in the
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -