⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 reliableoutputstream.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
        synchronized (retrQ) {            return retrQ.isEmpty();        }    }        /**     * Waits for the retransmit queue to become empty.     *     * @param timeout The relative time in milliseconds to wait for the queue to     *                become empty.     * @return {@code true} if the queue is empty otherwise {@code false}.     * @throws InterruptedException if interrupted     */    public boolean waitQueueEmpty(long timeout) throws InterruptedException {        long timeoutAt = TimeUtils.toAbsoluteTimeMillis(timeout);                synchronized (retrQ) {            while (!retrQ.isEmpty() && (TimeUtils.timeNow() < timeoutAt)) {                long sleepTime = TimeUtils.toRelativeTimeMillis(timeoutAt);                                if (sleepTime > 0) {                    retrQ.wait(sleepTime);                }            }                        return retrQ.isEmpty();        }    }        /**     * wait for activity on the retry queue     *     * @param timeout timeout in millis     * @throws InterruptedException when interrupted     */    public void waitQueueEvent(long timeout) throws InterruptedException {        synchronized (retrQ) {            retrQ.wait(timeout);        }    }        /**     * Calculates a message retransmission time-out     *     * @param dt        base time     * @param msgSeqNum Message sequence number     */    private void calcRTT(long dt, int msgSeqNum) {                if (numACKS.incrementAndGet() == 1) {            // First ACK arrived. We can start computing aveRTT on the messages            // we send from now on.            rttThreshold = sequenceNumber.get() + 1;        }                if (msgSeqNum > rttThreshold) {            // Compute only when it has stabilized a bit            // Since the initial mrrIQFreeSpace is small; the first few            // messages will be sent early on and may wait a long time            // for the return channel to initialize. After that things            // start flowing and RTT becomes relevant.            // Carefull with the computation: integer division with round-down            // causes cumulative damage: the ave never goes up if this is not            // taken care of. We keep the reminder from one round to the other.                        if (!aveRTTreset) {                aveRTT = dt;                aveRTTreset = true;            } else {                long tmp = (8 * aveRTT) + ((8 * remRTT) / 9) + dt;                aveRTT = tmp / 9;                remRTT = tmp - aveRTT * 9;            }        }                // Set retransmission time out: 2.5 x RTT        // RTO = (aveRTT << 1) + (aveRTT >> 1);        RTO = aveRTT * 2;                // Enforce a min/max        RTO = Math.max(RTO, minRTO);        RTO = Math.min(RTO, maxRTO);                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("RTT = " + dt + "ms aveRTT = " + aveRTT + "ms" + " RTO = " + RTO + "ms");        }    }        /**     * @param iq Description of the Parameter     * @return Description of the Return Value     */    private int calcAVEIQ(int iq) {        int n = nIQTests;        nIQTests += 1;        aveIQSize = ((n * aveIQSize) + iq) / nIQTests;        return aveIQSize;    }        /**     * process an incoming message     *     * @param msg message to process     */    public void recv(Message msg) {                Iterator<MessageElement> eachACK = msg.getMessageElements(Defs.NAMESPACE, Defs.MIME_TYPE_ACK);                while (eachACK.hasNext()) {            MessageElement elt = eachACK.next();            eachACK.remove();            int sackCount = ((int) elt.getByteLength() / 4) - 1;                        try {                DataInputStream dis = new DataInputStream(elt.getStream());                int seqack = dis.readInt();                int[] sacs = new int[sackCount];                for (int eachSac = 0; eachSac < sackCount; eachSac++) {                    sacs[eachSac] = dis.readInt();                }                Arrays.sort(sacs);                // take care of the ACK here;                ackReceived(seqack, sacs);            } catch (IOException failed) {                if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                    LOG.log(Level.WARNING, "Failure processing ACK", failed);                }            }        }    }        /**     * Process an ACK Message. We remove ACKed     * messages from the retry queue.  We only     * acknowledge messages received in sequence.     * <p/>     * The seqnum is for the largest unacknowledged seqnum     * the recipient has received.     * <p/>     * The sackList is a sequence of all of the     * received messages in the sender's input Q. All     * will be sequence numbers higher than the     * sequential ACK seqnum.     * <p/>     * Recipients are passive and only ack upon the     * receipt of an in sequence message.     * <p/>     * They depend on our RTO to fill holes in message     * sequences.     *     * @param seqnum   message sequence number     * @param sackList array of message sequence numbers     */    public void ackReceived(int seqnum, int[] sackList) {                int numberACKed = 0;        long rttCalcDt = 0;        int rttCalcSeqnum = -1;        long fallBackDt = 0;        int fallBackSeqnum = -1;                // remove acknowledged messages from retrans Q.        synchronized (retrQ) {            lastACKTime = TimeUtils.timeNow();            fc.ackEventBegin();            maxACK = Math.max(maxACK, seqnum);                        // dump the current Retry queue and the SACK list            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                StringBuilder dumpRETRQ = new StringBuilder("ACK RECEIVE : " + Integer.toString(seqnum));                                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    dumpRETRQ.append('\n');                }                dumpRETRQ.append("\tRETRQ (size=").append(retrQ.size()).append(")");                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    dumpRETRQ.append(" : ");                    for (int y = 0; y < retrQ.size(); y++) {                        if (0 != y) {                            dumpRETRQ.append(", ");                        }                        RetrQElt r = retrQ.get(y);                                                dumpRETRQ.append(r.seqnum);                    }                }                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    dumpRETRQ.append('\n');                }                dumpRETRQ.append("\tSACKLIST (size=").append(sackList.length).append(")");                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    dumpRETRQ.append(" : ");                    for (int y = 0; y < sackList.length; y++) {                        if (0 != y) {                            dumpRETRQ.append(", ");                        }                        dumpRETRQ.append(sackList[y]);                    }                }                LOG.fine(dumpRETRQ.toString());            }                        Iterator<RetrQElt> eachRetryQueueEntry = retrQ.iterator();            // First remove monotonically increasing seq#s in retrans vector            while (eachRetryQueueEntry.hasNext()) {                RetrQElt retrQElt = eachRetryQueueEntry.next();                if (retrQElt.seqnum > seqnum) {                    break;                }                // Acknowledged                eachRetryQueueEntry.remove();                                // Update RTT, RTO. Use only those that where acked                // w/o retrans otherwise the number may be phony (ack                // of first xmit received just after resending => RTT                // seems small).  Also, we keep the worst of the bunch                // we encounter.  If we really can't find a single                // non-resent message, we make do with a pessimistic                // approximation: we must not be left behind with an                // RTT that's too short, we'd keep resending like                // crazy.                long enqueuetime = retrQElt.enqueuedAt;                long dt = TimeUtils.toRelativeTimeMillis(lastACKTime, enqueuetime);                // Update RTT, RTO                if (retrQElt.marked == 0) {                    if (dt > rttCalcDt) {                        rttCalcDt = dt;                        rttCalcSeqnum = retrQElt.seqnum;                    }                } else {                    // In case we find no good candidate, make                    // a guess by dividing by the number of attempts                    // and keep the worst of them too. Since we                    // know it may be too short, we will not use it                    // if shortens rtt.                    dt /= (retrQElt.marked + 1);                    if (dt > fallBackDt) {                        fallBackDt = dt;                        fallBackSeqnum = retrQElt.seqnum;                    }                }                fc.packetACKed(retrQElt.seqnum);                retrQElt = null;                numberACKed++;            }            // Update last accessed time in response to getting seq acks.            if (numberACKed > 0) {                outgoing.setLastAccessed(TimeUtils.timeNow());            }            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("SEQUENTIALLY ACKD SEQN = " + seqnum + ", (" + numberACKed + " acked)");            }            // most recent remote IQ free space            mrrIQFreeSpace = rmaxQSize - sackList.length;            // let's look at average sacs.size(). If it is big, then this            // probably means we must back off because the system is slow.            // Our retrans Queue can be large and we can overwhelm the            // receiver with retransmissions.            // We will keep the rwin <= ave real input queue size.            int aveIQ = calcAVEIQ(sackList.length);            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("remote IQ free space = " + mrrIQFreeSpace + " remote avg IQ occupancy = " + aveIQ);            }                        int retrans = 0;            if (sackList.length > 0) {                Iterator<RetrQElt> eachRetrQElement = retrQ.iterator();                int currentSACK = 0;                while (eachRetrQElement.hasNext()) {                    RetrQElt retrQElt = eachRetrQElement.next();                    while (sackList[currentSACK] < retrQElt.seqnum) {                        currentSACK++;                        if (currentSACK == sackList.length) {                            break;                        }                    }                    if (currentSACK == sackList.length) {                        break;                    }                    if (sackList[currentSACK] == retrQElt.seqnum) {                        fc.packetACKed(retrQElt.seqnum);                        numberACKed++;                        eachRetrQElement.remove();                                                // Update RTT, RTO. Use only those that where acked w/o retrans                        // otherwise the number is completely phony.                        // Also, we keep the worst of the bunch we encounter.                        long enqueuetime = retrQElt.enqueuedAt;                        long dt = TimeUtils.toRelativeTimeMillis(lastACKTime, enqueuetime);                        // Update RTT, RTO                        if (retrQElt.marked == 0) {                            if (dt > rttCalcDt) {                                rttCalcDt = dt;                                rttCalcSeqnum = retrQElt.seqnum;                            }                        } else {                            // In case we find no good candidate, make                            // a guess by dividing by the number of attempts                            // and keep the worst of them too. Since we                            // know it may be too short, we will not use it                            // if shortens rtt.                            dt /= (retrQElt.marked + 1);                            if (dt > fallBackDt) {                                fallBackDt = dt;                                fallBackSeqnum = retrQElt.seqnum;                            }                        }                        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -