📄 reliableoutputstream.java
字号:
System.arraycopy(b, off, data, 0, len); // allocate new message Message jmsg = new Message(); synchronized (retrQ) { while (true) { if (closing || closed) { throw new IOException("Connection is "+ (closing ? "closing" :"closed")); } if (retrQ.size() > Math.min(rwindow, mrrIQFreeSpace * 2)) { try { retrQ.wait(1000); } catch (InterruptedException ignored) {} continue; } break; } ++sequenceNumber; MessageElement element = new ByteArrayMessageElement(Integer.toString(sequenceNumber), Defs.MIME_TYPE_BLOCK, data, null); jmsg.addMessageElement(Defs.NAMESPACE, element); RetrQElt retrQel = new RetrQElt(sequenceNumber, (Message) jmsg.clone()); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Reliable WRITE : seqn#" + sequenceNumber + " length=" + len); } // place copy on retransmission queue retrQ.add(retrQel); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Retrans Enqueue added seqn#" + sequenceNumber + " retrQ.size()=" + retrQ.size()); } } outgoing.send(jmsg); mrrIQFreeSpace--; // assume we have now taken a slot if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("SENT : seqn#" + sequenceNumber + " length=" + len); } } /** * Serialize a JXTA message as a reliable message * * @param msg message to send * @return message sequence number * @exception IOException if an i/o error occurs */ public int send(Message msg) throws IOException { WireFormatMessage msgSerialized = WireFormatMessageFactory.toWire(msg, Defs.MIME_TYPE_MSG, null); ByteArrayOutputStream baos = new ByteArrayOutputStream((int) msg.getByteLength()); msgSerialized.sendToStream(baos); baos.close(); byte[] msgData = baos.toByteArray(); write(msgData, 0, msgData.length); return sequenceNumber; } /** * Gets the maxAck attribute of the ReliableOutputStream object * * @return The maxAck value */ public int getMaxAck() { return maxACK; } /** * Gets the seqNumber attribute of the ReliableOutputStream object * * @return The seqNumber value */ public int getSeqNumber() { return sequenceNumber; } /** * Gets the queueFull attribute of the ReliableOutputStream object * * @return The queueFull value */ public boolean isQueueFull() { return mrrIQFreeSpace < 1; } /** * Gets the queueEmpty attribute of the ReliableOutputStream object * * @return The queueEmpty value */ public boolean isQueueEmpty() { return retrQ.isEmpty(); } /** * wait for activity on the retry queue * * @param timeout timeout in millis * @exception InterruptedException when interrupted */ public void waitQueueEvent(long timeout) throws InterruptedException { synchronized (retrQ) { retrQ.wait(timeout); } } /** * Calculates a message retransmission time-out * * @param dt base time * @param msgSeqNum Message sequence number */ private void calcRTT(long dt, int msgSeqNum) { nACKS++; if (nACKS == 1) { // First ACK arrived. We can start computing aveRTT on the messages // we send from now on. rttThreshold = sequenceNumber + 1; } if (msgSeqNum > rttThreshold) { // Compute only when it has stabilized a bit // Since the initial mrrIQFreeSpace is small; the first few // messages will be sent early on and may wait a long time // for the return channel to initialize. After that things // start flowing and RTT becomes relevant. // Carefull with the computation: integer division with round-down // causes cumulative damage: the ave never goes up if this is not // taken care of. We keep the reminder from one round to the other. if (!aveRTTreset) { aveRTT = dt; aveRTTreset = true; } else { long tmp = (8 * aveRTT) + ((8 * remRTT) / 9) + dt; aveRTT = tmp / 9; remRTT = tmp - aveRTT * 9; } } // Set retransmission time out: 2.5 x RTT // RTO = (aveRTT << 1) + (aveRTT >> 1); RTO = aveRTT * 2; // Enforce a min/max RTO = Math.max(RTO, minRTO); RTO = Math.min(RTO, maxRTO); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("RTT = " + dt + "ms aveRTT = " + aveRTT + "ms" + " RTO = " + RTO + "ms"); } } /** * @param iq Description of the Parameter * @return Description of the Return Value */ private int calcAVEIQ(int iq) { int n = nIQTests; nIQTests += 1; aveIQSize = ((n * aveIQSize) + iq) / nIQTests; return aveIQSize; } /** * process an incoming message * * @param msg message to process */ public void recv(Message msg) { Iterator eachACK = msg.getMessageElements(Defs.NAMESPACE, Defs.MIME_TYPE_ACK); while (eachACK.hasNext()) { MessageElement elt = (MessageElement) eachACK.next(); eachACK.remove(); int sackCount = ((int) elt.getByteLength() / 4) - 1; try { DataInputStream dis = new DataInputStream(elt.getStream()); int seqack = dis.readInt(); int[] sacs = new int[sackCount]; for (int eachSac = 0; eachSac < sackCount; eachSac++) { sacs[eachSac] = dis.readInt(); } Arrays.sort(sacs); // take care of the ACK here; ackReceived(seqack, sacs); } catch (IOException failed) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Failure processing ACK", failed); } } } } /** * Process an ACK Message. We remove ACKed * messages from the retry queue. We only * acknowledge messages received in sequence. * * The seqnum is for the largest unacknowledged seqnum * the receipient has received. * * The sackList is a sequence of all of the * received messages in the sender's input Q. All * will be sequence numbers higher than the * sequential ACK seqnum. * * Recepients are passive and only ack upon the * receipt of an in sequence message. * * They depend on our RTO to fill holes in message * sequences. * * @param seqnum message sequence number * @param sackList array of message sequence numbers */ public void ackReceived(int seqnum, int[] sackList) { int numberACKed = 0; long rttCalcDt = 0; int rttCalcSeqnum = -1; long fallBackDt = 0; int fallBackSeqnum = -1; // remove acknowledged messages from retrans Q. synchronized (retrQ) { lastACKTime = TimeUtils.timeNow(); fc.ackEventBegin(); maxACK = Math.max(maxACK, seqnum); // dump the current Retry queue and the SACK list if (LOG.isEnabledFor(Level.DEBUG)) { StringBuffer dumpRETRQ = new StringBuffer("ACK RECEIVE : " + Integer.toString(seqnum)); if (LOG.isEnabledFor(Level.DEBUG)) { dumpRETRQ.append('\n'); } dumpRETRQ.append("\tRETRQ (size=" + retrQ.size() + ")"); if (LOG.isEnabledFor(Level.DEBUG)) { dumpRETRQ.append(" : "); for (int y = 0; y < retrQ.size(); y++) { if (0 != y) { dumpRETRQ.append(", "); } RetrQElt r = (RetrQElt) retrQ.get(y); dumpRETRQ.append(r.seqnum); } } if (LOG.isEnabledFor(Level.DEBUG)) { dumpRETRQ.append('\n'); } dumpRETRQ.append("\tSACKLIST (size=" + sackList.length + ")"); if (LOG.isEnabledFor(Level.DEBUG)) { dumpRETRQ.append(" : "); for (int y = 0; y < sackList.length; y++) { if (0 != y) { dumpRETRQ.append(", "); } dumpRETRQ.append(sackList[y]); } } LOG.debug(dumpRETRQ); } Iterator eachRetryQueueEntry = retrQ.iterator(); // First remove monotonically increasing seq#s in retrans vector while (eachRetryQueueEntry.hasNext()) { RetrQElt r = (RetrQElt) eachRetryQueueEntry.next(); if (r.seqnum > seqnum) { break; } // Acknowledged eachRetryQueueEntry.remove(); // Update RTT, RTO. Use only those that where acked // w/o retrans otherwise the number may be phony (ack // of first xmit received just after resending => RTT // seems small). Also, we keep the worst of the bunch // we encounter. If we really can't find a single // non-resent message, we make do with a pessimistic // approximation: we must not be left behind with an // RTT that's too short, we'd keep resending like // crazy. long enqueuetime = r.enqueuedAt; long dt = TimeUtils.toRelativeTimeMillis(lastACKTime, enqueuetime); // Update RTT, RTO if (r.marked == 0) { if (dt > rttCalcDt) { rttCalcDt = dt; rttCalcSeqnum = r.seqnum; } } else { // In case we find no good candidate, make // a guess by dividing by the number of attempts // and keep the worst of them too. Since we // know it may be too short, we will not use it // if shortens rtt. dt /= (r.marked + 1); if (dt > fallBackDt) { fallBackDt = dt; fallBackSeqnum = r.seqnum; } } fc.packetACKed(r.seqnum); r.msg.clear(); r.msg = null; r = null; numberACKed++; } // Update last accessed time in response to getting seq acks. if (numberACKed > 0) { outgoing.setLastAccessed(TimeUtils.timeNow()); } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("SEQUENTIALLY ACKD SEQN = " + seqnum + ", (" + numberACKed + " acked)"); } // most recent remote IQ free space mrrIQFreeSpace = rmaxQSize - sackList.length; // let's look at average sacs.size(). If it is big, then this // probably means we must back off because the system is slow. // Our retrans Queue can be large and we can overwhelm the // receiver with retransmissions. // We will keep the rwin <= ave real input queue size. int aveIQ = calcAVEIQ(sackList.length); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("remote IQ free space = " + mrrIQFreeSpace + " remote avg IQ occupancy = " + aveIQ); } int retrans = 0; if (sackList.length > 0) { Iterator eachRetrQElement = retrQ.iterator(); int currentSACK = 0; while (eachRetrQElement.hasNext()) { RetrQElt r = (RetrQElt) eachRetrQElement.next(); while (sackList[currentSACK] < r.seqnum) { currentSACK++; if (currentSACK == sackList.length) { break; } } if (currentSACK == sackList.length) { break; } if (sackList[currentSACK] == r.seqnum) { fc.packetACKed(r.seqnum); numberACKed++; eachRetrQElement.remove(); // Update RTT, RTO. Use only those that where acked w/o retrans // otherwise the number is completely phony. // Also, we keep the worst of the bunch we encounter.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -