⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 reliableoutputstream.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
                            LOG.fine("SACKD SEQN = " + retrQElt.seqnum);                        }                                                // GC this stuff                        retrQElt = null;                                            } else {                        // Retransmit? Only if there is a hole in the selected                        // acknowledgement list. Otherwise let RTO deal.                                                // Given that this SACK acknowledged messages still                        // in the retrQ:                        // seqnum is the max consectively SACKD message.                        // seqnum < retrQElt.seqnum means a message has not reached                        // receiver. EG: sacklist == 10,11,13 seqnum == 11                        // We retransmit 12.                        if (seqnum < retrQElt.seqnum) {                            fc.packetMissing(retrQElt.seqnum);                            retrans++;                            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                                LOG.fine("RETR: Fill hole, SACK, seqn#" + retrQElt.seqnum + ", Window =" + retrans);                            }                        }                    }                }                                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("SELECTIVE ACKD (" + numberACKed + ") " + retrans + " retrans wanted");                }            }                        // Compute aveRTT on the most representative message,            // if any. That's the most accurate data.            // Failing that we use the fall back, provided that it not            // more recent than aveRTT ago - that would decrease aveRTT            // and in the absence of solid data, we do not want to take            // that risk.            if (rttCalcSeqnum != -1) {                calcRTT(rttCalcDt, rttCalcSeqnum);                // get fc to recompute rwindow                rwindow = fc.ackEventEnd(rmaxQSize, aveRTT, rttCalcDt);            } else if ((fallBackSeqnum != -1) && (fallBackDt > aveRTT)) {                calcRTT(fallBackDt, fallBackSeqnum);                // get fc to recompute rwindow                rwindow = fc.ackEventEnd(rmaxQSize, aveRTT, fallBackDt);            }            retrQ.notifyAll();        }    }        /**     * retransmit unacknowledged  messages     *     * @param rwin        max number of messages to retransmit     * @param triggerTime base time     * @return number of messages retransmitted.     */    private int retransmit(int rwin, long triggerTime) {                List<RetrQElt> retransMsgs = new ArrayList<RetrQElt>();                int numberToRetrans;                // build a list of retries.        synchronized (retrQ) {            numberToRetrans = Math.min(retrQ.size(), rwin);            if (numberToRetrans > 0 && Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("Number of messages pending retransmit =" + numberToRetrans);            }            for (int j = 0; j < numberToRetrans; j++) {                RetrQElt r = retrQ.get(j);                // Mark message as retransmission                // need to know if a msg was retr or not for RTT eval                if (r.marked == 0) {                    // First time: we're here because this message has not arrived, but                    // the next one has. It may be an out of order message.                    // Experience shows that such a message rarely arrives older than                    // 1.2 * aveRTT. Beyond that, it's lost. It is also rare that we                    // detect a hole within that delay. So, often enough, as soon as                    // a hole is detected, it's time to resend...but not always.                    if (TimeUtils.toRelativeTimeMillis(triggerTime, r.sentAt) < (6 * aveRTT) / 5) {                        // Nothing to worry about, yet.                        continue;                    }                } else {                    // That one has been retransmitted at least once already.                    // So, we don't have much of a clue other than the age of the                    // last transmission. It is unlikely that it arrives before aveRTT/2                    // but we have to anticipate its loss at the risk of making dupes.                    // Otherwise the receiver will reach the hole, and that's really                    // expensive. (Think that we've been trying for a while already.)                                        if (TimeUtils.toRelativeTimeMillis(triggerTime, r.sentAt) < aveRTT) {                        // Nothing to worry about, yet.                        continue;                    }                }                r.marked++;                // Make a copy to for sending                retransMsgs.add(r);            }        }                // send the retries.        int retransmitted = 0;        Iterator<RetrQElt> eachRetrans = retransMsgs.iterator();        while (eachRetrans.hasNext()) {            RetrQElt r = eachRetrans.next();            eachRetrans.remove();            try {                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("RETRANSMIT seqn#" + r.seqnum);                }                Message sending = r.msg;                // its possible that the message was                // acked while we were working in this                // case r.msg will have been nulled.                if (null != sending) {                    sending = sending.clone();                    sending.replaceMessageElement(Defs.NAMESPACE, RETELT);                    if (outgoing.send(sending)) {                        r.sentAt = TimeUtils.timeNow();                        mrrIQFreeSpace--;                        // assume we have now taken a slot                        retransmitted++;                    } else {                        break;                        // don't bother continuing sending now.                    }                }            } catch (IOException e) {                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.log(Level.FINE, "FAILED RETRANS seqn#" + r.seqnum, e);                }                break;                // don't bother continuing.            }        }                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("RETRANSMITED " + retransmitted + " of " + numberToRetrans);        }                return retransmitted;    }        /**     * Retransmission daemon thread     */    private class Retransmitter implements Runnable {                int nAtThisRTO = 0;        volatile int nretransmitted = 0;                /**         * Constructor for the Retransmitter object         */        public Retransmitter() {            if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {                LOG.info("STARTED Reliable Retransmitter, RTO = " + RTO);            }        }                /**         * Gets the retransCount attribute of the Retransmitter object         *         * @return The retransCount value         */        public int getRetransCount() {            return nretransmitted;        }                /**         *  {@inheritDoc}         *         *  <p/>Main processing method for the Retransmitter object         */        public void run() {            try {                int idleCounter = 0;                                while (TimeUtils.toRelativeTimeMillis(closedAt) > 0) {                    long conn_idle = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), outgoing.getLastAccessed());                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.fine(outgoing + " idle for " + conn_idle);                    }                                        // check to see if we have not idled out.                    if (outgoing.getIdleTimeout() < conn_idle) {                        if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {                            LOG.info("Shutting down idle " + "connection " + outgoing);                        }                                                break;                    }                                        long sinceLastACK;                    long oldestInQueueWait;                                        synchronized (retrQ) {                        try {                            if (RTO > 0) {                                retrQ.wait(RTO);                            }                            Thread.currentThread().setName(                                    "JXTA Reliable Retransmiter for " + this + " Queue size : " + retrQ.size());                        } catch (InterruptedException e) {// ignored                        }                                                if (TimeUtils.toRelativeTimeMillis(closedAt) <= 0) {                            break;                        }                                                // see if we recently did a retransmit triggered by a SACK                        long sinceLastSACKRetr = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), sackRetransTime);                        if (sinceLastSACKRetr < RTO) {                            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                                LOG.fine("SACK retrans " + sinceLastSACKRetr + "ms ago");                            }                            continue;                        }                        // See how long we've waited since RTO was set                        sinceLastACK = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), lastACKTime);                                                if (!retrQ.isEmpty()) {                            RetrQElt elt = retrQ.get(0);                            oldestInQueueWait = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), elt.enqueuedAt);                        } else {                            oldestInQueueWait = 0;                        }                    }                                        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.fine("Last ACK " + sinceLastACK + "ms ago. Age of oldest in Queue " + oldestInQueueWait + "ms.");                    }                                        // see if the queue has gone dead                    if (oldestInQueueWait > outgoing.getMaxRetryAge()) {                        if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {                            LOG.info("Shutting down stale connection " + outgoing);                        }                                                break;                    }                                        // get real wait as max of age of oldest in retrQ and                    // lastAck time                    long realWait = Math.max(oldestInQueueWait, sinceLastACK);                    // Retransmit only if RTO has expired.                    // a. real wait time is longer than RTO                    // b. oldest message on Q has been there longer                    // than RTO. This is necessary because we may                    // have just sent a message, and we do not                    // want to overrun the receiver. Also, we                    // do not want to restransmit a message that                    // has not been idle for the RTO.                    if ((realWait >= RTO) && (oldestInQueueWait >= RTO)) {                        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                            LOG.fine("RTO RETRANSMISSION [" + rwindow + "]");                        }                        // retransmit                        int retransed = retransmit(rwindow, TimeUtils.timeNow());                        // Total                        nretransmitted += retransed;                        // number at this RTO                        nAtThisRTO += retransed;                        // See if real wait is too long and queue is non-empty                        // Remote may be dead - double until max.                        // Double after window restransmitted msgs at this RTO                        // exceeds the rwindow, and we've had no response for                        // twice the current RTO.                        if ((retransed > 0) && (realWait >= 2 * RTO) && (nAtThisRTO >= 2 * rwindow)) {                            RTO = (realWait > maxRTO ? maxRTO : 2 * RTO);                            nAtThisRTO = 0;                        }                        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                            LOG.fine(                                    "RETRANSMISSION " + retransed + " retrans " + nAtThisRTO + " at this RTO (" + RTO + ") "                                    + nretransmitted + " total retrans");                        }                    } else {                        idleCounter += 1;                        // reset RTO to min if we are idle                        if (idleCounter == 2) {                            RTO = minRTO;                            idleCounter = 0;                            nAtThisRTO = 0;                        }                        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                            LOG.fine("IDLE : RTO=" + RTO + " WAIT=" + realWait);                        }                    }                }            } catch (Throwable all) {                LOG.log(Level.SEVERE, "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all);            } finally {                hardClose();                                retrThread = null;                                if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {                    LOG.info("STOPPED Retransmit thread");                }            }        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -