⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 multiplexinputstream.java

📁 OpenJMS是一个开源的Java Message Service API 1.0.2 规范的实现,它包含有以下特性: *. 它既支持点到点(point-to-point)(PTP)模型和发布/订
💻 JAVA
字号:
/** * Redistribution and use of this software and associated documentation * ("Software"), with or without modification, are permitted provided * that the following conditions are met: * * 1. Redistributions of source code must retain copyright *    statements and notices.  Redistributions must also contain a *    copy of this document. * * 2. Redistributions in binary form must reproduce the *    above copyright notice, this list of conditions and the *    following disclaimer in the documentation and/or other *    materials provided with the distribution. * * 3. The name "Exolab" must not be used to endorse or promote *    products derived from this Software without prior written *    permission of Exoffice Technologies.  For written permission, *    please contact info@exolab.org. * * 4. Products derived from this Software may not be called "Exolab" *    nor may "Exolab" appear in their names without prior written *    permission of Exoffice Technologies. Exolab is a registered *    trademark of Exoffice Technologies. * * 5. Due credit should be given to the Exolab Project *    (http://www.exolab.org/). * * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED * OF THE POSSIBILITY OF SUCH DAMAGE. * * Copyright 2003-2005 (C) Exoffice Technologies Inc. All Rights Reserved. * * $Id: MultiplexInputStream.java,v 1.2 2005/04/02 13:23:12 tanderson Exp $ */package org.exolab.jms.net.multiplexer;import java.io.DataInputStream;import java.io.IOException;import java.io.InputStream;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;/** * An <code>InputStream</code> which reads multiplexed data over a shared * physical connection, managed by a {@link Multiplexer}. * <p/> * <em>NOTE:</em> the <code>InputStream</code> methods of this class are not * thread safe * * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a> * @version $Revision: 1.2 $ * @see Multiplexer */class MultiplexInputStream extends InputStream implements Constants {    /**     * The channel identifier.     */    private final int _channelId;    /**     * The multiplexer.     */    private Multiplexer _multiplexer;    /**     * The local data buffer.     */    private byte[] _data;    /**     * Temporary buffer for single byte reads.     */    private byte[] _byte = new byte[1];    /**     * The index into <code>_data</code> where data starts.     */    private int _index = 0;    /**     * The number of available bytes in <code>_data</code>.     */    private int _available = 0;    /**     * Indicates if the underyling connection has been closed.     */    private boolean _disconnected = false;    /**     * The no. of bytes to read before notifying the remote endpoint.     */    private final int _lowWaterMark;    /**     * The number of bytes read from this stream since the last.     * <code>notifyRead()</code> call     */    private int _read = 0;    /**     * Synchronization helper.     */    private final Object _lock = new Object();    /**     * The logger.     */    private final Log _log = LogFactory.getLog(MultiplexInputStream.class);    /**     * Construct a new <code>MultiplexInputStream</code>.     *     * @param channelId   the channel identifier     * @param multiplexer the multiplexer     * @param size        the size of the local data buffer     */    public MultiplexInputStream(int channelId, Multiplexer multiplexer,                                int size) {        _channelId = channelId;        _multiplexer = multiplexer;        _data = new byte[size];        _lowWaterMark = size / 2;    }    /**     * This implementation is a no-op, as the stream is re-used.     */    public void close() {    }    /**     * Closes this input stream and releases any resources associated with it.     *     * @throws IOException if an I/O error occurs     */    public void destroy() throws IOException {        // notify the endpoint iff it hasn't notified this of disconnection        synchronized (_lock) {            if (!_disconnected) {                //_multiplexer.closed(this);            }        }        _multiplexer = null;        _data = null;    }    /**     * Reads the next byte of data from the input stream. The value byte is     * returned as an <code>int</code> in the range <code>0</code> to     * <code>255</code>. If no byte is available because the end of the stream     * has been reached, the value <code>-1</code> is returned. This method     * blocks until input data is available, the end of the stream is detected,     * or an exception is thrown.     *     * @return the next byte of data, or <code>-1</code> if the end of the     *         stream is reached.     * @throws IOException if an I/O error occurs.     */    public int read() throws IOException {        final int mask = 0xFF;        int count = read(_byte, 0, 1);        return (count == 1) ? _byte[0] & mask : -1;    }    /**     * Reads up to <code>length</code> bytes of data from the input stream into     * an array of bytes.  An attempt is made to read as many as     * <code>length</code> bytes, but a smaller number may be read, possibly     * zero. The number of bytes actually read is returned as an integer.     * <p/>     * <p> If the first byte cannot be read for any reason other than end of     * file, then an <code>IOException</code> is thrown. In particular, an     * <code>IOException</code> is thrown if the input stream has been closed.     *     * @param buffer the buffer into which the data is read     * @param offset the start offset in array <code>buffer</code> at which the     *               data is written     * @param length the maximum number of bytes to read     * @return the total number of bytes read into the buffer, or     *         <code>-1</code> if there is no more data because the end of the     *         stream has been reached.     * @throws IOException               if an I/O error occurs.     * @throws IndexOutOfBoundsException if <code>offset</code> is negative, or     *                                   <code>length</code> is negative, or     *                                   <code>offset+length</code> is greater     *                                   than the length of the array     * @throws NullPointerException      if <code>buffer</code> is null     */    public int read(byte[] buffer, int offset, int length) throws IOException {        int count = 0;        if (length > 0) {            synchronized (_lock) {                count = (length <= _available) ? length : _available;                if (_log.isDebugEnabled()) {                    _log.debug("read(length=" + length + ") [channelId="                               + _channelId                               + ", available=" + _available + "]");                }                if (count > 0) {                    // copy the available data into the buffer                    copy(buffer, offset, count);                }                if (count < length) {                    // wait for more data to become available                    int more = length - count;                    while ((_available < more) && !_disconnected) {                        if (_log.isDebugEnabled()) {                            _log.debug("read() waiting on data [channelId="                                       + _channelId                                       + ", available=" + _available                                       + ", requested=" + more + "]");                        }                        try {                            _lock.wait();                        } catch (InterruptedException ignore) {                        }                    }                    if (_available > 0) {                        // more data available, so copy it                        more = (more <= _available) ? more : _available;                        offset += count;                        copy(buffer, offset, more);                        count += more;                    }                }                if ((count == 0) && _disconnected) {                    // no data was read, and we were disconnected. Indicate                    // end of stream to user                    count = -1;                }            }        }        return count;    }    /**     * Returns the number of bytes that can be read (or skipped over) from this     * input stream without blocking by the next caller of a method for this     * input stream.     *     * @return the number of bytes that can be read from this input stream     *         without blocking.     */    public int available() {        int result;        synchronized (_lock) {            result = _available;        }        return result;    }    /**     * Invoked when the underlying physical connection is closed.     */    public void disconnected() {        synchronized (_lock) {            _disconnected = true;            _lock.notifyAll();        }    }    /**     * Returns a string representation of this.     *     * @return a string representation of this     */    public String toString() {        return "MultiplexInputStream[available=" + _available + "]";    }    /**     * Invoked by {@link Multiplexer} when data is available for this stream.     *     * @param input  the stream to read data from     * @param length the number of bytes to read     * @throws IOException if an I/O error occurs     */    protected void receive(DataInputStream input, int length)            throws IOException {        synchronized (_lock) {            int space = _data.length - _available;            if (length > space) {                throw new IOException("Buffer overflow: buffer size="                                      + _data.length                                      + ", space available=" + space                                      + ", requested size=" + length);            }            int freeAtEnd = _data.length - (_index + _available);            if (length > freeAtEnd) {                // make space at the end of the buffer, by shuffling data                // to the start                System.arraycopy(_data, _index, _data, 0, _available);                _index = 0;            }            input.readFully(_data, _index + _available, length);            if (_log.isDebugEnabled()) {                _log.debug("receive(length=" + length                           + ") [channelId=" + _channelId                           + ", available=" + _available                           + ", space=" + (_data.length - _available) + "]");/*                StringBuffer buf = new StringBuffer();                for (int i = 0; i < length; ++i) {                    if (i > 0) {                      buf.append(", ");                    }                    final int mask = 0xff;                    int value = _data[_index + i + _available] & mask;                    buf.append(Integer.toHexString(value));                }                _log.debug("receive[channelId=" + _channelId                           + "], length=" + length + ", data=" + buf);*/            }            _available += length;            _lock.notifyAll();        }    }    /**     * Helper to copy data to a user buffer, notifying the remote endpoint if     * more data should be sent.     *     * @param buffer the buffer into which the data is read     * @param offset the start offset in array <code>buffer</code> at which the     *               data is written     * @param length the maximum number of bytes to read     * @throws IOException               if an I/O error occurs.     * @throws IndexOutOfBoundsException if <code>offset</code> is negative, or     *                                   <code>length</code> is negative, or     *                                   <code>offset+length</code> is greater     *                                   than the length of the array     * @throws NullPointerException      if <code>buffer</code> is null     */    private void copy(byte[] buffer, int offset, int length)            throws IOException {        System.arraycopy(_data, _index, buffer, offset, length);        _index += length;        _available -= length;        _read += length;        if (_read >= _lowWaterMark) {            notifyRead();        }    }    /**     * Notify the remote endpoint of the current no. of bytes read.     *     * @throws IOException if the notification fails     */    private void notifyRead() throws IOException {        if (_log.isDebugEnabled()) {            _log.debug("notifyRead() [channelId=" + _channelId                       + ", read=" + _read + "]");        }        _multiplexer.send(FLOW_READ, _channelId, _read);        _read = 0;    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -