📄 tcpconnection.java
字号:
/* * * $Id: TcpConnection.java,v 1.72 2006/06/02 18:25:31 bondolo Exp $ * * Copyright (c) 2001 Sun Microsystems, Inc. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, * if any, must include the following acknowledgment: * "This product includes software developed by the * Sun Microsystems, Inc. for Project JXTA." * Alternately, this acknowledgment may appear in the software itself, * if and wherever such third-party acknowledgments normally appear. * * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" * must not be used to endorse or promote products derived from this * software without prior written permission. For written * permission, please contact Project JXTA at http://www.jxta.org. * * 5. Products derived from this software may not be called "JXTA", * nor may "JXTA" appear in their name, without prior written * permission of Sun. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL SUN MICROSYSTEMS OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of Project JXTA. For more * information on Project JXTA, please see * <http://www.jxta.org/>. * * This license is based on the BSD license adopted by the Apache Foundation. */package net.jxta.impl.endpoint.tcp;import java.io.BufferedOutputStream;import java.io.EOFException;import java.io.IOException;import java.io.InputStream;import java.io.InterruptedIOException;import java.io.OutputStream;import java.net.InetAddress;import java.net.Socket;import java.net.SocketException;import net.jxta.document.MimeMediaType;import net.jxta.endpoint.EndpointAddress;import net.jxta.endpoint.Message;import net.jxta.endpoint.WireFormatMessage;import net.jxta.endpoint.WireFormatMessageFactory;import net.jxta.id.ID;import net.jxta.impl.endpoint.IPUtils;import net.jxta.impl.endpoint.msgframing.MessagePackageHeader;import net.jxta.impl.endpoint.msgframing.WelcomeMessage;import net.jxta.impl.endpoint.transportMeter.TransportBindingMeter;import net.jxta.impl.endpoint.transportMeter.TransportMeterBuildSettings;import net.jxta.impl.util.TimeUtils;import net.jxta.peer.PeerID;import net.jxta.util.LimitInputStream;import net.jxta.util.WatchedInputStream;import net.jxta.util.WatchedOutputStream;import org.apache.log4j.Level;import org.apache.log4j.Logger;/** * Low-level TcpMessenger * */class TcpConnection implements Runnable { /** * Log4J Logger */ private final static transient Logger LOG = Logger.getLogger(TcpConnection.class.getName()); private static final MimeMediaType appMsg = new MimeMediaType("application/x-jxta-msg").intern(); private final TcpTransport proto; private EndpointAddress dstAddress = null; private EndpointAddress fullDstAddress = null; private transient InetAddress inetAddress = null; private transient int port = 0; private transient volatile boolean closed = false; private transient Thread recvThread = null; private transient WelcomeMessage myWelcome = null; private transient WelcomeMessage itsWelcome = null; private final transient long firstUsed = TimeUtils.timeNow(); private transient long lastUsed = TimeUtils.timeNow(); private transient Socket sharedSocket = null; private transient WatchedOutputStream woutputStream = null; private transient WatchedInputStream winputStream = null; private transient OutputStream outputStream = null; private transient InputStream inputStream = null; private TransportBindingMeter transportBindingMeter; private boolean initiator; private long connectionBegunTime; private boolean closingDueToFailure = false; /** * only one outgoing message at a time per connection. */ private final transient Object writeLock; /** * Creates a new TcpConnection for the specified destination address. * * @param destaddr the destination address of this connection. * @param p the transport which this connection is part of. * @throws IOException for failures in creating the connection. */ TcpConnection(EndpointAddress destaddr, TcpTransport p) throws IOException { initiator = true; proto = p; this.fullDstAddress = destaddr; this.dstAddress = new EndpointAddress(destaddr, null, null); String protoAddr = destaddr.getProtocolAddress(); int portIndex = protoAddr.lastIndexOf(":"); if (portIndex == -1) { throw new IllegalArgumentException("Invalid Protocol Address (port # missing) "); } String portString = protoAddr.substring(portIndex + 1); try { port = Integer.valueOf(portString).intValue(); } catch (NumberFormatException caught) { throw new IllegalArgumentException("Invalid Protocol Address (port # invalid): " + portString); } // Check for bad port number. if ((port <= 0) || (port > 65535)) { throw new IllegalArgumentException("Invalid port number in Protocol Address : " + port); } String hostString = protoAddr.substring(0, portIndex); inetAddress = InetAddress.getByName(hostString); if (LOG.isEnabledFor(Level.INFO)) { LOG.info("New TCP Connection to : " + dstAddress + " / " + inetAddress.getHostAddress() + ":" + port); } writeLock = new String("TCP write lock for " + inetAddress.getHostAddress() + ":" + port); // See if we're attempting to use the loopback address. // And if so, is the peer configured for the loopback network only? // (otherwise the connection is not permitted). Btw, the otherway around // is just as wrong, so we check both at once and pretend it cannot work, // even if it might have. // FIXME 20041130 This is not an appropriate check if the other peer is // running on the same machine and the InetAddress.getByName returns the // loopback address. if (inetAddress.isLoopbackAddress() != proto.usingInterface.isLoopbackAddress()) { throw new IOException("Network unreachable--connect to loopback attempted."); } try { if (TransportMeterBuildSettings.TRANSPORT_METERING) { connectionBegunTime = System.currentTimeMillis(); } /* * DVT KLUDGE TO MAKE THE ROUTER'S LIFE MISERABLE. * to be removed after new ad-hoc routing dvt. */ int rp = proto.getRestrictionPort(); if (rp != -1 && (port < rp - 1 || port > rp + 1)) { throw new IOException("Simulated separate networks killed outgoing cnx."); } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Connecting to " + inetAddress.getHostAddress() + ":" + port + " via " + proto.usingInterface.getHostAddress() + ":0"); } sharedSocket = IPUtils.connectToFrom(inetAddress, port, proto.usingInterface, 0, proto.connectionTimeOut); startSocket(); if (TransportMeterBuildSettings.TRANSPORT_METERING) { transportBindingMeter = proto.getUnicastTransportBindingMeter((PeerID) getDestinationPeerID(), dstAddress); if (transportBindingMeter != null) { transportBindingMeter.connectionEstablished(initiator, System.currentTimeMillis() - connectionBegunTime); } // Fix-Me: We need to add the bytes from the Welcome Messages to the transportBindingMeter, iam@jxta.org } } catch (IOException e) { if (TransportMeterBuildSettings.TRANSPORT_METERING) { transportBindingMeter = proto.getUnicastTransportBindingMeter(null, dstAddress); if (transportBindingMeter != null) { transportBindingMeter.connectionFailed(initiator, System.currentTimeMillis() - connectionBegunTime); } // Fix-Me: We need to add the bytes from the Welcome Messages to the transportBindingMeter, iam@jxta.org } // If we failed for any reason, make sure the socket is closed. // We're the only one to know about it. if (sharedSocket != null) { sharedSocket.close(); } throw e; } } /** * Creates a new connection from an incoming socket * * @param incSocket the incoming socket. * @param TcpTransport the transport we are working for. * @throws IOException for failures in creating the connection. */ TcpConnection(Socket incSocket, TcpTransport p) throws IOException { proto = p; try { if (LOG.isEnabledFor(Level.INFO)) { LOG.info("Connection from " + incSocket.getInetAddress().getHostAddress() + ":" + incSocket.getPort()); } initiator = false; if (TransportMeterBuildSettings.TRANSPORT_METERING) { connectionBegunTime = System.currentTimeMillis(); } inetAddress = incSocket.getInetAddress(); port = incSocket.getPort(); writeLock = new String("TCP write lock for " + inetAddress.getHostAddress() + ":" + port); // Temporarily, our address for inclusion in the welcome message response. dstAddress = new EndpointAddress(proto.getProtocolName(), inetAddress.getHostAddress() + ":" + port, null, null); fullDstAddress = dstAddress; sharedSocket = incSocket; startSocket(); // The correct value for dstAddr: that of the other party. dstAddress = itsWelcome.getPublicAddress(); fullDstAddress = dstAddress; // Reset the thread name now that we have a meaningfull // destination address and remote welcome msg. setThreadName(); if (TransportMeterBuildSettings.TRANSPORT_METERING) { transportBindingMeter = proto.getUnicastTransportBindingMeter((PeerID) getDestinationPeerID(), dstAddress); if (transportBindingMeter != null) { transportBindingMeter.connectionEstablished(initiator, System.currentTimeMillis() - connectionBegunTime); } } } catch (IOException e) { if (TransportMeterBuildSettings.TRANSPORT_METERING) { transportBindingMeter = proto.getUnicastTransportBindingMeter(null, dstAddress); if (transportBindingMeter != null) { transportBindingMeter.connectionFailed(initiator, System.currentTimeMillis() - connectionBegunTime); } } throw e; } } /** * {@inheritDoc} */ public boolean equals(Object target) { if (this == target) { return true; } if (null == target) { return false; } if (target instanceof TcpConnection) { TcpConnection likeMe = (TcpConnection) target; return getDestinationAddress().equals(likeMe.getDestinationAddress()) && getDestinationPeerID().equals(likeMe.getDestinationPeerID()); } return false; } /** * {@inheritDoc} This is pointless and expensive since the socket already defines finalize protected void finalize() { closingDueToFailure = false; close();} */ /** * {@inheritDoc} */ public int hashCode() { return getDestinationPeerID().hashCode() + getDestinationAddress().hashCode(); } /** * {@inheritDoc} * * <p/>Implementation for debugging. */ public String toString() { return super.toString() + ":" + ((null != itsWelcome) ? itsWelcome.getPeerID().toString() : "unknown") + " on address " + ((null != dstAddress) ? dstAddress.toString() : "unknown"); } private synchronized void setThreadName() { Thread temp = recvThread; if (temp != null) { try { temp.setName("TCP receive : " + itsWelcome.getPeerID() + " on address " + dstAddress); } catch (Exception ez1) { if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("Cannot change thread name", ez1); } } } } public EndpointAddress getDestinationAddress() { return (EndpointAddress) dstAddress.clone(); } public EndpointAddress getConnectionAddress() { // Somewhat confusing but destinationAddress is the name of that thing // for the welcome message. return itsWelcome.getDestinationAddress(); } public ID getDestinationPeerID() { return itsWelcome.getPeerID(); } private void startSocket() throws IOException { sharedSocket.setKeepAlive(true); int useBufferSize = Math.max(TcpTransport.ChunkSize, sharedSocket.getSendBufferSize()); sharedSocket.setSendBufferSize(useBufferSize); useBufferSize = Math.max(TcpTransport.RecvBufferSize, sharedSocket.getReceiveBufferSize()); sharedSocket.setReceiveBufferSize(useBufferSize);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -