⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 autofillinputbuffer.java

📁 本人历尽千辛万苦找的clustream中的jar包
💻 JAVA
字号:
package org.osu.ogsa.stream.util;import java.io.*;import java.nio.*;import java.nio.channels.*;import java.nio.channels.spi.*;import java.nio.charset.*;import java.net.*;import java.util.*;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import java.awt.event.*;import javax.swing.Timer;public class AutoFillInputBuffer extends Thread implements SocketNotification, ActionListener{//	private final static float OVER_LOADED_THRESHOLD = (float)0.40;//	private final static float LIGHT_LOADED_THRESHOLD = (float)0.10;//	private final static float CAPABLE = (float)0.20;	private int BUF_SIZE = DefConstants.DEFAULT_BUFFER_SIZE;	private final static int STATE_NULL = 0;	private final static int STATE_CONNECTED = STATE_NULL + 1;	private final static int STATE_TRANSMISSION = STATE_CONNECTED + 1;	private ConnectionContext context = null;/*      private ServerSocketChannel selectableChannel = null;	private Selector selector = null; */	private CircularBuffer cirBuffer;	private ByteBuffer tempBuf = ByteBuffer.allocate(BUF_SIZE);	private NonBlockingChannel sockChannel;	private int state = STATE_NULL;	//The tuning para.	private int rtnBufSizeTimes = 1;	private double tempLoadedFactor;	private int n = 0; //the times the idle() was called	private int point = 0;	private double load_factors[] = new double[DefConstants.LD_FACTOR_WINDOW];	private double temp_load;	private double delay_para = 0;	private long millis_delay;	private int nanos_delay;        private javax.swing.Timer timerMonBuf;	private boolean bValid = true;	private static Log log = LogFactory.getLog(AutoFillInputBuffer.class.getName());	public AutoFillInputBuffer()	{		try{			timerMonBuf = new javax.swing.Timer(DefConstants.DELAY_MONITOR_BUF, this);			cirBuffer = new CircularBuffer();		}catch(Exception e) {			log.error(e);		}	} 	public AutoFillInputBuffer(int bufSize)	{		try{			cirBuffer = new CircularBuffer(bufSize);			timerMonBuf = new javax.swing.Timer(DefConstants.DELAY_MONITOR_BUF, this);		}catch(Exception e) {			log.error(e);		}	} 		public void setConnectionContext(ConnectionContext cc)	{		context = cc;	}	public int getBufferSize()	{		return cirBuffer.getSize();	}	public ConnectionContext getConnectionContext()	{		return context;	}		        public boolean init_sock() throws InterruptedException, IOException {		if(context == null)		{			log.error("initialize the ConnectionContext object first");			return false;		}		Selector selector = SelectorProvider.provider().openSelector();		ServerSocketChannel selectableChannel = ServerSocketChannel.open();		selectableChannel.configureBlocking(true);		log.info("the port socket server is listening to is "+context.rcverPort);		if(context.rcverPort > 0)		{			InetSocketAddress isa = new InetSocketAddress(context.rcverPort );			selectableChannel.socket().bind(isa);		}		else			selectableChannel.socket().bind(null);		log.info("the socket has successfully been bound:"+selectableChannel.socket().isBound());	     	SocketChannel sChannel = selectableChannel.accept();	     	if(sChannel == null)		{			log.error("accept client connection error");			throw new InterruptedException();		}		if(!sChannel.isConnected())		{			log.error("accept client connection error");			return false;		}		//selectableChannel.close();		sockChannel = new NonBlockingChannel(sChannel);		sockChannel.addCallback(this);		sockChannel.initialize();		//Set the conext object		InetSocketAddress saddr = (InetSocketAddress)sChannel.socket().getLocalSocketAddress();		InetSocketAddress saddr_remote = (InetSocketAddress)sChannel.socket().getRemoteSocketAddress();/*		log.debug(saddr);		log.debug(saddr_remote); */				context.bufNotification.SocketConnected(context);		return true;		    	}/*	public void setupChannel(SocketChannel s, Hashtable ht) throws IOException, ClosedChannelException	{		streamNameTable = ht;		sockChannel = new NonBlockingChannel(s);		sockChannel.addCallback(this);		sockChannel.initialize();	}	public void setBufferNotificationObject(BufferNotification bufNot, Object attachment)	{		bufNotification = bufNot;		Object temp = attached_object;		attached_object = (BufferEventHandler)attachment;	}	public void setBufferNotificationObject(BufferNotification bufNot)	{		bufNotification = bufNot;	}*/        public synchronized void setBufferStatus(boolean bValid)	{		this.bValid = bValid;	}        public synchronized boolean getBufferStatus(boolean bValid)	{		return bValid;	}	public int get(byte[] buf, int length)	{		int i = 0;		if(buf.length < length)			return -1;		try		{			while( i < length)			{				if(cirBuffer.BufferIsEmpty() == true)					return i;				buf[i] = cirBuffer.get();				/*if((cirBuffer.getLoadedFactor() < LIGHT_LOADED_THRESHOLD) 						&& bufNotification != null						&& state == STATE_TRANSMISSION)*//*				if(((tempLoadedFactor = cirBuffer.getLoadedFactor()) < DefConstants.LIGHT_LOADED_THRESHOLD) 						&& context.bufNotification != null)				{					context.loaded = tempLoadedFactor;					context.bufNotification.light_loaded(context);				} */				i = i + 1;			}		}		catch(BufferDataException e)		{			log.error(e);			return -1;		}		catch(InterruptedException e)		{			log.error(e);			return -1;		}		return i;	}				public int get(ByteBuffer buf, int length)	{		int i = 0;		if(buf.limit()< length)			return -1;		buf.clear();		try		{			while( i < length)			{				if(cirBuffer.BufferIsEmpty() == true)					return i;				buf.put(cirBuffer.get());				/*if((cirBuffer.getLoadedFactor() < LIGHT_LOADED_THRESHOLD) 						&& bufNotification != null						&& state == STATE_TRANSMISSION)*//*				if(((tempLoadedFactor = cirBuffer.getLoadedFactor()) < DefConstants.LIGHT_LOADED_THRESHOLD) 						&& context.bufNotification != null)				{					context.loaded = tempLoadedFactor;					context.bufNotification.light_loaded(context);				} */				i = i + 1;			}//			buf.rewind();		}		catch(BufferDataException e)		{			log.error(e);			return -1;		}		catch(InterruptedException e)		{			log.error(e);			return -1;		}		return i;	}	public int readToByteBuffer(ByteBuffer buf, int length, boolean bBlock)	{		int lenRet;		try		{		/*	if(cirBuffer.BufferIsEmpty() == true)				return -1;*/			lenRet = cirBuffer.readToByteBuffer(buf, length,true,bBlock);			/*if((cirBuffer.getLoadedFactor() < LIGHT_LOADED_THRESHOLD) 					&& bufNotification != null 					&& state == STATE_TRANSMISSION)*//*			if(((tempLoadedFactor = cirBuffer.getLoadedFactor()) < DefConstants.LIGHT_LOADED_THRESHOLD) 					&& context.bufNotification != null)			{				context.loaded = tempLoadedFactor;				context.bufNotification.light_loaded(context);			} */		}		catch(BufferDataException e)		{			log.error(e);			return -1;		}		catch(InterruptedException e)		{			log.error(e);			return -1;		}		return lenRet;	}	public void run()	{		setName("buffer thread");	        try {            		init_sock();        	} catch ( IOException e ) {	            e.printStackTrace();       		    return;        	}        	catch ( InterruptedException e ) {	            e.printStackTrace();       		     log.error( e );		     return;		}		log.fatal("start timer....");		timerMonBuf.start();	   while(true){	        try {       		     sockChannel.handleEvents();		     idle();		     handleDefEvents();        	}        	catch ( IOException e ) {	            e.printStackTrace();       		     log.error( e );        	}        	catch ( InterruptedException e ) {	            e.printStackTrace();       		     log.error( e );        	}	   }	}	public void handleDefEvents()	{	}	//Check if the buffer is over-loaded	public double getLoadedFactor()	{		return cirBuffer.getLoadedFactor();	}	//Read data from the socket channel and 	//put them to circular object     	public void readBytesEvent(SocketChannel channel)	{		 tempBuf.clear();		 int numBytesRead, toCir;		     		 int nRemaining = cirBuffer.getCapacity();		 if(nRemaining == 0)			 return;		 else		 {			 tempBuf.position(BUF_SIZE - nRemaining);			 tempBuf.mark();		 }//		 log.info("readByteEvent");		 try 		 {		// Clear the buffer and read bytes from socket			if((numBytesRead = channel.read(tempBuf)) > 0)			{				// To read the bytes, flip the buffer 				log.debug("the number of read from socket is" + numBytesRead);				//simulate the possible network delay. 				//the bandwidth is DefConstants.NET_BANDWIDTH bps/*				if(DefConstants.NET_BANDWIDTH > 0)				{					//(1e9 (ns) * 8 (bits)) / bandwidth					delay_para += (1e9 * numBytesRead * 8.0 ) / (double)(DefConstants.NET_BANDWIDTH * 2);  					millis_delay = (long)(delay_para / 1e6);					nanos_delay = (int)(delay_para - (double)millis_delay * 1e6);					log.debug(delay_para + ":" + numBytesRead + "the network delay is " + millis_delay + ":" + nanos_delay);					if(millis_delay >= (long)DefConstants.DELAY_THRESHOLD)					{						sleep(millis_delay, nanos_delay);						delay_para = 0.0;					}										}*/				tempBuf.reset();								toCir = numBytesRead;				while(toCir > 0)				{					// Read the bytes from the buffer ...;					toCir -= cirBuffer.getFromByteBuffer(tempBuf, toCir ,false/*blocking mode*/);					/*if(cirBuffer.getLoadedFactor() > OVER_LOADED_THRESHOLD 						&& bufNotification != null 						&& state == STATE_TRANSMISSION)*//*					if((tempLoadedFactor = cirBuffer.getLoadedFactor()) > DefConstants.OVER_LOADED_THRESHOLD 						&& context.bufNotification != null )					{						log.warn("buffer OVERLOADED" + tempLoadedFactor);						context.loaded = tempLoadedFactor;						context.bufNotification.over_loaded(context);						//attached_object.over_loaded(attached_object);					} */				}			}		 } catch (Exception e) 		 {			 	log.info("hi, I do't why I am here");	            		log.error(e);		 }	}	public boolean isBufferOverloaded()	{		return (cirBuffer.getLoadedFactor() > DefConstants.OVER_LOADED_THRESHOLD);	}	public boolean isBufferLightloaded()	{		return (cirBuffer.getLoadedFactor() < DefConstants.LIGHT_LOADED_THRESHOLD);	}	public void initAdaptationPara()	{		n = 0;		context.initAdaptationPara();	}	public void writeBytesEvent(SocketChannel channel ){}	public void socketConnectedEvent(SelectionKey readyKey, SocketChannel channel)	{	//	state = STATE_CONNECTED;		}	//50 ms elapse	public void actionPerformed(ActionEvent e)	{		calculateLoad();	}	public void idle()	{                if(cirBuffer.getSize() > 0)			return; 		try{		sleep(DefConstants.IDLE_TIME);		}catch(InterruptedException e)		{		}	//	calculateLoad();	}	private void calculateLoad()	{                if(!context.bufNotification.isWorkClassRunning())			return;		tempLoadedFactor = cirBuffer.getLoadedFactor();		log.info("tempLoadedFactor:" + tempLoadedFactor + " average_loaded:" + context.average_loaded);		if(n < DefConstants.LD_FACTOR_WINDOW)		{			context.average_loaded = (context.average_loaded * (double)n + tempLoadedFactor)/ (double)(n+1);			load_factors[point] = tempLoadedFactor;			point ++;			n++;		}		else		{			if(point == DefConstants.LD_FACTOR_WINDOW)				point = 0;			temp_load = load_factors[point];			context.average_loaded += (tempLoadedFactor - temp_load)/(double)DefConstants.LD_FACTOR_WINDOW;			load_factors[point] = tempLoadedFactor;			point ++;		}		for(int i = 0; i < DefConstants.LD_FACTOR_WINDOW; i++)		{			log.info(i + ":" + load_factors[i]);		}		//log.info(" average_loaded:" + context.average_loaded + "_________" + point);		log.warn(" average_loaded:" + context.average_loaded);		if(tempLoadedFactor  > DefConstants.OVER_LOADED_THRESHOLD 			&& context.bufNotification != null && bValid)		{			log.debug("buffer OVERLOADED" + tempLoadedFactor);			context.loaded = tempLoadedFactor;			context.bufNotification.over_loaded(context);			//attached_object.over_loaded(attached_object);		}		else if(tempLoadedFactor  < DefConstants.LIGHT_LOADED_THRESHOLD 			&& context.bufNotification != null && bValid)		{			context.loaded = tempLoadedFactor;			context.bufNotification.light_loaded(context);		}		else //this else statement is added on Sep. 14th to make the update of 		//severity more realicitic		{			context.loaded = tempLoadedFactor;			context.setLongTermLoadedFactor();		}		context.changingTrafficInfo();		log.warn(" longtermfactor:" + context.longTermLoadedFactor);		if(bValid)			context.bufNotification.IdleTransmission();/*		if(state == STATE_CONNECTED)		{			state = STATE_TRANSMISSION;                        //Read the handle and id from the buffer*//*			cirBuffer.readToByteBuffer(specifiedStreamName, 100);			if(streamNameTable.containsKey(specifiedStreamName))			{				String strHandleId = streamNameTable.get(specifiedStreamName);				//attached_object = new BufferEventHandler(strHandleId);				bufNotification.setConnectedHandle(strHandleId);				if(bufNotification.sendAck())					state = STATE_TRANSMISSION;			}*//*		}		if(state == STATE_TRANSMISSION)		{			bufNotification.idleTransmission();		} */	}	public synchronized void setRntBufSizeTimes(int times)	{		if(times < 1)		{			log.info("the times of returning buf size must be larger than 1");			return;		}		rtnBufSizeTimes = times;	}	public synchronized int getRntBufSizeTimes(int times)	{			return rtnBufSizeTimes;	}	//The following function is added for tuning	//the get() function put the data to internal 	//buffer and pass the reference of the buf	//to the invoker. The reture value is the number 	//of bytes in the buffer	//**************************************	//This number of bytes in the buffer has	//the following possible values:	//1. any value less than 1Kb	//	if the availbe bytes in the circularbuf	//	is less than 1k, all the bytes will be 	//	returned	//2. nKb n is a integer( 0<n) 	//	if the available bytes in the circularbuf	//	is more than 1Kb, the bytes passed to the	//	invoker depends on the current tuning para.	//	The current tuning para. is set by whatever	//	Grid service instance via the AutoFillBufferArray	public int get(byte[] buf)	{		int i = 0;		//How many bytes in the cirBuffer are available		int availLen = cirBuffer.getSize();		if(availLen <= 0)			return -1;		int times = availLen / BUF_SIZE;		if(times  >= rtnBufSizeTimes)			availLen = BUF_SIZE * rtnBufSizeTimes;		else if (times > 0)			availLen = BUF_SIZE* times;		buf = new byte[availLen];		try		{			while( i < availLen)			{				buf[i] = cirBuffer.get();				/*if((cirBuffer.getLoadedFactor() < LIGHT_LOADED_THRESHOLD) 						&& bufNotification != null						&& state == STATE_TRANSMISSION)*//*				if(((tempLoadedFactor = cirBuffer.getLoadedFactor()) < DefConstants.LIGHT_LOADED_THRESHOLD) 						&& context.bufNotification != null)				{					context.loaded = tempLoadedFactor;					context.bufNotification.light_loaded(context);				} */				i = i + 1;				if(cirBuffer.BufferIsEmpty() == true)					return i;			}		}		catch(BufferDataException e)		{			log.error(e);			return -1;		}		catch(InterruptedException e)		{			log.error(e);			return -1;		}		return i;	}}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -