⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jxtabidipipe.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
/* * Copyright (c) 2006-2007 Sun Microsystems, Inc.  All rights reserved. *   *  The Sun Project JXTA(TM) Software License *   *  Redistribution and use in source and binary forms, with or without  *  modification, are permitted provided that the following conditions are met: *   *  1. Redistributions of source code must retain the above copyright notice, *     this list of conditions and the following disclaimer. *   *  2. Redistributions in binary form must reproduce the above copyright notice,  *     this list of conditions and the following disclaimer in the documentation  *     and/or other materials provided with the distribution. *   *  3. The end-user documentation included with the redistribution, if any, must  *     include the following acknowledgment: "This product includes software  *     developed by Sun Microsystems, Inc. for JXTA(TM) technology."  *     Alternately, this acknowledgment may appear in the software itself, if  *     and wherever such third-party acknowledgments normally appear. *   *  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must  *     not be used to endorse or promote products derived from this software  *     without prior written permission. For written permission, please contact  *     Project JXTA at http://www.jxta.org. *   *  5. Products derived from this software may not be called "JXTA", nor may  *     "JXTA" appear in their name, without prior written permission of Sun. *   *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, *  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND  *  FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN  *  MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,  *  OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF  *  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING  *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,  *  EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *   *  JXTA is a registered trademark of Sun Microsystems, Inc. in the United  *  States and other countries. *   *  Please see the license information page at : *  <http://www.jxta.org/project/www/license.html> for instructions on use of  *  the license in source files. *   *  ==================================================================== *   *  This software consists of voluntary contributions made by many individuals  *  on behalf of Project JXTA. For more information on Project JXTA, please see  *  http://www.jxta.org. *   *  This license is based on the BSD license adopted by the Apache Foundation.  */package net.jxta.util;import net.jxta.credential.Credential;import net.jxta.document.AdvertisementFactory;import net.jxta.document.MimeMediaType;import net.jxta.document.StructuredDocument;import net.jxta.document.StructuredDocumentFactory;import net.jxta.document.XMLDocument;import net.jxta.endpoint.EndpointAddress;import net.jxta.endpoint.EndpointService;import net.jxta.endpoint.Message;import net.jxta.endpoint.MessageElement;import net.jxta.endpoint.Messenger;import net.jxta.endpoint.StringMessageElement;import net.jxta.endpoint.TextDocumentMessageElement;import net.jxta.id.ID;import net.jxta.impl.endpoint.tcp.TcpMessenger;import net.jxta.impl.util.pipe.reliable.Defs;import net.jxta.impl.util.pipe.reliable.FixedFlowControl;import net.jxta.impl.util.pipe.reliable.OutgoingMsgrAdaptor;import net.jxta.impl.util.pipe.reliable.ReliableInputStream;import net.jxta.impl.util.pipe.reliable.ReliableOutputStream;import net.jxta.logging.Logging;import net.jxta.membership.MembershipService;import net.jxta.peer.PeerID;import net.jxta.peergroup.PeerGroup;import net.jxta.pipe.InputPipe;import net.jxta.pipe.OutputPipe;import net.jxta.pipe.OutputPipeEvent;import net.jxta.pipe.OutputPipeListener;import net.jxta.pipe.PipeID;import net.jxta.pipe.PipeMsgEvent;import net.jxta.pipe.PipeMsgListener;import net.jxta.pipe.PipeService;import net.jxta.protocol.PeerAdvertisement;import net.jxta.protocol.PipeAdvertisement;import java.io.IOException;import java.io.InputStream;import java.net.SocketTimeoutException;import java.util.Collections;import java.util.Iterator;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.TimeUnit;import java.util.logging.Level;import java.util.logging.Logger;/** * JxtaBiDiPipe is a pair of UnicastPipe channels that implements a bidirectional pipe. * By default, JxtaBiDiPipe operates in reliable mode, unless otherwise specified, * in addition, messages must not exceed the Endpoint MTU size of 64K, exceed the * MTU will lead to unexpected behavior. * <p/> * It highly recommended that an application message listener is specified, not doing so, may * lead to message loss in the event the internal queue is overflowed. * <p/> * Sending messages vis {@link #sendMessage(Message)} from within a  * {@code PipeMsgListener} may result in a deadlock due to contention * between the sending and receiving portions of BiDi pipes.  * <p/> * JxtaBiDiPipe, whenever possible, will attempt to utilize direct tcp messengers, * which leads to improved performance. */public class JxtaBiDiPipe implements PipeMsgListener, OutputPipeListener, ReliableInputStream.MsgListener {    /**     * Logger     */    private final static transient Logger LOG = Logger.getLogger(JxtaBiDiPipe.class.getName());    private final static int MAXRETRYTIMEOUT = 120 * 1000;    private PipeAdvertisement remotePipeAdv;    private PeerAdvertisement remotePeerAdv;    protected int timeout = 15 * 1000;    protected int retryTimeout = 60 * 1000;    protected int maxRetryTimeout = MAXRETRYTIMEOUT;    protected int windowSize = 50;    private ArrayBlockingQueue<PipeMsgEvent> queue = new ArrayBlockingQueue<PipeMsgEvent>(windowSize);    protected PeerGroup group;    protected PipeAdvertisement pipeAdv;    protected PipeAdvertisement myPipeAdv;    protected PipeService pipeSvc;    protected InputPipe inputPipe;    protected OutputPipe connectOutpipe;    protected Messenger msgr;    protected InputStream stream;    protected final Object closeLock = new Object();    protected final Object acceptLock = new Object();    protected final Object finalLock = new Object();    protected boolean closed = false;    protected boolean bound = false;    protected boolean dequeued = false;    protected PipeMsgListener msgListener;    protected PipeEventListener eventListener;    protected PipeStateListener stateListener;    protected Credential credential = null;    protected boolean waiting;    /**     * If {@code true} then we are using the underlying end-to-end ACK reliable     * layer to ensure that messages are received by the remote peer.     */    protected boolean isReliable = false;    protected ReliableInputStream ris = null;    protected ReliableOutputStream ros = null;    /**     * If {@code true} then we are using a reliable direct messenger to the     * remote peer. We will assume that messages which are sent successfully     * will be received successfully.     */    protected volatile boolean direct = false;    protected OutgoingMsgrAdaptor outgoing = null;    protected StructuredDocument credentialDoc = null;    /**     * Pipe close Event     */    public static final int PIPE_CLOSED_EVENT = 1;    /**     * Creates a bidirectional pipe     *     * @param group      group context     * @param msgr       lightweight output pipe     * @param pipe       PipeAdvertisement     * @param isReliable Whether the connection is reliable or not     * @param credDoc    Credential StructuredDocument     * @param direct     indicates a direct messenger pipe     * @throws IOException if an io error occurs     */    protected JxtaBiDiPipe(PeerGroup group, Messenger msgr, PipeAdvertisement pipe, StructuredDocument credDoc, boolean isReliable, boolean direct) throws IOException {        if (msgr == null) {            throw new IOException("Null Messenger");        }        this.direct = direct;        this.group = group;        this.pipeAdv = pipe;        this.credentialDoc = credDoc != null ? credDoc : getCredDoc(group);        this.pipeSvc = group.getPipeService();        this.inputPipe = pipeSvc.createInputPipe(pipe, this);        this.msgr = msgr;        this.isReliable = isReliable;        if (!direct) {            createRLib();        }        setBound();    }    /**     * Creates a new object with a default timeout of #timeout, and no reliability.     *     */    public JxtaBiDiPipe() {    }    /**     * Creates a bidirectional pipe.     *     * Attempts to create a bidirectional connection to remote peer within default     * timeout of #timeout.     *     * @param group       group context     * @param pipeAd      PipeAdvertisement     * @param msgListener application PipeMsgListener     * @throws IOException if an io error occurs     */    public JxtaBiDiPipe(PeerGroup group, PipeAdvertisement pipeAd, PipeMsgListener msgListener) throws IOException {        connect(group, null, pipeAd, timeout, msgListener);    }    /**     * Creates a bidirectional pipe.     *     * Attempts to create a bidirectional connection to remote peer within specified     * timeout of #timeout.     *     * @param group       group context     * @param timeout     The number of milliseconds within which the JxtaBiDiPipe must     *                    be successfully created. An exception will be thrown if the pipe     *                    cannot be created in the alotted time. A timeout value of {@code 0}     *                    (zero) specifies an infinite timeout.     * @param pipeAd      PipeAdvertisement     * @param msgListener application PipeMsgListener     * @throws IOException if an io error occurs     */    public JxtaBiDiPipe(PeerGroup group, PipeAdvertisement pipeAd, int timeout, PipeMsgListener msgListener) throws IOException {        connect(group, null, pipeAd, timeout, msgListener);    }    /**     * attempts to create a bidirectional connection to remote peer     *     * @param group       group context     * @param pipeAd      PipeAdvertisement     * @param timeout     The number of milliseconds within which the JxtaBiDiPipe must     *                    be successfully created. An exception will be thrown if the pipe     *                    cannot be created in the allotted time. A timeout value of {@code 0}     *                    (zero) specifies an infinite timeout.     * @param msgListener application PipeMsgListener     * @param reliable    if true, the reliability is assumed     * @throws IOException if an io error occurs     */    public JxtaBiDiPipe(PeerGroup group, PipeAdvertisement pipeAd, int timeout, PipeMsgListener msgListener, boolean reliable) throws IOException {        connect(group, null, pipeAd, timeout, msgListener, reliable);    }    /**     * Connect to a JxtaServerPipe with default timeout     *     * @param group  group context     * @param pipeAd PipeAdvertisement     * @throws IOException if an io error occurs     */    public void connect(PeerGroup group, PipeAdvertisement pipeAd) throws IOException {        connect(group, pipeAd, timeout);    }    /**     * Connects to a remote JxtaBiDiPipe     *     * @param group   group context     * @param pipeAd  PipeAdvertisement     * @param timeout timeout in ms, also reset object default timeout     *                to that of timeout     * @throws IOException if an io error occurs     */    public void connect(PeerGroup group, PipeAdvertisement pipeAd, int timeout) throws IOException {        connect(group, null, pipeAd, timeout, null);    }    /**     * Connects to a remote JxtaServerPipe     *     * @param group       group context     * @param peerid      peer to connect to     * @param pipeAd      PipeAdvertisement     * @param timeout     timeout in ms, also reset object default timeout to that of timeout     * @param msgListener application PipeMsgListener     * @throws IOException if an io error occurs     */    public void connect(PeerGroup group, PeerID peerid, PipeAdvertisement pipeAd, int timeout, PipeMsgListener msgListener) throws IOException {        connect(group, peerid, pipeAd, timeout, msgListener, isReliable);    }    /**     * Connects to a remote JxtaServerPipe     *     * @param group       group context     * @param peerid      peer to connect to     * @param pipeAd      PipeAdvertisement     * @param timeout     timeout in ms, also reset object default timeout to that of timeout     * @param msgListener application PipeMsgListener     * @param reliable    Reliable connection     * @throws IOException if an io error occurs     */    public void connect(PeerGroup group, PeerID peerid, PipeAdvertisement pipeAd, int timeout, PipeMsgListener msgListener, boolean reliable) throws IOException {        if (isBound()) {            throw new IOException("Pipe already bound");        }        if (timeout <= 0) {            throw new IllegalArgumentException("Invalid timeout :" + timeout);        }        this.pipeAdv = pipeAd;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -