📄 circularobjectbuffer.java
字号:
* not be able to be written to the buffer. * <p> * Note that if the buffer is of INFINITE_SIZE it will * neither block or throw exceptions, but rather grow * without bound. * * @param size desired capacity of the buffer in Objects or CircularObjectBuffer.INFINITE_SIZE. * @param blockingWrite true writing to a full buffer should block * until space is available, false if an exception should * be thrown instead. * * @since ostermillerutils 1.00.00 */ public CircularObjectBuffer(int size, boolean blockingWrite){ if (size == INFINITE_SIZE){ buffer = new Object[DEFAULT_SIZE]; infinite = true; } else { buffer = new Object[size]; infinite = false; } this.blockingWrite = blockingWrite; } /** * Get a single Object from this buffer. This method should be called * by the consumer. * This method will block until a Object is available or no more * objects are available. * * @return The Object read, or null if there are no more objects * @throws InterruptedException if the thread is inturrupted while waiting. * * @since ostermillerutils 1.00.00 */ public Object read() throws InterruptedException { while (true){ synchronized (this){ int available = available(); if (available > 0){ Object result = buffer[readPosition]; readPosition++; if (readPosition == buffer.length){ readPosition = 0; } return result; } else if (inputDone){ return null; } } Thread.sleep(100); } } /** * Get Objects into an array from this buffer. This method should * be called by the consumer. * This method will block until some input is available, * or there is no more input. * * @param buf Destination buffer. * @return The number of Objects read, or -1 there will * be no more objects available. * @throws InterruptedException if the thread is inturrupted while waiting. * * @since ostermillerutils 1.00.00 */ public int read(Object[] buf) throws InterruptedException { return read(buf, 0, buf.length); } /** * Get Objects into a portion of an array from this buffer. This * method should be called by the consumer. * This method will block until some input is available, * an I/O error occurs, or the end of the stream is reached. * * @param buf Destination buffer. * @param off Offset at which to start storing Objects. * @param len Maximum number of Objects to read. * @return The number of Objects read, or -1 there will * be no more objects available. * @throws InterruptedException if the thread is inturrupted while waiting. * * @since ostermillerutils 1.00.00 */ public int read(Object[] buf, int off, int len) throws InterruptedException { while (true){ synchronized (this){ int available = available(); if (available > 0){ int length = Math.min(len, available); int firstLen = Math.min(length, buffer.length - readPosition); int secondLen = length - firstLen; System.arraycopy(buffer, readPosition, buf, off, firstLen); if (secondLen > 0){ System.arraycopy(buffer, 0, buf, off+firstLen, secondLen); readPosition = secondLen; } else { readPosition += length; } if (readPosition == buffer.length) { readPosition = 0; } return length; } else if (inputDone){ return -1; } } Thread.sleep(100); } } /** * Skip Objects. This method should be used by the consumer * when it does not care to examine some number of Objects. * This method will block until some Objects are available, * or there will be no more Objects available. * * @param n The number of Objects to skip * @return The number of Objects actually skipped * @throws IllegalArgumentException if n is negative. * @throws InterruptedException if the thread is inturrupted while waiting. * * @since ostermillerutils 1.00.00 */ public long skip(long n) throws InterruptedException, IllegalArgumentException { while (true){ synchronized (this){ int available = available(); if (available > 0){ int length = Math.min((int)n, available); int firstLen = Math.min(length, buffer.length - readPosition); int secondLen = length - firstLen; if (secondLen > 0){ readPosition = secondLen; } else { readPosition += length; } if (readPosition == buffer.length) { readPosition = 0; } return length; } else if (inputDone){ return 0; } } Thread.sleep(100); } } /** * This method should be used by the producer to signal to the consumer * that the producer is done producing objects and that the consumer * should stop asking for objects once it has used up buffered objects. * <p> * Once the producer has signaled that it is done, further write() invocations * will cause an IllegalStateException to be thrown. Calling done() multiple times, * however, has no effect. * * @since ostermillerutils 1.00.00 */ public void done(){ synchronized (this){ inputDone = true; } } /** * Fill this buffer with array of Objects. This method should be called * by the producer. * If the buffer allows blocking writes, this method will block until * all the data has been written rather than throw a BufferOverflowException. * * @param buf Array of Objects to be written * @throws BufferOverflowException if buffer does not allow blocking writes * and the buffer is full. If the exception is thrown, no data * will have been written since the buffer was set to be non-blocking. * @throws IllegalStateException if done() has been called. * @throws InterruptedException if the write is interrupted. * * @since ostermillerutils 1.00.00 */ public void write(Object[] buf) throws BufferOverflowException, IllegalStateException, InterruptedException { write(buf, 0, buf.length); } /** * Fill this buffer with a portion of an array of Objects. * This method should be called by the producer. * If the buffer allows blocking writes, this method will block until * all the data has been written rather than throw an IOException. * * @param buf Array of Objects * @param off Offset from which to start writing Objects * @param len - Number of Objects to write * @throws BufferOverflowException if buffer does not allow blocking writes * and the buffer is full. If the exception is thrown, no data * will have been written since the buffer was set to be non-blocking. * @throws IllegalStateException if done() has been called. * @throws InterruptedException if the write is interrupted. * * @since ostermillerutils 1.00.00 */ public void write(Object[] buf, int off, int len) throws BufferOverflowException, IllegalStateException, InterruptedException { while (len > 0){ synchronized (CircularObjectBuffer.this){ if (inputDone) throw new IllegalStateException("CircularObjectBuffer.done() has been called, CircularObjectBuffer.write() failed."); int spaceLeft = spaceLeft(); while (infinite && spaceLeft < len){ resize(); spaceLeft = spaceLeft(); } if (!blockingWrite && spaceLeft < len) throw new BufferOverflowException("CircularObjectBuffer is full; cannot write " + len + " Objects"); int realLen = Math.min(len, spaceLeft); int firstLen = Math.min(realLen, buffer.length - writePosition); int secondLen = Math.min(realLen - firstLen, buffer.length - readPosition - 1); int written = firstLen + secondLen; if (firstLen > 0){ System.arraycopy(buf, off, buffer, writePosition, firstLen); } if (secondLen > 0){ System.arraycopy(buf, off+firstLen, buffer, 0, secondLen); writePosition = secondLen; } else { writePosition += written; } if (writePosition == buffer.length) { writePosition = 0; } off += written; len -= written; } if (len > 0){ Thread.sleep(100); } } } /** * Add a single Object to this buffer. This method should be * called by the producer. * If the buffer allows blocking writes, this method will block until * all the data has been written rather than throw an IOException. * * @param o Object to be written. * @throws BufferOverflowException if buffer does not allow blocking writes * and the buffer is full. If the exception is thrown, no data * will have been written since the buffer was set to be non-blocking. * @throws IllegalStateException if done() has been called. * @throws InterruptedException if the write is interrupted. * * @since ostermillerutils 1.00.00 */ public void write(Object o) throws BufferOverflowException, IllegalStateException, InterruptedException { boolean written = false; while (!written){ synchronized (CircularObjectBuffer.this){ if (inputDone) throw new IllegalStateException("CircularObjectBuffer.done() has been called, CircularObjectBuffer.write() failed."); int spaceLeft = spaceLeft(); while (infinite && spaceLeft < 1){ resize(); spaceLeft = spaceLeft(); } if (!blockingWrite && spaceLeft < 1) throw new BufferOverflowException("CircularObjectBuffer is full; cannot write 1 Object"); if (spaceLeft > 0){ buffer[writePosition] = o; writePosition++; if (writePosition == buffer.length) { writePosition = 0; } written = true; } } if (!written){ Thread.sleep(100); } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -