cxfconnector.java

来自「提供ESB 应用mule源代码 提供ESB 应用mule源代码」· Java 代码 · 共 380 行

JAVA
380
字号
/* * $Id: CxfConnector.java 12918 2008-10-06 13:54:19Z dirk.olmes $ * -------------------------------------------------------------------------------------- * Copyright (c) MuleSource, Inc.  All rights reserved.  http://www.mulesource.com * * The software in this package is published under the terms of the CPAL v1.0 * license, a copy of which has been included with this distribution in the * LICENSE.txt file. */package org.mule.transport.cxf;import org.mule.api.MuleEvent;import org.mule.api.MuleException;import org.mule.api.MuleMessage;import org.mule.api.context.notification.MuleContextNotificationListener;import org.mule.api.context.notification.ServerNotification;import org.mule.api.endpoint.EndpointBuilder;import org.mule.api.endpoint.EndpointURI;import org.mule.api.endpoint.InboundEndpoint;import org.mule.api.endpoint.OutboundEndpoint;import org.mule.api.lifecycle.InitialisationException;import org.mule.api.service.Service;import org.mule.api.transport.DispatchException;import org.mule.api.transport.MessageReceiver;import org.mule.component.DefaultJavaComponent;import org.mule.config.spring.SpringRegistry;import org.mule.context.notification.MuleContextNotification;import org.mule.endpoint.EndpointURIEndpointBuilder;import org.mule.model.seda.SedaService;import org.mule.object.SingletonObjectFactory;import org.mule.routing.inbound.DefaultInboundRouterCollection;import org.mule.transport.AbstractConnector;import org.mule.transport.cxf.transport.MuleUniversalTransport;import org.mule.transport.http.HttpConnector;import org.mule.transport.http.HttpConstants;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import javax.xml.namespace.QName;import edu.emory.mathcs.backport.java.util.Collections;import org.apache.cxf.Bus;import org.apache.cxf.BusFactory;import org.apache.cxf.bus.spring.SpringBusFactory;import org.apache.cxf.endpoint.Server;import org.apache.cxf.transport.ConduitInitiatorManager;import org.apache.cxf.transport.DestinationFactoryManager;import org.springframework.context.ApplicationContext;/** * Connects Mule to a CXF bus instance. */public class CxfConnector extends AbstractConnector implements MuleContextNotificationListener{    public static final String CXF = "cxf";    public static final String CXF_SERVICE_COMPONENT_NAME = "_cxfServiceComponent";    public static final String CONFIGURATION_LOCATION = "configurationLocation";    public static final String DEFAULT_MULE_NAMESPACE_URI = "http://www.muleumo.org";    public static final String BUS_PROPERTY = CXF;    // The CXF Bus object    private Bus bus;    private String configurationLocation;    private String defaultFrontend = CxfConstants.JAX_WS_FRONTEND;    private List<SedaService> services = Collections.synchronizedList(new ArrayList<SedaService>());    private Map<String, Server> uriToServer = new HashMap<String, Server>();    private boolean initializeStaticBusInstance = true;        public CxfConnector()    {        super();        registerProtocols();    }    protected void registerProtocols()    {        registerSupportedProtocol("http");        registerSupportedProtocol("https");        registerSupportedProtocol("jms");        registerSupportedProtocol("vm");        registerSupportedProtocol("servlet");        registerSupportedProtocol("jetty");    }    public String getProtocol()    {        return CXF;    }    protected void doInitialise() throws InitialisationException    {        ApplicationContext context = (ApplicationContext) muleContext.getRegistry().lookupObject(SpringRegistry.SPRING_APPLICATION_CONTEXT);                if (configurationLocation != null)        {            bus = new SpringBusFactory(context).createBus(configurationLocation, true);        }        else        {            bus = new SpringBusFactory(context).createBus((String)null, true);        }                if (!initializeStaticBusInstance)        {            BusFactory.setDefaultBus(null);        }                MuleUniversalTransport transport = new MuleUniversalTransport(this);        DestinationFactoryManager dfm = bus.getExtension(DestinationFactoryManager.class);        dfm.registerDestinationFactory("http://schemas.xmlsoap.org/soap/http", transport);        dfm.registerDestinationFactory("http://schemas.xmlsoap.org/wsdl/soap/http", transport);        dfm.registerDestinationFactory("http://cxf.apache.org/transports/http/configuration", transport);        dfm.registerDestinationFactory("http://schemas.xmlsoap.org/wsdl/http/", transport);        dfm.registerDestinationFactory("http://www.w3.org/2003/05/soap/bindings/HTTP/", transport);        dfm.registerDestinationFactory(MuleUniversalTransport.TRANSPORT_ID, transport);        ConduitInitiatorManager extension = bus.getExtension(ConduitInitiatorManager.class);        extension.registerConduitInitiator("http://schemas.xmlsoap.org/wsdl/soap/", transport);        extension.registerConduitInitiator("http://schemas.xmlsoap.org/soap/http", transport);        extension.registerConduitInitiator(MuleUniversalTransport.TRANSPORT_ID, transport);                // Registers the listener        try        {            muleContext.registerListener(this);        }        catch (Exception e)        {            throw new InitialisationException(e, this);        }    }    protected void doDispose()    {        // template method    }    protected void doConnect() throws Exception    {        // template method    }    protected void doDisconnect() throws Exception    {        // template method    }    protected void doStart() throws MuleException    {    }    protected void doStop() throws MuleException    {        bus.shutdown(true);    }    public Bus getCxfBus()    {        return bus;    }    public void setCxfBus(Bus bus)    {        this.bus = bus;    }    public String getConfigurationLocation()    {        return configurationLocation;    }    public void setConfigurationLocation(String configurationLocation)    {        this.configurationLocation = configurationLocation;    }    public String getDefaultFrontend()    {        return defaultFrontend;    }    public void setDefaultFrontend(String defaultFrontend)    {        this.defaultFrontend = defaultFrontend;    }    @SuppressWarnings("unchecked")    protected void registerReceiverWithMuleService(MessageReceiver receiver, EndpointURI ep) throws MuleException    {        CxfMessageReceiver cxfReceiver = (CxfMessageReceiver) receiver;        Server server = cxfReceiver.getServer();        uriToServer.put(server.getEndpoint().getEndpointInfo().getAddress(), server);                // TODO MULE-2228 Simplify this API        SedaService c = new SedaService();        c.setName(CXF_SERVICE_COMPONENT_NAME + server.getEndpoint().getService().getName() + c.hashCode());        c.setModel(muleContext.getRegistry().lookupSystemModel());        CxfServiceComponent svcComponent = new CxfServiceComponent(this, (CxfMessageReceiver) receiver);        svcComponent.setBus(bus);        c.setComponent(new DefaultJavaComponent(new SingletonObjectFactory(svcComponent)));        // No determine if the endpointUri requires a new connector to be        // registed in the case of http we only need to register the new        // endpointUri if the port is different        String endpoint = receiver.getEndpointURI().getAddress();        String scheme = ep.getScheme().toLowerCase();        InboundEndpoint originalEndpoint = receiver.getEndpoint();        boolean sync = originalEndpoint.isSynchronous();        // If we are using sockets then we need to set the endpoint name appropiately        // and if using http/https        // we need to default to POST and set the Content-Type        if (scheme.equals("http") || scheme.equals("https") || scheme.equals("ssl") || scheme.equals("tcp")            || scheme.equals("servlet"))        {            originalEndpoint.getProperties().put(HttpConnector.HTTP_METHOD_PROPERTY, "POST");            originalEndpoint.getProperties().put(HttpConstants.HEADER_CONTENT_TYPE, "text/xml");        }        QName serviceName = server.getEndpoint().getEndpointInfo().getName();                EndpointBuilder protocolEndpointBuilder = new EndpointURIEndpointBuilder(endpoint, muleContext);        protocolEndpointBuilder.setSynchronous(sync);        protocolEndpointBuilder.setName(ep.getScheme() + ":" + serviceName.getLocalPart());                EndpointBuilder receiverEndpointBuilder = new EndpointURIEndpointBuilder(originalEndpoint,            muleContext);                // Apply the transformers to the correct endpoint        EndpointBuilder transformerEndpoint;        if (cxfReceiver.isApplyTransformersToProtocol())        {            transformerEndpoint = protocolEndpointBuilder;             receiverEndpointBuilder.setTransformers(null);            receiverEndpointBuilder.setResponseTransformers(null);        }        else        {              transformerEndpoint = receiverEndpointBuilder;        }        transformerEndpoint.setTransformers(originalEndpoint.getTransformers());        transformerEndpoint.setResponseTransformers(originalEndpoint.getResponseTransformers());                // apply the filters to the correct endpoint        EndpointBuilder filterEndpoint;        if (cxfReceiver.isApplyFiltersToProtocol())        {            filterEndpoint = protocolEndpointBuilder;               receiverEndpointBuilder.setFilter(null);                                                                                                        }        else        {              filterEndpoint = receiverEndpointBuilder;        }        filterEndpoint.setFilter(originalEndpoint.getFilter());                // apply the security filter to the correct endpoint        EndpointBuilder secFilterEndpoint;        if (cxfReceiver.isApplySecurityToProtocol())        {            secFilterEndpoint = protocolEndpointBuilder;               receiverEndpointBuilder.setSecurityFilter(null);                                                                                                       }        else        {              secFilterEndpoint = receiverEndpointBuilder;        }                     secFilterEndpoint.setSecurityFilter(originalEndpoint.getSecurityFilter());        String connectorName = (String) originalEndpoint.getProperty(CxfConstants.PROTOCOL_CONNECTOR);        if (connectorName != null)         {            protocolEndpointBuilder.setConnector(muleContext.getRegistry().lookupConnector(connectorName));        }                InboundEndpoint protocolEndpoint = muleContext.getRegistry()            .lookupEndpointFactory()            .getInboundEndpoint(protocolEndpointBuilder);        InboundEndpoint receiverEndpoint = muleContext.getRegistry()            .lookupEndpointFactory()            .getInboundEndpoint(receiverEndpointBuilder);        receiver.setEndpoint(receiverEndpoint);                c.setInboundRouter(new DefaultInboundRouterCollection());        c.getInboundRouter().addEndpoint(protocolEndpoint);        services.add(c);    }    /**     * The method determines the key used to store the receiver against.     *      * @param service the service for which the endpoint is being registered     * @param endpoint the endpoint being registered for the service     * @return the key to store the newly created receiver against. In this case it     *         is the service name, which is equivilent to the Axis service name.     */    @Override    protected Object getReceiverKey(Service service, InboundEndpoint endpoint)    {        if (service.getName().startsWith("_cxfServiceComponent"))        {            return service.getName();        }        else        {            return endpoint.getEndpointURI().getAddress();        }    }    public void onNotification(ServerNotification event)    {        // We need to register the CXF service service once the model        // starts because        // when the model starts listeners on components are started, thus        // all listener        // need to be registered for this connector before the CXF service        // service is registered. The implication of this is that to add a        // new service and a        // different http port the model needs to be restarted before the        // listener is available        if (event.getAction() == MuleContextNotification.CONTEXT_STARTED)        {            for (Service c : services)            {                try                {                    muleContext.getRegistry().registerService(c);                }                catch (MuleException e)                {                    handleException(e);                }            }        }    }        public boolean isSyncEnabled(String protocol)    {        protocol = protocol.toLowerCase();        if (protocol.equals("http") || protocol.equals("https") || protocol.equals("ssl") || protocol.equals("tcp")            || protocol.equals("servlet"))        {            return true;        }        else        {            return super.isSyncEnabled(protocol);        }    }    public Server getServer(String uri)    {        return uriToServer.get(uri);    }    public boolean isInitializeStaticBusInstance()    {        return initializeStaticBusInstance;    }    public void setInitializeStaticBusInstance(boolean initializeStaticBusInstance)    {        this.initializeStaticBusInstance = initializeStaticBusInstance;    }    }

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?