📄 reliableoutputstream.java
字号:
/* * $Id: ReliableOutputStream.java,v 1.26 2006/07/29 00:26:05 hamada Exp $ * * Copyright (c) 2003-2006 Sun Microsystems, Inc. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, * if any, must include the following acknowledgment: * "This product includes software developed by the * Sun Microsystems, Inc. for Project JXTA." * Alternately, this acknowledgment may appear in the software itself, * if and wherever such third-party acknowledgments normally appear. * * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" * must not be used to endorse or promote products derived from this * software without prior written permission. For written * permission, please contact Project JXTA at http://www.jxta.org. * * 5. Products derived from this software may not be called "JXTA", * nor may "JXTA" appear in their name, without prior written * permission of Sun. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL SUN MICROSYSTEMS OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of Project JXTA. For more * information on Project JXTA, please see * <http://www.jxta.org/>. * * This license is based on the BSD license adopted by the Apache Foundation. */package net.jxta.impl.util.pipe.reliable;import java.io.ByteArrayOutputStream;import java.io.DataInputStream;import java.io.IOException;import java.io.OutputStream;import java.util.ArrayList;import java.util.Arrays;import java.util.Iterator;import java.util.List;import net.jxta.endpoint.ByteArrayMessageElement;import net.jxta.endpoint.Message;import net.jxta.endpoint.MessageElement;import net.jxta.endpoint.StringMessageElement;import net.jxta.endpoint.WireFormatMessage;import net.jxta.endpoint.WireFormatMessageFactory;import net.jxta.impl.util.TimeUtils;import org.apache.log4j.Level;import org.apache.log4j.Logger;/** * Accepts data and packages it into messages for * sending to the remote. The messages are kept in a * retry queue until the remote peer acknowledges * receipt of the message. */public class ReliableOutputStream extends OutputStream implements Incoming { /** * Log4J Logger */ private final static Logger LOG = Logger.getLogger(ReliableOutputStream.class.getName()); /** * This maximum is only enforced if we have not heard * from the remote for RETRMAXAGE. */ private final static int MAXRETRQSIZE = 100; /** * Initial estimated Round Trip Time */ private final static long initRTT = 10 * TimeUtils.ASECOND; private final static MessageElement RETELT = new StringMessageElement(Defs.RETRY_ELEMENT_NAME, Defs.RETRY_ELEMENT_VALUE, null); /** * If true then the stream has been closed. */ private volatile boolean closed = false; /** * If true then the stream is being closed. * It means that it still works completely for all messages already * queued, but no new message may be enqueued. */ private volatile boolean closing = false; /** * Sequence number of the message we most recently sent out. */ private volatile int sequenceNumber = 0; /** * Sequence number of highest sequential ACK. */ private volatile int maxACK = 0; /** * connection we are working for */ private Outgoing outgoing = null; private Retransmitter retrThread = null; // for retransmission /** * Average round trip time in milliseconds. */ private volatile long aveRTT = initRTT; private volatile long remRTT = 0; /** * Has aveRTT been set at least once over its initial guesstimate value. */ private boolean aveRTTreset = false; /** * Number of ACK message received. */ private int nACKS = 0; /** * When to start computing aveRTT */ private int rttThreshold = 0; /** * Retry Time Out measured in milliseconds. */ private volatile long RTO = 0; /** * Minimum Retry Timeout measured in milliseconds. */ private volatile long minRTO = initRTT * 5; /** * Maximum Retry Timeout measured in milliseconds. */ private volatile long maxRTO = initRTT * 60; /** * absolute time in milliseconds of last sequential ACK. */ private volatile long lastACKTime = 0; /** * absolute time in milliseconds of last SACK based retransmit. */ private volatile long sackRetransTime = 0; /** * The collection of messages available for re-transmission. * * elements are {@link RetrQElt} */ protected List retrQ = new ArrayList(); // running average of receipients Input Queue private int nIQTests = 0; private int aveIQSize = 0; /** * Our estimation of the current free space in the remote input queue. */ private volatile int mrrIQFreeSpace = 0; /** * Our estimation of the maximum sise of the remote input queue. */ private int rmaxQSize = Defs.MAXQUEUESIZE; /** * The flow control module. */ private final FlowControl fc; /** * Cache of the last rwindow recommendation by fc. */ private volatile int rwindow = 0; /** * retrans queue element */ private static class RetrQElt { int seqnum; // sequence number of this message. long enqueuedAt; // absolute time of original enqueing volatile Message msg; // the message int marked; // has been marked as retransmission long sentAt; // when this msg was last transmitted /** *Constructor for the RetrQElt object * * @param seqnum sequence number * @param msg the message */ public RetrQElt(int seqnum, Message msg) { this.seqnum = seqnum; this.msg = msg; this.enqueuedAt = TimeUtils.timeNow(); this.sentAt = this.enqueuedAt; this.marked = 0; } } /** *Constructor for the ReliableOutputStream object * * @param outgoing the outgoing object */ public ReliableOutputStream(Outgoing outgoing) { // By default use the old behaviour: fixed fc with a rwin of 20 this(outgoing, new FixedFlowControl(20)); } /** *Constructor for the ReliableOutputStream object * * @param outgoing the outgoing object * @param fc flow-control */ public ReliableOutputStream(Outgoing outgoing, FlowControl fc) { this.outgoing = outgoing; // initial RTO is set to maxRTO so as to give time // to the receiver to catch-up this.RTO = maxRTO; this.mrrIQFreeSpace = rmaxQSize; this.rttThreshold = rmaxQSize; // Init last ACK Time to now this.lastACKTime = TimeUtils.timeNow(); this.sackRetransTime = TimeUtils.timeNow(); // Attach the flowControl module this.fc = fc; // Update our initial rwindow to reflect fc's initial value this.rwindow = fc.getRwindow(); // Start retransmission thread this.retrThread = new Retransmitter(); } /** * {@inheritDoc} * * <p/>We don't current support linger. */ public synchronized void close() throws IOException { super.close(); closed = true; // We have to use a temp because someone else // might be trying to null out retrThread. Retransmitter temp = retrThread; if (null != temp) { synchronized (temp) { temp.notifyAll(); } } retrQ.clear(); } /** * indicate that we're in the process of closing. To respect the semantics * of close()/isClosed(), we do not set the closed flag, yet. Instead, we * set the flag "closing", which simply garantees that no new message * will be queued. * This, in combination with getSequenceNumber and getMaxAck, and * waitQevent, enables fine grain control of the tear down process. */ public void setClosing() { synchronized (retrQ) { closing = true; retrQ.clear(); retrQ.notifyAll(); } } /** * Returns the state of the stream * @return true if closed */ public synchronized boolean isClosed() { return closed; } /** * {@inheritDoc} */ public void write(int c) throws IOException { byte[] a = new byte[1]; a[0] = (byte) (c & 0xFF); write(a, 0, 1); } /** * {@inheritDoc} */ public void write(byte[] b, int off, int len) throws IOException { if (closed) { throw new IOException("stream is closed"); } if (closing) { throw new IOException("stream is being closed"); } if (b == null) { throw new IllegalArgumentException("buffer is null"); } if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) { throw new IndexOutOfBoundsException(); } if (len == 0) { return; } // Copy the data since it will be queued, and caller may // overwrite the same byte[] buffer. byte[] data = new byte[len];
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -