⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tcpconnection.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/* *  $Id: TcpConnection.java,v 1.6 2006/05/09 01:32:16 bondolo Exp $ * *  Copyright (c) 2001 Sun Microsystems, Inc.  All rights reserved. * *  Redistribution and use in source and binary forms, with or without *  modification, are permitted provided that the following conditions *  are met: * *  1. Redistributions of source code must retain the above copyright *  notice, this list of conditions and the following disclaimer. * *  2. Redistributions in binary form must reproduce the above copyright *  notice, this list of conditions and the following disclaimer in *  the documentation and/or other materials provided with the *  distribution. * *  3. The end-user documentation included with the redistribution, *  if any, must include the following acknowledgment: *  "This product includes software developed by the *  Sun Microsystems, Inc. for Project JXTA." *  Alternately, this acknowledgment may appear in the software itself, *  if and wherever such third-party acknowledgments normally appear. * *  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" *  must not be used to endorse or promote products derived from this *  software without prior written permission. For written *  permission, please contact Project JXTA at http://www.jxta.org. * *  5. Products derived from this software may not be called "JXTA", *  nor may "JXTA" appear in their name, without prior written *  permission of Sun. * *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED *  WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES *  OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE *  DISCLAIMED.  IN NO EVENT SHALL SUN MICROSYSTEMS OR *  ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF *  USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND *  ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, *  OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT *  OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF *  SUCH DAMAGE. * *  ==================================================================== * *  This software consists of voluntary contributions made by many *  individuals on behalf of Project JXTA.  For more *  information on Project JXTA, please see *  <http://www.jxta.org/>. * *  This license is based on the BSD license adopted by the Apache Foundation. */package net.jxta.test.util;import java.io.InputStream;import java.net.InetAddress;import java.net.Socket;import java.util.Collections;import java.util.List;import java.util.ArrayList;import java.io.EOFException;import java.io.IOException;import java.io.InterruptedIOException;import org.apache.log4j.Logger;import org.apache.log4j.Level;import net.jxta.document.MimeMediaType;import net.jxta.endpoint.EndpointAddress;import net.jxta.endpoint.Message;import net.jxta.endpoint.WireFormatMessage;import net.jxta.endpoint.WireFormatMessageFactory;import net.jxta.id.ID;import net.jxta.peer.PeerID;import net.jxta.util.LimitInputStream;import net.jxta.util.WatchedInputStream;import net.jxta.util.WatchedOutputStream;import net.jxta.impl.endpoint.msgframing.MessagePackageHeader;import net.jxta.impl.endpoint.msgframing.WelcomeMessage;import net.jxta.impl.endpoint.IPUtils;/** *  Low-level TcpMessenger * */public class TcpConnection implements Runnable {        private transient volatile boolean closed = false;    private volatile boolean closingDueToFailure = false;        private EndpointAddress dstAddress = null;    private EndpointAddress fullDstAddress = null;    private transient InetAddress inetAddress = null;    private boolean initiator;    private transient WatchedInputStream inputStream = null;    private transient WelcomeMessage itsWelcome = null;        private transient long lastUsed = System.currentTimeMillis();        private transient WelcomeMessage myWelcome = null;    private transient WatchedOutputStream outputStream = null;    private transient int port = 0;    static final int SendBufferSize = 64 * 1024; // 64 KBytes    static final int RecvBufferSize = 64 * 1024; // 64 KBytes    static final int LingerDelay = 2 * 60 * 1000;    static final int LongTimeout = 30 * 60 * 1000;    static final int ShortTimeout = 10 * 1000;        private transient Thread recvThread = null;    private transient Socket sharedSocket = null;    private int connectionTimeOut = 10 * 1000;        // Connections that are watched often - io in progress    List ShortCycle = Collections.synchronizedList(new ArrayList());        // Connections that are watched rarely - idle or waiting for input    List LongCycle = Collections.synchronizedList(new ArrayList());        /**     *  only one outgoing message per connection.     */    private transient Object writeLock = new String( "Write Lock" );    private final static Logger LOG = Logger.getLogger(TcpConnection.class.getName());    private MessageListener listener = null;    private final static MimeMediaType appMsg = new MimeMediaType("application/x-jxta-msg");            /**     *  Creates a new TcpConnection for the specified destination address.     *     *@param  destaddr         the destination address of this connection.     *@param  p                the transport which this connection is part of.     *@exception  IOException  Description of the Exception     *@throws  IOException     for failures in creating the connection.     */    public TcpConnection(EndpointAddress destaddr, InetAddress from, PeerID id, MessageListener listener) throws IOException {        initiator = true;        this.listener = listener;        this.fullDstAddress = destaddr;        this.dstAddress = new EndpointAddress(destaddr, null, null);                if (LOG.isEnabledFor(Level.INFO)) {            LOG.info("New TCP Connection to : " + dstAddress);        }                String tmp = destaddr.getProtocolAddress();        int portIndex = tmp.lastIndexOf(":");        if (portIndex == -1) {            throw new IllegalArgumentException("Invalid EndpointAddress (port # missing) ");        }                try {            port = Integer.valueOf(tmp.substring(portIndex + 1)).intValue();        } catch (NumberFormatException caught) {            throw new IllegalArgumentException("Invalid EndpointAddress (port # invalid) ");        }                // Check for bad port number.        if ((port <= 0) || (port > 65535)) {            throw new IllegalArgumentException("Invalid port number in EndpointAddress: " + port);        }        inetAddress = InetAddress.getByName(tmp.substring(0, portIndex));        try {            sharedSocket = IPUtils.connectToFrom(inetAddress, port, from, 0, connectionTimeOut);            startSocket(id);        } catch (IOException e) {            // If we failed for any reason, make sure the socket is closed.            // We're the only one to know about it.            if (sharedSocket != null) {                sharedSocket.close();            }            throw e;        }    }            /**     *  Creates a new connection from an incoming socket     *     *@param  incSocket        the incoming socket.     *@param  p                Description of the Parameter     *@exception  IOException  Description of the Exception     *@throws  IOException     for failures in creating the connection.     */    public TcpConnection(Socket incSocket, PeerID id, MessageListener listener) throws IOException {        try {            if (LOG.isEnabledFor(Level.INFO)) {                LOG.info("Connection from " + incSocket.getInetAddress().getHostAddress() + ":" + incSocket.getPort());            }                        initiator = false;            this.listener = listener;            inetAddress = incSocket.getInetAddress();            port = incSocket.getPort();                        // Temporarily, our address for inclusion in the welcome message            // response.            dstAddress =            new EndpointAddress("tcp", inetAddress.getHostAddress() + ":"+ port, null, null);            fullDstAddress = dstAddress;                        sharedSocket = incSocket;            startSocket(id);                        // The correct value for dstAddr: that of the other party.            dstAddress = itsWelcome.getPublicAddress();            fullDstAddress = dstAddress;                        // Reset the thread name now that we have a meaningfull            // destination address and remote welcome msg.            setThreadName();        } catch (IOException e) {            throw e;        }    }            /**     *  Set the last used time for this connection in absolute milliseconds.     *     *@param  time  absolute time in milliseconds.     */    private void setLastUsed(long time) {        lastUsed = time;    }    public WelcomeMessage getWM() {                return myWelcome;    }        /**     *  Sets the threadName attribute of the TcpConnection object     */    private synchronized void setThreadName() {                if (recvThread != null) {            try {                recvThread.setName("TCP receive : " + itsWelcome.getPeerID() + " on address " + dstAddress);            } catch (Exception ez1) {                if (LOG.isEnabledFor(Level.WARN)) {                    LOG.warn("Cannot change thread name", ez1);                }            }        }    }            /**     *  Gets the connectionAddress attribute of the TcpConnection object     *     *@return    The connectionAddress value     */    public EndpointAddress getConnectionAddress() {        // Somewhat confusing but destinationAddress is the name of that thing        // for the welcome message.        return itsWelcome.getDestinationAddress();    }            /**     *  Gets the destinationAddress attribute of the TcpConnection object     *     *@return    The destinationAddress value     */    public EndpointAddress getDestinationAddress() {        return (EndpointAddress) dstAddress.clone();    }            /**     *  Gets the destinationPeerID attribute of the TcpConnection object     *     *@return    The destinationPeerID value     */    public ID getDestinationPeerID() {        return itsWelcome.getPeerID();    }            /**     *  Return the absolute time in milliseconds at which this Connection was     *  last used.     *     *@return    absolute time in milliseconds.     */    public long getLastUsed() {        return lastUsed;    }            /**     *  return the current connection status.     *     *@return       The connected value     */    public boolean isConnected() {        return ((recvThread != null) && (!closed));    }            /**     *  Soft close of the connection. Messages can no longer be sent, but any in     *  the queue will be flushed.     */    public synchronized void close() {        if (LOG.isEnabledFor(Level.INFO)) {            LOG.info((closingDueToFailure ? "Failure" : "Normal") + " close of socket to : " + dstAddress + " / " + inetAddress.getHostAddress() + ":" + port);            if (closingDueToFailure) {                LOG.info( "Failure stack trace", new Throwable("stack trace"));            }        }                if (!closed) {            setLastUsed(0);            // we idle now. Way idle.            closeIOs();            closed = true;            if (recvThread != null) {                recvThread.interrupt();            }        }    }            /**

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -