⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tcpconnection.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/* * * $Id: TcpConnection.java,v 1.72 2006/06/02 18:25:31 bondolo Exp $ * * Copyright (c) 2001 Sun Microsystems, Inc.  All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright *    notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright *    notice, this list of conditions and the following disclaimer in *    the documentation and/or other materials provided with the *    distribution. * * 3. The end-user documentation included with the redistribution, *    if any, must include the following acknowledgment: *       "This product includes software developed by the *       Sun Microsystems, Inc. for Project JXTA." *    Alternately, this acknowledgment may appear in the software itself, *    if and wherever such third-party acknowledgments normally appear. * * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" *    must not be used to endorse or promote products derived from this *    software without prior written permission. For written *    permission, please contact Project JXTA at http://www.jxta.org. * * 5. Products derived from this software may not be called "JXTA", *    nor may "JXTA" appear in their name, without prior written *    permission of Sun. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED.  IN NO EVENT SHALL SUN MICROSYSTEMS OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of Project JXTA.  For more * information on Project JXTA, please see * <http://www.jxta.org/>. * * This license is based on the BSD license adopted by the Apache Foundation. */package net.jxta.impl.endpoint.tcp;import java.io.BufferedOutputStream;import java.io.EOFException;import java.io.IOException;import java.io.InputStream;import java.io.InterruptedIOException;import java.io.OutputStream;import java.net.InetAddress;import java.net.Socket;import java.net.SocketException;import net.jxta.document.MimeMediaType;import net.jxta.endpoint.EndpointAddress;import net.jxta.endpoint.Message;import net.jxta.endpoint.WireFormatMessage;import net.jxta.endpoint.WireFormatMessageFactory;import net.jxta.id.ID;import net.jxta.impl.endpoint.IPUtils;import net.jxta.impl.endpoint.msgframing.MessagePackageHeader;import net.jxta.impl.endpoint.msgframing.WelcomeMessage;import net.jxta.impl.endpoint.transportMeter.TransportBindingMeter;import net.jxta.impl.endpoint.transportMeter.TransportMeterBuildSettings;import net.jxta.impl.util.TimeUtils;import net.jxta.peer.PeerID;import net.jxta.util.LimitInputStream;import net.jxta.util.WatchedInputStream;import net.jxta.util.WatchedOutputStream;import org.apache.log4j.Level;import org.apache.log4j.Logger;/** * Low-level TcpMessenger * */class TcpConnection implements Runnable {    /**     *  Log4J Logger     */    private final static transient Logger LOG = Logger.getLogger(TcpConnection.class.getName());    private static final MimeMediaType appMsg = new MimeMediaType("application/x-jxta-msg").intern();    private final TcpTransport proto;    private EndpointAddress dstAddress = null;    private EndpointAddress fullDstAddress = null;    private transient InetAddress inetAddress = null;    private transient int port = 0;    private transient volatile boolean closed = false;    private transient Thread recvThread = null;    private transient WelcomeMessage myWelcome = null;    private transient WelcomeMessage itsWelcome = null;    private final transient long firstUsed = TimeUtils.timeNow();    private transient long lastUsed = TimeUtils.timeNow();    private transient Socket sharedSocket = null;    private transient WatchedOutputStream woutputStream = null;    private transient WatchedInputStream winputStream = null;    private transient OutputStream outputStream = null;    private transient InputStream inputStream = null;    private TransportBindingMeter transportBindingMeter;    private boolean initiator;    private long connectionBegunTime;    private boolean closingDueToFailure = false;    /**     *  only one outgoing message at a time per connection.     */    private final transient Object writeLock;    /**     *  Creates a new TcpConnection for the specified destination address.     *     *  @param destaddr the destination address of this connection.     *  @param p    the transport which this connection is part of.     *  @throws IOException for failures in creating the connection.     */    TcpConnection(EndpointAddress destaddr, TcpTransport p) throws IOException {        initiator = true;        proto = p;        this.fullDstAddress = destaddr;        this.dstAddress = new EndpointAddress(destaddr, null, null);        String protoAddr = destaddr.getProtocolAddress();        int portIndex = protoAddr.lastIndexOf(":");        if (portIndex == -1) {            throw new IllegalArgumentException("Invalid Protocol Address (port # missing) ");        }        String portString = protoAddr.substring(portIndex + 1);        try {            port = Integer.valueOf(portString).intValue();        } catch (NumberFormatException caught) {            throw new IllegalArgumentException("Invalid Protocol Address (port # invalid): " + portString);        }        // Check for bad port number.        if ((port <= 0) || (port > 65535)) {            throw new IllegalArgumentException("Invalid port number in Protocol Address : " + port);        }        String hostString = protoAddr.substring(0, portIndex);        inetAddress = InetAddress.getByName(hostString);        if (LOG.isEnabledFor(Level.INFO)) {            LOG.info("New TCP Connection to : " + dstAddress + " / " + inetAddress.getHostAddress() + ":" + port);        }        writeLock = new String("TCP write lock for " + inetAddress.getHostAddress() + ":" + port);        // See if we're attempting to use the loopback address.        // And if so, is the peer configured for the loopback network only?        // (otherwise the connection is not permitted). Btw, the otherway around        // is just as wrong, so we check both at once and pretend it cannot work,        // even if it might have.        // FIXME 20041130 This is not an appropriate check if the other peer is        // running on the same machine and the InetAddress.getByName returns the        // loopback address.        if (inetAddress.isLoopbackAddress() != proto.usingInterface.isLoopbackAddress()) {            throw new IOException("Network unreachable--connect to loopback attempted.");        }        try {            if (TransportMeterBuildSettings.TRANSPORT_METERING) {                connectionBegunTime = System.currentTimeMillis();            }            /*             * DVT KLUDGE TO MAKE THE ROUTER'S LIFE MISERABLE.             * to be removed after new ad-hoc routing dvt.             */            int rp = proto.getRestrictionPort();            if (rp != -1 && (port < rp - 1 || port > rp + 1)) {                throw new IOException("Simulated separate networks killed outgoing cnx.");            }            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Connecting to " + inetAddress.getHostAddress() + ":" + port + " via " + proto.usingInterface.getHostAddress() + ":0");            }            sharedSocket = IPUtils.connectToFrom(inetAddress, port, proto.usingInterface, 0, proto.connectionTimeOut);            startSocket();            if (TransportMeterBuildSettings.TRANSPORT_METERING) {                transportBindingMeter = proto.getUnicastTransportBindingMeter((PeerID) getDestinationPeerID(), dstAddress);                if (transportBindingMeter != null) {                    transportBindingMeter.connectionEstablished(initiator, System.currentTimeMillis() - connectionBegunTime);                }                // Fix-Me: We need to add the bytes from the Welcome Messages to the transportBindingMeter, iam@jxta.org            }        } catch (IOException e) {            if (TransportMeterBuildSettings.TRANSPORT_METERING) {                transportBindingMeter = proto.getUnicastTransportBindingMeter(null, dstAddress);                if (transportBindingMeter != null) {                    transportBindingMeter.connectionFailed(initiator, System.currentTimeMillis() - connectionBegunTime);                }                // Fix-Me: We need to add the bytes from the Welcome Messages to the transportBindingMeter, iam@jxta.org            }            // If we failed for any reason, make sure the socket is closed.            // We're the only one to know about it.            if (sharedSocket != null) {                sharedSocket.close();            }            throw e;        }    }    /**     *  Creates a new connection from an incoming socket     *     *  @param incSocket    the incoming socket.     *  @param TcpTransport the transport we are working for.     *  @throws IOException for failures in creating the connection.     */    TcpConnection(Socket incSocket, TcpTransport p) throws IOException {        proto = p;        try {            if (LOG.isEnabledFor(Level.INFO)) {                LOG.info("Connection from " + incSocket.getInetAddress().getHostAddress() + ":" + incSocket.getPort());            }            initiator = false;            if (TransportMeterBuildSettings.TRANSPORT_METERING) {                connectionBegunTime = System.currentTimeMillis();            }            inetAddress = incSocket.getInetAddress();            port = incSocket.getPort();            writeLock = new String("TCP write lock for " + inetAddress.getHostAddress() + ":" + port);            // Temporarily, our address for inclusion in the welcome message response.            dstAddress = new EndpointAddress(proto.getProtocolName(), inetAddress.getHostAddress() + ":" + port, null, null);            fullDstAddress = dstAddress;            sharedSocket = incSocket;            startSocket();            // The correct value for dstAddr: that of the other party.            dstAddress = itsWelcome.getPublicAddress();            fullDstAddress = dstAddress;            // Reset the thread name now that we have a meaningfull            // destination address and remote welcome msg.            setThreadName();            if (TransportMeterBuildSettings.TRANSPORT_METERING) {                transportBindingMeter = proto.getUnicastTransportBindingMeter((PeerID) getDestinationPeerID(), dstAddress);                if (transportBindingMeter != null) {                    transportBindingMeter.connectionEstablished(initiator, System.currentTimeMillis() - connectionBegunTime);                }            }        } catch (IOException e) {            if (TransportMeterBuildSettings.TRANSPORT_METERING) {                transportBindingMeter = proto.getUnicastTransportBindingMeter(null, dstAddress);                if (transportBindingMeter != null) {                    transportBindingMeter.connectionFailed(initiator, System.currentTimeMillis() - connectionBegunTime);                }            }            throw e;        }    }    /**     * {@inheritDoc}     */    public boolean equals(Object target) {        if (this == target) {            return true;        }        if (null == target) {            return false;        }        if (target instanceof TcpConnection) {            TcpConnection likeMe = (TcpConnection) target;            return getDestinationAddress().equals(likeMe.getDestinationAddress()) && getDestinationPeerID().equals(likeMe.getDestinationPeerID());        }        return false;    }    /**     * {@inheritDoc}          This is pointless and expensive since the socket already defines finalize    protected void finalize() {        closingDueToFailure = false;        close();}    */    /**     * {@inheritDoc}     */    public int hashCode() {        return  getDestinationPeerID().hashCode() + getDestinationAddress().hashCode();    }    /**     *  {@inheritDoc}     *     *  <p/>Implementation for debugging.     */    public String toString() {        return super.toString() + ":" + ((null != itsWelcome) ? itsWelcome.getPeerID().toString() : "unknown") + " on address "               + ((null != dstAddress) ? dstAddress.toString() : "unknown");    }    private synchronized void setThreadName() {        Thread temp = recvThread;        if (temp != null) {            try {                temp.setName("TCP receive : " + itsWelcome.getPeerID() + " on address " + dstAddress);            } catch (Exception ez1) {                if (LOG.isEnabledFor(Level.ERROR)) {                    LOG.error("Cannot change thread name", ez1);                }            }        }    }    public EndpointAddress getDestinationAddress() {        return (EndpointAddress) dstAddress.clone();    }    public EndpointAddress getConnectionAddress() {        // Somewhat confusing but destinationAddress is the name of that thing        // for the welcome message.        return itsWelcome.getDestinationAddress();    }    public ID getDestinationPeerID() {        return itsWelcome.getPeerID();    }    private void startSocket() throws IOException {        sharedSocket.setKeepAlive(true);        int useBufferSize = Math.max(TcpTransport.ChunkSize, sharedSocket.getSendBufferSize());        sharedSocket.setSendBufferSize(useBufferSize);        useBufferSize = Math.max(TcpTransport.RecvBufferSize, sharedSocket.getReceiveBufferSize());        sharedSocket.setReceiveBufferSize(useBufferSize);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -