📄 jtlsoutputstream.java
字号:
if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) { throw new IndexOutOfBoundsException(); } if (len == 0) { return; } // Copy the data since it will be queued, and caller may // overwrite the same byte[] buffer. byte[] data = new byte[len]; System.arraycopy(b, off, data, 0, len); // sync so that writes don't get out of order. synchronized (retrQ) { // add TLS record as element MessageElement ciphertext = new ByteArrayMessageElement(Integer.toString(++sequenceNumber), JTlsDefs.BLOCKS, data , null); jmsg.addMessageElement(JTlsDefs.TLSNameSpace, ciphertext); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("TLS CT WRITE : seqn#" + sequenceNumber + " length=" + len); } // (1) See if the most recent remote input queue size is close to // it's maximum input queue size // Send only if at least 20% or more of the queue is free. // (2) Also, if our retransQ is larger than the remotes inputQ, // wait until we've received an ack. // We assume some msgs are in transit or the remote system buffers // We do not want to overrun the receiver. // (3) We need to release from the loop because of possible deadlocks // EG: retrQ.size() == 0 and mrrIQFreeSpace forces looping // forever because the most recent SACK cleared it, and the receiver // is waiting for more data. // max of 200ms wait int maxwait = Math.min((int) aveRTT, 200); // iterations to wait (max 3, min 1) int waitCt = Math.max(maxwait / 60, 1); // check if the queue has gone dead. if (retrQ.size() > 0) { long inQueue = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), retrQ.get(0).enqueuedAt); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("write : Retry queue idle for " + inQueue); } if (inQueue > tp.RETRMAXAGE) { if (inQueue > (2 * tp.RETRMAXAGE)) { if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("Closing stale connection " + conn); } // SPT - set flag for connection close in finally block closeStale = true; throw new IOException("Stale connection closure in progress"); } else if (retrQ.size() >= MAXRETRQSIZE) { // if the the queue is "full" and we are long idle, delay new writes forever. waitCt = Integer.MAX_VALUE; } } } int i = 0; while (!closed && ((mrrIQFreeSpace < rmaxQSize / 5) || (retrQ.size() > rmaxQSize))) { // see if max. wait has arrived. if (i++ == waitCt) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("write() wait for ACK, maxwait timer expired while enqueuing seqn#" + sequenceNumber); } break; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("write() wait 60ms for ACK while enqueuing seqn#" + sequenceNumber + "\n\tremote IQ free space = " + mrrIQFreeSpace + "\n\tMIN free space to continue = " + (rmaxQSize / 5) + "" + "\n\tretQ.size()=" + retrQ.size()); } // Less than 20% free queue space is left. Wait. try { retrQ.wait(60); } catch (InterruptedException ignored) { Thread.interrupted(); } } // place copy on retransmission queue RetrQElt r = new RetrQElt(sequenceNumber, jmsg.clone()); retrQ.add(r); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Retrans Enqueue added seqn#" + sequenceNumber + " retQ.size()=" + retrQ.size()); } } // Here we will send the message to the transport conn.sendToRemoteTls(jmsg); // assume we have now taken a slot mrrIQFreeSpace--; if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("TLS CT SENT : seqn#" + sequenceNumber + " length=" + len); } } finally { if (closeStale) { // The retry queue has really gone stale. try { setClosing(); // in this we close ourself conn.close(HandshakeState.CONNECTIONDEAD); } catch (IOException ignored) { ; } } } } private void calcRTT(long enqueuedAt) { long dt = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), enqueuedAt); if (dt == 0) { dt += 1; } int n = nACKS; nACKS += 1; aveRTT = ((n * aveRTT) + dt) / (nACKS); // Set retransmission time out: 2.5 x RTT RTO = (aveRTT << 1) + (aveRTT >> 1); // Enforce a min/max RTO = Math.max(RTO, minRTO); RTO = Math.min(RTO, maxRTO); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("TLS!! RTT = " + dt + "ms aveRTT = " + aveRTT + "ms" + " RTO = " + RTO + "ms"); } } private int calcAVEIQ(int iq) { int n = nIQTests; nIQTests += 1; aveIQSize = ((n * aveIQSize) + iq) / nIQTests; return aveIQSize; } /** * Process an ACK Message. We remove ACKed messages from the retry queue. * We only acknowledge messages received in sequence. * * The seqnum is for the largest unacknowledged seqnum * the receipient has received. * * The sackList is a sequence of all of the received * messages in the sender's input Q. All will be sequence numbers higher * than the sequential ACK seqnum. * * Recepients are passive and only ack upon the receipt * of an in sequence message. * * They depend on our RTO to fill holes in message * sequences. **/ void ackReceived(int seqnum, int[] sackList) { lastACKTime = TimeUtils.timeNow(); int numberACKed = 0; // remove acknowledged messages from retrans Q. synchronized (retrQ) { maxACK = Math.max(maxACK, seqnum); // dump the current Retry queue and the SACK list if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { StringBuilder dumpRETRQ = new StringBuilder("ACK RECEIVE : " + Integer.toString(seqnum)); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { dumpRETRQ.append('\n'); } dumpRETRQ.append("\tRETRQ (size=" + retrQ.size() + ")"); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { dumpRETRQ.append(" : "); for (int y = 0; y < retrQ.size(); y++) { if (0 != y) { dumpRETRQ.append(", "); } RetrQElt r = retrQ.get(y); dumpRETRQ.append(r.seqnum); } } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { dumpRETRQ.append('\n'); } dumpRETRQ.append("\tSACKLIST (size=" + sackList.length + ")"); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { dumpRETRQ.append(" : "); for (int y = 0; y < sackList.length; y++) { if (0 != y) { dumpRETRQ.append(", "); } dumpRETRQ.append(sackList[y]); } } LOG.fine(dumpRETRQ.toString()); } Iterator eachRetryQueueEntry = retrQ.iterator(); // First remove monotonically increasing seq#s in retrans vector while (eachRetryQueueEntry.hasNext()) { RetrQElt r = (RetrQElt) eachRetryQueueEntry.next(); if (r.seqnum > seqnum) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("r.seqnum :" + r.seqnum + " > seqnum :" + seqnum); } break; } // Acknowledged if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("seqnum :" + seqnum); LOG.fine("Removing :" + r.seqnum + " from retransmit queue"); } eachRetryQueueEntry.remove(); // Update RTT, RTO if (0 != r.enqueuedAt) { calcRTT(r.enqueuedAt); } r.msg.clear(); r.msg = null; r = null; numberACKed++; } // Update last accessed time in response to getting seq acks. if (numberACKed > 0) { conn.lastAccessed = TimeUtils.timeNow(); } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("TLS!! SEQUENTIALLY ACKD SEQN = " + seqnum + ", (" + numberACKed + " acked)"); } // most recent remote IQ free space rmaxQSize = Math.max(rmaxQSize, sackList.length); mrrIQFreeSpace = rmaxQSize - sackList.length; // let's look at average sacs.size(). If it is big, then this // probably means we must back off because the system is slow. // Our retrans Queue can be large and we can overwhelm the // receiver with retransmissions. // We will keep the rwin <= ave real input queue size. int aveIQ = calcAVEIQ(sackList.length); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("remote IQ free space = " + mrrIQFreeSpace + " remote avg IQ occupancy = " + aveIQ); } int retrans = 0; if (sackList.length > 0) { Iterator eachRetrQElement = retrQ.iterator(); int currentSACK = 0; while (eachRetrQElement.hasNext()) { RetrQElt r = (RetrQElt) eachRetrQElement.next(); while (sackList[currentSACK] < r.seqnum) { currentSACK++; if (currentSACK == sackList.length) { break; } } if (currentSACK == sackList.length) { break; } if (sackList[currentSACK] == r.seqnum) { eachRetrQElement.remove(); // ack counter numberACKed++; // for aveRTT calculation long enqueuetime = r.enqueuedAt; // Update RTT, RTO if (enqueuetime != 0) { calcRTT(enqueuetime); } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("TLS!! SACKD SEQN = " + r.seqnum); } // GC this stuff r.msg.clear(); r.msg = null; r = null; } else {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -