⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jtlsoutputstream.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
                        // Retransmit? Only if there is a hole in the selected                        // acknowledgement list. Otherwise let RTO deal.                        // Given that this SACK acknowledged messages still                        // in the retrQ:                        // seqnum is the max consectively SACKD message.                        // seqnum < r.seqnum means a message has not reached                        // receiver. EG: sacklist == 10,11,13 seqnum == 11                        // We retransmit 12.                        if (seqnum < r.seqnum) {                            retrans++;                            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                                LOG.fine("RETR: Fill hole, SACK, seqn#" + r.seqnum + ", Window =" + retrans);                            }                        }                    }                }                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("TLS!! SELECTIVE ACKD (" + numberACKed + ") " + retrans + " retrans wanted");                }                // retransmit 1 retq mem. only                if (retrans > 0) {                    retransmit(Math.min(RWINDOW, retrans), lastACKTime);                    sackRetransTime = TimeUtils.timeNow();                }            }            retrQ.notify();        }    }    /**     * retransmit unacknowledged  messages     *     *  @param rwin max number of messages to retransmit     *  @return number of messages retransmitted.     **/    private int retransmit(int rwin, long triggerTime) {        List retransMsgs = new ArrayList();        int numberToRetrans;        // build a list of retries.        synchronized (retrQ) {            numberToRetrans = Math.min(retrQ.size(), rwin);            if (numberToRetrans > 0 && Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("RETRANSMITING [rwindow = " + numberToRetrans + "]");            }            for (int j = 0; j < numberToRetrans; j++) {                RetrQElt r = retrQ.get(j);                // Mark message as retransmission                // need to know if a msg was retr or not for RTT eval                if (r.marked == 0) {                    // First time: we're here because this message has not arrived, but                    // the next one has. It may be an out of order message.                    // Experience shows that such a message rarely arrives older than                    // 1.2 * aveRTT. Beyond that, it's lost. It is also rare that we                    // detect a hole within that delay. So, often enough, as soon as                    // a hole is detected, it's time to resend...but not always.                    if (TimeUtils.toRelativeTimeMillis(triggerTime, r.sentAt) < (6 * aveRTT) / 5) {                        // Nothing to worry about, yet.                        continue;                    }                } else {                    // That one has been retransmitted at least once already.                    // So, we don't have much of a clue other than the age of the                    // last transmission. It is unlikely that it arrives before aveRTT/2                    // but we have to anticipate its loss at the risk of making dupes.                    // Otherwise the receiver will reach the hole, and that's really                    // expensive. (Think that we've been trying for a while already.)                    if (TimeUtils.toRelativeTimeMillis(triggerTime, r.sentAt) < aveRTT) {                        // Nothing to worry about, yet.                        continue;                    }                }                r.marked++;                // Make a copy to for sending                retransMsgs.add(r);            }        }        // send the retries.        int retransmitted = 0;        Iterator eachRetrans = retransMsgs.iterator();        while (eachRetrans.hasNext()) {            RetrQElt r = (RetrQElt) eachRetrans.next();            eachRetrans.remove();            try {                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("TLS!! RETRANSMIT seqn#" + r.seqnum);                }                Message sending = r.msg;                // its possible that the message was acked while we were working                // in this case r.msg will have been nulled.                if (null != sending) {                    sending = sending.clone();                    sending.replaceMessageElement(JTlsDefs.TLSNameSpace, RETELT);                    if (conn.sendToRemoteTls(sending)) {                        mrrIQFreeSpace--; // assume we have now taken a slot                        retransmitted++;                    } else {                        break;                    } // don't bother continuing.                }            } catch (IOException e) {                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.log(Level.FINE, "FAILED RETRANS seqn#" + r.seqnum, e);                }                break; // don't bother continuing.            }        }        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("RETRANSMITED " + retransmitted + " of " + numberToRetrans);        }        return retransmitted;    }    /**     * Retransmission daemon thread     **/    private class Retransmitter implements Runnable {        Thread retransmitterThread;        volatile int nretransmitted = 0;        int nAtThisRTO = 0;        public Retransmitter() {            this.retransmitterThread = new Thread(tp.myThreadGroup, this, "JXTA TLS Retransmiter for " + conn.destAddr);            retransmitterThread.setDaemon(true);            retransmitterThread.start();            if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {                LOG.info("STARTED TLS Retransmit thread, RTO = " + RTO);            }        }        public int getRetransCount() {            return nretransmitted;        }        /**         *  {@inheritDoc]         **/        public void run() {            try {                int idleCounter = 0;                while (!closed) {                    long conn_idle = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), conn.lastAccessed);                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.fine("RETRANS : " + conn + " idle for " + conn_idle);                    }                    // check to see if we have not idled out.                    if (tp.CONNECTION_IDLE_TIMEOUT < conn_idle) {                        if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {                            LOG.info("RETRANS : Shutting down idle connection: " + conn);                        }                        try {                            setClosing();                            // the following call eventually closes this stream                            conn.close(HandshakeState.CONNECTIONDEAD);                            // Leave. Otherwise we'll be spinning forever                            return;                        } catch (IOException ignored) {                            ;                        }                        continue;                    }                    synchronized (retrQ) {                        try {                            retrQ.wait(RTO);                        } catch (InterruptedException e) {                            Thread.interrupted();                        }                    }                    if (closed) {                        break;                    }                    // see if we recently did a retransmit triggered by a SACK                    long sinceLastSACKRetr = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), sackRetransTime);                    if (sinceLastSACKRetr < RTO) {                        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                            LOG.fine("RETRANS : SACK retrans " + sinceLastSACKRetr + "ms ago");                        }                        continue;                    }                    // See how long we've waited since RTO was set                    long sinceLastACK = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), lastACKTime);                    long oldestInQueueWait;                    synchronized (retrQ) {                        if (retrQ.size() > 0) {                            oldestInQueueWait = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), retrQ.get(0).enqueuedAt);                        } else {                            oldestInQueueWait = 0;                        }                    }                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.fine("RETRANS : Last ACK " + sinceLastACK + "ms ago. Age of oldest in Queue " + oldestInQueueWait + "ms");                    }                    // see if the queue has gone dead                    if (oldestInQueueWait > (tp.RETRMAXAGE * 2)) {                        if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {                            LOG.info("RETRANS : Shutting down stale connection: " + conn);                        }                        try {                            setClosing();                            conn.close(HandshakeState.CONNECTIONDEAD);                            // Leave. Otherwise we'll be spinning forever.                            return;                        } catch (IOException ignored) {                            ;                        }                        continue;                    }                    // get real wait as max of age of oldest in retrQ and                    // lastAck time                    long realWait = Math.max(oldestInQueueWait, sinceLastACK);                    // Retransmit only if RTO has expired.                    // a. real wait time is longer than RTO                    // b. oldest message on Q has been there longer                    // than RTO. This is necessary because we may                    // have just sent a message, and we do not                    // want to overrun the receiver. Also, we                    // do not want to restransmit a message that                    // has not been idle for the RTO.                    if ((realWait >= RTO) && (oldestInQueueWait >= RTO)) {                        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                            LOG.fine("RETRANS : RTO RETRANSMISSION [" + RWINDOW + "]");                        }                        // retrasmit                        int retransed = retransmit(RWINDOW, TimeUtils.timeNow());                        // Total                        nretransmitted += retransed;                        // number at this RTO                        nAtThisRTO += retransed;                        // See if real wait is too long and queue is non-empty                        // Remote may be dead - double until max.                        // Double after window restransmitted msgs at this RTO                        // exceeds the RWINDOW, and we've had no response for                        // twice the current RTO.                        if ((retransed > 0) && (realWait >= 2 * RTO) && (nAtThisRTO >= 2 * RWINDOW)) {                            RTO = (realWait > maxRTO ? maxRTO : 2 * RTO);                            nAtThisRTO = 0;                        }                        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                            LOG.fine("RETRANS : RETRANSMISSION " + retransed + " retrans " + nAtThisRTO + " at this RTO (" + RTO                                    + ") " + nretransmitted + " total retrans");                        }                    } else {                        idleCounter += 1;                        // reset RTO to min if we are idle                        if (idleCounter == 2) {                            RTO = minRTO;                            idleCounter = 0;                            nAtThisRTO = 0;                        }                        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                            LOG.fine("RETRANS : IDLE : RTO=" + RTO + " WAIT=" + realWait);                        }                    }                }            } catch (Throwable all) {                if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {                    LOG.log(Level.SEVERE, "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all);                }            } finally {                if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {                    LOG.info("STOPPED TLS Retransmit thread");                }                retransmitterThread = null;                retransmitter = null;            }        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -