⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 reliableoutputstream.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
        System.arraycopy(b, off, data, 0, len);        // allocate new message        Message jmsg = new Message();        synchronized (retrQ) {            while (true) {                if (closing || closed) {                    throw new IOException("Connection is "+ (closing ? "closing" :"closed"));                }                if (retrQ.size() > Math.min(rwindow, mrrIQFreeSpace * 2)) {                    try {                        retrQ.wait(1000);                    } catch (InterruptedException ignored) {}                    continue;                }                break;            }            ++sequenceNumber;            MessageElement element =                    new ByteArrayMessageElement(Integer.toString(sequenceNumber),                    Defs.MIME_TYPE_BLOCK, data, null);            jmsg.addMessageElement(Defs.NAMESPACE, element);            RetrQElt retrQel = new RetrQElt(sequenceNumber, (Message) jmsg.clone());            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Reliable WRITE : seqn#" + sequenceNumber + " length=" + len);            }            // place copy on retransmission queue            retrQ.add(retrQel);            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Retrans Enqueue added seqn#" + sequenceNumber + " retrQ.size()=" + retrQ.size());            }        }        outgoing.send(jmsg);        mrrIQFreeSpace--;        // assume we have now taken a slot        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("SENT : seqn#" + sequenceNumber + " length=" + len);        }    }    /**     *  Serialize a JXTA message as a reliable message     *     * @param  msg              message to send     * @return                  message sequence number     * @exception  IOException  if an i/o error occurs     */    public int send(Message msg) throws IOException {        WireFormatMessage msgSerialized =                WireFormatMessageFactory.toWire(msg, Defs.MIME_TYPE_MSG, null);        ByteArrayOutputStream baos =                new ByteArrayOutputStream((int) msg.getByteLength());        msgSerialized.sendToStream(baos);        baos.close();        byte[] msgData = baos.toByteArray();        write(msgData, 0, msgData.length);        return sequenceNumber;    }    /**     *  Gets the maxAck attribute of the ReliableOutputStream object     *     * @return    The maxAck value     */    public int getMaxAck() {        return maxACK;    }    /**     *  Gets the seqNumber attribute of the ReliableOutputStream object     *     * @return    The seqNumber value     */    public int getSeqNumber() {        return sequenceNumber;    }    /**     *  Gets the queueFull attribute of the ReliableOutputStream object     *     * @return    The queueFull value     */    public boolean isQueueFull() {        return mrrIQFreeSpace < 1;    }    /**     *  Gets the queueEmpty attribute of the ReliableOutputStream object     *     * @return    The queueEmpty value     */    public boolean isQueueEmpty() {        return retrQ.isEmpty();    }    /**     *  wait for activity on the retry queue     *     * @param  timeout                   timeout in millis      * @exception  InterruptedException  when interrupted     */    public void waitQueueEvent(long timeout) throws InterruptedException {        synchronized (retrQ) {            retrQ.wait(timeout);        }    }    /**     *  Calculates a message retransmission time-out     *     * @param  dt         base time     * @param  msgSeqNum  Message sequence number     */    private void calcRTT(long dt, int msgSeqNum) {        nACKS++;        if (nACKS == 1) {            // First ACK arrived. We can start computing aveRTT on the messages            // we send from now on.            rttThreshold = sequenceNumber + 1;        }        if (msgSeqNum > rttThreshold) {            // Compute only when it has stabilized a bit            // Since the initial mrrIQFreeSpace is small; the first few            // messages will be sent early on and may wait a long time            // for the return channel to initialize. After that things            // start flowing and RTT becomes relevant.            // Carefull with the computation: integer division with round-down            // causes cumulative damage: the ave never goes up if this is not            // taken care of. We keep the reminder from one round to the other.            if (!aveRTTreset) {                aveRTT = dt;                aveRTTreset = true;            } else {                long tmp = (8 * aveRTT) + ((8 * remRTT) / 9) + dt;                aveRTT = tmp / 9;                remRTT = tmp - aveRTT * 9;            }        }        // Set retransmission time out: 2.5 x RTT        //        RTO = (aveRTT << 1) + (aveRTT >> 1);        RTO = aveRTT * 2;        // Enforce a min/max        RTO = Math.max(RTO, minRTO);        RTO = Math.min(RTO, maxRTO);        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("RTT = " + dt + "ms aveRTT = " + aveRTT + "ms" +                    " RTO = " + RTO + "ms");        }    }    /**     * @param  iq  Description of the Parameter     * @return     Description of the Return Value     */    private int calcAVEIQ(int iq) {        int n = nIQTests;        nIQTests += 1;        aveIQSize = ((n * aveIQSize) + iq) / nIQTests;        return aveIQSize;    }    /**     *  process an incoming message     *     * @param  msg  message to process     */    public void recv(Message msg) {        Iterator eachACK =                msg.getMessageElements(Defs.NAMESPACE, Defs.MIME_TYPE_ACK);        while (eachACK.hasNext()) {            MessageElement elt = (MessageElement) eachACK.next();            eachACK.remove();            int sackCount = ((int) elt.getByteLength() / 4) - 1;            try {                DataInputStream dis = new DataInputStream(elt.getStream());                int seqack = dis.readInt();                int[] sacs = new int[sackCount];                for (int eachSac = 0; eachSac < sackCount; eachSac++) {                    sacs[eachSac] = dis.readInt();                }                Arrays.sort(sacs);                // take care of the ACK here;                ackReceived(seqack, sacs);            } catch (IOException failed) {                if (LOG.isEnabledFor(Level.WARN)) {                    LOG.warn("Failure processing ACK", failed);                }            }        }    }    /**     * Process an ACK Message. We remove ACKed     * messages from the retry queue.  We only     * acknowledge messages received in sequence.     *     * The seqnum is for the largest unacknowledged seqnum     * the receipient has received.     *     * The sackList is a sequence of all of the     * received messages in the sender's input Q. All     * will be sequence numbers higher than the     * sequential ACK seqnum.     *     * Recepients are passive and only ack upon the     * receipt of an in sequence message.     *     * They depend on our RTO to fill holes in message     * sequences.     *     * @param  seqnum    message sequence number     * @param  sackList  array of message sequence numbers     */    public void ackReceived(int seqnum, int[] sackList) {        int numberACKed = 0;        long rttCalcDt = 0;        int rttCalcSeqnum = -1;        long fallBackDt = 0;        int fallBackSeqnum = -1;        // remove acknowledged messages from retrans Q.        synchronized (retrQ) {            lastACKTime = TimeUtils.timeNow();            fc.ackEventBegin();            maxACK = Math.max(maxACK, seqnum);            // dump the current Retry queue and the SACK list            if (LOG.isEnabledFor(Level.DEBUG)) {                StringBuffer dumpRETRQ =                        new StringBuffer("ACK RECEIVE : " + Integer.toString(seqnum));                if (LOG.isEnabledFor(Level.DEBUG)) {                    dumpRETRQ.append('\n');                }                dumpRETRQ.append("\tRETRQ (size=" + retrQ.size() + ")");                if (LOG.isEnabledFor(Level.DEBUG)) {                    dumpRETRQ.append(" : ");                    for (int y = 0; y < retrQ.size(); y++) {                        if (0 != y) {                            dumpRETRQ.append(", ");                        }                        RetrQElt r = (RetrQElt) retrQ.get(y);                        dumpRETRQ.append(r.seqnum);                    }                }                if (LOG.isEnabledFor(Level.DEBUG)) {                    dumpRETRQ.append('\n');                }                dumpRETRQ.append("\tSACKLIST (size=" + sackList.length + ")");                if (LOG.isEnabledFor(Level.DEBUG)) {                    dumpRETRQ.append(" : ");                    for (int y = 0; y < sackList.length; y++) {                        if (0 != y) {                            dumpRETRQ.append(", ");                        }                        dumpRETRQ.append(sackList[y]);                    }                }                LOG.debug(dumpRETRQ);            }            Iterator eachRetryQueueEntry = retrQ.iterator();            // First remove monotonically increasing seq#s in retrans vector            while (eachRetryQueueEntry.hasNext()) {                RetrQElt r = (RetrQElt) eachRetryQueueEntry.next();                if (r.seqnum > seqnum) {                    break;                }                // Acknowledged                eachRetryQueueEntry.remove();                // Update RTT, RTO. Use only those that where acked                // w/o retrans otherwise the number may be phony (ack                // of first xmit received just after resending => RTT                // seems small).  Also, we keep the worst of the bunch                // we encounter.  If we really can't find a single                // non-resent message, we make do with a pessimistic                // approximation: we must not be left behind with an                // RTT that's too short, we'd keep resending like                // crazy.                long enqueuetime = r.enqueuedAt;                long dt = TimeUtils.toRelativeTimeMillis(lastACKTime, enqueuetime);                // Update RTT, RTO                if (r.marked == 0) {                    if (dt > rttCalcDt) {                        rttCalcDt = dt;                        rttCalcSeqnum = r.seqnum;                    }                } else {                    // In case we find no good candidate, make                    // a guess by dividing by the number of attempts                    // and keep the worst of them too. Since we                    // know it may be too short, we will not use it                    // if shortens rtt.                    dt /= (r.marked + 1);                    if (dt > fallBackDt) {                        fallBackDt = dt;                        fallBackSeqnum = r.seqnum;                    }                }                fc.packetACKed(r.seqnum);                r.msg.clear();                r.msg = null;                r = null;                numberACKed++;            }            // Update last accessed time in response to getting seq acks.            if (numberACKed > 0) {                outgoing.setLastAccessed(TimeUtils.timeNow());            }            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("SEQUENTIALLY ACKD SEQN = " + seqnum +                        ", (" + numberACKed + " acked)");            }            // most recent remote IQ free space            mrrIQFreeSpace = rmaxQSize - sackList.length;            // let's look at average sacs.size(). If it is big, then this            // probably means we must back off because the system is slow.            // Our retrans Queue can be large and we can overwhelm the            // receiver with retransmissions.            // We will keep the rwin <= ave real input queue size.            int aveIQ = calcAVEIQ(sackList.length);            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("remote IQ free space = " + mrrIQFreeSpace +                        " remote avg IQ occupancy = " + aveIQ);            }            int retrans = 0;            if (sackList.length > 0) {                Iterator eachRetrQElement = retrQ.iterator();                int currentSACK = 0;                while (eachRetrQElement.hasNext()) {                    RetrQElt r = (RetrQElt) eachRetrQElement.next();                    while (sackList[currentSACK] < r.seqnum) {                        currentSACK++;                        if (currentSACK == sackList.length) {                            break;                        }                    }                    if (currentSACK == sackList.length) {                        break;                    }                    if (sackList[currentSACK] == r.seqnum) {                        fc.packetACKed(r.seqnum);                        numberACKed++;                        eachRetrQElement.remove();                        // Update RTT, RTO. Use only those that where acked w/o retrans                        // otherwise the number is completely phony.                        // Also, we keep the worst of the bunch we encounter.

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -