📄 tcpconnection.java
字号:
/* * $Id: TcpConnection.java,v 1.6 2006/05/09 01:32:16 bondolo Exp $ * * Copyright (c) 2001 Sun Microsystems, Inc. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, * if any, must include the following acknowledgment: * "This product includes software developed by the * Sun Microsystems, Inc. for Project JXTA." * Alternately, this acknowledgment may appear in the software itself, * if and wherever such third-party acknowledgments normally appear. * * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" * must not be used to endorse or promote products derived from this * software without prior written permission. For written * permission, please contact Project JXTA at http://www.jxta.org. * * 5. Products derived from this software may not be called "JXTA", * nor may "JXTA" appear in their name, without prior written * permission of Sun. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL SUN MICROSYSTEMS OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of Project JXTA. For more * information on Project JXTA, please see * <http://www.jxta.org/>. * * This license is based on the BSD license adopted by the Apache Foundation. */package net.jxta.test.util;import java.io.InputStream;import java.net.InetAddress;import java.net.Socket;import java.util.Collections;import java.util.List;import java.util.ArrayList;import java.io.EOFException;import java.io.IOException;import java.io.InterruptedIOException;import org.apache.log4j.Logger;import org.apache.log4j.Level;import net.jxta.document.MimeMediaType;import net.jxta.endpoint.EndpointAddress;import net.jxta.endpoint.Message;import net.jxta.endpoint.WireFormatMessage;import net.jxta.endpoint.WireFormatMessageFactory;import net.jxta.id.ID;import net.jxta.peer.PeerID;import net.jxta.util.LimitInputStream;import net.jxta.util.WatchedInputStream;import net.jxta.util.WatchedOutputStream;import net.jxta.impl.endpoint.msgframing.MessagePackageHeader;import net.jxta.impl.endpoint.msgframing.WelcomeMessage;import net.jxta.impl.endpoint.IPUtils;/** * Low-level TcpMessenger * */public class TcpConnection implements Runnable { private transient volatile boolean closed = false; private volatile boolean closingDueToFailure = false; private EndpointAddress dstAddress = null; private EndpointAddress fullDstAddress = null; private transient InetAddress inetAddress = null; private boolean initiator; private transient WatchedInputStream inputStream = null; private transient WelcomeMessage itsWelcome = null; private transient long lastUsed = System.currentTimeMillis(); private transient WelcomeMessage myWelcome = null; private transient WatchedOutputStream outputStream = null; private transient int port = 0; static final int SendBufferSize = 64 * 1024; // 64 KBytes static final int RecvBufferSize = 64 * 1024; // 64 KBytes static final int LingerDelay = 2 * 60 * 1000; static final int LongTimeout = 30 * 60 * 1000; static final int ShortTimeout = 10 * 1000; private transient Thread recvThread = null; private transient Socket sharedSocket = null; private int connectionTimeOut = 10 * 1000; // Connections that are watched often - io in progress List ShortCycle = Collections.synchronizedList(new ArrayList()); // Connections that are watched rarely - idle or waiting for input List LongCycle = Collections.synchronizedList(new ArrayList()); /** * only one outgoing message per connection. */ private transient Object writeLock = new String( "Write Lock" ); private final static Logger LOG = Logger.getLogger(TcpConnection.class.getName()); private MessageListener listener = null; private final static MimeMediaType appMsg = new MimeMediaType("application/x-jxta-msg"); /** * Creates a new TcpConnection for the specified destination address. * *@param destaddr the destination address of this connection. *@param p the transport which this connection is part of. *@exception IOException Description of the Exception *@throws IOException for failures in creating the connection. */ public TcpConnection(EndpointAddress destaddr, InetAddress from, PeerID id, MessageListener listener) throws IOException { initiator = true; this.listener = listener; this.fullDstAddress = destaddr; this.dstAddress = new EndpointAddress(destaddr, null, null); if (LOG.isEnabledFor(Level.INFO)) { LOG.info("New TCP Connection to : " + dstAddress); } String tmp = destaddr.getProtocolAddress(); int portIndex = tmp.lastIndexOf(":"); if (portIndex == -1) { throw new IllegalArgumentException("Invalid EndpointAddress (port # missing) "); } try { port = Integer.valueOf(tmp.substring(portIndex + 1)).intValue(); } catch (NumberFormatException caught) { throw new IllegalArgumentException("Invalid EndpointAddress (port # invalid) "); } // Check for bad port number. if ((port <= 0) || (port > 65535)) { throw new IllegalArgumentException("Invalid port number in EndpointAddress: " + port); } inetAddress = InetAddress.getByName(tmp.substring(0, portIndex)); try { sharedSocket = IPUtils.connectToFrom(inetAddress, port, from, 0, connectionTimeOut); startSocket(id); } catch (IOException e) { // If we failed for any reason, make sure the socket is closed. // We're the only one to know about it. if (sharedSocket != null) { sharedSocket.close(); } throw e; } } /** * Creates a new connection from an incoming socket * *@param incSocket the incoming socket. *@param p Description of the Parameter *@exception IOException Description of the Exception *@throws IOException for failures in creating the connection. */ public TcpConnection(Socket incSocket, PeerID id, MessageListener listener) throws IOException { try { if (LOG.isEnabledFor(Level.INFO)) { LOG.info("Connection from " + incSocket.getInetAddress().getHostAddress() + ":" + incSocket.getPort()); } initiator = false; this.listener = listener; inetAddress = incSocket.getInetAddress(); port = incSocket.getPort(); // Temporarily, our address for inclusion in the welcome message // response. dstAddress = new EndpointAddress("tcp", inetAddress.getHostAddress() + ":"+ port, null, null); fullDstAddress = dstAddress; sharedSocket = incSocket; startSocket(id); // The correct value for dstAddr: that of the other party. dstAddress = itsWelcome.getPublicAddress(); fullDstAddress = dstAddress; // Reset the thread name now that we have a meaningfull // destination address and remote welcome msg. setThreadName(); } catch (IOException e) { throw e; } } /** * Set the last used time for this connection in absolute milliseconds. * *@param time absolute time in milliseconds. */ private void setLastUsed(long time) { lastUsed = time; } public WelcomeMessage getWM() { return myWelcome; } /** * Sets the threadName attribute of the TcpConnection object */ private synchronized void setThreadName() { if (recvThread != null) { try { recvThread.setName("TCP receive : " + itsWelcome.getPeerID() + " on address " + dstAddress); } catch (Exception ez1) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Cannot change thread name", ez1); } } } } /** * Gets the connectionAddress attribute of the TcpConnection object * *@return The connectionAddress value */ public EndpointAddress getConnectionAddress() { // Somewhat confusing but destinationAddress is the name of that thing // for the welcome message. return itsWelcome.getDestinationAddress(); } /** * Gets the destinationAddress attribute of the TcpConnection object * *@return The destinationAddress value */ public EndpointAddress getDestinationAddress() { return (EndpointAddress) dstAddress.clone(); } /** * Gets the destinationPeerID attribute of the TcpConnection object * *@return The destinationPeerID value */ public ID getDestinationPeerID() { return itsWelcome.getPeerID(); } /** * Return the absolute time in milliseconds at which this Connection was * last used. * *@return absolute time in milliseconds. */ public long getLastUsed() { return lastUsed; } /** * return the current connection status. * *@return The connected value */ public boolean isConnected() { return ((recvThread != null) && (!closed)); } /** * Soft close of the connection. Messages can no longer be sent, but any in * the queue will be flushed. */ public synchronized void close() { if (LOG.isEnabledFor(Level.INFO)) { LOG.info((closingDueToFailure ? "Failure" : "Normal") + " close of socket to : " + dstAddress + " / " + inetAddress.getHostAddress() + ":" + port); if (closingDueToFailure) { LOG.info( "Failure stack trace", new Throwable("stack trace")); } } if (!closed) { setLastUsed(0); // we idle now. Way idle. closeIOs(); closed = true; if (recvThread != null) { recvThread.interrupt(); } } } /**
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -