⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 reliableoutputstream.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
                        long enqueuetime = r.enqueuedAt;                        long dt = TimeUtils.toRelativeTimeMillis(lastACKTime, enqueuetime);                        // Update RTT, RTO                        if (r.marked == 0) {                            if (dt > rttCalcDt) {                                rttCalcDt = dt;                                rttCalcSeqnum = r.seqnum;                            }                        } else {                            // In case we find no good candidate, make                            // a guess by dividing by the number of attempts                            // and keep the worst of them too. Since we                            // know it may be too short, we will not use it                            // if shortens rtt.                            dt /= (r.marked + 1);                            if (dt > fallBackDt) {                                fallBackDt = dt;                                fallBackSeqnum = r.seqnum;                            }                        }                        if (LOG.isEnabledFor(Level.DEBUG)) {                            LOG.debug("SACKD SEQN = " + r.seqnum);                        }                        // GC this stuff                        r.msg.clear();                        r.msg = null;                        r = null;                    } else {                        // Retransmit? Only if there is a hole in the selected                        // acknowledgement list. Otherwise let RTO deal.                        //    Given that this SACK acknowledged messages still                        //    in the retrQ:                        //      seqnum is the max consectively SACKD message.                        //      seqnum < r.seqnum means a message has not reached                        //      receiver. EG: sacklist == 10,11,13 seqnum == 11                        //                  We retransmit 12.                        if (seqnum < r.seqnum) {                            fc.packetMissing(r.seqnum);                            retrans++;                            if (LOG.isEnabledFor(Level.DEBUG)) {                                LOG.debug("RETR: Fill hole, SACK, seqn#" +                                        r.seqnum +                                        ", Window =" + retrans);                            }                        }                    }                }                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("SELECTIVE ACKD (" + numberACKed + ") " +                            retrans + " retrans wanted");                }            }            // Compute aveRTT on the most representative message,            // if any. That's the most accurate data.            // Failing that we use the fall back, provided that it not            // more recent than aveRTT ago - that would decrease aveRTT            // and in the absence of solid data, we do not want to take            // that risk.            if (rttCalcSeqnum != -1) {                calcRTT(rttCalcDt, rttCalcSeqnum);                // get fc to recompute rwindow                rwindow = fc.ackEventEnd(rmaxQSize, aveRTT, rttCalcDt);            } else if ((fallBackSeqnum != -1) && (fallBackDt > aveRTT)) {                calcRTT(fallBackDt, fallBackSeqnum);                // get fc to recompute rwindow                rwindow = fc.ackEventEnd(rmaxQSize, aveRTT, fallBackDt);            }            retrQ.notifyAll();        }    }    /**     * retransmit unacknowledged  messages     *     * @param  rwin         max number of messages to retransmit     * @param  triggerTime  base time     * @return              number of messages retransmitted.     */    private int retransmit(int rwin, long triggerTime) {        List retransMsgs = new ArrayList();        int numberToRetrans;        // build a list of retries.        synchronized (retrQ) {            numberToRetrans = Math.min(retrQ.size(), rwin);            if (numberToRetrans > 0 && LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Number of messages pending retransmit =" + numberToRetrans);            }            for (int j = 0; j < numberToRetrans; j++) {                RetrQElt r = (RetrQElt) retrQ.get(j);                // Mark message as retransmission                // need to know if a msg was retr or not for RTT eval                if (r.marked == 0) {                    // First time: we're here because this message has not arrived, but                    // the next one has. It may be an out of order message.                    // Experience shows that such a message rarely arrives older than                    // 1.2 * aveRTT. Beyond that, it's lost. It is also rare that we                    // detect a hole within that delay. So, often enough, as soon as                    // a hole is detected, it's time to resend...but not always.                    if (TimeUtils.toRelativeTimeMillis(triggerTime, r.sentAt)                             < (6 * aveRTT) / 5) {                        // Nothing to worry about, yet.                        continue;                    }                } else {                    // That one has been retransmitted at least once already.                    // So, we don't have much of a clue other than the age of the                    // last transmission. It is unlikely that it arrives before aveRTT/2                    // but we have to anticipate its loss at the risk of making dupes.                    // Otherwise the receiver will reach the hole, and that's really                    // expensive. (Think that we've been trying for a while already.)                    if (TimeUtils.toRelativeTimeMillis(triggerTime, r.sentAt)                             < aveRTT) {                        // Nothing to worry about, yet.                        continue;                    }                }                r.marked++;                // Make a copy to for sending                retransMsgs.add(r);            }        }        // send the retries.        int retransmitted = 0;        Iterator eachRetrans = retransMsgs.iterator();        while (eachRetrans.hasNext()) {            RetrQElt r = (RetrQElt) eachRetrans.next();            eachRetrans.remove();            try {                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("RETRANSMIT seqn#" + r.seqnum);                }                Message sending = (Message) r.msg;                // its possible that the message was                // acked while we were working in this                // case r.msg will have been nulled.                if (null != sending) {                    sending = (Message) sending.clone();                    sending.replaceMessageElement(Defs.NAMESPACE, RETELT);                    if (outgoing.send(sending)) {                        r.sentAt = TimeUtils.timeNow();                        mrrIQFreeSpace--;                        // assume we have now taken a slot                        retransmitted++;                    } else {                        break;                        // don't bother continuing.                    }                }            } catch (IOException e) {                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("FAILED RETRANS seqn#" + r.seqnum, e);                }                break;                // don't bother continuing.            }        }        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("RETRANSMITED " + retransmitted +                    " of " + numberToRetrans);        }        return retransmitted;    }    /**     * Retransmission daemon thread     */    private class Retransmitter implements Runnable {        Thread th;        int nAtThisRTO = 0;        volatile int nretransmitted = 0;        /**         * Constructor for the Retransmitter object         */        public Retransmitter() {            this.th = new Thread(this, "JXTA Reliable Retransmiter for " + outgoing);            th.setDaemon(true);            th.start();            if (LOG.isEnabledFor(Level.INFO)) {                LOG.info("RETRANS : STARTED Reliable Retransmit thread, " + "RTO = " + RTO);            }        }        /**         *  Gets the retransCount attribute of the Retransmitter object         *         * @return    The retransCount value         */        public int getRetransCount() {            return nretransmitted;        }        /**         *  Main processing method for the Retransmitter object         */        public void run() {            try {                int idleCounter = 0;                while (!closed) {                    long conn_idle =                            TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(),                            outgoing.getLastAccessed());                    if (LOG.isEnabledFor(Level.DEBUG)) {                        LOG.debug("RETRANS : " + outgoing + " idle for " + conn_idle);                    }                    // check to see if we have not idled out.                    if (outgoing.getIdleTimeout() < conn_idle) {                        if (LOG.isEnabledFor(Level.INFO)) {                            LOG.info("RETRANS : Shutting down idle " +                                    "connection " + outgoing);                        }                        try {                            // in this we close ourself                            outgoing.close();                            setClosing();                            // Leave. Otherwise we'll be spinning forever.                            return;                        } catch (IOException ignored) {}                        continue;                    }                    synchronized (retrQ) {                        try {                            retrQ.wait(RTO);                            th.setName("JXTA Reliable Retransmiter for " +                                outgoing +" Queue size : "+retrQ.size());                        } catch (InterruptedException e) {}                    }                    if (closed) {                        break;                    }                    // see if we recently did a retransmit triggered by a SACK                    long sinceLastSACKRetr =                            TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(),                            sackRetransTime);                    if (sinceLastSACKRetr < RTO) {                        if (LOG.isEnabledFor(Level.DEBUG)) {                            LOG.debug("RETRANS : SACK retrans " +                                    sinceLastSACKRetr + "ms ago");                        }                        continue;                    }                    // See how long we've waited since RTO was set                    long sinceLastACK =                            TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(),                            lastACKTime);                    long oldestInQueueWait;                    synchronized (retrQ) {                        if (retrQ.size() > 0) {                            RetrQElt elt = (RetrQElt) retrQ.get(0);                            oldestInQueueWait =                                    TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(),                                    elt.enqueuedAt);                        } else {                            oldestInQueueWait = 0;                        }                    }                    if (LOG.isEnabledFor(Level.DEBUG)) {                        LOG.debug("RETRANS : Last ACK " + sinceLastACK +                                "ms ago. Age of oldest in Queue " +                                oldestInQueueWait + "ms");                    }                    // see if the queue has gone dead                    if (oldestInQueueWait > (outgoing.getMaxRetryAge() * 2)) {                        if (LOG.isEnabledFor(Level.INFO)) {                            LOG.info("RETRANS : Shutting down stale " +                                    "connection " + outgoing);                        }                        try {                            // in this we close ourself                            outgoing.close();                            setClosing();                            // Leave. Otherwise we'll be spinning forever.                            return;                        } catch (IOException ignored) {}                        continue;                    }                    // get real wait as max of age of oldest in retrQ and                    // lastAck time                    long realWait = Math.max(oldestInQueueWait, sinceLastACK);                    // Retransmit only if RTO has expired.                    //   a. real wait time is longer than RTO                    //   b. oldest message on Q has been there longer                    //      than RTO. This is necessary because we may                    //      have just sent a message, and we do not                    //      want to overrun the receiver. Also, we                    //      do not want to restransmit a message that                    //      has not been idle for the RTO.                    if ((realWait >= RTO) && (oldestInQueueWait >= RTO)) {                        if (LOG.isEnabledFor(Level.DEBUG)) {                            LOG.debug("RETRANS : RTO RETRANSMISSION [" +                                    rwindow + "]");                        }                        // retransmit                        int retransed = retransmit(rwindow, TimeUtils.timeNow());                        // Total                        nretransmitted += retransed;                        // number at this RTO                        nAtThisRTO += retransed;                        // See if real wait is too long and queue is non-empty                        //   Remote may be dead - double until max.                        //   Double after window restransmitted msgs at this RTO                        //   exceeds the rwindow, and we've had no response for                        //   twice the current RTO.                        if ((retransed > 0) &&                                (realWait >= 2 * RTO) &&                                (nAtThisRTO >= 2 * rwindow)) {                            RTO = (realWait > maxRTO ? maxRTO : 2 * RTO);                            nAtThisRTO = 0;                        }                        if (LOG.isEnabledFor(Level.DEBUG)) {                            LOG.debug("RETRANS : RETRANSMISSION "                                     + retransed + " retrans "                                     + nAtThisRTO + " at this RTO (" +                                       RTO + ") "                                     + nretransmitted + " total retrans");                        }                    } else {                        idleCounter += 1;                        // reset RTO to min if we are idle                        if (idleCounter == 2) {                            RTO = minRTO;                            idleCounter = 0;                            nAtThisRTO = 0;                        }                        if (LOG.isEnabledFor(Level.DEBUG)) {                            LOG.debug("RETRANS : IDLE : RTO=" + RTO +                                    " WAIT=" + realWait);                        }                    }                }                if (LOG.isEnabledFor(Level.INFO)) {                    LOG.info("Retransmit thread closing");                }            } catch (Throwable all) {                LOG.fatal("Uncaught Throwable in thread :" +                        Thread.currentThread().getName(), all);            }            if (LOG.isEnabledFor(Level.INFO)) {                LOG.info("STOPPED Retransmit thread");            }            retrThread = null;            th = null;        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -