⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 autofilloutputbuffer.java

📁 本人历尽千辛万苦找的clustream中的jar包
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
package org.osu.ogsa.stream.util;import java.io.*;import java.nio.*;import java.nio.channels.*;import java.nio.channels.spi.*;import java.nio.charset.*;import java.net.*;import java.util.*;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import javax.swing.Timer;import java.awt.event.*;public class AutoFillOutputBuffer extends Thread implements SocketNotification, ActionListener{	private final static int BUF_SIZE = 1024;//	private final static float OVER_LOADED_THRESHOLD = (float)0.80;//     	private final static float LIGHT_LOADED_THRESHOLD = (float)0.2;	private CircularBuffer cirBuffer;//	private byte[] tempBuf = new byte[BUF_SIZE];	private ByteBuffer tempBuf = ByteBuffer.allocate(BUF_SIZE);	private NonBlockingClient clientSocket;	private BufferNotification bufNotification = null;	private ConnectionContext context = null;	private double tempLoadedFactor;         private int n = 0; //the times the idle() was called	private int point = 0;	private double load_factors[] = new double[DefConstants.LD_FACTOR_WINDOW];	private double temp_load;        private double delay_para = 0;        private long millis_delay;	private int nanos_delay;	private int NET_BANDWIDTH = -1;	private int sign = -1;	private double [] seqBgTraffic;	private int nIndexOfTrafficArray = 0;	private double nAccumulatedPkg = 0;	private int nTimes = 0;	private boolean bValid = true;	private double net_util = 0.2;	private int occupiedBandwidth = 0; 	private int newNetBandwidth = 0;	private javax.swing.Timer timerMonBuf, timerBgTraffic, timerNetworkBandwidthVaring;	private static Log log = LogFactory.getLog(AutoFillOutputBuffer.class.getName());	public AutoFillOutputBuffer()	{		try{			cirBuffer = new CircularBuffer();			timerMonBuf = new javax.swing.Timer(DefConstants.DELAY_MONITOR_BUF, this);			timerBgTraffic = new javax.swing.Timer(DefConstants.PACKAGE_INTERAL, this);			timerNetworkBandwidthVaring = null;		}catch(Exception e)		{			log.error(e);		}	} 	public AutoFillOutputBuffer(int bufSize)	{		try{			cirBuffer = new CircularBuffer(bufSize);			timerMonBuf = new javax.swing.Timer(DefConstants.DELAY_MONITOR_BUF, this);			timerBgTraffic = new javax.swing.Timer(DefConstants.PACKAGE_INTERAL, this);			timerNetworkBandwidthVaring = null;		}catch(Exception e)		{			log.error(e);		}	}         public void setConnectionContext(ConnectionContext cc)	{		context = cc;		if(context.net_bandwidth > 0)			setNetworkBandwidth((int)context.net_bandwidth);		else if(DefConstants.NET_BANDWIDTH > 0)			setNetworkBandwidth((int)DefConstants.NET_BANDWIDTH);		else if(context.net_bandwidth == 0 || DefConstants.NET_BANDWIDTH == 0)		{			if(DefConstants.timeIntervalForVaringBandwidth > 0 			   && DefConstants.minProducingRate > 0 			   && DefConstants.maxProducingRate > 0 			   && DefConstants.minProducingRate != DefConstants.maxProducingRate			   && DefConstants.nDivide > 0)			{				try{					timerNetworkBandwidthVaring = new javax.swing.Timer(DefConstants.timeIntervalForVaringBandwidth, this);				}catch(Exception e)				{					log.error(e);				}								setNetworkBandwidth(DefConstants.maxProducingRate);				sign = -1;			}			else timerNetworkBandwidthVaring = null;		}		else 			setNetworkBandwidth(-1);		if(context.net_util < 1.0)			setNetUtil(context.net_util);				}        public ConnectionContext getConnectionContext()	{		return context;	}	public int put(byte[] buf, int length)	{		int i = 0;		//make sure the data in the buffer 		//can put into cirbuffer		int capacity = cirBuffer.getCapacity();		int size;		if(capacity < length)			size = capacity;		else			size = length;		try		{			while( i < size )			{				cirBuffer.put(buf[i]);/*				if(((tempLoadedFactor = cirBuffer.getLoadedFactor()) > DefConstants.OVER_LOADED_THRESHOLD) && context.bufNotification != null)				{					context.loaded = tempLoadedFactor;					context.bufNotification.over_loaded(context);				}*/				i = i + 1;			}		}		catch(BufferDataException e)		{			log.error(e);			return -1;		}		catch(InterruptedException e)		{			log.error(e);			return -1;		}		return size;	}				public int put(ByteBuffer buf, int length, boolean bBlock)	{		int i = 0;		if(bBlock == false)		//make sure the data in the buffer 		//can put into cirbuffer		{				int size;			try			{			int capacity = cirBuffer.getCapacity();				if(capacity < length)				size = capacity;			else				size = length;				while( i < size )				{					cirBuffer.put(buf.get(i));/*					if(((tempLoadedFactor = cirBuffer.getLoadedFactor()) > DefConstants.OVER_LOADED_THRESHOLD) && context.bufNotification != null)					{						context.loaded = tempLoadedFactor;						context.bufNotification.over_loaded(context);					}*/						i = i + 1;				}			}			catch(BufferDataException e)			{				log.error(e);				return -1;			}			catch(InterruptedException e)			{				log.error(e);				return -1;			}			return size;		}		else //blocking mode		{			try			{				while( i < length)				{					//log.debug("The value gotten from cirBuffer is"+buf.get(i)+"and i is "+ i);					cirBuffer.put(buf.get(i), true/*block mode*/);/*					if(((tempLoadedFactor = cirBuffer.getLoadedFactor()) > DefConstants.OVER_LOADED_THRESHOLD) && context.bufNotification != null)					{						context.loaded = tempLoadedFactor;						context.bufNotification.over_loaded(context);					} */						i = i + 1;				}			}			catch(BufferDataException e)			{				log.error(e);				return -1;			}			catch(InterruptedException e)			{				log.error(e);				return -1;			}			return length;		}	}	public void run()	{		setName("buffer thread");	        try {                	if(context == null)			{				log.error("initialize the ConnectionContext object first");				return;			}			log.debug(context.rcverHostName+context.rcverPort);			clientSocket = new NonBlockingClient(context.rcverHostName, context.rcverPort);			clientSocket.addCallback(this);			SocketChannel sChannel = clientSocket.initialize();			//System.out.println(sChannel);			//InetSocketAddress saddr = (InetSocketAddress)sChannel.socket().getLocalSocketAddress();			//InetSocketAddress saddr_remote = (InetSocketAddress)sChannel.socket().getRemoteSocketAddress();			//context.rcverPort = saddr.getPort();			//context.rcverAddr = saddr.getAddress();			//context.senderPort = saddr_remote.getPort();			//context.senderAddr = saddr_remote.getAddress();			context.bufNotification.SocketConnected(context);        	} catch ( IOException e ) {	            e.printStackTrace();       		    System.exit( -1 );        	}		timerMonBuf.start();		timerBgTraffic.start();		if(timerNetworkBandwidthVaring != null)			timerNetworkBandwidthVaring.start();		while(true){	        try {       		     clientSocket.handleEvents();		     idle();		     handleDefEvents();        	}        	catch ( IOException e ) {	            e.printStackTrace();       		     log.error( e );        	}        	catch ( InterruptedException e ) {	            e.printStackTrace();       		     log.error( e );        	}		}	}	public synchronized void setBufferStatus(boolean bValid)	{		this.bValid = bValid;	}	public synchronized boolean getBufferStatus(boolean bValid)	{		return bValid;	}			public void readBytesEvent(SocketChannel channel)	{}	//Keep it for dynamic one/*     	public void writeBytesEvent(SocketChannel channel)	{		 tempBuf.clear();		 int numBytesWrite;		     		 try {			if(NET_BANDWIDTH > 0)			{				//(1e9 (ns) * 8 (bits)) / bandwidth								double dTempPkg = getNumAccumulatedPkg();				int timeConsumed = DefConstants.PACKAGE_INTERAL * getNTimes();				if(timeConsumed != 0)				{					occupiedBandwidth = (int)((double)(dTempPkg*DefConstants.PACKAGE_SIZE*1000)/(double)timeConsumed); 					System.out.println("the occupied netbandwidth is " + occupiedBandwidth);				}				else				{					occupiedBandwidth = 0;					System.out.println("the occupied netbandwidth is 0 ");				}//				delay_para += 1e9*(numBytesWrite * 8.0 + getNumAccumulatedPkg()*DefConstants.PACKAGE_SIZE)/(double)(NET_BANDWIDTH);  //				delay_para += 1e9*(numBytesWrite * 8.0 + dTempPkg*DefConstants.PACKAGE_SIZE)/(double)(NET_BANDWIDTH);				newNetBandwidth = NET_BANDWIDTH - occupiedBandwidth;				log.debug("new network bandwidth is :" + newNetBandwidth);				if(newNetBandwidth  <= 0 )				{

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -