📄 httpmessagereceiver.java
字号:
/* * $Id: HttpMessageReceiver.java 12918 2008-10-06 13:54:19Z dirk.olmes $ * -------------------------------------------------------------------------------------- * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com * * The software in this package is published under the terms of the CPAL v1.0 * license, a copy of which has been included with this distribution in the * LICENSE.txt file. */package org.mule.transport.http;import org.mule.DefaultMuleEvent;import org.mule.DefaultMuleMessage;import org.mule.DefaultMuleSession;import org.mule.MuleServer;import org.mule.NullSessionHandler;import org.mule.OptimizedRequestContext;import org.mule.RequestContext;import org.mule.api.MessagingException;import org.mule.api.MuleEvent;import org.mule.api.MuleException;import org.mule.api.MuleMessage;import org.mule.api.config.MuleProperties;import org.mule.api.endpoint.EndpointURI;import org.mule.api.endpoint.ImmutableEndpoint;import org.mule.api.endpoint.InboundEndpoint;import org.mule.api.lifecycle.CreateException;import org.mule.api.service.Service;import org.mule.api.transformer.TransformerException;import org.mule.api.transport.Connector;import org.mule.api.transport.MessageAdapter;import org.mule.api.transport.MessageReceiver;import org.mule.transport.ConnectException;import org.mule.transport.NullPayload;import org.mule.transport.http.i18n.HttpMessages;import org.mule.transport.tcp.TcpConnector;import org.mule.transport.tcp.TcpMessageReceiver;import org.mule.util.MapUtils;import org.mule.util.ObjectUtils;import org.mule.util.monitor.Expirable;import java.io.IOException;import java.net.Socket;import java.net.SocketAddress;import java.util.HashMap;import java.util.Iterator;import java.util.Map;import javax.resource.spi.work.Work;import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;import org.apache.commons.httpclient.Cookie;import org.apache.commons.httpclient.Header;import org.apache.commons.httpclient.HttpVersion;import org.apache.commons.httpclient.cookie.MalformedCookieException;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;/** * <code>HttpMessageReceiver</code> is a simple http server that can be used to * listen for HTTP requests on a particular port. */public class HttpMessageReceiver extends TcpMessageReceiver{ protected final Log logger = LogFactory.getLog(getClass()); public HttpMessageReceiver(Connector connector, Service service, InboundEndpoint endpoint) throws CreateException { super(connector, service, endpoint); } // @Override protected Work createWork(Socket socket) throws IOException { return new HttpWorker(socket); } // @Override protected void doConnect() throws ConnectException { // If we already have an endpoint listening on this socket don't try and // start another serversocket if (this.shouldConnect()) { super.doConnect(); } } protected boolean shouldConnect() { StringBuffer requestUri = new StringBuffer(80); requestUri.append(endpoint.getProtocol()).append("://"); requestUri.append(endpoint.getEndpointURI().getHost()); requestUri.append(':').append(endpoint.getEndpointURI().getPort()); requestUri.append('*'); MessageReceiver[] receivers = connector.getReceivers(requestUri.toString()); for (int i = 0; i < receivers.length; i++) { if (receivers[i].isConnected()) { return false; } } return true; } // @Override protected MuleMessage handleUnacceptedFilter(MuleMessage message) { if (logger.isDebugEnabled()) { logger.debug("Message request '" + message.getProperty(HttpConnector.HTTP_REQUEST_PROPERTY) + "' is being rejected since it does not match the filter on this endpoint: " + endpoint); } message.setProperty(HttpConnector.HTTP_STATUS_PROPERTY, String.valueOf(HttpConstants.SC_NOT_ACCEPTABLE)); return message; } protected class HttpWorker implements Work, Expirable { private HttpServerConnection conn; private String cookieSpec; private boolean enableCookies; private String remoteClientAddress; public HttpWorker(Socket socket) throws IOException { String encoding = endpoint.getEncoding(); if (encoding == null) { encoding = MuleServer.getMuleContext().getConfiguration().getDefaultEncoding(); } conn = new HttpServerConnection(socket, encoding, (HttpConnector) connector); cookieSpec = MapUtils.getString(endpoint.getProperties(), HttpConnector.HTTP_COOKIE_SPEC_PROPERTY, ((HttpConnector) connector).getCookieSpec()); enableCookies = MapUtils.getBooleanValue(endpoint.getProperties(), HttpConnector.HTTP_ENABLE_COOKIES_PROPERTY, ((HttpConnector) connector).isEnableCookies()); final SocketAddress clientAddress = socket.getRemoteSocketAddress(); if (clientAddress != null) { remoteClientAddress = clientAddress.toString(); } } public void expired() { if (conn.isOpen()) { conn.close(); } } public void run() { long keepAliveTimeout = ((TcpConnector) connector).getKeepAliveTimeout(); try { do { conn.setKeepAlive(false); // Only add a monitor if the timeout has been set if (keepAliveTimeout > 0) { ((HttpConnector) connector).getKeepAliveMonitor().addExpirable( keepAliveTimeout, TimeUnit.MILLISECONDS, this); } HttpRequest request = conn.readRequest(); if (request == null) { break; } // Ensure that we drop any monitors, we'll add again for the next request ((HttpConnector) connector).getKeepAliveMonitor().removeExpirable(this); conn.writeResponse(processRequest(request)); } while (conn.isKeepAlive()); } catch (Exception e) { handleException(e); } finally { logger.info("Closing HTTP connection."); if (conn.isOpen()) { conn.close(); conn = null; // Ensure that we drop any monitors ((HttpConnector) connector).getKeepAliveMonitor().removeExpirable(this); } } } protected HttpResponse processRequest(HttpRequest request) throws MuleException, IOException { RequestLine requestLine = request.getRequestLine(); String method = requestLine.getMethod(); if (method.equals(HttpConstants.METHOD_HEAD)) { return doHead(requestLine); } else if (method.equals(HttpConstants.METHOD_GET) || method.equals(HttpConstants.METHOD_POST) || method.equals(HttpConstants.METHOD_OPTIONS) || method.equals(HttpConstants.METHOD_PUT) || method.equals(HttpConstants.METHOD_DELETE) || method.equals(HttpConstants.METHOD_TRACE) || method.equals(HttpConstants.METHOD_CONNECT)) { return doRequest(request, requestLine); } else { return doBad(requestLine); } } protected HttpResponse doHead(RequestLine requestLine) throws MuleException { MuleMessage message = new DefaultMuleMessage(NullPayload.getInstance()); MuleEvent event = new DefaultMuleEvent(message, endpoint, new DefaultMuleSession(message, new NullSessionHandler(), connector.getMuleContext()), true); OptimizedRequestContext.unsafeSetEvent(event); HttpResponse response = new HttpResponse(); response.setStatusLine(requestLine.getHttpVersion(), HttpConstants.SC_OK); return transformResponse(response); } protected HttpResponse doRequest(HttpRequest request, RequestLine requestLine) throws IOException, MuleException { Map headers = parseHeaders(request); // TODO Mule 2.0 generic way to set stream message adapter MessageAdapter adapter = buildStandardAdapter(request, headers); MuleMessage message = new DefaultMuleMessage(adapter); if (logger.isDebugEnabled()) { logger.debug(message.getProperty(HttpConnector.HTTP_REQUEST_PROPERTY)); } // determine if the request path on this request denotes a different receiver MessageReceiver receiver = getTargetReceiver(message, endpoint); HttpResponse response; // the response only needs to be transformed explicitly if // A) the request was not served or B) a null result was returned if (receiver != null) { preRouteMessage(message); MuleMessage returnMessage = receiver.routeMessage(message, endpoint.isSynchronous(), null); Object tempResponse; if (returnMessage != null) { tempResponse = returnMessage.getPayload(); } else { tempResponse = NullPayload.getInstance(); } // This removes the need for users to explicitly adding the response transformer // ObjectToHttpResponse in their config if (tempResponse instanceof HttpResponse) { response = (HttpResponse) tempResponse; } else { response = transformResponse(returnMessage); } response.disableKeepAlive(!((HttpConnector) connector).isKeepAlive()); // Check if endpoint has a keep-alive property configured. Note the translation from // keep-alive in the schema to keepAlive here.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -