⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 streamserviceprovider.java

📁 本人历尽千辛万苦找的clustream中的jar包
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
		if(curStage != numStages)		{			connectToNextService();		}		//Create a new instance of monitor service		startMonitorService();		//Load the working class		loadWorkClass();		return true;		}		catch(GridServiceException e)		{			log.error(e.getCause());			return false;		}		catch(RemoteException e)		{			log.error(e.getCause());			return false;		}		catch(IOException e)		{			log.error(e.getCause());			return false;		}	}			public void over_loaded(ConnectionContext cc)	{		AutoFillInputBuffer in;		AutoFillOutputBuffer out;		int i;		if(bJobDone || !bJobStart)			return;		//log.info(cc);		if(cc.in_or_out == -1)		{			log.fatal("the context is not correct");			return;		}		if(cc.times_overloaded < 100)			cc.times_overloaded ++;		cc.window ++;//		cc.window %= DefConstants.WINDOW_SIZE + 1;		if(cc.window > DefConstants.WINDOW_SIZE)			cc.window = DefConstants.WINDOW_SIZE;		if(cc.in_or_out == DefConstants.IN_BUFFER)			cc.misc_info = myHandle + "====Input Buffer=====";  		else			cc.misc_info = myHandle + "====Output Buffer====";		cc.setLongTermLoadedFactor();		if(cc.in_or_out == DefConstants.IN_BUFFER)		{			//log.info(myHandle + "::the input buffer is overloaded:"+cc.loaded);		/*	if(bFileLog)				fileLog.write(myHandle + "::the input buffer is overloaded:"+cc.loaded);*/			cc.misc_info = myHandle + ":the input buffer is overloaded and the long term loaded factor :"+cc.longTermLoadedFactor + "::" + cc.loaded + "---";			if(bFileLog)				fileLog.write(cc.misc_info); 			if(cc.longTermLoadedFactor > DefConstants.LONGTERM_EXTREMELY_OVER_LOADED_THRESHOLD && DefConstants.IFSPECIFYPARA)			//if(adjustAccuracyPara(cc.longTermLoadedFactor, cc) == REACHMAX) //reach the max value			{//the input buffer is long term overloaded			//call the upstream				StreamPortType streamPT = (StreamPortType)getStreamByName(cc.neighStreamHandle, DefConstants.UPSTREAM);				if(streamPT == null)				{					log.fatal("the stream handle is null");					return;				}					//				cc.fillSimContext(simCc);				try				{					//To the neighbour, I am a downstream					//streamPT.neighOverloaded(DefConstants.DOWNSTREAM, simCc);					streamPT.neighOverloaded(DefConstants.DOWNSTREAM);				}				catch(Exception e)				{					log.fatal(e);				}				//5/25/04 comments the following 'if' statement out				//the purpose of doing this is to get buffer's running 				//condition even applications don't specify the papa,				//so that monitor service can have idea about the system				//conditions.				/*if(!bIfSpecifiedPara)				{					cc.times_overloaded = 0;					cc.times_lightloaded = 0;					cc.window = 0;				}*/			}			//How many the input buffers are overloaded?			int number = inBufArray.howmanyInputBuffers();			//if there is only one buffer in the buffer array			if(number > 1 && inBufArray.isMajorityOverloaded())			{				//at lease two buffer in the array				//and the majority of the buffers are overloaded, 				//change the para.s.			//	adjustAccuracyPara(cc); 			}		}		else		{ 			//log.info(myHandle+"the output buffer is overloaded:"+cc.loaded);			/*if(bFileLog)				fileLog.write(myHandle+"the output buffer is overloaded:"+cc.loaded);*/			cc.misc_info = myHandle + ":the output buffer is overloaded and the long term loaded factor :"+cc.longTermLoadedFactor + "::" + cc.loaded + "---";			if(bFileLog)				fileLog.write(cc.misc_info);			//adjust the local parameters			//the reason is to try to locally 			//solve the traffic 			if(cc.longTermLoadedFactor > DefConstants.LONGTERM_EXTREMELY_OVER_LOADED_THRESHOLD && DefConstants.IFSPECIFYPARA)			//if(adjustAccuracyPara(cc) == REACHMAX) //reach the max value			{				//call the downstream				StreamPortType streamPT = (StreamPortType)getStreamByName(cc.neighStreamHandle, DefConstants.DOWNSTREAM);					if(streamPT == null)					{						log.fatal("the stream handle is null");						return;					}//				cc.fillSimContext(simCc);				try				{					//streamPT.neighOverloaded(DefConstants.UPSTREAM, simCc);					streamPT.neighOverloaded(DefConstants.UPSTREAM);				}				catch(Exception e)				{					log.fatal(e);				}				//5/25/04 comments the following 'if' statement out				//the purpose of doing this is to get buffer's running 				//condition even applications don't specify the papa,				//so that monitor service can have idea about the system				//conditions.				/*if(!bIfSpecifiedPara)				{				cc.times_overloaded = 0;				cc.times_lightloaded = 0;				cc.window = 0;				}*/				//DO I need to flip accuracyParaTake???????????????				//here I can get the notification from the upstream that the corresponding para has been changed			}		}	}	public void light_loaded(ConnectionContext cc)	{		AutoFillInputBuffer in;		AutoFillOutputBuffer out;		int i;		if(bJobDone || !bJobStart)			return;		//log.info(cc);		if(cc.in_or_out == -1)		{			log.fatal("the context is not correct");			return;		}		if(cc.times_lightloaded < 100)			cc.times_lightloaded ++;		cc.window --;		if(cc.window < -(DefConstants.WINDOW_SIZE))			cc.window = -(DefConstants.WINDOW_SIZE);//		cc.window %= DefConstants.WINDOW_SIZE + 1;		cc.misc_info = myHandle;  		cc.setLongTermLoadedFactor();		if(cc.in_or_out == DefConstants.IN_BUFFER)		{			//log.info("the input buffer is lightloaded");			//log.info(myHandle+"the input buffer is light loaded:" + cc.loaded);			cc.misc_info = myHandle + ":the input buffer is lightloaded and the long term loaded factor :"+cc.longTermLoadedFactor + "::" + cc.loaded + "---";			if(bFileLog)				fileLog.write(cc.misc_info);			if(cc.longTermLoadedFactor < DefConstants.LONGTERM_EXTREMELY_LIGHT_LOADED_THRESHOLD  && DefConstants.IFSPECIFYPARA)			{				StreamPortType streamPT = (StreamPortType)getStreamByName(cc.neighStreamHandle, DefConstants.UPSTREAM);				if(streamPT == null)				{					log.fatal("the stream handle is null");					return;				}					//				cc.fillSimContext(simCc);				try				{					//To the neighbour, I am a downstream					streamPT.neighLightloaded(DefConstants.DOWNSTREAM);				}				catch(Exception e)				{					log.fatal(e);				}				//5/25/04 comments the following 'if' statement out				//the purpose of doing this is to get buffer's running 				//condition even applications don't specify the papa,				//so that monitor service can have idea about the system				//conditions.				/*if(!bIfSpecifiedPara)				{				cc.times_overloaded = 0;				cc.times_lightloaded = 0;				cc.window = 0;				}*/			}			//How many the input buffers are lightloaded?			int number = inBufArray.howmanyInputBuffers();			//if there is only one buffer in the buffer array			if(number >1 && inBufArray.isMajorityLightloaded())			{				//At lease two buffers in the array				//the majority of the buffers are lightly loaded, 				//change the para.s.				//adjustAccuracyPara(1, cc);			}		}		else //Output buffer		{			//log.info(myHandle+ ":the output buffer is lightly loaded:"+cc.loaded);			cc.misc_info = myHandle + ":the output buffer is lightloaded and the long term loaded factor :"+cc.longTermLoadedFactor + "::" + cc.loaded + "---";			if(bFileLog)				fileLog.write(cc.misc_info);			//adjust the local parameters			//the reason is to try to locally 			//solve the traffic 			if(cc.longTermLoadedFactor < DefConstants.LONGTERM_EXTREMELY_LIGHT_LOADED_THRESHOLD && DefConstants.IFSPECIFYPARA)			//if(adjustAccuracyPara(cc) == REACHMIN) //reach the min value			{				//call the downstream				StreamPortType streamPT = (StreamPortType)getStreamByName(cc.neighStreamHandle, DefConstants.DOWNSTREAM);					if(streamPT == null)					{						log.fatal("the stream handle is null");						return;					}//				cc.fillSimContext(simCc);				try				{					//streamPT.neighOverloaded(DefConstants.UPSTREAM, simCc);					streamPT.neighLightloaded(DefConstants.UPSTREAM);				}				catch(Exception e)				{					log.fatal(e);				}				//5/25/04 comments the following 'if' statement out				//the purpose of doing this is to get buffer's running 				//condition even applications don't specify the papa,				//so that monitor service can have idea about the system				//conditions.				/*if(!bIfSpecifiedPara)				{				cc.times_overloaded = 0;				cc.times_lightloaded = 0;				cc.window = 0;				}*/			}		}	}	public void neighLightloaded(int direction)	{		int i; 				//5/25/04 comments the following 'if' statement out				//the purpose of doing this is to get buffer's running 				//condition even applications don't specify the papa,				//so that monitor service can have idea about the system				//conditions.		//if(!ifLoadedWorkClass || !bIfSpecifiedPara)		if(!ifLoadedWorkClass)			return; //		log.debug(myHandle + "come to neighLightLoaded");		boolean bAdjustPara = false;		ConnectionContext cc = new ConnectionContext();		if(direction == DefConstants.UPSTREAM)		{//upstream lightloaded			upstream_lightloaded_times ++;			inBufArray.adjustRtnBufSizeTimes(cc);/*			cc.misc_info = "upstream lightloaded";			if(!inBufArray.isMajorityOverloaded())				adjustAccuracyPara(1, cc.misc_info);  //Tune it slowly */		}		else //downstream lightloaded		{			downstream_lightloaded_times ++;			downstream_loaded_times_window --;			outBufArray.adjustRtnBufSizeTimes(cc);			cc.misc_info = "downstream lightloaded";/*			if(outBufArray.isMajorityOverloaded())				adjustAccuracyPara(-1, cc.misc_info); //Tune it fastly*/		}	}			public void neighOverloaded(int direction)	{		int i;				//5/25/04 comments the following 'if' statement out				//the purpose of doing this is to get buffer's running 				//condition even applications don't specify the papa,				//so that monitor service can have idea about the system				//conditions.		//if(!ifLoadedWorkClass || !bIfSpecifiedPara)		if(!ifLoadedWorkClass)			return; //		log.debug(myHandle + "come to neighOverLoaded");		boolean bAdjustPara = false;		ConnectionContext cc = new ConnectionContext();		if(direction == DefConstants.UPSTREAM)		{//upstream overloaded			upstream_overloaded_times ++;			inBufArray.adjustRtnBufSizeTimes(cc);			cc.misc_info = "upstream overloaded";		/*	if(inBufArray.isMajorityOverloaded())				adjustAccuracyPara(-1, cc.misc_info); //Tune it fastly*/		}		else 		{//downstream overloaded			downstream_overloaded_times ++;			downstream_loaded_times_window ++;			outBufArray.adjustRtnBufSizeTimes(cc);			cc.misc_info = "downstream overloaded";/*			if(!outBufArray.isMajorityOverloaded())				adjustAccuracyPara(1, cc.misc_info); //Tune it slowly */		}	}	public void handleBottleneck(ConnectionContext cc)	{		if(cc.neighStreamHandle == null)			return;		String strMonHandle = cc.neighStreamHandle.replaceFirst("StreamService", "StreamMonitorService");/*		if(cc.getDegradingFlag()) //Degrading from CRITICAL to SEVERE		{			//changing the flag in the SimConnectionContext of the monitor			try{				monitor.resetNumofNotifications(strMonHandle);			}catch(java.rmi.RemoteException e)			{				log.error(e);			}			cc.setDegradingFlag(false);			log.info("the traffic of the buffer " + cc.in_or_out + " degrade ");		} */		if(cc.severity == DefConstants.LV_CRITICAL)		{//			log.info("the traffic of the buffer " + cc.in_or_out + " turns to CRITICAL");			try{				monitor.executing(strMonHandle);			}			catch(java.rmi.RemoteException e)			{				log.error(e.getCause());			}		}		else if(cc.severity == DefConstants.LV_SEVERE)		{			if(!cc.getCalledFlag())			{				try{					monitor.executing(strMonHandle);				}				catch(java.rmi.RemoteException e)				{					log.error(e.getCause());				}				cc.setCalledFlag(true);			}		}	}		public int howmanyTimesBottleneck(boolean bFlush)	{		int temp;		synchronized(synBottleneck)		{			temp = intBottleneckTimes;			if(bFlush && intBottleneckTimes != 0)				intBottleneckTimes = 0;		}		return temp;	}			/*	private boolean isBottleneck()		{		boolean bBufOverLoaded = false;		//How many the input buffers are overloaded?		int number = inBufArray.howmanyInputBuffers();		//if there is only one buffer in the buffer array		int intOk, i;		for(i = 0, intOk = 0; i < number ; i++)			if(inBufArray.getInputBuffer(i).getConnectionContext().longTermLoadedFactor >  DefConstants.LONGTERM_EXTREMELY_OVER_LOADED_THRESHOLD)				intOk ++;		if(intOk >= number/2 + 1)			bBufOverLoaded = true;		else			bBufOverLoaded = false;		if(bBufOverLoaded)		{			Double doublePfmUnit = new Double(pfmParaUnit);			Double doubleAccuracyUnit = new Double(accuracyParaUnit);			if(Double.compare(pfmParaUnit, 0.0) > 0)			{				//the performance para has been specified				if(Double.compare(pfmPara, pfmParaMax) >= 0)					//the processing rate has been tuned to be fastest					return true;				else					return false;			}			else if(Double.compare(accuracyParaUnit, 0.0) >= 0)			{				if(Double.compare(accuracyPara, accuracyParaMin) <= 0)				//the processing rate has been tuned to be fastest					return true;				else					return false;			}			else			{				double tempLamda = calculateLamda(true);				if(Double.compare(tempLamda, 0.9) >=0)					return true;				else					return false;			}		}		else 			return false;	} */        public String setConfigFile(String add)	{		try		{			cfgURI[0] = add;			XMLConfigurator.init(cfgURI);			hashTree = new Hashtable();			Utilities.constructTree(hashTree);		}		catch(Exception e)		{			log.error(e);		}		return add;	}        public String setConfigInfo(String info)	{		try		{			strConfigInfo = info;			log.error("**********"+info);			byte [] tempBytes = info.getBytes();			ByteArrayInputStream byteIn = new ByteArrayInputStream(tempBytes);			XMLConfigurator.init(byteIn);			hashTree = new Hashtable();			Utilities.constructTree(hashTree);		}		catch(Exception e)		{

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -