⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 reliableinputstream.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
            }            ByteArrayInputStream bais =                new ByteArrayInputStream(elt.getBytes(false));            msg = WireFormatMessageFactory.fromWire(bais,                                                    Defs.MIME_TYPE_MSG,                                                    null);        } catch (IOException ex) {            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn("Could not deserialize message " +                         elt.getElementName() + ": " + ex.getMessage());            }            return null;        }        return msg;    }    /**     *  queue messages by sequence number.     */    private void queueIncomingMessage(Message msg) {        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug( "Queue Incoming Message begins for " +  msg );        }        long startEnqueue = TimeUtils.timeNow();        Message.ElementIterator e =            msg.getMessageElements(Defs.NAMESPACE, Defs.MIME_TYPE_BLOCK);        // OK look for jxta message        while (!closed && !closing && e.hasNext()) {            MessageElement elt = (MessageElement) e.next();            e.remove();            int msgSeqn = 0;            try {                msgSeqn = Integer.parseInt(elt.getElementName());            } catch (NumberFormatException n) {                if (LOG.isEnabledFor(Level.WARN)) {                    LOG.warn( "Discarding element (" + elt.getElementName() +                              ") Not one of ours." );                }                continue;            }            IQElt newElt = new IQElt();            newElt.seqnum = msgSeqn;            newElt.elt = elt;            newElt.ackd = false;            // OK we must inqueue:            // Wait until someone dequeues if we are at the size limit            // XXX AA 4/15/2003: where is the check for            // size limit and the code to wait?            // XXX JCH better not to wait anyway. Rely on the sender to not            // do that too badly, and drop the message if it does, or just take it            // (as we do right now).            // see if this is a duplicate            if (newElt.seqnum <= sequenceNumber) {                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("RCVD OLD MESSAGE : Discard seqn#" +                              newElt.seqnum + " now at seqn#" +                              sequenceNumber);                }                break;            }            synchronized (inputQueue) {                // dbl check with the lock held.                if (closing || closed) {                    return;                }                // Insert this message into the input queue.                //   1. Do not add duplicate messages                //   2. Store in increasing sequence nos.                int insertIndex = inputQueue.size();                boolean duplicate = false;                for (int j = 0; j < inputQueue.size(); j++) {                    IQElt iq = (IQElt) inputQueue.get(j);                    if (newElt.seqnum < iq.seqnum) {                        insertIndex = j;                        break;                    } else if (newElt.seqnum == iq.seqnum) {                        duplicate = true;                        break;                    }                }                if (duplicate) {                    if (LOG.isEnabledFor(Level.DEBUG)) {                        LOG.debug("RCVD OLD MESSAGE : " +                                  " Discard duplicate msg, seqn#" +                                  newElt.seqnum);                    }                    break;                }                inputQueue.add(insertIndex, newElt);                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("Enqueued msg with seqn#" + newElt.seqnum +                              " at index " + insertIndex );                }                inputQueue.notifyAll();            }        }        if (listener != null) {            Message newmsg = null;            while (true) {                try {                    newmsg = nextMessage(false);                } catch (IOException io) {                    //do nothing as this exception will never occur                }                if (newmsg == null) {                    break;                }                try {                    if (LOG.isEnabledFor(Level.DEBUG)) {                        LOG.debug("In listener mode, calling back listener");                    }                    listener.processIncomingMessage(newmsg);                } catch (Throwable t) {                    if (LOG.isEnabledFor(Level.WARN)) {                        LOG.warn("Failed to notify message listener", t);                    }                }            }        }        if (LOG.isEnabledFor(Level.DEBUG)) {            long waited = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(),                          startEnqueue);            LOG.debug("Queue Incoming Message for " + msg +                     " completed in " + waited + " msec.");        }    }    long nextRetransRequest =        TimeUtils.toAbsoluteTimeMillis(TimeUtils.ASECOND);    /**     *  Dequeue the message with the desired sequence number waiting as needed     *  until the message is available.     *     *  @param desiredSeqn the sequence number to be dequeued.     *  @return the Message Element with the desired sequence number or null if     *  the queue has been closed.     */    private MessageElement dequeueMessage(int desiredSeqn, boolean blocking) throws IOException {        IQElt iQ = null;        // Wait for incoming message here        long startDequeue = TimeUtils.timeNow();        long whenToTimeout = startDequeue + timeout;        int wct = 0;        synchronized (inputQueue) {            while(!closed) {                if(inputQueue.size() == 0) {                    if(!blocking) {                        return null;                    }                    if (closing) {                        return null;                    }                    try {                        wct++;                        inputQueue.wait(TimeUtils.ASECOND);                        if (whenToTimeout < TimeUtils.timeNow()) {                            throw new SocketTimeoutException("Read timeout reached");                        }                    } catch (InterruptedException e) {                        Thread.interrupted(); // just continue                    }                    // reset retrans request timer since we don't want to immediately                    // request retry after a long wait for out of order messages.                    nextRetransRequest =                        TimeUtils.toAbsoluteTimeMillis(TimeUtils.ASECOND);                    continue;                }                iQ = (IQElt) inputQueue.get(0); // FIFO                if (iQ.seqnum < desiredSeqn) {                    // Ooops a DUPE slipped in the head of the queue undetected                    // (seqnum consistency issue).                    // Just drop it.                    inputQueue.remove(0);                    // if such is the case then notify the other end so that                    // the message does not remain in the retry queue eventually                    // triggering a broken pipe exception                    sendACK(iQ.seqnum);                    continue;                } else if (iQ.seqnum != desiredSeqn) {                    if (TimeUtils.toRelativeTimeMillis(nextRetransRequest) < 0) {                        if (LOG.isEnabledFor(Level.DEBUG)) {                            LOG.debug("Trigger retransmission. Wanted seqn#" +                                      desiredSeqn + " found seqn#" +                                      iQ.seqnum);                        }                        sendACK(desiredSeqn - 1);                        nextRetransRequest =                             TimeUtils.toAbsoluteTimeMillis(TimeUtils.ASECOND);                    }                    if(!blocking) {                        if (LOG.isEnabledFor(Level.DEBUG)) {                            LOG.debug("Message out of sequece in Non-Blocking mode. returning");                        }                        //not the element of interest return nothing                        return null;                    }                    try {                        wct++;                        inputQueue.wait(TimeUtils.ASECOND);                        if (whenToTimeout < TimeUtils.timeNow()) {                            throw new SocketTimeoutException("Read timeout reached");                        }                    } catch (InterruptedException e) {                        throw new InterruptedIOException("IO interrupted ");                    }                    continue;                }                inputQueue.remove(0);                break;            }        }        nextRetransRequest = 0;        // if we are closed then we return null        if (null == iQ) {            return null;        }        sendACK(desiredSeqn);        if (LOG.isEnabledFor(Level.DEBUG)) {            long waited =                TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(),                                               startDequeue);            LOG.debug("DEQUEUED seqn#" + iQ.seqnum + " in " + waited +                     " msec on input queue" );            if (wct > 0) {                LOG.debug("DEQUEUE waited " + wct + " times on input queue");            }        }        if (LOG.isEnabledFor(Level.DEBUG)) {           LOG.debug("DEQUEUE waited " + wct + " times on input queue");        }        return iQ.elt;    }    /**     * {@inheritDoc}     */    public int available() throws IOException {        if(listener != null) {            throw new IOException("available() not supported in async mode");        }        if (closed) {            throw new IOException("Stream closed");        }        synchronized (record) {            if (record.inputStream != null) {                if ((record.size == 0) || (record.nextByte == record.size)) {                    if (inputQueue.size() == 0) {                        return 0;                    }                    // reset the record                    record.resetRecord();      // GC as necessary(inputStream byte[])                    if (LOG.isEnabledFor(Level.DEBUG)) {                        LOG.debug("local_read: getting next data block at seqn#" +                                  (sequenceNumber + 1));                    }                    MessageElement elt = dequeueMessage(sequenceNumber + 1, false);                    if (null == elt) {                        return 0;                    }                    sequenceNumber += 1;       // next msg sequence number                    // Get the length of the Record                    record.size = elt.getByteLength();                    record.inputStream = elt.getStream();                }                return record.inputStream.available();            }        }        return 0;    }    /**     *     */    private int local_read(byte[] buf, int offset, int length) throws IOException {        if(listener != null) {            throw new IOException("read() not supported in async mode");        }        synchronized (record) {            if ((record.size == 0) || (record.nextByte == record.size)) {                // reset the record                record.resetRecord(); // GC as necessary(inputStream byte[])                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("local_read: getting next data block at seqn#" +                              (sequenceNumber + 1));                }                MessageElement elt = dequeueMessage(sequenceNumber + 1, true);                if (null == elt) {                    return -1;                }                sequenceNumber += 1; // next msg sequence number                // Get the length of the Record                record.size = elt.getByteLength();                record.inputStream = elt.getStream();                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("local_read: new seqn#" + sequenceNumber +                              ", bytes = " + record.size );                }            }            // return the requested Record data            // These calls should NEVER ask for more data than is in the            // received Record.            long left = record.size - record.nextByte;            int copyLen = (int) Math.min(length, left);            int copied = 0;            do {                int res = record.inputStream.read(buf, offset + copied,                                                  copyLen - copied);                if (res < 0) {                    break;                }                copied += res;            } while (copied < copyLen);            record.nextByte += copied;            if (DEBUGIO) {                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("local_read: Requested " + length +                              ", Read " + copied + " bytes");                }            }            return copied;        }    }    /**     * Returns the message listener for this pipe     * @return MsgListener     *     */    public MsgListener getListener() {        return listener;    }    /**     *  The listener interface for receiving {@link net.jxta.endpoint.Message}      *      */    public interface MsgListener {        /**         * Called for each message received.         *         * @param message The message to be received.         */        void processIncomingMessage(Message message);    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -