tcpmessagereceiver.java
来自「提供ESB 应用mule源代码 提供ESB 应用mule源代码」· Java 代码 · 共 424 行
JAVA
424 行
/* * $Id: TcpMessageReceiver.java 12763 2008-09-26 22:35:25Z aperepel $ * -------------------------------------------------------------------------------------- * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com * * The software in this package is published under the terms of the CPAL v1.0 * license, a copy of which has been included with this distribution in the * LICENSE.txt file. */package org.mule.transport.tcp;import org.mule.DefaultMuleMessage;import org.mule.api.MuleException;import org.mule.api.config.MuleProperties;import org.mule.api.endpoint.InboundEndpoint;import org.mule.api.lifecycle.CreateException;import org.mule.api.lifecycle.Disposable;import org.mule.api.lifecycle.DisposeException;import org.mule.api.retry.RetryCallback;import org.mule.api.retry.RetryContext;import org.mule.api.service.Service;import org.mule.api.transaction.Transaction;import org.mule.api.transaction.TransactionException;import org.mule.api.transport.Connector;import org.mule.config.i18n.CoreMessages;import org.mule.transport.AbstractMessageReceiver;import org.mule.transport.AbstractReceiverResourceWorker;import org.mule.transport.ConnectException;import org.mule.transport.tcp.i18n.TcpMessages;import org.mule.util.monitor.Expirable;import java.io.BufferedInputStream;import java.io.BufferedOutputStream;import java.io.IOException;import java.io.InputStream;import java.io.OutputStream;import java.net.ServerSocket;import java.net.Socket;import java.net.SocketAddress;import java.net.SocketTimeoutException;import java.net.URI;import java.util.Iterator;import java.util.List;import javax.resource.spi.work.Work;import javax.resource.spi.work.WorkException;import javax.resource.spi.work.WorkManager;import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;/** * <code>TcpMessageReceiver</code> acts like a TCP server to receive socket * requests. */public class TcpMessageReceiver extends AbstractMessageReceiver implements Work{ private ServerSocket serverSocket = null; public TcpMessageReceiver(Connector connector, Service service, InboundEndpoint endpoint) throws CreateException { super(connector, service, endpoint); } protected void doConnect() throws ConnectException { disposing.set(false); URI uri = endpoint.getEndpointURI().getUri(); try { serverSocket = ((TcpConnector) connector).getServerSocket(uri); } catch (Exception e) { throw new org.mule.transport.ConnectException(TcpMessages.failedToBindToUri(uri), e, this); } try { getWorkManager().scheduleWork(this, WorkManager.INDEFINITE, null, connector); } catch (WorkException e) { throw new ConnectException(CoreMessages.failedToScheduleWork(), e, this); } } protected void doDisconnect() throws ConnectException { // this will cause the server thread to quit disposing.set(true); try { if (serverSocket != null) { if (logger.isDebugEnabled()) { logger.debug("Closing: " + serverSocket); } serverSocket.close(); } } catch (IOException e) { logger.warn("Failed to close server socket: " + e.getMessage(), e); } } protected void doStart() throws MuleException { // nothing to do } protected void doStop() throws MuleException { // nothing to do } /** * Obtain the serverSocket * @return the server socket for this server */ public ServerSocket getServerSocket() { return serverSocket; } public void run() { while (!disposing.get()) { if (connector.isStarted() && !disposing.get()) { try { retryTemplate.execute(new RetryCallback() { public void doWork(RetryContext context) throws Exception { Socket socket = null; try { socket = serverSocket.accept(); } catch (Exception e) { if (!connector.isDisposed() && !disposing.get()) { throw new ConnectException(e, null); } } if (socket != null) { Work work = createWork(socket); getWorkManager().scheduleWork(work, WorkManager.INDEFINITE, null, connector); } } public String getWorkDescription() { return getConnectionDescription(); } }); } catch (Exception e) { handleException(e); } } } } public void release() { // template method } protected void doDispose() { try { if (serverSocket != null && !serverSocket.isClosed()) { if (logger.isDebugEnabled()) { logger.debug("Closing: " + serverSocket); } serverSocket.close(); } serverSocket = null; } catch (Exception e) { logger.error(new DisposeException(TcpMessages.failedToCloseSocket(), e, this)); } logger.info("Closed Tcp port"); } protected Work createWork(Socket socket) throws IOException { return new TcpWorker(socket, this); } protected class TcpWorker extends AbstractReceiverResourceWorker implements Disposable, Expirable { protected Socket socket = null; protected TcpInputStream dataIn; protected InputStream underlyingIn; protected OutputStream dataOut; protected TcpProtocol protocol; protected boolean dataInWorkFinished = false; protected Object notify = new Object(); private boolean moreMessages = true; public TcpWorker(Socket socket, AbstractMessageReceiver receiver) throws IOException { super(socket, receiver, ((TcpConnector) connector).getTcpProtocol().createResponse(socket)); this.socket = socket; final TcpConnector tcpConnector = ((TcpConnector) connector); protocol = tcpConnector.getTcpProtocol(); try { tcpConnector.configureSocket(TcpConnector.SERVER, socket); underlyingIn = new BufferedInputStream(socket.getInputStream()); dataIn = new TcpInputStream(underlyingIn) { public void close() throws IOException { // Don't actually close the stream, we just want to know if the // we want to stop receiving messages on this sockete. // The Protocol is responsible for closing this. dataInWorkFinished = true; moreMessages = false; synchronized (notify) { notify.notifyAll(); } } }; dataOut = new BufferedOutputStream(socket.getOutputStream()); } catch (IOException e) { logger.error("Failed to set Socket properties: " + e.getMessage(), e); } } public void expired() { dispose(); } public void dispose() { releaseSocket(); } public void release() { waitForStreams(); releaseSocket(); } private void waitForStreams() { // The Message with the InputStream as a payload can be dispatched // into a different thread, in which case we need to wait for it to // finish streaming if (!dataInWorkFinished) { synchronized (notify) { if (!dataInWorkFinished) { try { notify.wait(); } catch (InterruptedException e) { } } } } } /** * Releases the socket when the input stream is closed. */ private void releaseSocket() { try { if (socket != null && !socket.isClosed()) { if (logger.isDebugEnabled()) { // some dirty workaround for IBM JSSE's SSL implementation, // which closes sockets asynchronously by that point. final SocketAddress socketAddress = socket.getLocalSocketAddress(); if (socketAddress == null) { logger.debug("Listener has already been closed by other process."); } else { logger.debug("Closing listener: " + socketAddress); } } shutdownSocket(); socket.close(); } } catch (IOException e) { logger.warn("Socket close failed with: " + e); } } protected void shutdownSocket() throws IOException { try { socket.shutdownOutput(); } catch (UnsupportedOperationException e) { //Ignore, not supported by ssl sockets } } protected void bindTransaction(Transaction tx) throws TransactionException { //nothing to do } protected Object getNextMessage(Object resource) throws Exception { long keepAliveTimeout = ((TcpConnector)connector).getKeepAliveTimeout(); Object readMsg = null; try { // Create a monitor if expiry was set if(keepAliveTimeout > 0) { ((TcpConnector) connector).getKeepAliveMonitor().addExpirable(keepAliveTimeout, TimeUnit.MILLISECONDS, this); } readMsg = protocol.read(dataIn); // There was some action so we can clear the monitor ((TcpConnector) connector).getKeepAliveMonitor().removeExpirable(this); if (dataIn.isStreaming()) { moreMessages = false; } return readMsg; } catch (SocketTimeoutException e) { ((TcpConnector) connector).getKeepAliveMonitor().removeExpirable(this); } finally { if (readMsg == null) { // Protocols can return a null object, which means we're done // reading messages for now and can mark the stream for closing later. // Also, exceptions can be thrown, in which case we're done reading. dataIn.close(); } } return null; } protected boolean hasMoreMessages(Object message) { return !socket.isClosed() && !dataInWorkFinished && !disposing.get() && moreMessages; } //@Override protected void handleResults(List messages) throws Exception { //should send back only if remote synch is set or no outbound endpoints if (endpoint.isRemoteSync() || !service.getOutboundRouter().hasEndpoints()) { for (Iterator iterator = messages.iterator(); iterator.hasNext();) { Object o = iterator.next(); protocol.write(dataOut, o); dataOut.flush(); } } } protected void preRouteMuleMessage(final DefaultMuleMessage message) throws Exception { super.preRouteMuleMessage(message); final SocketAddress clientAddress = socket.getRemoteSocketAddress(); if (clientAddress != null) { message.setProperty(MuleProperties.MULE_REMOTE_CLIENT_ADDRESS, clientAddress.toString()); } } }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?