⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 xmlutils.py

📁 Wxpython Implemented on Windows CE, Source code
💻 PY
📖 第 1 页 / 共 2 页
字号:
#----------------------------------------------------------------------------
# Name:         xmlutils.py
# Purpose:      XML and Marshaller Utilities
#
# Author:       Jeff Norton
#
# Created:      6/2/05
# CVS-ID:       $Id: xmlutils.py,v 1.3 2006/04/20 06:25:50 RD Exp $
# Copyright:    (c) 2004-2005 ActiveGrid, Inc.
# License:      wxWindows License
#----------------------------------------------------------------------------

from activegrid.util.lang import *
import os
import time
import urllib
import logging
from activegrid.util.lang import *
import activegrid.util.objutils as objutils
import activegrid.util.xmlmarshaller as xmlmarshaller
import activegrid.util.aglogging as aglogging

xmlLogger = logging.getLogger("activegrid.util.xml")
    
def load(fileName, knownTypes=None, knownNamespaces=None, createGenerics=False):
    loadedObject = None
    fileObject = file(fileName)
    timeStart = time.time()
    xml = ""
    try:
        xml = fileObject.read()
        loadedObject = unmarshal(xml, knownTypes=knownTypes, knownNamespaces=knownNamespaces, xmlSource=fileName, createGenerics=createGenerics)
        loadedObject.fileName = os.path.abspath(fileName)
        if hasattr(loadedObject, 'initialize'):
            loadedObject.initialize()
    finally:
        fileObject.close()
        if xmlLogger.isEnabledFor(aglogging.LEVEL_INFO):
            timeDone = time.time()
            aglogging.info(xmlLogger, ('Load statistics for file %s (%d bytes): elapsed time = %f secs' % (fileName, len(xml), timeDone-timeStart)))
    return loadedObject

def loadURI(uri, knownTypes=None, knownNamespaces=None, xmlSource=None, createGenerics=False):
    loadedObject = None
    timeStart = time.time()
    xml = ""
    try:
        xml = urllib.urlopen(uri).read()
        loadedObject = unmarshal(xml, knownTypes=knownTypes, knownNamespaces=knownNamespaces, xmlSource=xmlSource, createGenerics=createGenerics)
        loadedObject.fileName = uri
        if hasattr(loadedObject, 'initialize'):
            loadedObject.initialize()
    finally:
        if xmlLogger.isEnabledFor(aglogging.LEVEL_INFO):
            timeDone = time.time()
            aglogging.info(xmlLogger, ('Load statistics for URI %s (%d bytes): elapsed time = %f secs' % (uri, len(xml), timeDone-timeStart)))
    return loadedObject

def unmarshal(xml, knownTypes=None, knownNamespaces=None, xmlSource=None, createGenerics=False):
    if (knownTypes == None): 
        knownTypes, knownNamespaces = getAgKnownTypes()
    return xmlmarshaller.unmarshal(xml, knownTypes=knownTypes, knownNamespaces=knownNamespaces, xmlSource=xmlSource, createGenerics=createGenerics)    

def save(fileName, objectToSave, prettyPrint=True, marshalType=True, knownTypes=None, knownNamespaces=None, encoding='utf-8'):
    if hasattr(objectToSave, '_xmlReadOnly') and objectToSave._xmlReadOnly == True:
        raise xmlmarshaller.MarshallerException('Error marshalling object to file "%s": object is marked "readOnly" and cannot be written' % (fileName))        
    timeStart = time.time()
    xml = marshal(objectToSave, prettyPrint=prettyPrint, marshalType=marshalType, knownTypes=knownTypes, knownNamespaces=knownNamespaces, encoding=encoding)
    fileObject = file(fileName, 'w')
    try:
        fileObject.write(xml)
        fileObject.flush()
    except Exception, errorData:
        fileObject.close()
        raise xmlmarshaller.MarshallerException('Error marshalling object to file "%s": %s' % (fileName, str(errorData)))
    fileObject.close()
    timeDone = time.time()
    aglogging.info(xmlLogger, ('Save statistics for file %s: elapsed time = %f secs' % (fileName, timeDone-timeStart)))
    
def marshal(objectToSave, prettyPrint=True, marshalType=True, knownTypes=None, knownNamespaces=None, encoding='utf-8'):
    if (knownTypes == None): 
        knownTypes, knownNamespaces = getAgKnownTypes()
    return xmlmarshaller.marshal(objectToSave, prettyPrint=prettyPrint, marshalType=marshalType, knownTypes=knownTypes, knownNamespaces=knownNamespaces, encoding=encoding)
    
def addNSAttribute(xmlDoc, shortNamespace, longNamespace):
    if not hasattr(xmlDoc, "__xmlnamespaces__"):
        xmlDoc.__xmlnamespaces__ = {shortNamespace:longNamespace}
    elif shortNamespace not in xmlDoc.__xmlnamespaces__:
        if (hasattr(xmlDoc.__class__, "__xmlnamespaces__") 
            and (xmlDoc.__xmlnamespaces__ is xmlDoc.__class__.__xmlnamespaces__)):
            xmlDoc.__xmlnamespaces__ = dict(xmlDoc.__xmlnamespaces__)
        xmlDoc.__xmlnamespaces__[shortNamespace] = longNamespace

def genShortNS(xmlDoc, longNamespace=None):
    if not hasattr(xmlDoc, "__xmlnamespaces__"):
        return "ns1"
    elif longNamespace != None and longNamespace in xmlDoc.__xmlnamespaces__.items():
        for key, value in xmlDoc.__xmlnamespaces__.iteritems():
            if value == longNamespace:
                return key
    i = 1
    while ("ns%d" % i) in xmlDoc.__xmlnamespaces__:
        i += 1
    return ("ns%d" % i)
    
def genTargetNS(fileName, applicationName=None, type=None):
    if (applicationName != None):
        if (type != None):
            tns = "urn:%s:%s:%s" % (applicationName, type, fileName)
        else:
            tns = "urn:%s:%s" % (applicationName, fileName)
    else:
        tns = "urn:%s" % fileName
    return tns
    
def splitType(typeName):
    index = typeName.rfind(':')
    if index != -1:
        ns = typeName[:index]
        complexTypeName = typeName[index+1:]
    else:
        ns = None
        complexTypeName = typeName
    return (ns, complexTypeName)
        
def cloneObject(objectToClone, knownTypes=None, marshalType=True, knownNamespaces=None, encoding='utf-8'):
    if (knownTypes == None): 
        knownTypes, knownNamespaces = getAgKnownTypes()
    xml = xmlmarshaller.marshal(objectToClone, prettyPrint=True, marshalType=marshalType, knownTypes=knownTypes, knownNamespaces=knownNamespaces, encoding=encoding)
    clonedObject = xmlmarshaller.unmarshal(xml, knownTypes=knownTypes, knownNamespaces=knownNamespaces)
    if hasattr(objectToClone, 'fileName'):
        clonedObject.fileName = objectToClone.fileName
    if hasattr(objectToClone, "_parentDoc"):
        clonedObject._parentDoc = objectToClone._parentDoc
    try:
        clonedObject.initialize()
    except AttributeError:
        pass
    return clonedObject

def getAgVersion(fileName):
    fileObject = file(fileName)
    try:
        xml = fileObject.read()
    finally:
        fileObject.close()
    i = xml.find(' ag:version=')
    if i >= 0:
        i += 12
    else:
        i2 = xml.find('<ag:')
        if i2 >= 0:
            i = xml.find(' version=', i2)
            if i > 0:
                i += 9
        elif xml.find('<project version="10"') >= 0:
            return "10"
        else:
            return None
    version = None
    if xml[i:i+1] == '"':
        j = xml.find('"', i+1)
        if (j > i+1):
            version = xml[i+1:j]
    return version

    
AG_NS_URL = "http://www.activegrid.com/ag.xsd"
BPEL_NS_URL = "http://schemas.xmlsoap.org/ws/2003/03/business-process"
HTTP_WSDL_NS_URL = "http://schemas.xmlsoap.org/wsdl/http/"
MIME_WSDL_NS_URL = "http://schemas.xmlsoap.org/wsdl/mime/"
SOAP_NS_URL = "http://schemas.xmlsoap.org/wsdl/soap/"
SOAP12_NS_URL = "http://schemas.xmlsoap.org/wsdl/soap12/"
SOAP_NS_ENCODING = "http://schemas.xmlsoap.org/soap/encoding/"
WSDL_NS_URL = "http://schemas.xmlsoap.org/wsdl/"
WSSE_NS_URL = "http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd"
XFORMS_NS_URL = "http://www.w3c.org/xform.xsd"
XMLSCHEMA_NS_URL = "http://www.w3.org/2001/XMLSchema"
XSI_NS_URL = "http://www.w3.org/2001/XMLSchema-instance"
XACML_NS_URL = "urn:oasis:names:tc:xacml:2.0:policy:schema:os"

KNOWN_NAMESPACES = { AG_NS_URL          :  "ag",
                     BPEL_NS_URL        :  "bpws",
                     HTTP_WSDL_NS_URL   :  "http",
                     MIME_WSDL_NS_URL   :  "mime",
                     SOAP_NS_URL        :  "soap",
                     SOAP12_NS_URL      :  "soap12",
                     WSDL_NS_URL        :  "wsdl",
                     WSSE_NS_URL        :  "wsse", 
                     XFORMS_NS_URL      :  "xforms",                             
                     XMLSCHEMA_NS_URL   :  "xs",
                     XACML_NS_URL       :  "xacml",
                   }
    
global agXsdToClassName
agXsdToClassName = None
def getAgXsdToClassName():
    global agXsdToClassName
    if (agXsdToClassName == None):
        agXsdToClassName = {
            "ag:append"          : "activegrid.model.processmodel.AppendOperation",
            "ag:attribute"       : "activegrid.model.identitymodel.Attribute",

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -