📄 jxtasocket.java
字号:
/* * Copyright (c) 2006-2007 Sun Microsystems, Inc. All rights reserved. * * The Sun Project JXTA(TM) Software License * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * 3. The end-user documentation included with the redistribution, if any, must * include the following acknowledgment: "This product includes software * developed by Sun Microsystems, Inc. for JXTA(TM) technology." * Alternately, this acknowledgment may appear in the software itself, if * and wherever such third-party acknowledgments normally appear. * * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must * not be used to endorse or promote products derived from this software * without prior written permission. For written permission, please contact * Project JXTA at http://www.jxta.org. * * 5. Products derived from this software may not be called "JXTA", nor may * "JXTA" appear in their name, without prior written permission of Sun. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN * MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * JXTA is a registered trademark of Sun Microsystems, Inc. in the United * States and other countries. * * Please see the license information page at : * <http://www.jxta.org/project/www/license.html> for instructions on use of * the license in source files. * * ==================================================================== * * This software consists of voluntary contributions made by many individuals * on behalf of Project JXTA. For more information on Project JXTA, please see * http://www.jxta.org. * * This license is based on the BSD license adopted by the Apache Foundation. */package net.jxta.socket;import net.jxta.credential.Credential;import net.jxta.document.*;import net.jxta.endpoint.*;import net.jxta.id.ID;import net.jxta.id.IDFactory;import net.jxta.impl.util.pipe.reliable.FixedFlowControl;import net.jxta.impl.util.pipe.reliable.Outgoing;import net.jxta.impl.util.pipe.reliable.OutgoingMsgrAdaptor;import net.jxta.impl.util.pipe.reliable.ReliableInputStream;import net.jxta.impl.util.pipe.reliable.ReliableOutputStream;import net.jxta.logging.Logging;import net.jxta.membership.MembershipService;import net.jxta.peer.PeerID;import net.jxta.peergroup.PeerGroup;import net.jxta.peergroup.PeerGroupID;import net.jxta.pipe.*;import net.jxta.protocol.PeerAdvertisement;import net.jxta.protocol.PipeAdvertisement;import net.jxta.protocol.RouteAdvertisement;import java.io.IOException;import java.io.InputStream;import java.io.OutputStream;import java.net.Socket;import java.net.SocketAddress;import java.net.SocketException;import java.net.SocketTimeoutException;import java.util.Collections;import java.util.Iterator;import java.util.logging.Level;import java.util.logging.Logger;/** * JxtaSocket is a sub-class of java.net.socket, and should be used like a java.net.Socket. * Key differences to keep in mind are the following : * </p> * - JxtaSocket does not implement Nagle's algorithm, therefore at end of a data frame a flush must invoked to enure all * buffered data is packaged and transmitted. * - JxtaSocket does not implement keep-alive, therefore it is possible the underlaying messengers to be closed due to * lack of inactivity, which manifests in a short latency, while the messenger are recreated. This limitation should cease * to exist as soon the inactivity logic is removed. * */public class JxtaSocket extends Socket implements PipeMsgListener, OutputPipeListener { /** * Logger */ private final static Logger LOG = Logger.getLogger(JxtaSocket.class.getName()); private final static int MAXRETRYTIMEOUT = 120 * 1000; private final static int DEFAULT_TIMEOUT = 15 * 1000; /** * Default size for output buffers. Only used when we do not know the MTU * size for messengers sending to the remote peer and as an upper bounds * should the MTU size be really huge. */ private final static int DEFAULT_OUTPUT_BUFFER_SIZE = 256 * 1024; /** * If true then this peer initiated the connection. */ private boolean initiator = false; /** * The PeerGroup */ protected PeerGroup group; /** * Pipe Advertisement of the well known pipe. */ protected PipeAdvertisement pipeAdv; /** * Pipe Advertisement of local ephemeral pipe. */ protected PipeAdvertisement localEphemeralPipeAdv; /** * The input pipe for our ephemeral pipe. We will receive all messages on this pipe. */ protected InputPipe localEphemeralPipeIn; /** * Pipe Advertisement of it's ephemeral pipe. */ protected PipeAdvertisement remoteEphemeralPipeAdv; /** * The Messenger we use to */ protected Messenger remoteEphemeralPipeMsgr; protected PipeService pipeSvc; /** * The peer id of the peer we are connecting to or {@code null} if we are * willing to connect to any peer. */ protected PeerID remotePeerID; /** * Used to negotiate connection parameters */ protected OutputPipe connectOutpipe; /** * The timeout of the read() of this socket's input stream */ private int soTimeout = 0; /** * timeout for connect and close */ protected long timeout = 60 * 1000; /** * retry timeout in millisecods */ protected int retryTimeout = 60 * 1000; /** * maximum retry timeout allowed */ protected int maxRetryTimeout = MAXRETRYTIMEOUT; /** * retry window size */ protected int windowSize = 20; /** * Lock for output pipe resolution. */ protected final Object pipeResolveLock = new Object(); /** * Lock for ephemeral pipe connect states. */ protected final Object socketConnectLock = new Object(); /** * Lock for closing states. */ protected final Object closeLock = new Object(); /* *used to determine whether to wait for an ack */ private boolean closeAckReceived = false; /** * If {@code true} then this socket has been closed and can no longer be used. */ protected volatile boolean closed = false; /** * If {@code true} then we believer our end of the connection is open. */ protected boolean bound = false; /** * If {@code true} then we believe the remote peer currently has this socket open. */ protected boolean connected = false; /** * Credential of the remote peer. */ protected Credential remoteCredential = null; /** * Our credential that we provide to the remote peer. */ protected Credential localCredential = null; /** * The remote peer advertisement. */ private PeerAdvertisement remotePeerAdv = null; /** * If {@code true} then the socket is a stream socket otherwise it is a datagram socket. */ protected boolean isReliable = true; /** * If {@code true} then the output stream has been shutdown. All attempts * to write to the socket will fail. This socket can no longer be used to * send data though it may remain capable of receiving data. */ private boolean outputShutdown = false; /** * If {@code true} then the input stream has been shutdown. All attempts * to read from the socket will fail. This socket can no longer be used to * receive data though it may remain capable of sending data. */ private boolean inputShutdown = false; /** * Used for sending all messages by the reliable output and input streams. */ protected Outgoing outgoing = null; /** * The reliable input stream we use for receiving data if * {@link #isReliable} is {@code true}. */ protected ReliableInputStream ris = null; /** * The reliable output stream we use for sending data if * {@link #isReliable} is {@code true}. */ protected ReliableOutputStream ros = null; /** * The unreliable input stream we use for receiving data if * {@link #isReliable} is {@code false}. */ protected JxtaSocketInputStream nonReliableInputStream = null; /** * The unreliable output stream we use for sending data if * {@link #isReliable} is {@code false}. */ protected JxtaSocketOutputStream nonReliableOutputStream = null; /** * The size of the output buffers to use. If not set this defaults to the * MTU size of the messenger to the remote peer. */ private int outputBufferSize = -1; /** * This constructor does not establish a connection. Use this constructor * when altering the default parameters, and options of the socket. * <p/> * By default connections are reliable, and the default timeout is 60 * seconds. To alter a connection a call to create(false) changes the * connection to an unreliable one. */ public JxtaSocket() {} /** * This constructor is used by JxtaServer socket for creating JxtaSocket * instances in response to incoming connections. * * @param group group context * @param pipeAdv The original PipeAdvertisement * @param localCredential Our credential. * @param remoteEphemeralPipeAdv the phemeral pipe advertisement * @param remotePeerAdv remote peer advertisement * @param remoteCredential The remote peer's credential. * @param isReliable {@code true} for reliable stream connection or * {@code false} for unreliable stream connection. * @throws IOException if an io error occurs */ protected JxtaSocket(PeerGroup group, PipeAdvertisement pipeAdv, PipeAdvertisement remoteEphemeralPipeAdv, PeerAdvertisement remotePeerAdv, Credential localCredential, Credential remoteCredential, boolean isReliable) throws IOException { this.initiator = false; this.group = group; this.pipeAdv = pipeAdv; this.remoteEphemeralPipeAdv = remoteEphemeralPipeAdv; this.localEphemeralPipeAdv = newEphemeralPipeAdv(pipeAdv); this.remotePeerAdv = remotePeerAdv; this.remotePeerID = remotePeerAdv.getPeerID(); this.localCredential = localCredential; this.remoteCredential = remoteCredential; this.isReliable = isReliable; pipeSvc = group.getPipeService(); bind(); connect(); Message connectResponse = createConnectMessage(group, localEphemeralPipeAdv, localCredential, isReliable, initiator); remoteEphemeralPipeMsgr.sendMessage(connectResponse); if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("New socket : " + this); } } /** * Create a JxtaSocket connected to the give JxtaSocketAddress. * * @param address JxtaSocketAddress to connect to * @throws IOException if an io error occurs */ public JxtaSocket(SocketAddress address) throws IOException { connect(address, DEFAULT_TIMEOUT); } /** * Create a JxtaSocket to any node listening on pipeAdv * * @param group group context * @param pipeAdv PipeAdvertisement * @throws IOException if an io error occurs */ public JxtaSocket(PeerGroup group, PipeAdvertisement pipeAdv) throws IOException {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -