📄 multiplexinputstream.java
字号:
/**
* Redistribution and use of this software and associated documentation
* ("Software"), with or without modification, are permitted provided
* that the following conditions are met:
*
* 1. Redistributions of source code must retain copyright
* statements and notices. Redistributions must also contain a
* copy of this document.
*
* 2. Redistributions in binary form must reproduce the
* above copyright notice, this list of conditions and the
* following disclaimer in the documentation and/or other
* materials provided with the distribution.
*
* 3. The name "Exolab" must not be used to endorse or promote
* products derived from this Software without prior written
* permission of Exoffice Technologies. For written permission,
* please contact info@exolab.org.
*
* 4. Products derived from this Software may not be called "Exolab"
* nor may "Exolab" appear in their names without prior written
* permission of Exoffice Technologies. Exolab is a registered
* trademark of Exoffice Technologies.
*
* 5. Due credit should be given to the Exolab Project
* (http://www.exolab.org/).
*
* THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
* ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
* NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
* FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
* EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
* OF THE POSSIBILITY OF SUCH DAMAGE.
*
* Copyright 2003-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
*
* $Id: MultiplexInputStream.java,v 1.2 2005/04/02 13:23:12 tanderson Exp $
*/
package org.exolab.jms.net.multiplexer;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* An <code>InputStream</code> which reads multiplexed data over a shared
* physical connection, managed by a {@link Multiplexer}.
* <p/>
* <em>NOTE:</em> the <code>InputStream</code> methods of this class are not
* thread safe
*
* @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
* @version $Revision: 1.2 $
* @see Multiplexer
*/
class MultiplexInputStream extends InputStream implements Constants {
/**
* The channel identifier.
*/
private final int _channelId;
/**
* The multiplexer.
*/
private Multiplexer _multiplexer;
/**
* The local data buffer.
*/
private byte[] _data;
/**
* Temporary buffer for single byte reads.
*/
private byte[] _byte = new byte[1];
/**
* The index into <code>_data</code> where data starts.
*/
private int _index = 0;
/**
* The number of available bytes in <code>_data</code>.
*/
private int _available = 0;
/**
* Indicates if the underyling connection has been closed.
*/
private boolean _disconnected = false;
/**
* The no. of bytes to read before notifying the remote endpoint.
*/
private final int _lowWaterMark;
/**
* The number of bytes read from this stream since the last.
* <code>notifyRead()</code> call
*/
private int _read = 0;
/**
* Synchronization helper.
*/
private final Object _lock = new Object();
/**
* The logger.
*/
private final Log _log = LogFactory.getLog(MultiplexInputStream.class);
/**
* Construct a new <code>MultiplexInputStream</code>.
*
* @param channelId the channel identifier
* @param multiplexer the multiplexer
* @param size the size of the local data buffer
*/
public MultiplexInputStream(int channelId, Multiplexer multiplexer,
int size) {
_channelId = channelId;
_multiplexer = multiplexer;
_data = new byte[size];
_lowWaterMark = size / 2;
}
/**
* This implementation is a no-op, as the stream is re-used.
*/
public void close() {
}
/**
* Closes this input stream and releases any resources associated with it.
*
* @throws IOException if an I/O error occurs
*/
public void destroy() throws IOException {
// notify the endpoint iff it hasn't notified this of disconnection
synchronized (_lock) {
if (!_disconnected) {
//_multiplexer.closed(this);
}
}
_multiplexer = null;
_data = null;
}
/**
* Reads the next byte of data from the input stream. The value byte is
* returned as an <code>int</code> in the range <code>0</code> to
* <code>255</code>. If no byte is available because the end of the stream
* has been reached, the value <code>-1</code> is returned. This method
* blocks until input data is available, the end of the stream is detected,
* or an exception is thrown.
*
* @return the next byte of data, or <code>-1</code> if the end of the
* stream is reached.
* @throws IOException if an I/O error occurs.
*/
public int read() throws IOException {
final int mask = 0xFF;
int count = read(_byte, 0, 1);
return (count == 1) ? _byte[0] & mask : -1;
}
/**
* Reads up to <code>length</code> bytes of data from the input stream into
* an array of bytes. An attempt is made to read as many as
* <code>length</code> bytes, but a smaller number may be read, possibly
* zero. The number of bytes actually read is returned as an integer.
* <p/>
* <p> If the first byte cannot be read for any reason other than end of
* file, then an <code>IOException</code> is thrown. In particular, an
* <code>IOException</code> is thrown if the input stream has been closed.
*
* @param buffer the buffer into which the data is read
* @param offset the start offset in array <code>buffer</code> at which the
* data is written
* @param length the maximum number of bytes to read
* @return the total number of bytes read into the buffer, or
* <code>-1</code> if there is no more data because the end of the
* stream has been reached.
* @throws IOException if an I/O error occurs.
* @throws IndexOutOfBoundsException if <code>offset</code> is negative, or
* <code>length</code> is negative, or
* <code>offset+length</code> is greater
* than the length of the array
* @throws NullPointerException if <code>buffer</code> is null
*/
public int read(byte[] buffer, int offset, int length) throws IOException {
int count = 0;
if (length > 0) {
synchronized (_lock) {
count = (length <= _available) ? length : _available;
if (_log.isDebugEnabled()) {
_log.debug("read(length=" + length + ") [channelId="
+ _channelId
+ ", available=" + _available + "]");
}
if (count > 0) {
// copy the available data into the buffer
copy(buffer, offset, count);
}
if (count < length) {
// wait for more data to become available
int more = length - count;
while ((_available < more) && !_disconnected) {
if (_log.isDebugEnabled()) {
_log.debug("read() waiting on data [channelId="
+ _channelId
+ ", available=" + _available
+ ", requested=" + more + "]");
}
try {
_lock.wait();
} catch (InterruptedException ignore) {
}
}
if (_available > 0) {
// more data available, so copy it
more = (more <= _available) ? more : _available;
offset += count;
copy(buffer, offset, more);
count += more;
}
}
if ((count == 0) && _disconnected) {
// no data was read, and we were disconnected. Indicate
// end of stream to user
count = -1;
}
}
}
return count;
}
/**
* Returns the number of bytes that can be read (or skipped over) from this
* input stream without blocking by the next caller of a method for this
* input stream.
*
* @return the number of bytes that can be read from this input stream
* without blocking.
*/
public int available() {
int result;
synchronized (_lock) {
result = _available;
}
return result;
}
/**
* Invoked when the underlying physical connection is closed.
*/
public void disconnected() {
synchronized (_lock) {
_disconnected = true;
_lock.notifyAll();
}
}
/**
* Returns a string representation of this.
*
* @return a string representation of this
*/
public String toString() {
return "MultiplexInputStream[available=" + _available + "]";
}
/**
* Invoked by {@link Multiplexer} when data is available for this stream.
*
* @param input the stream to read data from
* @param length the number of bytes to read
* @throws IOException if an I/O error occurs
*/
protected void receive(DataInputStream input, int length)
throws IOException {
synchronized (_lock) {
int space = _data.length - _available;
if (length > space) {
throw new IOException("Buffer overflow: buffer size="
+ _data.length
+ ", space available=" + space
+ ", requested size=" + length);
}
int freeAtEnd = _data.length - (_index + _available);
if (length > freeAtEnd) {
// make space at the end of the buffer, by shuffling data
// to the start
System.arraycopy(_data, _index, _data, 0, _available);
_index = 0;
}
input.readFully(_data, _index + _available, length);
if (_log.isDebugEnabled()) {
_log.debug("receive(length=" + length
+ ") [channelId=" + _channelId
+ ", available=" + _available
+ ", space=" + (_data.length - _available) + "]");
/*
StringBuffer buf = new StringBuffer();
for (int i = 0; i < length; ++i) {
if (i > 0) {
buf.append(", ");
}
final int mask = 0xff;
int value = _data[_index + i + _available] & mask;
buf.append(Integer.toHexString(value));
}
_log.debug("receive[channelId=" + _channelId
+ "], length=" + length + ", data=" + buf);
*/
}
_available += length;
_lock.notifyAll();
}
}
/**
* Helper to copy data to a user buffer, notifying the remote endpoint if
* more data should be sent.
*
* @param buffer the buffer into which the data is read
* @param offset the start offset in array <code>buffer</code> at which the
* data is written
* @param length the maximum number of bytes to read
* @throws IOException if an I/O error occurs.
* @throws IndexOutOfBoundsException if <code>offset</code> is negative, or
* <code>length</code> is negative, or
* <code>offset+length</code> is greater
* than the length of the array
* @throws NullPointerException if <code>buffer</code> is null
*/
private void copy(byte[] buffer, int offset, int length)
throws IOException {
System.arraycopy(_data, _index, buffer, offset, length);
_index += length;
_available -= length;
_read += length;
if (_read >= _lowWaterMark) {
notifyRead();
}
}
/**
* Notify the remote endpoint of the current no. of bytes read.
*
* @throws IOException if the notification fails
*/
private void notifyRead() throws IOException {
if (_log.isDebugEnabled()) {
_log.debug("notifyRead() [channelId=" + _channelId
+ ", read=" + _read + "]");
}
_multiplexer.send(FLOW_READ, _channelId, _read);
_read = 0;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -