📄 reliableoutputstream.java
字号:
/* * Copyright (c) 2003-2007 Sun Microsystems, Inc. All rights reserved. * * The Sun Project JXTA(TM) Software License * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * 3. The end-user documentation included with the redistribution, if any, must * include the following acknowledgment: "This product includes software * developed by Sun Microsystems, Inc. for JXTA(TM) technology." * Alternately, this acknowledgment may appear in the software itself, if * and wherever such third-party acknowledgments normally appear. * * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must * not be used to endorse or promote products derived from this software * without prior written permission. For written permission, please contact * Project JXTA at http://www.jxta.org. * * 5. Products derived from this software may not be called "JXTA", nor may * "JXTA" appear in their name, without prior written permission of Sun. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN * MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * JXTA is a registered trademark of Sun Microsystems, Inc. in the United * States and other countries. * * Please see the license information page at : * <http://www.jxta.org/project/www/license.html> for instructions on use of * the license in source files. * * ==================================================================== * * This software consists of voluntary contributions made by many individuals * on behalf of Project JXTA. For more information on Project JXTA, please see * http://www.jxta.org. * * This license is based on the BSD license adopted by the Apache Foundation. */package net.jxta.impl.util.pipe.reliable;import java.io.ByteArrayOutputStream;import net.jxta.endpoint.ByteArrayMessageElement;import net.jxta.endpoint.Message;import net.jxta.endpoint.MessageElement;import net.jxta.endpoint.StringMessageElement;import net.jxta.endpoint.WireFormatMessage;import net.jxta.endpoint.WireFormatMessageFactory;import net.jxta.impl.util.TimeUtils;import net.jxta.logging.Logging;import java.io.DataInputStream;import java.io.IOException;import java.io.OutputStream;import java.util.ArrayList;import java.util.Arrays;import java.util.Iterator;import java.util.List;import java.util.concurrent.atomic.AtomicInteger;import java.util.logging.Level;import java.util.logging.Logger;/** * Accepts data and packages it into messages for sending to the remote. The * messages are kept in a retry queue until the remote peer acknowledges * receipt of the message. */public class ReliableOutputStream extends OutputStream implements Incoming { /** * Logger */ private final static Logger LOG = Logger.getLogger(ReliableOutputStream.class.getName()); /** * Initial estimated Round Trip Time */ private final static long initRTT = 10 * TimeUtils.ASECOND; /** * The default size for the blocks we will chunk the stream into. */ private final static int DEFAULT_MESSAGE_CHUNK_SIZE = 63 * 1024; private final static MessageElement RETELT = new StringMessageElement(Defs.RETRY_ELEMENT_NAME, Defs.RETRY_ELEMENT_VALUE, null); /** * A lock we use to ensure that write operations happen in order. */ private final Object writeLock = new String("writeLock"); /** * The buffer we cache writes to. */ private byte[] writeBuffer = null; /** * Number of bytes written to the write buffer. */ private int writeCount = 0; /** * Set the default write buffer size. */ private int writeBufferSize = DEFAULT_MESSAGE_CHUNK_SIZE; /** * Absolute time in milliseconds at which the write buffer began * accumulating bytes to be written. */ private long writeBufferAge = Long.MAX_VALUE; /** * If less than {@code TimeUtils.timenow()} then we are closed otherwise * this is the absolute time at which we will become closed. We begin by * setting this value as {@Long.MAX_VALUE} until we establish an earlier * close deadline. */ private long closedAt = Long.MAX_VALUE; /** * If {@code true} then we have received a close request from the remote * side. They do not want to receive any more messages from us. */ private volatile boolean remoteClosed = false; /** * If {@code true} then we have closed this stream locally and will not * accept any further messages for sending. Unacknowledged messages will * be retransmitted until the linger delay is passed. */ private volatile boolean localClosed = false; /** * The relative time in milliseconds that we will allow our connection to * linger. */ private long lingerDelay = 120 * TimeUtils.ASECOND; /** * Sequence number of the message we most recently sent out. */ private AtomicInteger sequenceNumber = new AtomicInteger(0); /** * Sequence number of highest sequential ACK. */ private volatile int maxACK = 0; /** * connection we are working for */ private final Outgoing outgoing; /** * The daemon thread that performs retransmissions. */ private Thread retrThread = null; // for retransmission /** * Average round trip time in milliseconds. */ private volatile long aveRTT = initRTT; private volatile long remRTT = 0; /** * Has aveRTT been set at least once over its initial guesstimate value. */ private boolean aveRTTreset = false; /** * Number of ACK message received. */ private AtomicInteger numACKS = new AtomicInteger(0); /** * When to start computing aveRTT */ private int rttThreshold = 0; /** * Retry Time Out measured in milliseconds. */ private volatile long RTO = 0; /** * Minimum Retry Timeout measured in milliseconds. */ private volatile long minRTO = initRTT * 5; /** * Maximum Retry Timeout measured in milliseconds. */ private volatile long maxRTO = initRTT * 60; /** * absolute time in milliseconds of last sequential ACK. */ private volatile long lastACKTime = 0; /** * absolute time in milliseconds of last SACK based retransmit. */ private volatile long sackRetransTime = 0; // running average of receipients Input Queue private int nIQTests = 0; private int aveIQSize = 0; /** * Our estimation of the current free space in the remote input queue. */ private volatile int mrrIQFreeSpace = 0; /** * Our estimation of the maximum size of the remote input queue. */ private int rmaxQSize = Defs.MAXQUEUESIZE; /** * The flow control module. */ private final FlowControl fc; /** * Cache of the last rwindow recommendation by fc. */ private volatile int rwindow = 0; /** * retrans queue element */ private static class RetrQElt { /** * sequence number of this message. */ final int seqnum; /** * the message */ final Message msg; /** * absolute time of original enqueuing */ final long enqueuedAt; /** * has been marked as retransmission */ int marked; /** * absolute time when this msg was last transmitted */ long sentAt; /** * Constructor for the RetrQElt object * * @param seqnum sequence number * @param msg the message */ public RetrQElt(int seqnum, Message msg) { this.seqnum = seqnum; this.msg = msg; this.enqueuedAt = TimeUtils.timeNow(); this.sentAt = this.enqueuedAt; this.marked = 0; } } /** * The collection of messages available for re-transmission. */ protected final List<RetrQElt> retrQ = new ArrayList<RetrQElt>(); /** * Constructor for the ReliableOutputStream object * * @param outgoing the outgoing object */ public ReliableOutputStream(Outgoing outgoing) { // By default use the old behaviour: fixed fc with a rwin of 20 this(outgoing, new FixedFlowControl(20)); } /** * Constructor for the ReliableOutputStream object * * @param outgoing the outgoing object * @param fc flow-control */ public ReliableOutputStream(Outgoing outgoing, FlowControl fc) { this.outgoing = outgoing; // initial RTO is set to maxRTO so as to give time // to the receiver to catch-up
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -