cxfservicecomponent.java
来自「提供ESB 应用mule源代码 提供ESB 应用mule源代码」· Java 代码 · 共 440 行
JAVA
440 行
/* * $Id: CxfServiceComponent.java 12786 2008-09-28 20:34:12Z dandiep $ * -------------------------------------------------------------------------------------- * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com * * The software in this package is published under the terms of the CPAL v1.0 * license, a copy of which has been included with this distribution in the * LICENSE.txt file. */package org.mule.transport.cxf;import org.mule.DefaultMuleMessage;import org.mule.RequestContext;import org.mule.api.ExceptionPayload;import org.mule.api.MuleEvent;import org.mule.api.MuleEventContext;import org.mule.api.MuleException;import org.mule.api.MuleMessage;import org.mule.api.config.ConfigurationException;import org.mule.api.config.MuleProperties;import org.mule.api.endpoint.EndpointNotFoundException;import org.mule.api.endpoint.EndpointURI;import org.mule.api.lifecycle.Callable;import org.mule.api.lifecycle.InitialisationException;import org.mule.api.lifecycle.Lifecycle;import org.mule.api.transformer.TransformerException;import org.mule.api.transport.OutputHandler;import org.mule.config.i18n.MessageFactory;import org.mule.message.DefaultExceptionPayload;import org.mule.module.xml.stax.StaxSource;import org.mule.transport.cxf.support.DelegatingOutputStream;import org.mule.transport.http.HttpConnector;import org.mule.transport.http.HttpConstants;import org.mule.transport.soap.SoapConstants;import org.mule.util.StringUtils;import java.io.ByteArrayInputStream;import java.io.ByteArrayOutputStream;import java.io.IOException;import java.io.InputStream;import java.io.OutputStream;import java.io.Reader;import javax.xml.stream.XMLStreamReader;import javax.xml.transform.Source;import javax.xml.transform.dom.DOMSource;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.cxf.Bus;import org.apache.cxf.endpoint.Server;import org.apache.cxf.io.CachedOutputStream;import org.apache.cxf.message.ExchangeImpl;import org.apache.cxf.message.Message;import org.apache.cxf.message.MessageImpl;import org.apache.cxf.service.model.EndpointInfo;import org.apache.cxf.staxutils.StaxUtils;import org.apache.cxf.transport.MessageObserver;import org.apache.cxf.transport.local.LocalConduit;import org.apache.cxf.transports.http.QueryHandler;import org.apache.cxf.transports.http.QueryHandlerRegistry;import org.apache.cxf.wsdl.http.AddressType;import org.w3c.dom.Document;import org.w3c.dom.Node;/** * The CXF receives messages from Mule, converts them into CXF messages and dispatches * them into the receiving CXF destination. */public class CxfServiceComponent implements Callable, Lifecycle{ /** * logger used by this class */ protected transient Log logger = LogFactory.getLog(getClass()); protected Bus bus; // manager to the component protected String transportClass; private CxfMessageReceiver receiver; public CxfServiceComponent(CxfConnector connector, CxfMessageReceiver receiver) throws ConfigurationException { super(); this.receiver = receiver; this.bus = receiver.connector.getCxfBus(); } public Object onCall(MuleEventContext eventContext) throws Exception { if (logger.isDebugEnabled()) { logger.debug(eventContext); } // if http request String requestPath = parseHttpRequestProperty( eventContext.getMessage().getStringProperty(HttpConnector.HTTP_REQUEST_PROPERTY, StringUtils.EMPTY)); if (requestPath.indexOf('?') > -1) { return generateWSDLOrXSD(eventContext, requestPath); } else { return sendToDestination(eventContext); } } private String parseHttpRequestProperty(String request) { String uriBase = ""; if (!(request.contains("?wsdl")) && (!(request.contains("?xsd")))) { int qIdx = request.indexOf('?'); if (qIdx > -1) { uriBase = request.substring(0, qIdx); } } else { uriBase = request; } return uriBase; } protected Object generateWSDLOrXSD(MuleEventContext eventContext, String req) throws EndpointNotFoundException, IOException { // TODO: Is there a way to make this not so ugly? String ctxUri = eventContext.getEndpointURI().getPath(); String wsdlUri = getWsdlUri(eventContext, req); String serviceUri = wsdlUri.substring(0, wsdlUri.indexOf('?')); EndpointInfo ei = receiver.getServer().getEndpoint().getEndpointInfo(); if (serviceUri != null) { ei.setAddress(serviceUri); if (ei.getExtensor(AddressType.class) != null) { ei.getExtensor(AddressType.class).setLocation(serviceUri); } } ByteArrayOutputStream out = new ByteArrayOutputStream(); String ct = null; for (QueryHandler qh : bus.getExtension(QueryHandlerRegistry.class).getHandlers()) { if (qh.isRecognizedQuery(wsdlUri, ctxUri, ei)) { ct = qh.getResponseContentType(wsdlUri, ctxUri); qh.writeResponse(wsdlUri, ctxUri, ei, out); out.flush(); } } String msg; if (ct == null) { ct = "text/plain"; msg = "No query handler found for URL."; } else { msg = out.toString(); } MuleMessage result = new DefaultMuleMessage(msg); result.setProperty(HttpConstants.HEADER_CONTENT_TYPE, ct); return result; } private String getWsdlUri(MuleEventContext eventContext, String reqPath) { EndpointURI epUri = eventContext.getEndpointURI(); String host = (String) eventContext.getMessage().getProperty("Host", epUri.getHost()); return epUri.getScheme() + "://" + host + reqPath; } protected Object sendToDestination(MuleEventContext ctx) throws MuleException, IOException { try { final MessageImpl m = new MessageImpl(); final MuleMessage muleReqMsg = ctx.getMessage(); String method = (String) muleReqMsg.getProperty(HttpConnector.HTTP_METHOD_PROPERTY); String ct = (String) muleReqMsg.getProperty(HttpConstants.HEADER_CONTENT_TYPE); if (ct != null) { m.put(Message.CONTENT_TYPE, ct); } String path = (String) muleReqMsg.getProperty(HttpConnector.HTTP_REQUEST_PROPERTY); if (path == null) { path = ""; } if (method != null) { m.put(Message.HTTP_REQUEST_METHOD, method); m.put(Message.PATH_INFO, path); m.put(Message.BASE_PATH, ctx.getEndpointURI().getPath()); method = method.toUpperCase(); } if (!"GET".equals(method)) { Object payload = ctx.transformMessage(); setPayload(ctx, m, payload); } // TODO: Not sure if this is 100% correct - DBD String soapAction = getSoapAction(ctx.getMessage()); m.put(org.mule.transport.soap.SoapConstants.SOAP_ACTION_PROPERTY_CAPS, soapAction); Server server = receiver.getServer(); org.apache.cxf.transport.Destination d = server.getDestination(); // Set up a listener for the response m.put(LocalConduit.DIRECT_DISPATCH, Boolean.TRUE); m.put(MuleProperties.MULE_EVENT_PROPERTY, RequestContext.getEvent()); m.setDestination(d); OutputHandler outputHandler = new OutputHandler() { public void write(MuleEvent event, OutputStream out) throws IOException { Message outFaultMessage = m.getExchange().getOutFaultMessage(); Message outMessage = m.getExchange().getOutMessage(); Message contentMsg = null; if (outFaultMessage != null) { contentMsg = outFaultMessage; } else if (outMessage != null) { contentMsg = outMessage; } if (contentMsg == null) { return; } DelegatingOutputStream delegate = (DelegatingOutputStream) contentMsg.getContent(OutputStream.class); out.write(((ByteArrayOutputStream) delegate.getOutputStream()).toByteArray()); delegate.setOutputStream(out); out.flush(); contentMsg.getInterceptorChain().resume(); } }; DefaultMuleMessage muleResMsg = new DefaultMuleMessage(outputHandler); ExchangeImpl exchange = new ExchangeImpl(); exchange.setInMessage(m); m.put(CxfConstants.MULE_MESSAGE, muleReqMsg); exchange.put(CxfConstants.MULE_MESSAGE, muleResMsg); // invoke the actual web service up until right before we serialize the response d.getMessageObserver().onMessage(m); // Handle a fault if there is one. Message faultMsg = m.getExchange().getOutFaultMessage(); if (faultMsg != null) { Exception ex = faultMsg.getContent(Exception.class); if (ex != null) { ExceptionPayload exceptionPayload = new DefaultExceptionPayload(new Exception("")); ctx.getMessage().setExceptionPayload(exceptionPayload); } } return muleResMsg; } catch (MuleException e) { logger.warn("Could not dispatch message to CXF!", e); throw e; } } private void setPayload(MuleEventContext ctx, final MessageImpl m, Object payload) throws TransformerException { if (payload instanceof InputStream) { m.put(Message.ENCODING, ctx.getEncoding()); m.setContent(InputStream.class, payload); } else if (payload instanceof Reader) { m.setContent(XMLStreamReader.class, StaxUtils.createXMLStreamReader((Reader) payload)); } else if (payload instanceof byte[]) { m.setContent(InputStream.class, new ByteArrayInputStream((byte[]) payload)); } else if (payload instanceof StaxSource) { m.setContent(XMLStreamReader.class, ((StaxSource) payload).getXMLStreamReader()); } else if (payload instanceof Source) { m.setContent(XMLStreamReader.class, StaxUtils.createXMLStreamReader((Source) payload)); } else if (payload instanceof XMLStreamReader) { m.setContent(XMLStreamReader.class, (XMLStreamReader) payload); } else if (payload instanceof Document) { DOMSource source = new DOMSource((Node) payload); m.setContent(XMLStreamReader.class, StaxUtils.createXMLStreamReader(source)); } else { InputStream is = (InputStream) ctx.transformMessage(InputStream.class); m.put(Message.ENCODING, ctx.getEncoding()); m.setContent(InputStream.class, is); } } /** * Gets the stream representation of the current message. If the message is set * for streaming the input stream on the UMOStreamMEssageAdapter will be used, * otherwise a byteArrayInputStream will be used to hold the byte[] * representation of the current message. * * @param context the event context * @return The inputstream for the current message * @throws MuleException */ protected InputStream getMessageStream(MuleEventContext context) throws MuleException { InputStream is; Object eventMsgPayload = context.transformMessage(); if (eventMsgPayload instanceof InputStream) { is = (InputStream) eventMsgPayload; } else { is = (InputStream) context.transformMessage(InputStream.class); } return is; } protected String getSoapAction(MuleMessage message) { String action = (String) message.getProperty(SoapConstants.SOAP_ACTION_PROPERTY); if (action != null && action.startsWith("\"") && action.endsWith("\"") && action.length() >= 2) { action = action.substring(1, action.length() - 1); } return action; } public Bus getBus() { return bus; } public void setBus(Bus bus) { this.bus = bus; } class ResponseListener implements MessageObserver { private Message message; public CachedOutputStream getCachedStream() { return message.getContent(CachedOutputStream.class); } public Message getMessage() { return message; } public synchronized void onMessage(Message message) { this.message = message; } } public void initialise() throws InitialisationException { if (bus == null) { throw new InitialisationException(MessageFactory.createStaticMessage("No Cxf bus instance, this component has not been initialized properly."), this); } } public void start() throws MuleException { // nothing to do } public void stop() throws MuleException { // nothing to do } public void dispose() { // template method }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?