⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jtlsoutputstream.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
            if (len == 0) {                return;            }            // Copy the data since it will be queued, and caller may            // overwrite the same byte[] buffer.            byte[] data = new byte[len];            System.arraycopy(b, off, data, 0, len);            // sync so that writes don't get out of order.            synchronized (retrQ) {                // add TLS record as element                MessageElement ciphertext = new ByteArrayMessageElement(Integer.toString(++sequenceNumber), JTlsDefs.BLOCKS, data, null);                jmsg.addMessageElement(JTlsDefs.TLSNameSpace, ciphertext);                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("TLS CT WRITE : seqn#" + sequenceNumber + " length=" + len);                }                // (1)  See if the most recent remote input queue size is close to                // it's maximum input queue size                // Send only if at least 20% or more of the queue is free.                // (2) Also, if our retransQ is larger than the remotes inputQ,                // wait until we've received an ack.                // We assume some msgs are in transit or the remote system buffers                // We do not want to overrun the receiver.                // (3) We need to release from the loop because of possible deadlocks                // EG: retrQ.size() == 0 and mrrIQFreeSpace forces looping                // forever because the most recent SACK cleared it, and the receiver                // is waiting for more data.                    // max of 200ms wait                    int maxwait = Math.min((int) aveRTT, 200);                    // iterations to wait (max 3, min 1)                    int waitCt = Math.max(maxwait / 60, 1);                    // check if the queue has gone dead.                    if (retrQ.size() > 0) {                        long inQueue = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), ((RetrQElt) retrQ.get(0)).enqueuedAt);                        if (LOG.isEnabledFor(Level.DEBUG)) {                            LOG.debug("write : Retry queue idle for " + inQueue);                        }                        if (inQueue > tp.RETRMAXAGE) {                            if (inQueue > (2 * tp.RETRMAXAGE)) {                                if (LOG.isEnabledFor(Level.INFO)) {                                    LOG.info("Closing stale connection " + conn);                                }                                // SPT - set flag for connection close in finally block                                closeStale = true;                                throw new IOException("Stale connection closure in progress");                            } else if (retrQ.size() >= MAXRETRQSIZE) {                                // if the the queue is "full" and we are long idle, delay new writes forever.                                waitCt = Integer.MAX_VALUE;                            }                        }                    }                    int i = 0;                    while (!closed && ((mrrIQFreeSpace < rmaxQSize / 5) || (retrQ.size() > rmaxQSize))) {                        // see if max. wait has arrived.                        if (i++ == waitCt) {                            if (LOG.isEnabledFor(Level.DEBUG)) {                                LOG.debug("write() wait for ACK, maxwait timer expired while enqueuing seqn#" + sequenceNumber);                            }                            break;                        }                        if (LOG.isEnabledFor(Level.DEBUG)) {                            LOG.debug( "write() wait 60ms for ACK while enqueuing seqn#" + sequenceNumber +                                       "\n\tremote IQ free space = " + mrrIQFreeSpace                                       + "\n\tMIN free space to continue = " + (rmaxQSize / 5) + "" +                                       "\n\tretQ.size()=" + retrQ.size());                        }                        // Less than 20% free queue space is left. Wait.                        try {                            retrQ.wait(60);                        } catch (InterruptedException ignored) {                            Thread.interrupted();                        }                    }                    // place copy on retransmission queue                    RetrQElt r = new RetrQElt(sequenceNumber, (Message) jmsg.clone());                    retrQ.add(r);                    if (LOG.isEnabledFor(Level.DEBUG)) {                        LOG.debug("Retrans Enqueue added seqn#" + sequenceNumber + " retQ.size()=" + retrQ.size());                    }            }            // Here we will send the message to the transport            conn.sendToRemoteTls(jmsg);            // assume we have now taken a slot            mrrIQFreeSpace--;            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("TLS CT SENT : seqn#" + sequenceNumber + " length=" + len);            }        }        finally {            if (closeStale) {                // The retry queue has really gone stale.                try {                    setClosing();                    // in this we close ourself                    conn.close(HandshakeState.CONNECTIONDEAD);                } catch (IOException ignored) {                    ;                }            }        }    }    private void calcRTT(long enqueuedAt) {        long dt = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), enqueuedAt);        if (dt == 0) {            dt += 1;        }        int n = nACKS;        nACKS += 1;        aveRTT = ((n * aveRTT) + dt) / (nACKS);        // Set retransmission time out: 2.5 x RTT        RTO = (aveRTT << 1) + (aveRTT >> 1);        // Enforce a min/max        RTO = Math.max(RTO, minRTO);        RTO = Math.min(RTO, maxRTO);        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("TLS!! RTT = " + dt + "ms aveRTT = " + aveRTT + "ms" + " RTO = " + RTO + "ms");        }    }    private int calcAVEIQ(int iq) {        int n = nIQTests;        nIQTests += 1;        aveIQSize = ((n * aveIQSize) + iq) / nIQTests;        return aveIQSize;    }    /**     * Process an ACK Message. We remove ACKed messages from the retry queue.     * We only acknowledge messages received in sequence.     *     * The seqnum is for the largest unacknowledged seqnum     * the receipient has received.     *     * The sackList is a sequence of all of the received     *    messages in the sender's input Q. All will be sequence numbers higher     *    than the sequential ACK seqnum.     *     *    Recepients are passive and only ack upon the receipt     *    of an in sequence message.     *     *    They depend on our RTO to fill holes in message     *   sequences.     **/    void ackReceived(int seqnum, int[] sackList) {        lastACKTime = TimeUtils.timeNow();        int numberACKed = 0;        // remove acknowledged messages from retrans Q.        synchronized (retrQ) {            maxACK = Math.max(maxACK, seqnum);            // dump the current Retry queue and the SACK list            if (LOG.isEnabledFor(Level.DEBUG)) {                StringBuffer dumpRETRQ = new StringBuffer("ACK RECEIVE : " + Integer.toString(seqnum));                if (LOG.isEnabledFor(Level.DEBUG)) {                    dumpRETRQ.append('\n');                }                dumpRETRQ.append("\tRETRQ (size=" + retrQ.size() + ")");                if (LOG.isEnabledFor(Level.DEBUG)) {                    dumpRETRQ.append(" : ");                    for (int y = 0; y < retrQ.size(); y++) {                        if (0 != y) {                            dumpRETRQ.append(", ");                        }                        RetrQElt r = (RetrQElt) retrQ.get(y);                        dumpRETRQ.append(r.seqnum);                    }                }                if (LOG.isEnabledFor(Level.DEBUG)) {                    dumpRETRQ.append('\n');                }                dumpRETRQ.append("\tSACKLIST (size=" + sackList.length + ")");                if (LOG.isEnabledFor(Level.DEBUG)) {                    dumpRETRQ.append(" : ");                    for (int y = 0; y < sackList.length; y++) {                        if (0 != y) {                            dumpRETRQ.append(", ");                        }                        dumpRETRQ.append(sackList[y]);                    }                }                LOG.debug(dumpRETRQ);            }            Iterator eachRetryQueueEntry = retrQ.iterator();            // First remove monotonically increasing seq#s in retrans vector            while (eachRetryQueueEntry.hasNext()) {                RetrQElt r = (RetrQElt) eachRetryQueueEntry.next();                if (r.seqnum > seqnum) {                    if (LOG.isEnabledFor(Level.DEBUG)) {                        LOG.debug("r.seqnum :"+r.seqnum +" > seqnum :"+seqnum);                    }                    break;                }                // Acknowledged                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("seqnum :"+seqnum);                    LOG.debug("Removing :"+r.seqnum +" from retransmit queue");                }                eachRetryQueueEntry.remove();                // Update RTT, RTO                if (0 != r.enqueuedAt) {                    calcRTT(r.enqueuedAt);                }                r.msg.clear();                r.msg = null;                r = null;                numberACKed++;            }            // Update last accessed time in response to getting seq acks.            if (numberACKed > 0) {                conn.lastAccessed = TimeUtils.timeNow();            }            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("TLS!! SEQUENTIALLY ACKD SEQN = " + seqnum + ", (" + numberACKed + " acked)");            }            // most recent remote IQ free space            rmaxQSize = Math.max( rmaxQSize, sackList.length );            mrrIQFreeSpace = rmaxQSize - sackList.length;            // let's look at average sacs.size(). If it is big, then this            // probably means we must back off because the system is slow.            // Our retrans Queue can be large and we can overwhelm the            // receiver with retransmissions.            // We will keep the rwin <= ave real input queue size.            int aveIQ = calcAVEIQ(sackList.length);            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("remote IQ free space = " + mrrIQFreeSpace + " remote avg IQ occupancy = " + aveIQ);            }            int retrans = 0;            if (sackList.length > 0) {                Iterator eachRetrQElement = retrQ.iterator();                int currentSACK = 0;                while (eachRetrQElement.hasNext()) {                    RetrQElt r = (RetrQElt) eachRetrQElement.next();                    while (sackList[currentSACK] < r.seqnum) {                        currentSACK++;                        if (currentSACK == sackList.length) {                            break;                        }                    }                    if (currentSACK == sackList.length) {                        break;                    }                    if (sackList[currentSACK] == r.seqnum) {                        eachRetrQElement.remove();                        // ack counter                        numberACKed++;                        // for aveRTT calculation                        long enqueuetime = r.enqueuedAt;                        // Update RTT, RTO                        if (enqueuetime != 0) {                            calcRTT(enqueuetime);                        }                        if (LOG.isEnabledFor(Level.DEBUG)) {                            LOG.debug("TLS!! SACKD SEQN = " + r.seqnum);                        }                        // GC this stuff                        r.msg.clear();                        r.msg = null;                        r = null;                    } else {                        // Retransmit? Only if there is a hole in the selected                        // acknowledgement list. Otherwise let RTO deal.

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -