📄 circularbytebuffer.java
字号:
public CircularByteBuffer(int size, boolean blockingWrite){ if (size == INFINITE_SIZE){ buffer = new byte[DEFAULT_SIZE]; infinite = true; } else { buffer = new byte[size]; infinite = false; } this.blockingWrite = blockingWrite; } /** * Class for reading from a circular byte buffer. * * @since ostermillerutils 1.00.00 */ protected class CircularByteBufferInputStream extends InputStream { /** * Returns the number of bytes that can be read (or skipped over) from this * input stream without blocking by the next caller of a method for this input * stream. The next caller might be the same thread or or another thread. * * @return the number of bytes that can be read from this input stream without blocking. * @throws IOException if the stream is closed. * * @since ostermillerutils 1.00.00 */ public int available() throws IOException { synchronized (CircularByteBuffer.this){ if (inputStreamClosed) throw new IOException("InputStream has been closed, it is not ready."); return (CircularByteBuffer.this.available()); } } /** * Close the stream. Once a stream has been closed, further read(), available(), * mark(), or reset() invocations will throw an IOException. Closing a * previously-closed stream, however, has no effect. * * @throws IOException never. * * @since ostermillerutils 1.00.00 */ public void close() throws IOException { synchronized (CircularByteBuffer.this){ inputStreamClosed = true; } } /** * Mark the present position in the stream. Subsequent calls to reset() will * attempt to reposition the stream to this point. * <p> * The readAheadLimit must be less than the size of circular buffer, otherwise * this method has no effect. * * @param readAheadLimit Limit on the number of bytes that may be read while * still preserving the mark. After reading this many bytes, attempting to * reset the stream will fail. * * @since ostermillerutils 1.00.00 */ public void mark(int readAheadLimit) { synchronized (CircularByteBuffer.this){ //if (inputStreamClosed) throw new IOException("InputStream has been closed; cannot mark a closed InputStream."); if (buffer.length - 1 > readAheadLimit) { markSize = readAheadLimit; markPosition = readPosition; } } } /** * Tell whether this stream supports the mark() operation. * * @return true, mark is supported. * * @since ostermillerutils 1.00.00 */ public boolean markSupported() { return true; } /** * Read a single byte. * This method will block until a byte is available, an I/O error occurs, * or the end of the stream is reached. * * @return The byte read, as an integer in the range 0 to 255 (0x00-0xff), * or -1 if the end of the stream has been reached * @throws IOException if the stream is closed. * * @since ostermillerutils 1.00.00 */ public int read() throws IOException { while (true){ synchronized (CircularByteBuffer.this){ if (inputStreamClosed) throw new IOException("InputStream has been closed; cannot read from a closed InputStream."); int available = CircularByteBuffer.this.available(); if (available > 0){ int result = buffer[readPosition] & 0xff; readPosition++; if (readPosition == buffer.length){ readPosition = 0; } ensureMark(); return result; } else if (outputStreamClosed){ return -1; } } try { Thread.sleep(100); } catch(Exception x){ throw new IOException("Blocking read operation interrupted."); } } } /** * Read bytes into an array. * This method will block until some input is available, * an I/O error occurs, or the end of the stream is reached. * * @param cbuf Destination buffer. * @return The number of bytes read, or -1 if the end of * the stream has been reached * @throws IOException if the stream is closed. * * @since ostermillerutils 1.00.00 */ public int read(byte[] cbuf) throws IOException { return read(cbuf, 0, cbuf.length); } /** * Read bytes into a portion of an array. * This method will block until some input is available, * an I/O error occurs, or the end of the stream is reached. * * @param cbuf Destination buffer. * @param off Offset at which to start storing bytes. * @param len Maximum number of bytes to read. * @return The number of bytes read, or -1 if the end of * the stream has been reached * @throws IOException if the stream is closed. * * @since ostermillerutils 1.00.00 */ public int read(byte[] cbuf, int off, int len) throws IOException { while (true){ synchronized (CircularByteBuffer.this){ if (inputStreamClosed) throw new IOException("InputStream has been closed; cannot read from a closed InputStream."); int available = CircularByteBuffer.this.available(); if (available > 0){ int length = Math.min(len, available); int firstLen = Math.min(length, buffer.length - readPosition); int secondLen = length - firstLen; System.arraycopy(buffer, readPosition, cbuf, off, firstLen); if (secondLen > 0){ System.arraycopy(buffer, 0, cbuf, off+firstLen, secondLen); readPosition = secondLen; } else { readPosition += length; } if (readPosition == buffer.length) { readPosition = 0; } ensureMark(); return length; } else if (outputStreamClosed){ return -1; } } try { Thread.sleep(100); } catch(Exception x){ throw new IOException("Blocking read operation interrupted."); } } } /** * Reset the stream. * If the stream has been marked, then attempt to reposition i * at the mark. If the stream has not been marked, or more bytes * than the readAheadLimit have been read, this method has no effect. * * @throws IOException if the stream is closed. * * @since ostermillerutils 1.00.00 */ public void reset() throws IOException { synchronized (CircularByteBuffer.this){ if (inputStreamClosed) throw new IOException("InputStream has been closed; cannot reset a closed InputStream."); readPosition = markPosition; } } /** * Skip bytes. * This method will block until some bytes are available, * an I/O error occurs, or the end of the stream is reached. * * @param n The number of bytes to skip * @return The number of bytes actually skipped * @throws IllegalArgumentException if n is negative. * @throws IOException if the stream is closed. * * @since ostermillerutils 1.00.00 */ public long skip(long n) throws IOException, IllegalArgumentException { while (true){ synchronized (CircularByteBuffer.this){ if (inputStreamClosed) throw new IOException("InputStream has been closed; cannot skip bytes on a closed InputStream."); int available = CircularByteBuffer.this.available(); if (available > 0){ int length = Math.min((int)n, available); int firstLen = Math.min(length, buffer.length - readPosition); int secondLen = length - firstLen; if (secondLen > 0){ readPosition = secondLen; } else { readPosition += length; } if (readPosition == buffer.length) { readPosition = 0; } return length; } else if (outputStreamClosed){ return 0; } } try { Thread.sleep(100); } catch(Exception x){ throw new IOException("Blocking read operation interrupted."); } } } } /** * Class for writing to a circular byte buffer. * If the buffer is full, the writes will either block * until there is some space available or throw an IOException * based on the CircularByteBuffer's preference. * * @since ostermillerutils 1.00.00 */ protected class CircularByteBufferOutputStream extends OutputStream { /** * Close the stream, flushing it first. * This will cause the InputStream associated with this circular buffer * to read its last bytes once it empties the buffer. * Once a stream has been closed, further write() or flush() invocations * will cause an IOException to be thrown. Closing a previously-closed stream, * however, has no effect. * * @throws IOException never. * * @since ostermillerutils 1.00.00 */ public void close() throws IOException { synchronized (CircularByteBuffer.this){ if (!outputStreamClosed){ flush(); } outputStreamClosed = true; } } /** * Flush the stream. * * @throws IOException if the stream is closed. * * @since ostermillerutils 1.00.00 */ public void flush() throws IOException { if (outputStreamClosed) throw new IOException("OutputStream has been closed; cannot flush a closed OutputStream."); if (inputStreamClosed) throw new IOException("Buffer closed by inputStream; cannot flush."); // this method needs to do nothing } /** * Write an array of bytes. * If the buffer allows blocking writes, this method will block until * all the data has been written rather than throw an IOException. * * @param cbuf Array of bytes to be written * @throws BufferOverflowException if buffer does not allow blocking writes * and the buffer is full. If the exception is thrown, no data * will have been written since the buffer was set to be non-blocking. * @throws IOException if the stream is closed, or the write is interrupted. * * @since ostermillerutils 1.00.00 */ public void write(byte[] cbuf) throws IOException { write(cbuf, 0, cbuf.length); } /** * Write a portion of an array of bytes. * If the buffer allows blocking writes, this method will block until * all the data has been written rather than throw an IOException. * * @param cbuf Array of bytes * @param off Offset from which to start writing bytes * @param len - Number of bytes to write * @throws BufferOverflowException if buffer does not allow blocking writes * and the buffer is full. If the exception is thrown, no data * will have been written since the buffer was set to be non-blocking. * @throws IOException if the stream is closed, or the write is interrupted. * * @since ostermillerutils 1.00.00 */ public void write(byte[] cbuf, int off, int len) throws IOException { while (len > 0){ synchronized (CircularByteBuffer.this){ if (outputStreamClosed) throw new IOException("OutputStream has been closed; cannot write to a closed OutputStream."); if (inputStreamClosed) throw new IOException("Buffer closed by InputStream; cannot write to a closed buffer."); int spaceLeft = spaceLeft(); while (infinite && spaceLeft < len){ resize(); spaceLeft = spaceLeft(); } if (!blockingWrite && spaceLeft < len) throw new BufferOverflowException("CircularByteBuffer is full; cannot write " + len + " bytes"); int realLen = Math.min(len, spaceLeft); int firstLen = Math.min(realLen, buffer.length - writePosition); int secondLen = Math.min(realLen - firstLen, buffer.length - markPosition - 1); int written = firstLen + secondLen; if (firstLen > 0){ System.arraycopy(cbuf, off, buffer, writePosition, firstLen); } if (secondLen > 0){ System.arraycopy(cbuf, off+firstLen, buffer, 0, secondLen); writePosition = secondLen; } else { writePosition += written; } if (writePosition == buffer.length) { writePosition = 0; } off += written; len -= written; } if (len > 0){ try { Thread.sleep(100); } catch(Exception x){ throw new IOException("Waiting for available space in buffer interrupted."); } } } } /** * Write a single byte. * The byte to be written is contained in the 8 low-order bits of the * given integer value; the 24 high-order bits are ignored. * If the buffer allows blocking writes, this method will block until * all the data has been written rather than throw an IOException. * * @param c int specifying a byte to be written. * @throws BufferOverflowException if buffer does not allow blocking writes * and the buffer is full. * @throws IOException if the stream is closed, or the write is interrupted. * * @since ostermillerutils 1.00.00 */ public void write(int c) throws IOException { boolean written = false; while (!written){ synchronized (CircularByteBuffer.this){ if (outputStreamClosed) throw new IOException("OutputStream has been closed; cannot write to a closed OutputStream."); if (inputStreamClosed) throw new IOException("Buffer closed by InputStream; cannot write to a closed buffer."); int spaceLeft = spaceLeft(); while (infinite && spaceLeft < 1){ resize(); spaceLeft = spaceLeft(); } if (!blockingWrite && spaceLeft < 1) throw new BufferOverflowException("CircularByteBuffer is full; cannot write 1 byte"); if (spaceLeft > 0){ buffer[writePosition] = (byte)(c & 0xff); writePosition++; if (writePosition == buffer.length) { writePosition = 0; } written = true; } } if (!written){ try { Thread.sleep(100); } catch(Exception x){ throw new IOException("Waiting for available space in buffer interrupted."); } } } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -