📄 circularbuffer.java
字号:
package org.osu.ogsa.stream.util;/** This CircularBuffer provides a FIFO communications pipe between threads. The get and put methods have been made "Thread Safe" using synchronized methods */import java.io.*;import java.lang.*;import java.nio.*;import java.util.*;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import java.io.InputStream;import java.io.FileInputStream;import java.util.Enumeration;import java.util.StringTokenizer;import java.lang.reflect.Method;public class CircularBuffer extends Object{ // Counts the number of data values in the buffer private int MAX_SIZE = DefConstants.DEFAULT_BUFFER_SIZE - 1; private int BUFFER_LEN = DefConstants.DEFAULT_BUFFER_SIZE; private boolean OVERRIDE = true; private static Log log = LogFactory.getLog(CircularBuffer.class.getName()); // Counts entry within the circular buffer where the oldest value // is stored. The (base+1) points to the first available element private int base = 0; //the read entry private int readIndex = base; // Stores the data values private byte[] circBuffer;// = new int[MAX_SIZE]; private int size = 0; //constructor public CircularBuffer(int bufSize) throws Exception { if(bufSize <= 0) throw new Exception("the buffer size must be larger than 0"); circBuffer = new byte[bufSize]; if(circBuffer == null) throw new Exception("can't allocate memory for buffers"); MAX_SIZE = bufSize -1; BUFFER_LEN = bufSize; } public CircularBuffer() throws Exception { circBuffer = new byte[BUFFER_LEN]; if(circBuffer == null) throw new Exception("can't allocate memory for buffers"); } public synchronized int getSize() { return size; } // Reset the read public synchronized void resetRead() { readIndex = base; } public synchronized void finalizeRead() { int temp_base = base; base = readIndex; size = size + temp_base - readIndex; } public synchronized void setNewBase(int length) { if(length > size || length <= 0) return; int temp_base = base; base = base + length; size = size - length; readIndex = base; } // Read a byte, the pointer won't change its position public byte read() throws BufferDataException { if(BufferIsEmpty()) { BufferDataException exc = new BufferDataException("the buffer is empty" ); exc.setType(DefConstants.BUFDATA_EXPT_EMPTY); throw exc; } if(((readIndex < base) && ((readIndex + MAX_SIZE) > (base + size))) || ((readIndex > base) && (readIndex > (base + size)))) { BufferDataException exc = new BufferDataException("the read pointer is beyond region" ); exc.setType(DefConstants.BUFDATA_EXPT_READ); throw exc; } readIndex = (readIndex + 1)% BUFFER_LEN; byte temp = circBuffer[readIndex]; return temp; } // Read a byte, the pointer won't change its position public synchronized byte read(boolean bBlock) throws BufferDataException { if(BufferIsEmpty()) { if(!bBlock){ BufferDataException exc = new BufferDataException("the buffer is empty" ); exc.setType(DefConstants.BUFDATA_EXPT_EMPTY); throw exc; } else try{// Thread.currentThread().wait(); wait(); }catch(Exception e){ } } if(((readIndex < base) && ((readIndex + MAX_SIZE) > (base + size))) || ((readIndex > base) && (readIndex > (base + size)))) { BufferDataException exc = new BufferDataException("the read pointer is beyond region" ); exc.setType(DefConstants.BUFDATA_EXPT_READ); throw exc; } readIndex = (readIndex + 1)% BUFFER_LEN; byte temp = circBuffer[readIndex]; try{ //Thread.currentThread().notifyAll(); notifyAll(); }catch(Exception e){ log.info(e); } return temp; } // Counts the number of data values in the buffer /** Get an entry out of the FIFO unless it is empty in which case the thread blocks. */ public synchronized byte get() throws InterruptedException, BufferDataException { if(BufferIsEmpty()) { BufferDataException exc = new BufferDataException("the buffer is empty" ); exc.setType(DefConstants.BUFDATA_EXPT_EMPTY); throw exc; } return getValueFromBuffer(); } /** Put an entry into the FIFO unless it is full in which case the thread blocks. */ public synchronized void put(byte value) throws InterruptedException, BufferDataException { if (bufferIsFull()&&OVERRIDE==false) { BufferDataException exc = new BufferDataException("the buffer is full" ); exc.setType(DefConstants.BUFDATA_EXPT_FULL); throw exc; } putValueIntoBuffer(value); } /** Put an entry into the FIFO unless it is full in which case the thread blocks. */ public synchronized void put(byte value, boolean bBlock) throws InterruptedException, BufferDataException {// log.debug("the value of bBlock is "+ bBlock); if(bufferIsFull()) { log.debug("buffer Is Full " + size); if(bBlock) try{// log.debug("ready to wait, and the value is "+ value); //Thread.currentThread().wait(); wait(); }catch(Exception e){ } else if (OVERRIDE==false) { BufferDataException exc = new BufferDataException("the buffer is full" ); exc.setType(DefConstants.BUFDATA_EXPT_FULL); throw exc; } } log.debug("waken up , and the value is "+ value); putValueIntoBuffer(value); try{ //Thread.currentThread().notifyAll(); notifyAll(); }catch(Exception e){ log.info(e); } } public synchronized int getFromByteBuffer(ByteBuffer buf, int length, boolean bBlock) throws InterruptedException, BufferDataException { if((remainingSize() - length < 0) && OVERRIDE == false ) { BufferDataException exc = new BufferDataException("no space to accommendate data"); exc.setType(DefConstants.BUFDATA_EXPT_FULL); throw exc; } if(!bBlock && (length < 0 || remainingSize() < length)) length = remainingSize();// log.debug("[readToByteBuffer]: length = " + length); for(int i = 0; i < length; i ++) { try{ if(bBlock && bufferIsFull()) { // log.debug("[getFromByteBuffer]: ready to wait"); wait(); } }catch(Exception e){ System.out.println(e); } putValueIntoBuffer(buf.get()); } try{ if(size > 0) notifyAll(); }catch(Exception e){ System.out.println(e); } return length; } public synchronized int readToByteBuffer(ByteBuffer buf, int length, boolean bIndexChange,boolean bBlock) throws InterruptedException, BufferDataException { if(!bBlock) { int curSize; int last = size + base; if(last < BUFFER_LEN) curSize = size; else curSize = MAX_SIZE - base; if(base == MAX_SIZE) curSize = size; if(length < 0 || curSize < length ) length = curSize;// log.debug("curSize = "+ curSize+" the last = "+last + "base: "+base+"size:"+size); }// log.debug("nonBlock [readToByteBuffer]: length = " + length); int rtnLen, j; rtnLen = length; while(length > 0) { if(bIndexChange == true) { try{ if(bBlock) { if(size < MAX_SIZE && size < length) wait(); else { for(j = 0; j < size && j < length; j++) buf.put(getValueFromBuffer()); length -= j; } } else//nonBlock { buf.put(getValueFromBuffer()); length--; } }catch(Exception e){ System.out.println(e); }// log.debug("[readToByteBuffer]: waken up, size = "+ size);// log.debug("[readToByteBuffer]: i = " + i); } else{ buf.put(read(bBlock)); length--; } } try{ if(remainingSize() > 0) notifyAll(); }catch(Exception e){ System.out.println(e); } return rtnLen; } public synchronized int getCapacity() { return remainingSize(); } public synchronized double getLoadedFactor() { return __getLoadedFactor(); } private double __getLoadedFactor() { return (double)size/(double)MAX_SIZE; } private int remainingSize() { return MAX_SIZE - size; } public boolean BufferIsEmpty() { // returns true iff the circular buffer has no values in it. return (size == 0); } private boolean bufferIsFull() { // returns true iff the circular buffer has no more space. return (size == MAX_SIZE); } private byte getValueFromBuffer() { // puts an extra value into the circular buffer base = (base + 1) % BUFFER_LEN; size--; if(size < 0) size = 0; return circBuffer[base]; } private void putValueIntoBuffer(byte value) { // puts a value from the circular buffer int index = (base + size + 1) % BUFFER_LEN; circBuffer[index] = value; size++; if(size > MAX_SIZE) size = MAX_SIZE; //All the elements in the buffer have been overrided //So restore the value of "size" back to the size of //the buffer/* if(size == 2*MAX_SIZE) size = MAX_SIZE;*/ }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -