⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 reliableinputstream.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
                LOG.log(Level.WARNING, "sendACK caught IOException:", e);            }        }    }        /**     *  {@inheritDoc}     */    public void recv(Message msg) {        queueIncomingMessage(msg);    }        public boolean hasNextMessage() {        return !inputQueue.isEmpty();    }           Message nextMessage(boolean blocking) throws IOException {        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("nextMessage blocking?  [" + blocking + "]");        }        MessageElement elt = dequeueMessage(sequenceNumber + 1, blocking);        if (null == elt) {            return null;        }        sequenceNumber += 1; // next msg sequence number                Message msg;        try {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("Converting message seqn :" + (sequenceNumber - 1) + "element to message");            }                        msg = WireFormatMessageFactory.fromWire(elt.getStream(), Defs.MIME_TYPE_MSG, null);        } catch (IOException ex) {            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.log(Level.WARNING, "Could not deserialize message " + elt.getElementName(), ex);            }            return null;        }        return msg;    }        /**     *  queue messages by sequence number.     */    private void queueIncomingMessage(Message msg) {                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("Queue Incoming Message begins for " + msg);        }                long startEnqueue = TimeUtils.timeNow();                Iterator<MessageElement> eachElement = msg.getMessageElements(Defs.NAMESPACE, Defs.MIME_TYPE_BLOCK);                // OK look for jxta message        while (!closed && !closing && eachElement.hasNext()) {            MessageElement elt = eachElement.next();            eachElement.remove();                        int msgSeqn;            try {                msgSeqn = Integer.parseInt(elt.getElementName());            } catch (NumberFormatException n) {                if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                    LOG.warning("Discarding element (" + elt.getElementName() + ") Not one of ours.");                }                continue;            }                        IQElt newElt = new IQElt(msgSeqn, elt);                        // OK we must enqueue                        // We rely on the sender to not to send more than the window size            // because we do not limit the number of elements we allow to be            // enqueued.                        // see if this is a duplicate            if (newElt.seqnum <= sequenceNumber) {                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("RCVD OLD MESSAGE : Discard seqn#" + newElt.seqnum + " now at seqn#" + sequenceNumber);                }                break;            }                        synchronized (inputQueue) {                                // dbl check with the lock held.                if (closing || closed) {                    return;                }                                // Insert this message into the input queue.                // 1. Do not add duplicate messages                // 2. Store in increasing sequence nos.                int insertIndex = inputQueue.size();                boolean duplicate = false;                                for (int j = 0; j < inputQueue.size(); j++) {                    IQElt iq = inputQueue.get(j);                    if (newElt.seqnum < iq.seqnum) {                        insertIndex = j;                        break;                    } else if (newElt.seqnum == iq.seqnum) {                        duplicate = true;                        break;                    }                }                                if (duplicate) {                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.fine("RCVD OLD MESSAGE :  Discard duplicate msg, seqn#" + newElt.seqnum);                    }                    break;                }                                inputQueue.add(insertIndex, newElt);                                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("Enqueued msg with seqn#" + newElt.seqnum + " at index " + insertIndex);                }                                inputQueue.notifyAll();            }        }                if (listener != null) {            Message newmsg = null;            while (true) {                try {                    newmsg = nextMessage(false);                } catch (IOException io) {// do nothing as this exception will never occur                }                if (newmsg == null) {                    break;                }                try {                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.fine("In listener mode, calling back listener");                    }                    listener.processIncomingMessage(newmsg);                } catch (Throwable all) {                    if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                        LOG.log(Level.WARNING, "Uncaught Throwable calling listener", all);                    }                }            }        }                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            long waited = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), startEnqueue);            LOG.fine("Queue Incoming Message for " + msg + " completed in " + waited + " msec.");        }    }        long nextRetransRequest = TimeUtils.toAbsoluteTimeMillis(TimeUtils.ASECOND);        /**     *  Dequeue the message with the desired sequence number waiting as needed     *  until the message is available.     *     *  @param desiredSeqn the sequence number to be dequeued.     *  @param blocking If {@code true} then this method should block while     *  waiting for the specified message sequence number.     *  @return the Message Element with the desired sequence number or null if     *  the queue has been closed.     */    private MessageElement dequeueMessage(int desiredSeqn, boolean blocking) throws IOException {        IQElt iQ = null;                // Wait for incoming message here        long startDequeue = TimeUtils.timeNow();        long timeoutAt = TimeUtils.toAbsoluteTimeMillis(timeout);        int wct = 0;                synchronized (inputQueue) {            while (!closed) {                if (inputQueue.isEmpty()) {                    if (!blocking) {                        return null;                    }                    if (closing) {                        return null;                    }                    try {                        wct++;                        inputQueue.wait(TimeUtils.ASECOND);                        if (timeoutAt < TimeUtils.timeNow()) {                            throw new SocketTimeoutException("Read timeout reached");                        }                    } catch (InterruptedException e) {                        Thread.interrupted();                    }                    // reset retrans request timer since we don't want to immediately                    // request retry after a long wait for out of order messages.                                        nextRetransRequest = TimeUtils.toAbsoluteTimeMillis(TimeUtils.ASECOND);                    continue;                }                                iQ = inputQueue.get(0); // FIFO                                if (iQ.seqnum < desiredSeqn) {                    // Ooops a DUPE slipped in the head of the queue undetected                    // (seqnum consistency issue).                    // Just drop it.                    inputQueue.remove(0);                    // if such is the case then notify the other end so that                    // the message does not remain in the retry queue eventually                    // triggering a broken pipe exception                    sendACK(iQ.seqnum);                    continue;                } else if (iQ.seqnum != desiredSeqn) {                    if (TimeUtils.toRelativeTimeMillis(nextRetransRequest) < 0) {                        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                            LOG.fine("Trigger retransmission. Wanted seqn#" + desiredSeqn + " found seqn#" + iQ.seqnum);                        }                        sendACK(desiredSeqn - 1);                        nextRetransRequest = TimeUtils.toAbsoluteTimeMillis(TimeUtils.ASECOND);                    }                    if (!blocking) {                        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                            LOG.fine("Message out of sequece in Non-Blocking mode. returning");                        }                        // not the element of interest return nothing                        return null;                    }                    try {                        wct++;                        inputQueue.wait(TimeUtils.ASECOND);                        if (timeoutAt < TimeUtils.timeNow()) {                            throw new SocketTimeoutException("Read timeout reached");                        }                    } catch (InterruptedException e) {                        throw new InterruptedIOException("IO interrupted ");                    }                    continue;                }                inputQueue.remove(0);                break;            }        }        nextRetransRequest = 0;        // if we are closed then we return null        if (null == iQ) {            return null;        }                sendACK(desiredSeqn);                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            long waited = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), startDequeue);            LOG.fine("DEQUEUED seqn#" + iQ.seqnum + " in " + waited + " msec on input queue");            if (wct > 0) {                LOG.fine("DEQUEUE waited " + wct + " times on input queue");            }        }        return iQ.elt;    }        /**     * {@inheritDoc}     */    @Override    public int available() throws IOException {        if (listener != null) {            throw new IOException("available() not supported in async mode");        }        if (closed) {            throw new IOException("Stream closed");        }        synchronized (record) {            if (record.inputStream != null) {                if ((record.size == 0) || (record.nextByte == record.size)) {                    if (inputQueue.isEmpty()) {                        return 0;                    }                    // reset the record                    record.resetRecord(); // GC as necessary(inputStream byte[])                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.fine("Getting next data block at seqn#" + (sequenceNumber + 1));                    }                    MessageElement elt = dequeueMessage(sequenceNumber + 1, false);                    if (null == elt) {                        return 0;                    }                    sequenceNumber += 1; // next msg sequence number                    // Get the length of the Record                    record.size = elt.getByteLength();                    record.inputStream = elt.getStream();                }                return record.inputStream.available();            }        }        return 0;    }        private int local_read(byte[] buf, int offset, int length) throws IOException {                if (listener != null) {            throw new IOException("read() not supported in async mode");        }                synchronized (record) {            if ((record.size == 0) || (record.nextByte == record.size)) {                                // reset the record                record.resetRecord(); // GC as necessary(inputStream byte[])                                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("Getting next data block at seqn#" + (sequenceNumber + 1));                }                                MessageElement elt = dequeueMessage(sequenceNumber + 1, true);                                if (null == elt) {                    return -1;                }                                sequenceNumber += 1; // next msg sequence number                                // Get the length of the Record                record.size = elt.getByteLength();                record.inputStream = elt.getStream();                                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("new seqn#" + sequenceNumber + ", bytes = " + record.size);                }            }                        // return the requested Record data            // These calls should NEVER ask for more data than is in the            // received Record.                        long left = record.size - record.nextByte;            int copyLen = (int) Math.min(length, left);            int copied = 0;                        do {                int res = record.inputStream.read(buf, offset + copied, copyLen - copied);                                if (res < 0) {                    break;                }                copied += res;            } while (copied < copyLen);                        record.nextByte += copied;                        if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) {                LOG.finer("Requested " + length + ", Read " + copied + " bytes");            }                        return copied;        }    }        /**     * Returns the message listener for this pipe     * @return MsgListener     *     */    public MsgListener getListener() {        return listener;    }        /**     *  The listener interface for receiving {@link net.jxta.endpoint.Message}     */    public interface MsgListener {                /**         * Called for each message received.         *         * @param message The message to be received.         */        void processIncomingMessage(Message message);    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -