⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jtlsinputstream.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
            Iterator eachSACK = sackList.iterator();            while (eachSACK.hasNext()) {                int aSack = ((Integer) eachSACK.next()).intValue();                dos.writeInt(aSack);            }            dos.close();            bos.close();            Message ACKMsg = new Message();            MessageElement elt = new ByteArrayMessageElement(JTlsDefs.ACKKEY, JTlsDefs.ACKS, bos.toByteArray(), null);            ACKMsg.addMessageElement(JTlsDefs.TLSNameSpace, elt);            conn.sendToRemoteTls(ACKMsg);            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("SENT ACK, seqn#" + seqnAck + " and " + sackList.size() + " SACKs ");            }        } catch (IOException e) {            if (LOG.isEnabledFor(Level.INFO)) {                LOG.info("sendACK caught IOException:", e);            }        }    }    /**     *  queue messages by sequence number.     */    public void queueIncomingMessage(Message msg) {        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("Queue Incoming Message begins for " + msg);        }        long startEnqueue = TimeUtils.timeNow();        Message.ElementIterator e = msg.getMessageElements(JTlsDefs.TLSNameSpace, JTlsDefs.BLOCKS);        // OK look for jxta message        while (!closed && !closing && e.hasNext()) {            MessageElement elt = (MessageElement) e.next();            e.remove();            int msgSeqn = 0;            try {                msgSeqn = Integer.parseInt(elt.getElementName());            } catch (NumberFormatException n) {                if (LOG.isEnabledFor(Level.WARN)) {                    LOG.warn("Discarding element (" + elt.getElementName() + ") Not one of ours.");                }                continue;            }            IQElt newElt = new IQElt();            newElt.seqnum = msgSeqn;            newElt.elt = elt;            newElt.ackd = false;            // OK we must inqueue:            // Wait until someone dequeues if we are at the size limit            // see if this is a duplicate            if (newElt.seqnum <= sequenceNumber) {                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("RCVD OLD MESSAGE : Discard seqn#" +                              newElt.seqnum + " now at seqn#" +                              sequenceNumber);                }                break;            }            synchronized(inputQueue) {                // dbl check with the lock held.                if (closing || closed) {                    return;                }                // Insert this message into the input queue.                // 1. Do not add duplicate messages                // 2. Store in increasing sequence nos.                int insertIndex = inputQueue.size();                boolean duplicate = false;                for (int j = 0; j < inputQueue.size(); j++) {                    IQElt iq = (IQElt) inputQueue.elementAt(j);                    if (newElt.seqnum < iq.seqnum) {                        insertIndex = j;                        break;                    } else if (newElt.seqnum == iq.seqnum) {                            duplicate = true;                            break;                        }                }                if (duplicate) {                    if (LOG.isEnabledFor(Level.DEBUG)) {                        LOG.debug("RCVD OLD MESSAGE : Discard duplicate msg, seqn#" + newElt.seqnum);                    }                    newElt = null;                    break;                }                inputQueue.add(insertIndex, newElt);                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("Enqueued msg with seqn#" + newElt.seqnum + " at index " + insertIndex);                }                inputQueue.notifyAll();                newElt = null;            }        }        if (LOG.isEnabledFor(Level.DEBUG)) {            long waited = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), startEnqueue);            LOG.debug("Queue Incoming Message for " + msg + " completed in " + waited + " msec.");        }    }    /**     *  Dequeue the message with the desired sequence number waiting as needed     *  until the message is available.     *     *  @param desiredSeqn the sequence number to be dequeued.     *  @return the Message Element with the desired sequence number or null if     *  the queue has been closed.     **/    private MessageElement dequeueMessage(int desiredSeqn) throws IOException {        IQElt iQ = null;        // Wait for incoming message here        long startDequeue = TimeUtils.timeNow();        long whenToTimeout = startDequeue + timeout;        int wct = 0;        long nextRetransRequest = TimeUtils.toAbsoluteTimeMillis(TimeUtils.ASECOND);        synchronized (inputQueue) {            while (!closed) {                if (inputQueue.size() == 0) {                    if (closing) {                        return null;                    }                    try {                        wct++;                        inputQueue.wait(TimeUtils.ASECOND);                        if (whenToTimeout < TimeUtils.timeNow()) {                            throw new SocketTimeoutException("Read timeout reached");                        }                    } catch (InterruptedException e) {                        Thread.interrupted(); // just continue                    }                    // we reset the retrans request timer since we don't want to                    // immediately request retry after a long wait for out of                    // order messages.                    nextRetransRequest = TimeUtils.toAbsoluteTimeMillis(TimeUtils.ASECOND);                    continue;                }                iQ = (IQElt) inputQueue.elementAt(0); // FIFO                if (iQ.seqnum < desiredSeqn) {                    // Ooops a DUPE slipped in the head of the queue undetected                    // (seqnum consistency issue).                    // Just drop it.                    inputQueue.remove(0);                    // if such is the case then notify the other end so that                    // the message does not remain in the retry queue eventually                    // triggering a broken pipe exception                    sendACK(iQ.seqnum);                    continue;                } else if (iQ.seqnum != desiredSeqn) {                        if (TimeUtils.toRelativeTimeMillis(nextRetransRequest) < 0) {                            if (LOG.isEnabledFor(Level.DEBUG)) {                                LOG.info("Trigger retransmission. Wanted seqn#" +                                         desiredSeqn + " found seqn#" +                                         iQ.seqnum);                            }                            sendACK(desiredSeqn - 1);                            nextRetransRequest = TimeUtils.toAbsoluteTimeMillis(TimeUtils.ASECOND);                        }                        try {                            wct++;                            inputQueue.wait(TimeUtils.ASECOND);                            if (whenToTimeout < TimeUtils.timeNow()) {                                throw new SocketTimeoutException("Read timeout reached");                            }                        } catch (InterruptedException e) {                            throw new InterruptedIOException("IO interrupted ");                        }                        continue;                    }                inputQueue.remove(0);                break;            }        }        nextRetransRequest = 0;        sendACK(desiredSeqn);        // if we are closed then we return null        if (null == iQ) {            return null;        }        if (LOG.isEnabledFor(Level.INFO)) {            long waited =                TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(),                                               startDequeue);            LOG.info("DEQUEUED seqn#" + iQ.seqnum + " in " + waited +                     " msec on input queue" );            if (wct > 0) {                LOG.debug("DEQUEUE waited " + wct + " times on input queue");            }        }        return iQ.elt;    }    /**     *     */    private int local_read(byte[] a, int offset, int length) throws IOException {        synchronized (jtrec) {            if ((jtrec.size == 0) || (jtrec.nextByte == jtrec.size)) {                // reset the record                jtrec.resetRecord(); // GC as necessary(tlsRecord byte[])                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("local_read: getting next data block at seqn#" + (sequenceNumber + 1));                }                MessageElement elt = null;                try {                    elt = dequeueMessage(sequenceNumber + 1);                } catch (SocketTimeoutException ste) {                    //timed out with no data                    //SSLSocket expects a 0 data in this case                    return 0;                }                if (null == elt) {                    return -1;                }                sequenceNumber += 1; // next msg sequence number                // Get the length of the TLS Record                jtrec.size = elt.getByteLength();                jtrec.tlsRecord = elt.getStream();                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("local_read: new seqn#" + sequenceNumber + ", bytes = " + jtrec.size);                }            }            // return the requested TLS Record data            // These calls should NEVER ask for more data than is in the            // received TLS Record.            long left = jtrec.size - jtrec.nextByte;            int copyLen = (int) Math.min(length, left);            int copied = 0;            do {                int res = jtrec.tlsRecord.read(a, offset + copied, copyLen - copied);                if (res < 0) {                    break;                }                copied += res;            } while (copied < copyLen);            jtrec.nextByte += copied;            if (DEBUGIO) {                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("local_read: Requested " + length + ", Read " + copied + " bytes");                }            }            return copied;        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -