⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tcpconnection.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/* * Copyright (c) 2001-2007 Sun Microsystems, Inc.  All rights reserved. *   *  The Sun Project JXTA(TM) Software License *   *  Redistribution and use in source and binary forms, with or without  *  modification, are permitted provided that the following conditions are met: *   *  1. Redistributions of source code must retain the above copyright notice, *     this list of conditions and the following disclaimer. *   *  2. Redistributions in binary form must reproduce the above copyright notice,  *     this list of conditions and the following disclaimer in the documentation  *     and/or other materials provided with the distribution. *   *  3. The end-user documentation included with the redistribution, if any, must  *     include the following acknowledgment: "This product includes software  *     developed by Sun Microsystems, Inc. for JXTA(TM) technology."  *     Alternately, this acknowledgment may appear in the software itself, if  *     and wherever such third-party acknowledgments normally appear. *   *  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must  *     not be used to endorse or promote products derived from this software  *     without prior written permission. For written permission, please contact  *     Project JXTA at http://www.jxta.org. *   *  5. Products derived from this software may not be called "JXTA", nor may  *     "JXTA" appear in their name, without prior written permission of Sun. *   *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, *  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND  *  FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN  *  MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,  *  OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF  *  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING  *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,  *  EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *   *  JXTA is a registered trademark of Sun Microsystems, Inc. in the United  *  States and other countries. *   *  Please see the license information page at : *  <http://www.jxta.org/project/www/license.html> for instructions on use of  *  the license in source files. *   *  ==================================================================== *   *  This software consists of voluntary contributions made by many individuals  *  on behalf of Project JXTA. For more information on Project JXTA, please see  *  http://www.jxta.org. *   *  This license is based on the BSD license adopted by the Apache Foundation.  */package net.jxta.test.util;import java.io.InputStream;import java.net.InetAddress;import java.net.Socket;import java.util.Collections;import java.util.List;import java.util.ArrayList;import java.io.EOFException;import java.io.IOException;import java.io.InterruptedIOException;import java.util.logging.Logger;import java.util.logging.Level;import net.jxta.logging.Logging;import net.jxta.document.MimeMediaType;import net.jxta.endpoint.EndpointAddress;import net.jxta.endpoint.Message;import net.jxta.endpoint.WireFormatMessage;import net.jxta.endpoint.WireFormatMessageFactory;import net.jxta.id.ID;import net.jxta.peer.PeerID;import net.jxta.util.LimitInputStream;import net.jxta.util.WatchedInputStream;import net.jxta.util.WatchedOutputStream;import net.jxta.impl.endpoint.msgframing.MessagePackageHeader;import net.jxta.impl.endpoint.msgframing.WelcomeMessage;import net.jxta.impl.endpoint.IPUtils;/** *  Low-level TcpMessenger * */public class TcpConnection implements Runnable {        private transient volatile boolean closed = false;    private volatile boolean closingDueToFailure = false;        private EndpointAddress dstAddress = null;    private EndpointAddress fullDstAddress = null;    private transient InetAddress inetAddress = null;    private boolean initiator;    private transient WatchedInputStream inputStream = null;    private transient WelcomeMessage itsWelcome = null;        private transient long lastUsed = System.currentTimeMillis();        private transient WelcomeMessage myWelcome = null;    private transient WatchedOutputStream outputStream = null;    private transient int port = 0;    static final int SendBufferSize = 64 * 1024; // 64 KBytes    static final int RecvBufferSize = 64 * 1024; // 64 KBytes    static final int LingerDelay = 2 * 60 * 1000;    static final int LongTimeout = 30 * 60 * 1000;    static final int ShortTimeout = 10 * 1000;        private transient Thread recvThread = null;    private transient Socket sharedSocket = null;    private int connectionTimeOut = 10 * 1000;        // Connections that are watched often - io in progress    List ShortCycle = Collections.synchronizedList(new ArrayList());        // Connections that are watched rarely - idle or waiting for input    List LongCycle = Collections.synchronizedList(new ArrayList());        /**     *  only one outgoing message per connection.     */    private transient Object writeLock = new String("Write Lock");    private final static Logger LOG = Logger.getLogger(TcpConnection.class.getName());    private MessageListener listener = null;    private final static MimeMediaType appMsg = new MimeMediaType("application/x-jxta-msg");        /**     *  Creates a new TcpConnection for the specified destination address.     *     *@param  destaddr         the destination address of this connection.     *@param  p                the transport which this connection is part of.     *@exception  IOException  Description of the Exception     *@throws  IOException     for failures in creating the connection.     */    public TcpConnection(EndpointAddress destaddr, InetAddress from, PeerID id, MessageListener listener) throws IOException {        initiator = true;        this.listener = listener;        this.fullDstAddress = destaddr;        this.dstAddress = new EndpointAddress(destaddr, null, null);                if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {            LOG.info("New TCP Connection to : " + dstAddress);        }                String tmp = destaddr.getProtocolAddress();        int portIndex = tmp.lastIndexOf(":");        if (portIndex == -1) {            throw new IllegalArgumentException("Invalid EndpointAddress (port # missing) ");        }                try {            port = Integer.valueOf(tmp.substring(portIndex + 1)).intValue();        } catch (NumberFormatException caught) {            throw new IllegalArgumentException("Invalid EndpointAddress (port # invalid) ");        }                // Check for bad port number.        if ((port <= 0) || (port > 65535)) {            throw new IllegalArgumentException("Invalid port number in EndpointAddress: " + port);        }        inetAddress = InetAddress.getByName(tmp.substring(0, portIndex));        try {            sharedSocket = IPUtils.connectToFrom(inetAddress, port, from, 0, connectionTimeOut);            startSocket(id);        } catch (IOException e) {            // If we failed for any reason, make sure the socket is closed.            // We're the only one to know about it.            if (sharedSocket != null) {                sharedSocket.close();            }            throw e;        }    }        /**     *  Creates a new connection from an incoming socket     *     *@param  incSocket        the incoming socket.     *@param  p                Description of the Parameter     *@exception  IOException  Description of the Exception     *@throws  IOException     for failures in creating the connection.     */    public TcpConnection(Socket incSocket, PeerID id, MessageListener listener) throws IOException {        try {            if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {                LOG.info("Connection from " + incSocket.getInetAddress().getHostAddress() + ":" + incSocket.getPort());            }                        initiator = false;            this.listener = listener;            inetAddress = incSocket.getInetAddress();            port = incSocket.getPort();                        // Temporarily, our address for inclusion in the welcome message            // response.            dstAddress = new EndpointAddress("tcp", inetAddress.getHostAddress() + ":" + port, null, null);            fullDstAddress = dstAddress;                        sharedSocket = incSocket;            startSocket(id);                        // The correct value for dstAddr: that of the other party.            dstAddress = itsWelcome.getPublicAddress();            fullDstAddress = dstAddress;                        // Reset the thread name now that we have a meaningfull            // destination address and remote welcome msg.            setThreadName();        } catch (IOException e) {            throw e;        }    }        /**     *  Set the last used time for this connection in absolute milliseconds.     *     *@param  time  absolute time in milliseconds.     */    private void setLastUsed(long time) {        lastUsed = time;    }    public WelcomeMessage getWM() {                return myWelcome;    }        /**     *  Sets the threadName attribute of the TcpConnection object     */    private synchronized void setThreadName() {                if (recvThread != null) {            try {                recvThread.setName("TCP receive : " + itsWelcome.getPeerID() + " on address " + dstAddress);            } catch (Exception ez1) {                if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                    LOG.log(Level.WARNING, "Cannot change thread name", ez1);                }            }        }    }        /**     *  Gets the connectionAddress attribute of the TcpConnection object     *     *@return    The connectionAddress value     */    public EndpointAddress getConnectionAddress() {        // Somewhat confusing but destinationAddress is the name of that thing        // for the welcome message.        return itsWelcome.getDestinationAddress();    }        /**     *  Gets the destinationAddress attribute of the TcpConnection object     *     *@return    The destinationAddress value     */    public EndpointAddress getDestinationAddress() {        return dstAddress.clone();    }        /**     *  Gets the destinationPeerID attribute of the TcpConnection object     *     *@return    The destinationPeerID value     */    public ID getDestinationPeerID() {        return itsWelcome.getPeerID();    }        /**     *  Return the absolute time in milliseconds at which this Connection was     *  last used.     *     *@return    absolute time in milliseconds.     */    public long getLastUsed() {        return lastUsed;    }        /**     *  return the current connection status.     *     *@return       The connected value     */    public boolean isConnected() {        return ((recvThread != null) && (!closed));    }        /**     *  Soft close of the connection. Messages can no longer be sent, but any in     *  the queue will be flushed.     */    public synchronized void close() {        if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {            LOG.info(                    (closingDueToFailure ? "Failure" : "Normal") + " close of socket to : " + dstAddress + " / "                    + inetAddress.getHostAddress() + ":" + port);            if (closingDueToFailure) {                LOG.log(Level.INFO, "Failure stack trace", new Throwable("stack trace"));            }        }                if (!closed) {            setLastUsed(0);            // we idle now. Way idle.            closeIOs();            closed = true;            if (recvThread != null) {                recvThread.interrupt();            }        }    }        /**     *  Description of the Method     */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -