📄 reliableoutputstream.java
字号:
long enqueuetime = r.enqueuedAt; long dt = TimeUtils.toRelativeTimeMillis(lastACKTime, enqueuetime); // Update RTT, RTO if (r.marked == 0) { if (dt > rttCalcDt) { rttCalcDt = dt; rttCalcSeqnum = r.seqnum; } } else { // In case we find no good candidate, make // a guess by dividing by the number of attempts // and keep the worst of them too. Since we // know it may be too short, we will not use it // if shortens rtt. dt /= (r.marked + 1); if (dt > fallBackDt) { fallBackDt = dt; fallBackSeqnum = r.seqnum; } } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("SACKD SEQN = " + r.seqnum); } // GC this stuff r.msg.clear(); r.msg = null; r = null; } else { // Retransmit? Only if there is a hole in the selected // acknowledgement list. Otherwise let RTO deal. // Given that this SACK acknowledged messages still // in the retrQ: // seqnum is the max consectively SACKD message. // seqnum < r.seqnum means a message has not reached // receiver. EG: sacklist == 10,11,13 seqnum == 11 // We retransmit 12. if (seqnum < r.seqnum) { fc.packetMissing(r.seqnum); retrans++; if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("RETR: Fill hole, SACK, seqn#" + r.seqnum + ", Window =" + retrans); } } } } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("SELECTIVE ACKD (" + numberACKed + ") " + retrans + " retrans wanted"); } } // Compute aveRTT on the most representative message, // if any. That's the most accurate data. // Failing that we use the fall back, provided that it not // more recent than aveRTT ago - that would decrease aveRTT // and in the absence of solid data, we do not want to take // that risk. if (rttCalcSeqnum != -1) { calcRTT(rttCalcDt, rttCalcSeqnum); // get fc to recompute rwindow rwindow = fc.ackEventEnd(rmaxQSize, aveRTT, rttCalcDt); } else if ((fallBackSeqnum != -1) && (fallBackDt > aveRTT)) { calcRTT(fallBackDt, fallBackSeqnum); // get fc to recompute rwindow rwindow = fc.ackEventEnd(rmaxQSize, aveRTT, fallBackDt); } retrQ.notifyAll(); } } /** * retransmit unacknowledged messages * * @param rwin max number of messages to retransmit * @param triggerTime base time * @return number of messages retransmitted. */ private int retransmit(int rwin, long triggerTime) { List retransMsgs = new ArrayList(); int numberToRetrans; // build a list of retries. synchronized (retrQ) { numberToRetrans = Math.min(retrQ.size(), rwin); if (numberToRetrans > 0 && LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Number of messages pending retransmit =" + numberToRetrans); } for (int j = 0; j < numberToRetrans; j++) { RetrQElt r = (RetrQElt) retrQ.get(j); // Mark message as retransmission // need to know if a msg was retr or not for RTT eval if (r.marked == 0) { // First time: we're here because this message has not arrived, but // the next one has. It may be an out of order message. // Experience shows that such a message rarely arrives older than // 1.2 * aveRTT. Beyond that, it's lost. It is also rare that we // detect a hole within that delay. So, often enough, as soon as // a hole is detected, it's time to resend...but not always. if (TimeUtils.toRelativeTimeMillis(triggerTime, r.sentAt) < (6 * aveRTT) / 5) { // Nothing to worry about, yet. continue; } } else { // That one has been retransmitted at least once already. // So, we don't have much of a clue other than the age of the // last transmission. It is unlikely that it arrives before aveRTT/2 // but we have to anticipate its loss at the risk of making dupes. // Otherwise the receiver will reach the hole, and that's really // expensive. (Think that we've been trying for a while already.) if (TimeUtils.toRelativeTimeMillis(triggerTime, r.sentAt) < aveRTT) { // Nothing to worry about, yet. continue; } } r.marked++; // Make a copy to for sending retransMsgs.add(r); } } // send the retries. int retransmitted = 0; Iterator eachRetrans = retransMsgs.iterator(); while (eachRetrans.hasNext()) { RetrQElt r = (RetrQElt) eachRetrans.next(); eachRetrans.remove(); try { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("RETRANSMIT seqn#" + r.seqnum); } Message sending = (Message) r.msg; // its possible that the message was // acked while we were working in this // case r.msg will have been nulled. if (null != sending) { sending = (Message) sending.clone(); sending.replaceMessageElement(Defs.NAMESPACE, RETELT); if (outgoing.send(sending)) { r.sentAt = TimeUtils.timeNow(); mrrIQFreeSpace--; // assume we have now taken a slot retransmitted++; } else { break; // don't bother continuing. } } } catch (IOException e) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("FAILED RETRANS seqn#" + r.seqnum, e); } break; // don't bother continuing. } } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("RETRANSMITED " + retransmitted + " of " + numberToRetrans); } return retransmitted; } /** * Retransmission daemon thread */ private class Retransmitter implements Runnable { Thread th; int nAtThisRTO = 0; volatile int nretransmitted = 0; /** * Constructor for the Retransmitter object */ public Retransmitter() { this.th = new Thread(this, "JXTA Reliable Retransmiter for " + outgoing); th.setDaemon(true); th.start(); if (LOG.isEnabledFor(Level.INFO)) { LOG.info("RETRANS : STARTED Reliable Retransmit thread, " + "RTO = " + RTO); } } /** * Gets the retransCount attribute of the Retransmitter object * * @return The retransCount value */ public int getRetransCount() { return nretransmitted; } /** * Main processing method for the Retransmitter object */ public void run() { try { int idleCounter = 0; while (!closed) { long conn_idle = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), outgoing.getLastAccessed()); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("RETRANS : " + outgoing + " idle for " + conn_idle); } // check to see if we have not idled out. if (outgoing.getIdleTimeout() < conn_idle) { if (LOG.isEnabledFor(Level.INFO)) { LOG.info("RETRANS : Shutting down idle " + "connection " + outgoing); } try { // in this we close ourself outgoing.close(); setClosing(); // Leave. Otherwise we'll be spinning forever. return; } catch (IOException ignored) {} continue; } synchronized (retrQ) { try { retrQ.wait(RTO); th.setName("JXTA Reliable Retransmiter for " + outgoing +" Queue size : "+retrQ.size()); } catch (InterruptedException e) {} } if (closed) { break; } // see if we recently did a retransmit triggered by a SACK long sinceLastSACKRetr = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), sackRetransTime); if (sinceLastSACKRetr < RTO) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("RETRANS : SACK retrans " + sinceLastSACKRetr + "ms ago"); } continue; } // See how long we've waited since RTO was set long sinceLastACK = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), lastACKTime); long oldestInQueueWait; synchronized (retrQ) { if (retrQ.size() > 0) { RetrQElt elt = (RetrQElt) retrQ.get(0); oldestInQueueWait = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), elt.enqueuedAt); } else { oldestInQueueWait = 0; } } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("RETRANS : Last ACK " + sinceLastACK + "ms ago. Age of oldest in Queue " + oldestInQueueWait + "ms"); } // see if the queue has gone dead if (oldestInQueueWait > (outgoing.getMaxRetryAge() * 2)) { if (LOG.isEnabledFor(Level.INFO)) { LOG.info("RETRANS : Shutting down stale " + "connection " + outgoing); } try { // in this we close ourself outgoing.close(); setClosing(); // Leave. Otherwise we'll be spinning forever. return; } catch (IOException ignored) {} continue; } // get real wait as max of age of oldest in retrQ and // lastAck time long realWait = Math.max(oldestInQueueWait, sinceLastACK); // Retransmit only if RTO has expired. // a. real wait time is longer than RTO // b. oldest message on Q has been there longer // than RTO. This is necessary because we may // have just sent a message, and we do not // want to overrun the receiver. Also, we // do not want to restransmit a message that // has not been idle for the RTO. if ((realWait >= RTO) && (oldestInQueueWait >= RTO)) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("RETRANS : RTO RETRANSMISSION [" + rwindow + "]"); } // retransmit int retransed = retransmit(rwindow, TimeUtils.timeNow()); // Total nretransmitted += retransed; // number at this RTO nAtThisRTO += retransed; // See if real wait is too long and queue is non-empty // Remote may be dead - double until max. // Double after window restransmitted msgs at this RTO // exceeds the rwindow, and we've had no response for // twice the current RTO. if ((retransed > 0) && (realWait >= 2 * RTO) && (nAtThisRTO >= 2 * rwindow)) { RTO = (realWait > maxRTO ? maxRTO : 2 * RTO); nAtThisRTO = 0; } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("RETRANS : RETRANSMISSION " + retransed + " retrans " + nAtThisRTO + " at this RTO (" + RTO + ") " + nretransmitted + " total retrans"); } } else { idleCounter += 1; // reset RTO to min if we are idle if (idleCounter == 2) { RTO = minRTO; idleCounter = 0; nAtThisRTO = 0; } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("RETRANS : IDLE : RTO=" + RTO + " WAIT=" + realWait); } } } if (LOG.isEnabledFor(Level.INFO)) { LOG.info("Retransmit thread closing"); } } catch (Throwable all) { LOG.fatal("Uncaught Throwable in thread :" + Thread.currentThread().getName(), all); } if (LOG.isEnabledFor(Level.INFO)) { LOG.info("STOPPED Retransmit thread"); } retrThread = null; th = null; } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -