📄 jxtabidipipe.java
字号:
/* * Copyright (c) 2006-2007 Sun Microsystems, Inc. All rights reserved. * * The Sun Project JXTA(TM) Software License * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * 3. The end-user documentation included with the redistribution, if any, must * include the following acknowledgment: "This product includes software * developed by Sun Microsystems, Inc. for JXTA(TM) technology." * Alternately, this acknowledgment may appear in the software itself, if * and wherever such third-party acknowledgments normally appear. * * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must * not be used to endorse or promote products derived from this software * without prior written permission. For written permission, please contact * Project JXTA at http://www.jxta.org. * * 5. Products derived from this software may not be called "JXTA", nor may * "JXTA" appear in their name, without prior written permission of Sun. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN * MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * JXTA is a registered trademark of Sun Microsystems, Inc. in the United * States and other countries. * * Please see the license information page at : * <http://www.jxta.org/project/www/license.html> for instructions on use of * the license in source files. * * ==================================================================== * * This software consists of voluntary contributions made by many individuals * on behalf of Project JXTA. For more information on Project JXTA, please see * http://www.jxta.org. * * This license is based on the BSD license adopted by the Apache Foundation. */package net.jxta.util;import net.jxta.credential.Credential;import net.jxta.document.AdvertisementFactory;import net.jxta.document.MimeMediaType;import net.jxta.document.StructuredDocument;import net.jxta.document.StructuredDocumentFactory;import net.jxta.document.XMLDocument;import net.jxta.endpoint.EndpointAddress;import net.jxta.endpoint.EndpointService;import net.jxta.endpoint.Message;import net.jxta.endpoint.MessageElement;import net.jxta.endpoint.Messenger;import net.jxta.endpoint.StringMessageElement;import net.jxta.endpoint.TextDocumentMessageElement;import net.jxta.id.ID;import net.jxta.impl.endpoint.tcp.TcpMessenger;import net.jxta.impl.util.pipe.reliable.Defs;import net.jxta.impl.util.pipe.reliable.FixedFlowControl;import net.jxta.impl.util.pipe.reliable.OutgoingMsgrAdaptor;import net.jxta.impl.util.pipe.reliable.ReliableInputStream;import net.jxta.impl.util.pipe.reliable.ReliableOutputStream;import net.jxta.logging.Logging;import net.jxta.membership.MembershipService;import net.jxta.peer.PeerID;import net.jxta.peergroup.PeerGroup;import net.jxta.pipe.InputPipe;import net.jxta.pipe.OutputPipe;import net.jxta.pipe.OutputPipeEvent;import net.jxta.pipe.OutputPipeListener;import net.jxta.pipe.PipeID;import net.jxta.pipe.PipeMsgEvent;import net.jxta.pipe.PipeMsgListener;import net.jxta.pipe.PipeService;import net.jxta.protocol.PeerAdvertisement;import net.jxta.protocol.PipeAdvertisement;import java.io.IOException;import java.io.InputStream;import java.net.SocketTimeoutException;import java.util.Collections;import java.util.Iterator;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.TimeUnit;import java.util.logging.Level;import java.util.logging.Logger;/** * JxtaBiDiPipe is a pair of UnicastPipe channels that implements a bidirectional pipe. * By default, JxtaBiDiPipe operates in reliable mode, unless otherwise specified, * in addition, messages must not exceed the Endpoint MTU size of 64K, exceed the * MTU will lead to unexpected behavior. * <p/> * It highly recommended that an application message listener is specified, not doing so, may * lead to message loss in the event the internal queue is overflowed. * <p/> * Sending messages vis {@link #sendMessage(Message)} from within a * {@code PipeMsgListener} may result in a deadlock due to contention * between the sending and receiving portions of BiDi pipes. * <p/> * JxtaBiDiPipe, whenever possible, will attempt to utilize direct tcp messengers, * which leads to improved performance. */public class JxtaBiDiPipe implements PipeMsgListener, OutputPipeListener, ReliableInputStream.MsgListener { /** * Logger */ private final static transient Logger LOG = Logger.getLogger(JxtaBiDiPipe.class.getName()); private final static int MAXRETRYTIMEOUT = 120 * 1000; private PipeAdvertisement remotePipeAdv; private PeerAdvertisement remotePeerAdv; protected int timeout = 15 * 1000; protected int retryTimeout = 60 * 1000; protected int maxRetryTimeout = MAXRETRYTIMEOUT; protected int windowSize = 50; private ArrayBlockingQueue<PipeMsgEvent> queue = new ArrayBlockingQueue<PipeMsgEvent>(windowSize); protected PeerGroup group; protected PipeAdvertisement pipeAdv; protected PipeAdvertisement myPipeAdv; protected PipeService pipeSvc; protected InputPipe inputPipe; protected OutputPipe connectOutpipe; protected Messenger msgr; protected InputStream stream; protected final Object closeLock = new Object(); protected final Object acceptLock = new Object(); protected final Object finalLock = new Object(); protected boolean closed = false; protected boolean bound = false; protected boolean dequeued = false; protected PipeMsgListener msgListener; protected PipeEventListener eventListener; protected PipeStateListener stateListener; protected Credential credential = null; protected boolean waiting; /** * If {@code true} then we are using the underlying end-to-end ACK reliable * layer to ensure that messages are received by the remote peer. */ protected boolean isReliable = false; protected ReliableInputStream ris = null; protected ReliableOutputStream ros = null; /** * If {@code true} then we are using a reliable direct messenger to the * remote peer. We will assume that messages which are sent successfully * will be received successfully. */ protected volatile boolean direct = false; protected OutgoingMsgrAdaptor outgoing = null; protected StructuredDocument credentialDoc = null; /** * Pipe close Event */ public static final int PIPE_CLOSED_EVENT = 1; /** * Creates a bidirectional pipe * * @param group group context * @param msgr lightweight output pipe * @param pipe PipeAdvertisement * @param isReliable Whether the connection is reliable or not * @param credDoc Credential StructuredDocument * @param direct indicates a direct messenger pipe * @throws IOException if an io error occurs */ protected JxtaBiDiPipe(PeerGroup group, Messenger msgr, PipeAdvertisement pipe, StructuredDocument credDoc, boolean isReliable, boolean direct) throws IOException { if (msgr == null) { throw new IOException("Null Messenger"); } this.direct = direct; this.group = group; this.pipeAdv = pipe; this.credentialDoc = credDoc != null ? credDoc : getCredDoc(group); this.pipeSvc = group.getPipeService(); this.inputPipe = pipeSvc.createInputPipe(pipe, this); this.msgr = msgr; this.isReliable = isReliable; if (!direct) { createRLib(); } setBound(); } /** * Creates a new object with a default timeout of #timeout, and no reliability. * */ public JxtaBiDiPipe() { } /** * Creates a bidirectional pipe. * * Attempts to create a bidirectional connection to remote peer within default * timeout of #timeout. * * @param group group context * @param pipeAd PipeAdvertisement * @param msgListener application PipeMsgListener * @throws IOException if an io error occurs */ public JxtaBiDiPipe(PeerGroup group, PipeAdvertisement pipeAd, PipeMsgListener msgListener) throws IOException { connect(group, null, pipeAd, timeout, msgListener); } /** * Creates a bidirectional pipe. * * Attempts to create a bidirectional connection to remote peer within specified * timeout of #timeout. * * @param group group context * @param timeout The number of milliseconds within which the JxtaBiDiPipe must * be successfully created. An exception will be thrown if the pipe * cannot be created in the alotted time. A timeout value of {@code 0} * (zero) specifies an infinite timeout. * @param pipeAd PipeAdvertisement * @param msgListener application PipeMsgListener * @throws IOException if an io error occurs */ public JxtaBiDiPipe(PeerGroup group, PipeAdvertisement pipeAd, int timeout, PipeMsgListener msgListener) throws IOException { connect(group, null, pipeAd, timeout, msgListener); } /** * attempts to create a bidirectional connection to remote peer * * @param group group context * @param pipeAd PipeAdvertisement * @param timeout The number of milliseconds within which the JxtaBiDiPipe must * be successfully created. An exception will be thrown if the pipe * cannot be created in the allotted time. A timeout value of {@code 0} * (zero) specifies an infinite timeout. * @param msgListener application PipeMsgListener * @param reliable if true, the reliability is assumed * @throws IOException if an io error occurs */ public JxtaBiDiPipe(PeerGroup group, PipeAdvertisement pipeAd, int timeout, PipeMsgListener msgListener, boolean reliable) throws IOException { connect(group, null, pipeAd, timeout, msgListener, reliable); } /** * Connect to a JxtaServerPipe with default timeout * * @param group group context * @param pipeAd PipeAdvertisement * @throws IOException if an io error occurs */ public void connect(PeerGroup group, PipeAdvertisement pipeAd) throws IOException { connect(group, pipeAd, timeout); } /** * Connects to a remote JxtaBiDiPipe * * @param group group context * @param pipeAd PipeAdvertisement * @param timeout timeout in ms, also reset object default timeout * to that of timeout * @throws IOException if an io error occurs */ public void connect(PeerGroup group, PipeAdvertisement pipeAd, int timeout) throws IOException { connect(group, null, pipeAd, timeout, null); } /** * Connects to a remote JxtaServerPipe * * @param group group context * @param peerid peer to connect to * @param pipeAd PipeAdvertisement * @param timeout timeout in ms, also reset object default timeout to that of timeout * @param msgListener application PipeMsgListener * @throws IOException if an io error occurs */ public void connect(PeerGroup group, PeerID peerid, PipeAdvertisement pipeAd, int timeout, PipeMsgListener msgListener) throws IOException { connect(group, peerid, pipeAd, timeout, msgListener, isReliable); } /** * Connects to a remote JxtaServerPipe * * @param group group context * @param peerid peer to connect to * @param pipeAd PipeAdvertisement * @param timeout timeout in ms, also reset object default timeout to that of timeout * @param msgListener application PipeMsgListener * @param reliable Reliable connection * @throws IOException if an io error occurs */ public void connect(PeerGroup group, PeerID peerid, PipeAdvertisement pipeAd, int timeout, PipeMsgListener msgListener, boolean reliable) throws IOException { if (isBound()) { throw new IOException("Pipe already bound"); } if (timeout <= 0) { throw new IllegalArgumentException("Invalid timeout :" + timeout); } this.pipeAdv = pipeAd;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -