cxfmessagedispatcher.java

来自「提供ESB 应用mule源代码 提供ESB 应用mule源代码」· Java 代码 · 共 258 行

JAVA
258
字号
/* * $Id: CxfMessageDispatcher.java 13051 2008-10-10 21:10:48Z dfeist $ * -------------------------------------------------------------------------------------- * Copyright (c) MuleSource, Inc.  All rights reserved.  http://www.mulesource.com * * The software in this package is published under the terms of the CPAL v1.0 * license, a copy of which has been included with this distribution in the * LICENSE.txt file. */package org.mule.transport.cxf;import org.mule.DefaultMuleMessage;import org.mule.api.MuleEvent;import org.mule.api.MuleMessage;import org.mule.api.config.MuleProperties;import org.mule.api.endpoint.EndpointURI;import org.mule.api.endpoint.OutboundEndpoint;import org.mule.api.transformer.TransformerException;import org.mule.transport.AbstractMessageDispatcher;import org.mule.transport.soap.SoapConstants;import org.mule.util.TemplateParser;import java.lang.reflect.Method;import java.util.ArrayList;import java.util.Arrays;import java.util.HashMap;import java.util.Iterator;import java.util.List;import java.util.Map;import java.util.Set;import java.util.regex.Pattern;import javax.activation.DataHandler;import javax.xml.namespace.QName;import javax.xml.ws.BindingProvider;import org.apache.cxf.endpoint.Client;import org.apache.cxf.endpoint.ClientImpl;import org.apache.cxf.service.model.BindingOperationInfo;/** * The CxfMessageDispatcher is used for making Soap client requests to remote * services. */public class CxfMessageDispatcher extends AbstractMessageDispatcher{    private static final String URI_REGEX = "cxf:\\[(.+?)\\]:(.+?)/\\[(.+?)\\]:(.+?)";    Pattern URI_PATTERN = Pattern.compile(URI_REGEX);    protected final CxfConnector connector;    protected ClientWrapper wrapper;    private final TemplateParser soapActionTemplateParser = TemplateParser.createMuleStyleParser();    public CxfMessageDispatcher(OutboundEndpoint endpoint)    {        super(endpoint);        this.connector = (CxfConnector) endpoint.getConnector();    }    /*    We need a way to associate an endpoint with a specific CXF service and operation, and the most sensible way to    accomplish that is to overload URI syntax:    cxf:[service_URI]:service_localname/[ep_URI]:ep_localname    And the map method to operation     */    protected void doConnect() throws Exception    {        wrapper = new ClientWrapper();        wrapper.setBus(connector.getCxfBus());        wrapper.setEndpoint(endpoint);        wrapper.initialize();    }    protected void doDisconnect() throws Exception    {    }    protected void doDispose()    {        // nothing to do    }    protected Object[] getArgs(MuleEvent event) throws TransformerException    {        Object payload = event.transformMessage();        Object[] args;        if (payload instanceof Object[])        {            args = (Object[])payload;        }        else        {            args = new Object[]{payload};        }        MuleMessage message = event.getMessage();        Set<?> attachmentNames = message.getAttachmentNames();        if (attachmentNames != null && !attachmentNames.isEmpty())        {            List<DataHandler> attachments = new ArrayList<DataHandler>();            for (Iterator<?> i = attachmentNames.iterator(); i.hasNext();)            {                attachments.add(message.getAttachment((String)i.next()));            }            List<Object> temp = new ArrayList<Object>(Arrays.asList(args));            temp.add(attachments.toArray(new DataHandler[0]));            args = temp.toArray();        }        if (args.length == 0)        {            return null;        }        return args;    }    protected MuleMessage doSend(MuleEvent event) throws Exception    {        ((ClientImpl)wrapper.getClient()).setSynchronousTimeout(event.getTimeout());        if (!wrapper.isClientProxyAvailable())        {            return doSendWithClient(event);        }        else        {            return doSendWithProxy(event);        }    }    protected MuleMessage doSendWithProxy(MuleEvent event) throws Exception    {        Method method = wrapper.getMethod(event);        Map<String, Object> props = new HashMap<String, Object>();        props.put(MuleProperties.MULE_EVENT_PROPERTY, event);                 // Set custom soap action if set on the event or endpoint        String soapAction = (String)event.getMessage().getProperty(SoapConstants.SOAP_ACTION_PROPERTY);        if (soapAction != null)        {            soapAction = parseSoapAction(soapAction, new QName(method.getName()), event);            props.put(org.apache.cxf.binding.soap.SoapBindingConstants.SOAP_ACTION, soapAction);        }                BindingProvider bp = wrapper.getClientProxy();        bp.getRequestContext().putAll(props);                Object response = method.invoke(wrapper.getClientProxy(), getArgs(event));                // TODO: handle holders                return buildResponseMessage(event, new Object[] { response });    }    protected MuleMessage doSendWithClient(MuleEvent event) throws Exception    {        BindingOperationInfo bop = wrapper.getOperation(event);                Map<String, Object> props = new HashMap<String, Object>();        props.put(MuleProperties.MULE_EVENT_PROPERTY, event);                 // Set custom soap action if set on the event or endpoint        String soapAction = (String)event.getMessage().getProperty(SoapConstants.SOAP_ACTION_PROPERTY);        if (soapAction != null)        {            soapAction = parseSoapAction(soapAction, bop.getName(), event);            props.put(org.apache.cxf.binding.soap.SoapBindingConstants.SOAP_ACTION, soapAction);            event.getMessage().setProperty(SoapConstants.SOAP_ACTION_PROPERTY, soapAction);        }                Map<String, Object> ctx = new HashMap<String, Object>();        ctx.put(Client.REQUEST_CONTEXT, props);         ctx.put(Client.RESPONSE_CONTEXT, props);                 // Set Custom Headers on the client        Object[] arr = event.getMessage().getPropertyNames().toArray();        String head;        for (int i = 0; i < arr.length; i++)        {            head = (String) arr[i];            if ((head != null) && (!head.startsWith("MULE")))            {                props.put((String) arr[i], event.getMessage().getProperty((String) arr[i]));            }        }                Object[] response = wrapper.getClient().invoke(bop, getArgs(event), ctx);        return buildResponseMessage(event, response);    }    protected MuleMessage buildResponseMessage(MuleEvent event, Object[] response)     {        MuleMessage result = null;        if (response != null && response.length <= 1)        {            if (response.length == 1)            {                result = new DefaultMuleMessage(response[0], event.getMessage());            }        }        else        {            result = new DefaultMuleMessage(response, event.getMessage());        }        return result;    }    protected void doDispatch(MuleEvent event) throws Exception    {        doSend(event);    }    public String parseSoapAction(String soapAction, QName method, MuleEvent event)    {        EndpointURI endpointURI = event.getEndpoint().getEndpointURI();        Map<String, String> properties = new HashMap<String, String>();        MuleMessage msg = event.getMessage();        for (Iterator<?> iterator = msg.getPropertyNames().iterator(); iterator.hasNext();)        {            String propertyKey = (String)iterator.next();            properties.put(propertyKey, msg.getProperty(propertyKey).toString());        }        properties.put(MuleProperties.MULE_METHOD_PROPERTY, method.getLocalPart());        properties.put("methodNamespace", method.getNamespaceURI());        properties.put("address", endpointURI.getAddress());        properties.put("scheme", endpointURI.getScheme());        properties.put("host", endpointURI.getHost());        properties.put("port", String.valueOf(endpointURI.getPort()));        properties.put("path", endpointURI.getPath());        properties.put("hostInfo", endpointURI.getScheme()                                   + "://"                                   + endpointURI.getHost()                                   + (endpointURI.getPort() > -1                                                   ? ":" + String.valueOf(endpointURI.getPort()) : ""));        if (event.getService() != null)        {            properties.put("serviceName", event.getService().getName());        }        soapAction = soapActionTemplateParser.parse(properties, soapAction);        if (logger.isDebugEnabled())        {            logger.debug("SoapAction for this call is: " + soapAction);        }        return soapAction;    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?