⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 autofilloutputbuffer.java

📁 本人历尽千辛万苦找的clustream中的jar包
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
					log.debug("don't send anything go back");					delay_para += 1e9*timeConsumed;					millis_delay = (long)(delay_para / 1e6);					nanos_delay = (int)(delay_para - (double)millis_delay * 1e6);					if(millis_delay >= (long)DefConstants.DELAY_THRESHOLD)					{						delay_para = 0.0;						sleep(millis_delay, nanos_delay);					}				}			}			else				newNetBandwidth = -1;						//get the size of data in cirbuffer			int capacity = cirBuffer.getCapacity();			int size = cirBuffer.getSize();			int read;			log.info("the number of bytes in the cirBuffer is " + size);			if(size > BUF_SIZE)				size = BUF_SIZE;				//the data in cirBuffer still there until the setNewBase is called			read = cirBuffer.readToByteBuffer(tempBuf, size, true,false);//block mode			tempBuf.flip();			if((numBytesWrite = channel.write(tempBuf)) != read )				log.info("write part of data to the socket\n" + 						" numBytesWrite = " + numBytesWrite);			else				log.info(" the number of bytes written to the socket is "+numBytesWrite);			if(numBytesWrite > 0 && newNetBandwidth > 0)			{				delay_para += 1e9*(double)numBytesWrite * 8.0/(double)(newNetBandwidth);  				millis_delay = (long)(delay_para / 1e6);				nanos_delay = (int)(delay_para - (double)millis_delay * 1e6);				log.debug(delay_para + ":" + numBytesWrite + "the network delay is " + millis_delay + ":" + nanos_delay);				if(millis_delay >= (long)DefConstants.DELAY_THRESHOLD)				{					delay_para = 0.0;					sleep(millis_delay, nanos_delay);				}			}		 } catch (Exception e){	            		log.error(e);		 }	} */	//this is just intermedia version     	public void writeBytesEvent(SocketChannel channel)	{		 tempBuf.clear();		 int numBytesWrite;		 int bandwidth;		     		 try {						//get the size of data in cirbuffer						int capacity = cirBuffer.getCapacity();			int size = cirBuffer.getSize();			int read;			bandwidth = getNetworkBandwidth();			log.debug("the number of bytes in the cirBuffer is " + size);			if(size > BUF_SIZE)				size = BUF_SIZE;				//the data in cirBuffer still there until the setNewBase is called			read = cirBuffer.readToByteBuffer(tempBuf, size, true,false/*block mode*/);			tempBuf.flip();			if((numBytesWrite = channel.write(tempBuf)) != read )				log.debug("write part of data to the socket\n" + 						" numBytesWrite = " + numBytesWrite);			else				log.debug(" the number of bytes written to the socket is "+numBytesWrite);			newNetBandwidth = (int)((double)bandwidth*(1.0 - getNetUtil()));			log.debug("new network bandwidth is :" + newNetBandwidth);			if(numBytesWrite > 0 && newNetBandwidth > 0)			{				delay_para += 1e9*(double)numBytesWrite * 8.0/(double)(newNetBandwidth);  				millis_delay = (long)(delay_para / 1e6);				nanos_delay = (int)(delay_para - (double)millis_delay * 1e6);				log.debug(delay_para + ":" + numBytesWrite + "the network delay is " + millis_delay + ":" + nanos_delay);				if(millis_delay >= (long)DefConstants.DELAY_THRESHOLD)				{					delay_para = 0.0;					sleep(millis_delay, nanos_delay);				}/*				if(millis_delay != 0 || nanos_delay != 0)					sleep(millis_delay, nanos_delay);*/			}/*			if(((tempLoadedFactor = cirBuffer.getLoadedFactor()) < DefConstants.LIGHT_LOADED_THRESHOLD) && context.bufNotification != null)			{				context.loaded = tempLoadedFactor;				context.bufNotification.light_loaded(context); 			} */			//			cirBuffer.setNewBase(numBytesWrite);		 } catch (Exception e){	            		log.error(e);		 }	}        public boolean isBufferOverloaded()	{		return (cirBuffer.getLoadedFactor() > DefConstants.OVER_LOADED_THRESHOLD);	}	public boolean isBufferLightloaded()	{		return (cirBuffer.getLoadedFactor() < DefConstants.LIGHT_LOADED_THRESHOLD);	}	public void socketConnectedEvent(SelectionKey readyKey, SocketChannel channel){	}        public void initAdaptationPara()        {                n = 0;                context.initAdaptationPara();        }	public void actionPerformed(ActionEvent e)	{		if(timerMonBuf.equals(e.getSource()))			calculateLoad();		if(timerBgTraffic.equals(e.getSource()))			accumulateBgTrafficPkg();		if(timerNetworkBandwidthVaring != null)		{			if(timerNetworkBandwidthVaring.equals(e.getSource()))				changingNetworkBandwidth();		}	}	public void idle()	{		if(cirBuffer.getSize() > 0)			return;		try{		sleep(DefConstants.IDLE_TIME);//		log.info("sleeping");		}		catch(InterruptedException e)		{		}		//calculateLoad();	}	private void calculateLoad()	{		if(!context.bufNotification.isWorkClassRunning())			return;		tempLoadedFactor = cirBuffer.getLoadedFactor();		if(n < DefConstants.LD_FACTOR_WINDOW)		{			context.average_loaded = (context.average_loaded * (double)n + tempLoadedFactor)/ (double)(n+1);			load_factors[point] = tempLoadedFactor;			point ++;			n++;		}		else		{			if(point == DefConstants.LD_FACTOR_WINDOW)				point = 0;			temp_load = load_factors[point];			context.average_loaded += (tempLoadedFactor - temp_load)/(double)DefConstants.LD_FACTOR_WINDOW;			load_factors[point] = tempLoadedFactor;			point ++;		}                log.debug(" average_loaded:" + context.average_loaded);		if(tempLoadedFactor  > DefConstants.OVER_LOADED_THRESHOLD			&& context.bufNotification != null && bValid)		{			context.loaded = tempLoadedFactor;			context.bufNotification.over_loaded(context);		}		else if(tempLoadedFactor < DefConstants.LIGHT_LOADED_THRESHOLD  			&& context.bufNotification != null && bValid)		{			context.loaded = tempLoadedFactor;			context.bufNotification.light_loaded(context);		}                else //this else statement is added on Sep. 14th to make the update of			//severity more realicitic		{			context.loaded = tempLoadedFactor;			context.setLongTermLoadedFactor();		}		context.changingTrafficInfo();						if(bValid)			context.bufNotification.IdleTransmission();	}	public synchronized void setSeqBgTraffic(double [] seqBgTraffic)	{		this.seqBgTraffic = seqBgTraffic;		nIndexOfTrafficArray = 0;		//debug		for(int i = 0 ; i < seqBgTraffic.length ; i ++)		{			log.warn(i + ":" + seqBgTraffic[i]);		}	}	public synchronized void setNetUtil(double util)	{		net_util = util;	}	public synchronized double getNetUtil()	{		return net_util;	}			public synchronized void accumulateBgTrafficPkg()	{		int bandwidth = getNetworkBandwidth();		if(bandwidth == -1)			return;		if(!context.bufNotification.isWorkClassRunning())			return;		if(seqBgTraffic != null) //before the monitor set background secquences		{			nAccumulatedPkg += seqBgTraffic[nIndexOfTrafficArray];			nIndexOfTrafficArray = (nIndexOfTrafficArray + 1) % DefConstants.SIZE_SELF_SIMILAR_ARRAY;			nTimes ++;		}	}		private synchronized int getNTimes()	{		int temp = nTimes;		nTimes = 0;		return temp;	}			public synchronized double getNumAccumulatedPkg()	{		int bandwidth = getNetworkBandwidth();		if(!DefConstants.IF_NET_BANDWIDTH_VARING)			return 0;		if(seqBgTraffic == null && bandwidth != -1) //before the monitor set background secquences		{			double package_interval = (double)DefConstants.PACKAGE_INTERAL/(double)1000;			//In fact ffg_ave is the mean packages per PACKAGE_INTERAL			/* 			 * M*PACKAGE_SIZE/PACKAGE_INTERAL = bandwidth*dNetworkUtil;			 */			//The default 's net utility is 0.2			double tempUtil = getNetUtil();			double ffg_ave = (double)bandwidth*tempUtil*package_interval/(double)DefConstants.PACKAGE_SIZE;			double ffg_variance = 4.0 * ffg_ave;			SelfSimilarSeqGenerator tr_init = new SelfSimilarSeqGenerator(ffg_ave, ffg_variance, DefConstants.SIZE_SELF_SIMILAR_ARRAY);	                seqBgTraffic = tr_init.generating();			//debug			log.warn("initilize pakcetasfa by itself:ffg_ave: " + ffg_ave + "net_util: " + tempUtil);			for(int i = 0 ; i < seqBgTraffic.length ; i ++)			{				log.warn(i + ":" + seqBgTraffic[i]);			}			nIndexOfTrafficArray = 0;			nAccumulatedPkg = 0.0;			return 0.0;		}		double temp = nAccumulatedPkg;		nAccumulatedPkg = 0;		return temp;	}	public void changingNetworkBandwidth()	{		Date temTime = new Date();		long longTime = temTime.getTime();		int rate = getNetworkBandwidth();		log.error(longTime + " the current rate is " + rate);		rate += sign*(int)((double)(DefConstants.maxProducingRate - DefConstants.minProducingRate)/(double)DefConstants.nDivide);		if(rate > DefConstants.maxProducingRate)		{			rate = DefConstants.maxProducingRate;			sign = -1;			rate += sign*(int)((double)(DefConstants.maxProducingRate - DefConstants.minProducingRate)/(double)DefConstants.nDivide);		}		else if(rate < DefConstants.minProducingRate)		{			rate = DefConstants.minProducingRate;			sign = 1;			rate += sign*(int)((double)(DefConstants.maxProducingRate - DefConstants.minProducingRate)/(double)DefConstants.nDivide);		}		log.error(longTime + " the new rate is " + rate);		setNetworkBandwidth(rate);	}	private synchronized int getNetworkBandwidth()	{		return NET_BANDWIDTH;	}	private synchronized void setNetworkBandwidth(int bandwidth)	{		NET_BANDWIDTH = bandwidth;	}	public void handleDefEvents()	{	}}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -