⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 circularbuffer.java

📁 本人历尽千辛万苦找的clustream中的jar包
💻 JAVA
字号:
package org.osu.ogsa.stream.util;/**   This CircularBuffer provides a FIFO communications pipe between   threads.  The get and put methods have been made "Thread Safe"   using synchronized methods */import java.io.*;import java.lang.*;import java.nio.*;import java.util.*;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import java.io.InputStream;import java.io.FileInputStream;import java.util.Enumeration;import java.util.StringTokenizer;import java.lang.reflect.Method;public class CircularBuffer extends Object{   // Counts the number of data values in the buffer   private int MAX_SIZE = DefConstants.DEFAULT_BUFFER_SIZE - 1;   private int BUFFER_LEN = DefConstants.DEFAULT_BUFFER_SIZE;   private boolean OVERRIDE = true;   private static Log log = LogFactory.getLog(CircularBuffer.class.getName());   // Counts entry within the circular buffer where the oldest value   // is stored. The (base+1) points to the first available element   private int base = 0;   //the read entry   private int readIndex = base;      // Stores the data values   private byte[] circBuffer;// = new int[MAX_SIZE];   private int size = 0;   //constructor   public CircularBuffer(int bufSize) throws Exception   {	if(bufSize <= 0)		throw new Exception("the buffer size must be larger than 0");	 		circBuffer = new byte[bufSize];	if(circBuffer == null)		throw new Exception("can't allocate memory for buffers");	MAX_SIZE = bufSize -1;	BUFFER_LEN = bufSize;    }   public CircularBuffer() throws Exception   {		circBuffer = new byte[BUFFER_LEN];	if(circBuffer == null)		throw new Exception("can't allocate memory for buffers");   }   public synchronized int getSize()   {	   return size;   }   // Reset the read   public synchronized void resetRead()   {	   readIndex = base;   }   public synchronized void finalizeRead()   {	   int temp_base = base;	   base = readIndex;	   size = size + temp_base - readIndex;   }   public synchronized void setNewBase(int length)   {	   if(length > size || length <= 0)		   return;	   int temp_base = base;	   base = base + length;	   size = size - length;	   readIndex = base;   }   // Read a byte, the pointer won't change its position   public byte read() throws BufferDataException   {      if(BufferIsEmpty()) {	BufferDataException exc = new BufferDataException("the buffer is empty" );	exc.setType(DefConstants.BUFDATA_EXPT_EMPTY);	throw exc;      }      if(((readIndex < base) && ((readIndex + MAX_SIZE) > (base + size)))	      || ((readIndex > base) && (readIndex > (base + size))))      {	BufferDataException exc = new BufferDataException("the read pointer is beyond region" );	exc.setType(DefConstants.BUFDATA_EXPT_READ);	throw exc;      }      readIndex = (readIndex + 1)% BUFFER_LEN;      byte temp = circBuffer[readIndex];      return temp;   }   // Read a byte, the pointer won't change its position   public synchronized byte read(boolean bBlock) throws BufferDataException   {      if(BufferIsEmpty()) {	if(!bBlock){		BufferDataException exc = new BufferDataException("the buffer is empty" );		exc.setType(DefConstants.BUFDATA_EXPT_EMPTY);		throw exc;      	}	else 		try{//			Thread.currentThread().wait();			wait();		}catch(Exception e){		}      }      if(((readIndex < base) && ((readIndex + MAX_SIZE) > (base + size)))	      || ((readIndex > base) && (readIndex > (base + size))))      {	BufferDataException exc = new BufferDataException("the read pointer is beyond region" );	exc.setType(DefConstants.BUFDATA_EXPT_READ);	throw exc;      }      readIndex = (readIndex + 1)% BUFFER_LEN;      byte temp = circBuffer[readIndex];      try{	      //Thread.currentThread().notifyAll();	      notifyAll();      }catch(Exception e){	      log.info(e);      }      return temp;   }   // Counts the number of data values in the buffer   /**      Get an entry out of the FIFO unless it is empty in which case      the thread blocks.   */   public synchronized byte get()      throws InterruptedException, BufferDataException   {      if(BufferIsEmpty()) {	BufferDataException exc = new BufferDataException("the buffer is empty" );	exc.setType(DefConstants.BUFDATA_EXPT_EMPTY);	throw exc;      }      return getValueFromBuffer();   }   /**      Put an entry into the FIFO unless it is full in which case      the thread blocks.   */   public synchronized void put(byte value)      throws InterruptedException, BufferDataException   {      if (bufferIsFull()&&OVERRIDE==false) {	BufferDataException exc = new BufferDataException("the buffer is full" );	exc.setType(DefConstants.BUFDATA_EXPT_FULL);	throw exc;      }      putValueIntoBuffer(value);   }      /**      Put an entry into the FIFO unless it is full in which case      the thread blocks.   */   public synchronized void put(byte value, boolean bBlock)      throws InterruptedException, BufferDataException   {//	log.debug("the value of bBlock is "+ bBlock);	if(bufferIsFull())	{		log.debug("buffer Is Full " + size);		if(bBlock)			try{//			log.debug("ready to wait, and the value is "+ value);			//Thread.currentThread().wait();			wait();			}catch(Exception e){			}		else if (OVERRIDE==false) 		{			BufferDataException exc = new BufferDataException("the buffer is full" );			exc.setType(DefConstants.BUFDATA_EXPT_FULL);			throw exc;		}	}	log.debug("waken up , and the value is "+ value);	putValueIntoBuffer(value);	try{	//Thread.currentThread().notifyAll();	notifyAll();	}catch(Exception e){		log.info(e);	}   }   public synchronized int getFromByteBuffer(ByteBuffer buf, int length, boolean bBlock)      throws InterruptedException, BufferDataException   {      if((remainingSize() - length < 0) && OVERRIDE == false ) {	BufferDataException exc = new BufferDataException("no space to accommendate data");	exc.setType(DefConstants.BUFDATA_EXPT_FULL);	throw exc;      }      if(!bBlock && (length < 0 || remainingSize() < length))	      length = remainingSize();//	log.debug("[readToByteBuffer]: length = " + length);      for(int i = 0; i < length; i ++)      {	      try{		      if(bBlock && bufferIsFull())		      {			   //   log.debug("[getFromByteBuffer]: ready to wait");			      wait();		      }	      }catch(Exception e){			System.out.println(e);	      }		putValueIntoBuffer(buf.get());      }      try{      if(size  > 0)	     	notifyAll();      }catch(Exception e){			System.out.println(e);      }      return length;   }   public synchronized int readToByteBuffer(ByteBuffer buf, int length, boolean bIndexChange,boolean bBlock)      throws InterruptedException, BufferDataException   {	if(!bBlock)	{		int curSize;		int last = size + base;		if(last < BUFFER_LEN)			curSize = size;		else 			curSize = MAX_SIZE  - base;		if(base == MAX_SIZE)			curSize = size;		if(length < 0 || curSize < length )			length = curSize;//		log.debug("curSize = "+ curSize+" the last = "+last + "base: "+base+"size:"+size);	}//	log.debug("nonBlock [readToByteBuffer]: length = " + length);	int rtnLen, j;	rtnLen = length;		while(length > 0)	{		if(bIndexChange == true)		{			try{				if(bBlock)				{ 					if(size < MAX_SIZE && size < length)						wait();					else					{						for(j = 0; j < size && j < length; j++)							buf.put(getValueFromBuffer());						length -= j;					}				}				else//nonBlock				{					buf.put(getValueFromBuffer());					length--;				}			}catch(Exception e){				System.out.println(e);			}//			log.debug("[readToByteBuffer]: waken up, size = "+ size);//			log.debug("[readToByteBuffer]: i = " + i);		}       		else{			buf.put(read(bBlock));			length--; 		}	}      	try{	if(remainingSize() > 0)		notifyAll();	}catch(Exception e){		System.out.println(e);	}	return rtnLen;   }   public synchronized int getCapacity()   {	   return remainingSize();   }   public synchronized double getLoadedFactor()   {	   return __getLoadedFactor();   }   private double __getLoadedFactor()   {	   return (double)size/(double)MAX_SIZE;   }   private int remainingSize()   {	   return MAX_SIZE - size;   }   public boolean BufferIsEmpty()   {      // returns true iff the circular buffer has no values in it.      return (size == 0);   }   private boolean bufferIsFull()   {      // returns true iff the circular buffer has no more space.      return (size == MAX_SIZE);   }   private byte getValueFromBuffer()   {      // puts an extra value into the circular buffer      base = (base + 1) % BUFFER_LEN;      size--;      if(size < 0)	      size = 0;      return circBuffer[base];   }   private void putValueIntoBuffer(byte value)   {      // puts a value from the circular buffer      int index = (base + size + 1) % BUFFER_LEN;      circBuffer[index] = value;      size++;      if(size > MAX_SIZE)	      size = MAX_SIZE;      //All the elements in the buffer have been overrided      //So restore the value of "size" back to the size of       //the buffer/*      if(size == 2*MAX_SIZE)	      size = MAX_SIZE;*/   }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -