⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jtlsinputstream.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
        try {            dos.writeInt(seqnAck);                        Iterator<Integer> eachSACK = sackList.iterator();                        while (eachSACK.hasNext()) {                int aSack = (eachSACK.next()).intValue();                dos.writeInt(aSack);            }            dos.close();            bos.close();                        Message ACKMsg = new Message();            MessageElement elt = new ByteArrayMessageElement(JTlsDefs.ACKKEY, JTlsDefs.ACKS, bos.toByteArray(), null);                        ACKMsg.addMessageElement(JTlsDefs.TLSNameSpace, elt);                        conn.sendToRemoteTls(ACKMsg);                        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("SENT ACK, seqn#" + seqnAck + " and " + sackList.size() + " SACKs ");            }        } catch (IOException e) {            if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {                LOG.log(Level.INFO, "sendACK caught IOException:", e);            }        }    }        /**     *  queue messages by sequence number.     */    public void queueIncomingMessage(Message msg) {                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("Queue Incoming Message begins for " + msg);        }                long startEnqueue = TimeUtils.timeNow();                Message.ElementIterator e = msg.getMessageElements(JTlsDefs.TLSNameSpace, JTlsDefs.BLOCKS);                // OK look for jxta message        while (!closed && !closing && e.hasNext()) {            MessageElement elt = e.next();            e.remove();                        int msgSeqn = 0;                        try {                msgSeqn = Integer.parseInt(elt.getElementName());            } catch (NumberFormatException n) {                if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                    LOG.warning("Discarding element (" + elt.getElementName() + ") Not one of ours.");                }                continue;            }                        IQElt newElt = new IQElt();                        newElt.seqnum = msgSeqn;            newElt.elt = elt;            newElt.ackd = false;                        // OK we must inqueue:            // Wait until someone dequeues if we are at the size limit            // see if this is a duplicate            if (newElt.seqnum <= sequenceNumber) {                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("RCVD OLD MESSAGE : Discard seqn#" + newElt.seqnum + " now at seqn#" + sequenceNumber);                }                break;            }            synchronized (inputQueue) {                // dbl check with the lock held.                if (closing || closed) {                    return;                }                                // Insert this message into the input queue.                // 1. Do not add duplicate messages                // 2. Store in increasing sequence nos.                int insertIndex = inputQueue.size();                boolean duplicate = false;                                for (int j = 0; j < inputQueue.size(); j++) {                    IQElt iq = inputQueue.elementAt(j);                    if (newElt.seqnum < iq.seqnum) {                        insertIndex = j;                        break;                    } else if (newElt.seqnum == iq.seqnum) {                        duplicate = true;                        break;                    }                }                                if (duplicate) {                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.fine("RCVD OLD MESSAGE : Discard duplicate msg, seqn#" + newElt.seqnum);                    }                    newElt = null;                    break;                }                                inputQueue.add(insertIndex, newElt);                                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("Enqueued msg with seqn#" + newElt.seqnum + " at index " + insertIndex);                }                                inputQueue.notifyAll();                newElt = null;            }        }        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            long waited = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), startEnqueue);                        LOG.fine("Queue Incoming Message for " + msg + " completed in " + waited + " msec.");        }    }        /**     *  Dequeue the message with the desired sequence number waiting as needed     *  until the message is available.     *     *  @param desiredSeqn the sequence number to be dequeued.     *  @return the Message Element with the desired sequence number or null if     *  the queue has been closed.     **/    private MessageElement dequeueMessage(int desiredSeqn) throws IOException {        IQElt iQ = null;                // Wait for incoming message here        long startDequeue = TimeUtils.timeNow();        long whenToTimeout = startDequeue + timeout;        int wct = 0;                long nextRetransRequest = TimeUtils.toAbsoluteTimeMillis(TimeUtils.ASECOND);                synchronized (inputQueue) {            while (!closed) {                if (inputQueue.size() == 0) {                    if (closing) {                        return null;                    }                    try {                        wct++;                        inputQueue.wait(TimeUtils.ASECOND);                        if (whenToTimeout < TimeUtils.timeNow()) {                            throw new SocketTimeoutException("Read timeout reached");                        }                    } catch (InterruptedException e) {                        Thread.interrupted(); // just continue                    }                    // we reset the retrans request timer since we don't want to                    // immediately request retry after a long wait for out of                    // order messages.                                        nextRetransRequest = TimeUtils.toAbsoluteTimeMillis(TimeUtils.ASECOND);                    continue;                }                                iQ = inputQueue.elementAt(0); // FIFO                                if (iQ.seqnum < desiredSeqn) {                    // Ooops a DUPE slipped in the head of the queue undetected                    // (seqnum consistency issue).                    // Just drop it.                    inputQueue.remove(0);                    // if such is the case then notify the other end so that                    // the message does not remain in the retry queue eventually                    // triggering a broken pipe exception                    sendACK(iQ.seqnum);                    continue;                } else if (iQ.seqnum != desiredSeqn) {                    if (TimeUtils.toRelativeTimeMillis(nextRetransRequest) < 0) {                        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                            LOG.fine("Trigger retransmission. Wanted seqn#" + desiredSeqn + " found seqn#" + iQ.seqnum);                        }                        sendACK(desiredSeqn - 1);                        nextRetransRequest = TimeUtils.toAbsoluteTimeMillis(TimeUtils.ASECOND);                    }                                        try {                        wct++;                        inputQueue.wait(TimeUtils.ASECOND);                        if (whenToTimeout < TimeUtils.timeNow()) {                            throw new SocketTimeoutException("Read timeout reached");                        }                    } catch (InterruptedException e) {                        throw new InterruptedIOException("IO interrupted ");                    }                    continue;                }                                inputQueue.remove(0);                break;            }        }                nextRetransRequest = 0;        sendACK(desiredSeqn);        // if we are closed then we return null        if (null == iQ) {            return null;        }                if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {            long waited = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), startDequeue);                        LOG.info("DEQUEUED seqn#" + iQ.seqnum + " in " + waited + " msec on input queue");            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                if (wct > 0) {                    LOG.fine("DEQUEUE waited " + wct + " times on input queue");                }            }        }                return iQ.elt;    }        /**     *     */    private int local_read(byte[] a, int offset, int length) throws IOException {                synchronized (jtrec) {            if ((jtrec.size == 0) || (jtrec.nextByte == jtrec.size)) {                                // reset the record                jtrec.resetRecord(); // GC as necessary(tlsRecord byte[])                                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("local_read: getting next data block at seqn#" + (sequenceNumber + 1));                }                                MessageElement elt = null;                try {                    elt = dequeueMessage(sequenceNumber + 1);                } catch (SocketTimeoutException ste) {                    // timed out with no data                    // SSLSocket expects a 0 data in this case                    return 0;                }                                if (null == elt) {                    return -1;                }                                sequenceNumber += 1; // next msg sequence number                                // Get the length of the TLS Record                jtrec.size = elt.getByteLength();                jtrec.tlsRecord = elt.getStream();                                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("local_read: new seqn#" + sequenceNumber + ", bytes = " + jtrec.size);                }            }                        // return the requested TLS Record data            // These calls should NEVER ask for more data than is in the            // received TLS Record.                        long left = jtrec.size - jtrec.nextByte;            int copyLen = (int) Math.min(length, left);            int copied = 0;                        do {                int res = jtrec.tlsRecord.read(a, offset + copied, copyLen - copied);                                if (res < 0) {                    break;                }                                copied += res;            } while (copied < copyLen);                        jtrec.nextByte += copied;                        if (DEBUGIO) {                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("local_read: Requested " + length + ", Read " + copied + " bytes");                }            }                        return copied;        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -