⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 reliableoutputstream.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
/* *  $Id: ReliableOutputStream.java,v 1.26 2006/07/29 00:26:05 hamada Exp $ * *  Copyright (c) 2003-2006 Sun Microsystems, Inc.  All rights reserved. * *  Redistribution and use in source and binary forms, with or without *  modification, are permitted provided that the following conditions *  are met: * *  1. Redistributions of source code must retain the above copyright *  notice, this list of conditions and the following disclaimer. * *  2. Redistributions in binary form must reproduce the above copyright *  notice, this list of conditions and the following disclaimer in *  the documentation and/or other materials provided with the *  distribution. * *  3. The end-user documentation included with the redistribution, *  if any, must include the following acknowledgment: *  "This product includes software developed by the *  Sun Microsystems, Inc. for Project JXTA." *  Alternately, this acknowledgment may appear in the software itself, *  if and wherever such third-party acknowledgments normally appear. * *  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" *  must not be used to endorse or promote products derived from this *  software without prior written permission. For written *  permission, please contact Project JXTA at http://www.jxta.org. * *  5. Products derived from this software may not be called "JXTA", *  nor may "JXTA" appear in their name, without prior written *  permission of Sun. * *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED *  WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES *  OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE *  DISCLAIMED.  IN NO EVENT SHALL SUN MICROSYSTEMS OR *  ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF *  USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND *  ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, *  OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT *  OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF *  SUCH DAMAGE. * *  ==================================================================== * *  This software consists of voluntary contributions made by many *  individuals on behalf of Project JXTA.  For more *  information on Project JXTA, please see *  <http://www.jxta.org/>. * *  This license is based on the BSD license adopted by the Apache Foundation. */package net.jxta.impl.util.pipe.reliable;import java.io.ByteArrayOutputStream;import java.io.DataInputStream;import java.io.IOException;import java.io.OutputStream;import java.util.ArrayList;import java.util.Arrays;import java.util.Iterator;import java.util.List;import net.jxta.endpoint.ByteArrayMessageElement;import net.jxta.endpoint.Message;import net.jxta.endpoint.MessageElement;import net.jxta.endpoint.StringMessageElement;import net.jxta.endpoint.WireFormatMessage;import net.jxta.endpoint.WireFormatMessageFactory;import net.jxta.impl.util.TimeUtils;import org.apache.log4j.Level;import org.apache.log4j.Logger;/** * Accepts data and packages it into messages for * sending to the remote. The messages are kept in a * retry queue until the remote peer acknowledges * receipt of the message. */public class ReliableOutputStream extends OutputStream implements Incoming {    /**     *  Log4J Logger     */    private final static Logger LOG =            Logger.getLogger(ReliableOutputStream.class.getName());    /**     * This maximum is only enforced if we have not heard     * from the remote for RETRMAXAGE.     */    private final static int MAXRETRQSIZE = 100;    /**     *  Initial estimated Round Trip Time     */    private final static long initRTT = 10 * TimeUtils.ASECOND;    private final static MessageElement RETELT =            new StringMessageElement(Defs.RETRY_ELEMENT_NAME,            Defs.RETRY_ELEMENT_VALUE, null);    /**     *  If true then the stream has been closed.     */    private volatile boolean closed = false;    /**     * If true then the stream is being closed.     * It means that it still works completely for all messages already     * queued, but no new message may be enqueued.     */    private volatile boolean closing = false;    /**     *  Sequence number of the message we most recently sent out.     */    private volatile int sequenceNumber = 0;    /**     *  Sequence number of highest sequential ACK.     */    private volatile int maxACK = 0;    /**     *  connection we are working for     */    private Outgoing outgoing = null;    private Retransmitter retrThread = null;    // for retransmission    /**     *  Average round trip time in milliseconds.     */    private volatile long aveRTT = initRTT;    private volatile long remRTT = 0;    /**     * Has aveRTT been set at least once over its initial guesstimate value.     */    private boolean aveRTTreset = false;    /**     *  Number of ACK message received.     */    private int nACKS = 0;    /**     * When to start computing aveRTT     */    private int rttThreshold = 0;    /**     *  Retry Time Out measured in milliseconds.     */    private volatile long RTO = 0;    /**     *  Minimum Retry Timeout measured in milliseconds.     */    private volatile long minRTO = initRTT * 5;    /**     *  Maximum Retry Timeout measured in milliseconds.     */    private volatile long maxRTO = initRTT * 60;    /**     *  absolute time in milliseconds of last sequential ACK.     */    private volatile long lastACKTime = 0;    /**     *  absolute time in milliseconds of last SACK based retransmit.     */    private volatile long sackRetransTime = 0;    /**     *   The collection of messages available for re-transmission.     *     *   elements are {@link RetrQElt}     */    protected List retrQ = new ArrayList();    // running average of receipients Input Queue    private int nIQTests = 0;    private int aveIQSize = 0;    /**     *  Our estimation of the current free space in the remote input queue.     */    private volatile int mrrIQFreeSpace = 0;    /**     *  Our estimation of the maximum sise of the remote input queue.     */    private int rmaxQSize = Defs.MAXQUEUESIZE;    /**     * The flow control module.     */    private final FlowControl fc;    /**     * Cache of the last rwindow recommendation by fc.     */    private volatile int rwindow = 0;    /**     * retrans queue element     */    private static class RetrQElt {        int seqnum;        // sequence number of this message.        long enqueuedAt;        // absolute time of original enqueing        volatile Message msg;        // the message        int marked;        // has been marked as retransmission        long sentAt;        // when this msg was last transmitted        /**         *Constructor for the RetrQElt object         *         * @param  seqnum  sequence number         * @param  msg     the message         */        public RetrQElt(int seqnum, Message msg) {            this.seqnum = seqnum;            this.msg = msg;            this.enqueuedAt = TimeUtils.timeNow();            this.sentAt = this.enqueuedAt;            this.marked = 0;        }    }    /**     *Constructor for the ReliableOutputStream object     *     * @param  outgoing  the outgoing object     */    public ReliableOutputStream(Outgoing outgoing) {        // By default use the old behaviour: fixed fc with a rwin of 20        this(outgoing, new FixedFlowControl(20));    }    /**     *Constructor for the ReliableOutputStream object     *     * @param  outgoing  the outgoing object     * @param  fc        flow-control     */    public ReliableOutputStream(Outgoing outgoing, FlowControl fc) {        this.outgoing = outgoing;        // initial RTO is set to maxRTO so as to give time        // to the receiver to catch-up        this.RTO = maxRTO;        this.mrrIQFreeSpace = rmaxQSize;        this.rttThreshold = rmaxQSize;        // Init last ACK Time to now        this.lastACKTime = TimeUtils.timeNow();        this.sackRetransTime = TimeUtils.timeNow();        // Attach the flowControl module        this.fc = fc;        // Update our initial rwindow to reflect fc's initial value        this.rwindow = fc.getRwindow();        // Start retransmission thread        this.retrThread = new Retransmitter();    }    /**     * {@inheritDoc}     *     *  <p/>We don't current support linger.     */    public synchronized void close() throws IOException {        super.close();        closed = true;        // We have to use a temp because someone else        // might be trying to null out retrThread.        Retransmitter temp = retrThread;        if (null != temp) {            synchronized (temp) {                temp.notifyAll();            }        }        retrQ.clear();    }    /**     * indicate that we're in the process of closing. To respect the semantics     * of close()/isClosed(), we do not set the closed flag, yet. Instead, we     * set the flag "closing", which simply garantees that no new message     * will be queued.     * This, in combination with getSequenceNumber and getMaxAck, and     * waitQevent, enables fine grain control of the tear down process.     */    public void setClosing() {        synchronized (retrQ) {            closing = true;            retrQ.clear();            retrQ.notifyAll();        }    }    /**     * Returns the state of the stream     * @return true if closed     */    public synchronized boolean isClosed() {        return closed;    }    /**     * {@inheritDoc}     */    public void write(int c) throws IOException {        byte[] a = new byte[1];        a[0] = (byte) (c & 0xFF);        write(a, 0, 1);    }    /**     * {@inheritDoc}     */    public void write(byte[] b, int off, int len) throws IOException {        if (closed) {            throw new IOException("stream is closed");        }        if (closing) {            throw new IOException("stream is being closed");        }        if (b == null) {            throw new IllegalArgumentException("buffer is null");        }        if ((off < 0) || (off > b.length) || (len < 0) ||                ((off + len) > b.length) || ((off + len) < 0)) {            throw new IndexOutOfBoundsException();        }        if (len == 0) {            return;        }        // Copy the data since it will be queued, and caller may        // overwrite the same byte[] buffer.        byte[] data = new byte[len];

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -