⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jtlsoutputstream.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
            if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {                throw new IndexOutOfBoundsException();            }            if (len == 0) {                return;            }            // Copy the data since it will be queued, and caller may            // overwrite the same byte[] buffer.            byte[] data = new byte[len];            System.arraycopy(b, off, data, 0, len);            // sync so that writes don't get out of order.            synchronized (retrQ) {                // add TLS record as element                MessageElement ciphertext = new ByteArrayMessageElement(Integer.toString(++sequenceNumber), JTlsDefs.BLOCKS, data                        ,                        null);                jmsg.addMessageElement(JTlsDefs.TLSNameSpace, ciphertext);                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("TLS CT WRITE : seqn#" + sequenceNumber + " length=" + len);                }                // (1)  See if the most recent remote input queue size is close to                // it's maximum input queue size                // Send only if at least 20% or more of the queue is free.                // (2) Also, if our retransQ is larger than the remotes inputQ,                // wait until we've received an ack.                // We assume some msgs are in transit or the remote system buffers                // We do not want to overrun the receiver.                // (3) We need to release from the loop because of possible deadlocks                // EG: retrQ.size() == 0 and mrrIQFreeSpace forces looping                // forever because the most recent SACK cleared it, and the receiver                // is waiting for more data.                // max of 200ms wait                int maxwait = Math.min((int) aveRTT, 200);                // iterations to wait (max 3, min 1)                int waitCt = Math.max(maxwait / 60, 1);                // check if the queue has gone dead.                if (retrQ.size() > 0) {                    long inQueue = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), retrQ.get(0).enqueuedAt);                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.fine("write : Retry queue idle for " + inQueue);                    }                    if (inQueue > tp.RETRMAXAGE) {                        if (inQueue > (2 * tp.RETRMAXAGE)) {                            if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {                                LOG.info("Closing stale connection " + conn);                            }                            // SPT - set flag for connection close in finally block                            closeStale = true;                            throw new IOException("Stale connection closure in progress");                        } else if (retrQ.size() >= MAXRETRQSIZE) {                            // if the the queue is "full" and we are long idle, delay new writes forever.                            waitCt = Integer.MAX_VALUE;                        }                    }                }                int i = 0;                while (!closed && ((mrrIQFreeSpace < rmaxQSize / 5) || (retrQ.size() > rmaxQSize))) {                    // see if max. wait has arrived.                    if (i++ == waitCt) {                        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                            LOG.fine("write() wait for ACK, maxwait timer expired while enqueuing seqn#" + sequenceNumber);                        }                        break;                    }                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.fine("write() wait 60ms for ACK while enqueuing seqn#" + sequenceNumber + "\n\tremote IQ free space = "                                + mrrIQFreeSpace + "\n\tMIN free space to continue = " + (rmaxQSize / 5) + "" + "\n\tretQ.size()="                                + retrQ.size());                    }                    // Less than 20% free queue space is left. Wait.                    try {                        retrQ.wait(60);                    } catch (InterruptedException ignored) {                        Thread.interrupted();                    }                }                // place copy on retransmission queue                RetrQElt r = new RetrQElt(sequenceNumber, jmsg.clone());                retrQ.add(r);                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("Retrans Enqueue added seqn#" + sequenceNumber + " retQ.size()=" + retrQ.size());                }            }            // Here we will send the message to the transport            conn.sendToRemoteTls(jmsg);            // assume we have now taken a slot            mrrIQFreeSpace--;            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("TLS CT SENT : seqn#" + sequenceNumber + " length=" + len);            }        } finally {            if (closeStale) {                // The retry queue has really gone stale.                try {                    setClosing();                    // in this we close ourself                    conn.close(HandshakeState.CONNECTIONDEAD);                } catch (IOException ignored) {                    ;                }            }        }    }    private void calcRTT(long enqueuedAt) {        long dt = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), enqueuedAt);        if (dt == 0) {            dt += 1;        }        int n = nACKS;        nACKS += 1;        aveRTT = ((n * aveRTT) + dt) / (nACKS);        // Set retransmission time out: 2.5 x RTT        RTO = (aveRTT << 1) + (aveRTT >> 1);        // Enforce a min/max        RTO = Math.max(RTO, minRTO);        RTO = Math.min(RTO, maxRTO);        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("TLS!! RTT = " + dt + "ms aveRTT = " + aveRTT + "ms" + " RTO = " + RTO + "ms");        }    }    private int calcAVEIQ(int iq) {        int n = nIQTests;        nIQTests += 1;        aveIQSize = ((n * aveIQSize) + iq) / nIQTests;        return aveIQSize;    }    /**     * Process an ACK Message. We remove ACKed messages from the retry queue.     * We only acknowledge messages received in sequence.     *     * The seqnum is for the largest unacknowledged seqnum     * the receipient has received.     *     * The sackList is a sequence of all of the received     *    messages in the sender's input Q. All will be sequence numbers higher     *    than the sequential ACK seqnum.     *     *    Recepients are passive and only ack upon the receipt     *    of an in sequence message.     *     *    They depend on our RTO to fill holes in message     *   sequences.     **/    void ackReceived(int seqnum, int[] sackList) {        lastACKTime = TimeUtils.timeNow();        int numberACKed = 0;        // remove acknowledged messages from retrans Q.        synchronized (retrQ) {            maxACK = Math.max(maxACK, seqnum);            // dump the current Retry queue and the SACK list            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                StringBuilder dumpRETRQ = new StringBuilder("ACK RECEIVE : " + Integer.toString(seqnum));                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    dumpRETRQ.append('\n');                }                dumpRETRQ.append("\tRETRQ (size=" + retrQ.size() + ")");                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    dumpRETRQ.append(" : ");                    for (int y = 0; y < retrQ.size(); y++) {                        if (0 != y) {                            dumpRETRQ.append(", ");                        }                        RetrQElt r = retrQ.get(y);                        dumpRETRQ.append(r.seqnum);                    }                }                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    dumpRETRQ.append('\n');                }                dumpRETRQ.append("\tSACKLIST (size=" + sackList.length + ")");                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    dumpRETRQ.append(" : ");                    for (int y = 0; y < sackList.length; y++) {                        if (0 != y) {                            dumpRETRQ.append(", ");                        }                        dumpRETRQ.append(sackList[y]);                    }                }                LOG.fine(dumpRETRQ.toString());            }            Iterator eachRetryQueueEntry = retrQ.iterator();            // First remove monotonically increasing seq#s in retrans vector            while (eachRetryQueueEntry.hasNext()) {                RetrQElt r = (RetrQElt) eachRetryQueueEntry.next();                if (r.seqnum > seqnum) {                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.fine("r.seqnum :" + r.seqnum + " > seqnum :" + seqnum);                    }                    break;                }                // Acknowledged                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("seqnum :" + seqnum);                    LOG.fine("Removing :" + r.seqnum + " from retransmit queue");                }                eachRetryQueueEntry.remove();                // Update RTT, RTO                if (0 != r.enqueuedAt) {                    calcRTT(r.enqueuedAt);                }                r.msg.clear();                r.msg = null;                r = null;                numberACKed++;            }            // Update last accessed time in response to getting seq acks.            if (numberACKed > 0) {                conn.lastAccessed = TimeUtils.timeNow();            }            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("TLS!! SEQUENTIALLY ACKD SEQN = " + seqnum + ", (" + numberACKed + " acked)");            }            // most recent remote IQ free space            rmaxQSize = Math.max(rmaxQSize, sackList.length);            mrrIQFreeSpace = rmaxQSize - sackList.length;            // let's look at average sacs.size(). If it is big, then this            // probably means we must back off because the system is slow.            // Our retrans Queue can be large and we can overwhelm the            // receiver with retransmissions.            // We will keep the rwin <= ave real input queue size.            int aveIQ = calcAVEIQ(sackList.length);            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("remote IQ free space = " + mrrIQFreeSpace + " remote avg IQ occupancy = " + aveIQ);            }            int retrans = 0;            if (sackList.length > 0) {                Iterator eachRetrQElement = retrQ.iterator();                int currentSACK = 0;                while (eachRetrQElement.hasNext()) {                    RetrQElt r = (RetrQElt) eachRetrQElement.next();                    while (sackList[currentSACK] < r.seqnum) {                        currentSACK++;                        if (currentSACK == sackList.length) {                            break;                        }                    }                    if (currentSACK == sackList.length) {                        break;                    }                    if (sackList[currentSACK] == r.seqnum) {                        eachRetrQElement.remove();                        // ack counter                        numberACKed++;                        // for aveRTT calculation                        long enqueuetime = r.enqueuedAt;                        // Update RTT, RTO                        if (enqueuetime != 0) {                            calcRTT(enqueuetime);                        }                        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                            LOG.fine("TLS!! SACKD SEQN = " + r.seqnum);                        }                        // GC this stuff                        r.msg.clear();                        r.msg = null;                        r = null;                    } else {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -