📄 xmlutils.py
字号:
#----------------------------------------------------------------------------# Name: xmlutils.py# Purpose: XML and Marshaller Utilities## Author: Jeff Norton## Created: 6/2/05# CVS-ID: $Id: xmlutils.py,v 1.3 2006/04/20 06:25:50 RD Exp $# Copyright: (c) 2004-2005 ActiveGrid, Inc.# License: wxWindows License#----------------------------------------------------------------------------from activegrid.util.lang import *import osimport timeimport urllibimport loggingfrom activegrid.util.lang import *import activegrid.util.objutils as objutilsimport activegrid.util.xmlmarshaller as xmlmarshallerimport activegrid.util.aglogging as agloggingxmlLogger = logging.getLogger("activegrid.util.xml") def load(fileName, knownTypes=None, knownNamespaces=None, createGenerics=False): loadedObject = None fileObject = file(fileName) timeStart = time.time() xml = "" try: xml = fileObject.read() loadedObject = unmarshal(xml, knownTypes=knownTypes, knownNamespaces=knownNamespaces, xmlSource=fileName, createGenerics=createGenerics) loadedObject.fileName = os.path.abspath(fileName) if hasattr(loadedObject, 'initialize'): loadedObject.initialize() finally: fileObject.close() if xmlLogger.isEnabledFor(aglogging.LEVEL_INFO): timeDone = time.time() aglogging.info(xmlLogger, ('Load statistics for file %s (%d bytes): elapsed time = %f secs' % (fileName, len(xml), timeDone-timeStart))) return loadedObjectdef loadURI(uri, knownTypes=None, knownNamespaces=None, xmlSource=None, createGenerics=False): loadedObject = None timeStart = time.time() xml = "" try: xml = urllib.urlopen(uri).read() loadedObject = unmarshal(xml, knownTypes=knownTypes, knownNamespaces=knownNamespaces, xmlSource=xmlSource, createGenerics=createGenerics) loadedObject.fileName = uri if hasattr(loadedObject, 'initialize'): loadedObject.initialize() finally: if xmlLogger.isEnabledFor(aglogging.LEVEL_INFO): timeDone = time.time() aglogging.info(xmlLogger, ('Load statistics for URI %s (%d bytes): elapsed time = %f secs' % (uri, len(xml), timeDone-timeStart))) return loadedObjectdef unmarshal(xml, knownTypes=None, knownNamespaces=None, xmlSource=None, createGenerics=False): if (knownTypes == None): knownTypes, knownNamespaces = getAgKnownTypes() return xmlmarshaller.unmarshal(xml, knownTypes=knownTypes, knownNamespaces=knownNamespaces, xmlSource=xmlSource, createGenerics=createGenerics) def save(fileName, objectToSave, prettyPrint=True, marshalType=True, knownTypes=None, knownNamespaces=None, encoding='utf-8'): if hasattr(objectToSave, '_xmlReadOnly') and objectToSave._xmlReadOnly == True: raise xmlmarshaller.MarshallerException('Error marshalling object to file "%s": object is marked "readOnly" and cannot be written' % (fileName)) timeStart = time.time() xml = marshal(objectToSave, prettyPrint=prettyPrint, marshalType=marshalType, knownTypes=knownTypes, knownNamespaces=knownNamespaces, encoding=encoding) fileObject = file(fileName, 'w') try: fileObject.write(xml) fileObject.flush() except Exception, errorData: fileObject.close() raise xmlmarshaller.MarshallerException('Error marshalling object to file "%s": %s' % (fileName, str(errorData))) fileObject.close() timeDone = time.time() aglogging.info(xmlLogger, ('Save statistics for file %s: elapsed time = %f secs' % (fileName, timeDone-timeStart))) def marshal(objectToSave, prettyPrint=True, marshalType=True, knownTypes=None, knownNamespaces=None, encoding='utf-8'): if (knownTypes == None): knownTypes, knownNamespaces = getAgKnownTypes() return xmlmarshaller.marshal(objectToSave, prettyPrint=prettyPrint, marshalType=marshalType, knownTypes=knownTypes, knownNamespaces=knownNamespaces, encoding=encoding) def addNSAttribute(xmlDoc, shortNamespace, longNamespace): if not hasattr(xmlDoc, "__xmlnamespaces__"): xmlDoc.__xmlnamespaces__ = {shortNamespace:longNamespace} elif shortNamespace not in xmlDoc.__xmlnamespaces__: if (hasattr(xmlDoc.__class__, "__xmlnamespaces__") and (xmlDoc.__xmlnamespaces__ is xmlDoc.__class__.__xmlnamespaces__)): xmlDoc.__xmlnamespaces__ = dict(xmlDoc.__xmlnamespaces__) xmlDoc.__xmlnamespaces__[shortNamespace] = longNamespacedef genShortNS(xmlDoc, longNamespace=None): if not hasattr(xmlDoc, "__xmlnamespaces__"): return "ns1" elif longNamespace != None and longNamespace in xmlDoc.__xmlnamespaces__.items(): for key, value in xmlDoc.__xmlnamespaces__.iteritems(): if value == longNamespace: return key i = 1 while ("ns%d" % i) in xmlDoc.__xmlnamespaces__: i += 1 return ("ns%d" % i) def genTargetNS(fileName, applicationName=None, type=None): if (applicationName != None): if (type != None): tns = "urn:%s:%s:%s" % (applicationName, type, fileName) else: tns = "urn:%s:%s" % (applicationName, fileName) else: tns = "urn:%s" % fileName return tns def splitType(typeName): index = typeName.rfind(':') if index != -1: ns = typeName[:index] complexTypeName = typeName[index+1:] else: ns = None complexTypeName = typeName return (ns, complexTypeName) def cloneObject(objectToClone, knownTypes=None, marshalType=True, knownNamespaces=None, encoding='utf-8'): if (knownTypes == None): knownTypes, knownNamespaces = getAgKnownTypes() xml = xmlmarshaller.marshal(objectToClone, prettyPrint=True, marshalType=marshalType, knownTypes=knownTypes, knownNamespaces=knownNamespaces, encoding=encoding) clonedObject = xmlmarshaller.unmarshal(xml, knownTypes=knownTypes, knownNamespaces=knownNamespaces) if hasattr(objectToClone, 'fileName'): clonedObject.fileName = objectToClone.fileName if hasattr(objectToClone, "_parentDoc"): clonedObject._parentDoc = objectToClone._parentDoc try: clonedObject.initialize() except AttributeError: pass return clonedObjectdef getAgVersion(fileName): fileObject = file(fileName) try: xml = fileObject.read() finally: fileObject.close() i = xml.find(' ag:version=') if i >= 0: i += 12 else: i2 = xml.find('<ag:') if i2 >= 0: i = xml.find(' version=', i2) if i > 0: i += 9 elif xml.find('<project version="10"') >= 0: return "10" else: return None version = None if xml[i:i+1] == '"': j = xml.find('"', i+1) if (j > i+1): version = xml[i+1:j] return version AG_NS_URL = "http://www.activegrid.com/ag.xsd"BPEL_NS_URL = "http://schemas.xmlsoap.org/ws/2003/03/business-process"HTTP_WSDL_NS_URL = "http://schemas.xmlsoap.org/wsdl/http/"MIME_WSDL_NS_URL = "http://schemas.xmlsoap.org/wsdl/mime/"SOAP_NS_URL = "http://schemas.xmlsoap.org/wsdl/soap/"SOAP12_NS_URL = "http://schemas.xmlsoap.org/wsdl/soap12/"SOAP_NS_ENCODING = "http://schemas.xmlsoap.org/soap/encoding/"WSDL_NS_URL = "http://schemas.xmlsoap.org/wsdl/"WSSE_NS_URL = "http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd"XFORMS_NS_URL = "http://www.w3c.org/xform.xsd"XMLSCHEMA_NS_URL = "http://www.w3.org/2001/XMLSchema"XSI_NS_URL = "http://www.w3.org/2001/XMLSchema-instance"XACML_NS_URL = "urn:oasis:names:tc:xacml:2.0:policy:schema:os"KNOWN_NAMESPACES = { AG_NS_URL : "ag", BPEL_NS_URL : "bpws", HTTP_WSDL_NS_URL : "http", MIME_WSDL_NS_URL : "mime", SOAP_NS_URL : "soap", SOAP12_NS_URL : "soap12", WSDL_NS_URL : "wsdl", WSSE_NS_URL : "wsse", XFORMS_NS_URL : "xforms", XMLSCHEMA_NS_URL : "xs", XACML_NS_URL : "xacml", } global agXsdToClassNameagXsdToClassName = Nonedef getAgXsdToClassName(): global agXsdToClassName if (agXsdToClassName == None): agXsdToClassName = { "ag:append" : "activegrid.model.processmodel.AppendOperation", "ag:attribute" : "activegrid.model.identitymodel.Attribute",
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -