⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 streammonitorimpl.java

📁 本人历尽千辛万苦找的clustream中的jar包
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
			return true;		}		catch(GridServiceException e)		{			log.error(e);			return false;		}		catch(RemoteException e)		{			log.error(e);			return false;		}		catch(IOException e)		{			log.error(e);			return false;		}	}			public void addUpstreamHandle(int upStage, int upPlacement, String handle)	{		log.debug("adding upstream info: Handle: " + handle + " upStage: " + upStage + " upPlacement" + upPlacement);		StreamMonitorPortType monitor = getMonitorByName(handle, upStage, upPlacement, DefConstants.UPSTREAM);		if(monitor == null)		{			log.error("can't get the monitor port type of " + handle);			return;		}		SimConnectionContext scc = (SimConnectionContext)hashStreamMonitorHandles.get(handle);		if(scc == null)		{			log.error("the connection context shouldn't be null");			return;		}		//Add a task thread for output buffer		taskArray.addMonitorTask(scc);		log.debug("successfully add upstream info");	}			public void resetNumofNotifications(String strNeighborMonitorHandle)	{		if(strNeighborMonitorHandle == null)			return;		SimConnectionContext scc = (SimConnectionContext)hashStreamMonitorHandles.get(strNeighborMonitorHandle);		if(scc == null)		{			log.error("can't find the connection context for " + strNeighborMonitorHandle);			return;		}				scc.resetNumofNotifications();	}	public void executing(String strNeighborMonitorHandle)	{		if(strNeighborMonitorHandle == null)		{			log.error("the handle of neighbour monitor is null");			return;		}		SimConnectionContext scc = (SimConnectionContext)hashStreamMonitorHandles.get(strNeighborMonitorHandle);		if(scc == null)		{			log.error("can't find the connection context for " + strNeighborMonitorHandle);			return;		}				scc.wakeupThread();	}	//this is a recursive function	//the return value is the location of the bottleneck	public int isBottleneck(String strUpstreamMonitorHandle, int ith_stage, int in_or_out)	{		//if strUpstreamMonitorHandle is null, then it is called from itself ResMonitorTask/*		if(strUpstreamMonitorHandle == null)		{			log.error("the upstream monitor handle is null");			return DefConstants.EXCEPTION;		} *//*		log.debug("come to check if the following buffer bottleneck *******");		log.debug("the upstream handle is :" + strUpstreamMonitorHandle);		log.debug("from which stage:" + ith_stage + " and in_or_out buffer? " + in_or_out); */		if(curStage == numStages)		//the last stage		{			//if the call is from upstream?			if(ith_stage == numStages) // from myself				return DefConstants.STAY;			else				return DefConstants.MOVE;		}		//some stage in the middle		else		{			boolean needCheckInBuffer, needCheckOutBuffer;			if(ith_stage < curStage)			//Check both input and output buffer of mine				needCheckInBuffer = needCheckOutBuffer = true;			else if(ith_stage == curStage && in_or_out == DefConstants.IN_BUFFER)			{				needCheckInBuffer = false;				needCheckOutBuffer = true;			}			else				needCheckInBuffer = needCheckOutBuffer = false;			//if curStage is at the same node as the downstream stage, don't check output buffer			if(location.equals(downstream_location))				needCheckOutBuffer = false;			//Check the input Buffer			if(needCheckInBuffer)			{				SimConnectionContext scc_in = (SimConnectionContext)hashStreamMonitorHandles.get(strUpstreamMonitorHandle);				if(scc_in == null)				{					log.error("can't get connection context of " + strUpstreamMonitorHandle);					return DefConstants.EXCEPTION;				}				if(scc_in.isBufOverloaded())				{					//log.debug("sorry for the stage " + ith_stage + ":" + location + ": my input buffer is overloaded");					return DefConstants.STAY;				}			}			//check the output buffer			if(needCheckOutBuffer)			{									SimConnectionContext scc_out = (SimConnectionContext)hashStreamMonitorHandles.get(strDownstreamMonitorHandle);				if(scc_out == null)				{					log.error("can't get connection context of " + strDownstreamMonitorHandle);					return DefConstants.EXCEPTION;				}				if(scc_out.isBufOverloaded())				{					//log.debug("sorry for the stage " + ith_stage + ":" + location + ": my output buffer is overloaded");					return DefConstants.STAY;				}			}			//now call the downstream			try{				StreamMonitorPortType downMonitorStream = (StreamMonitorPortType)getMonitorByName(strDownstreamMonitorHandle, -1 , -1 , DefConstants.DOWNSTREAM);				if(downMonitorStream == null)				{					log.error("can't get downstream's port type " + strDownstreamMonitorHandle);					return DefConstants.EXCEPTION;				}				return downMonitorStream.isBottleneck(strMyHandle, ith_stage, DefConstants.IN_BUFFER);			}			catch(java.rmi.RemoteException e)			{				System.out.println(e);				return DefConstants.EXCEPTION;			}		}	}	public void netBandwidthNotification(int bandwidth, double dNetworkUtil, String strDownstreamHandle)	{		if(curStage == numStages)			return;		SimConnectionContext scc = (SimConnectionContext)hashStreamMonitorHandles.get(strDownstreamHandle);		if(scc == null)		{			log.debug("can't find the corresponding connection context for " + strDownstreamHandle);			return;		}		else if(!scc.bValid)		{			log.debug(strDownstreamHandle + "is invalid already");			return;		}		//Seconds --->		double package_interval = (double)DefConstants.PACKAGE_INTERAL/(double)1000;		//In fact ffg_ave is the mean packages per PACKAGE_INTERAL		/* 		 * M*PACKAGE_SIZE/PACKAGE_INTERAL = bandwidth*dNetworkUtil;		 */		double ffg_ave = (double)bandwidth*dNetworkUtil*package_interval/(double)DefConstants.PACKAGE_SIZE;		double ffg_variance = 4.0 * ffg_ave;		log.info("the network util is " + dNetworkUtil + "ffg_ave is " + ffg_ave);		String strStreamHandle = strDownstreamHandle.replaceFirst("StreamMonitorService", "StreamService");		try{			log.debug("seting service's bandwidth");			streamSrv.netBandwidthNotification(ffg_ave, ffg_variance, dNetworkUtil, strStreamHandle);		}		catch(java.rmi.RemoteException e)		{			log.error(e);		}	}	public boolean realMoveStreamSrv(String strNewConfig)	{		if(strNewConfig == null)			return false;		if(curStage == numStages)			return false;		setConfigInfo(strNewConfig);                        		try{			streamSrv.setConfigInfo(strNewConfig);		}		catch(Exception e)		{			log.error(e);			return false;		}		String strNextConnection = (String)XMLConfigurator.getParameter("stages|stage" + curStage +"|connection" + curPlacement);		String [] places = strNextConnection.split(":");		int nextStage = Integer.parseInt(places[0].substring(5));		int nextPlacement = Integer.parseInt(places[1].substring(9));		URL urlHandle = (URL)XMLConfigurator.getParameter("stages|stage" + nextStage + "|placement" + nextPlacement);		if(urlHandle == null)		{			log.error("can't find the downstream service of stage " + curStage + " and placment " + curPlacement);			return false;		}		String newDownstreamHandle = urlHandle.toString();		String newDowstreamMonitorHandle = newDownstreamHandle.replaceFirst("StreamService", "StreamMonitorService") + "/stage" + nextStage + "-placement" + nextPlacement;		log.debug("new downstream monitor:" + newDowstreamMonitorHandle);		log.debug("cur downstream monitor:" + strDownstreamMonitorHandle);		try{		if(newDowstreamMonitorHandle.equals(strDownstreamMonitorHandle))		{			log.warn("in the new path, the new downstream " + newDownstreamHandle + "is the same as the old one");			StreamMonitorPortType downStream = (StreamMonitorPortType)getMonitorByName(strDownstreamMonitorHandle, downstream_stage, downstream_placement, DefConstants.DOWNSTREAM);			return downStream.realMoveStreamSrv(strNewConfig);		}		else		{			String strDownstreamHandle = streamSrv.launchNextService();			if(strDownstreamHandle == null)			{				log.error("can't lanuch up the downstream service");				return false;			}			int nIndexOutBuf = streamSrv.connectToNextService();			if(nIndexOutBuf < 0)			{				log.error("can't connect to the downstream service");				return false;			}			else				log.debug("the index of output buffer is " + nIndexOutBuf);			if(!streamSrv.shiftToNextService(nIndexOutBuf))			{				log.error("can't shift to the next service");				return false;			}			if(!streamSrv.runDownstreamWorkClass(strDownstreamHandle))			{				log.error("can't run the downstream work class");				return false;			}			//We also need to contact the next Monitor by calling restart()			restart(strDownstreamHandle);		}		}		catch(Exception e)		{			log.error(e);			return false;		}		return true;	}	public synchronized void setMigratable(boolean flag)	{		DefConstants.IF_DYNAMIC_ALLOCATION_APPLIED = flag;	}	//Called by ResMonitorTask	public synchronized boolean tryMoveStreamSrv(int nKind)	{		if(!DefConstants.IF_DYNAMIC_ALLOCATION_APPLIED)			return false;		try{		if(nKind == 1)		{			//if current location is same as the downstream location 			//return false			if(location.equals(downstream_location))				return false;						//the network is bottleneck			if(curStage == numStages)				return false;			EnvironmentSimulatorPortType env = getEnvironmentSimulatorHandle();			if(env == null)			{				log.warn("EnvironmentSimulatorPortType is null");				return false;			}			//try to find a new path//			log.debug("trying to find a new path");			String strNewConfig = env.findNewPath(location, curStage, curPlacement);						if(strNewConfig != null)			{							log.debug("new path in the config file is:" + strNewConfig);				//the XMLConfigurator has been updated				//find the connection infor of the current instance				if(!realMoveStreamSrv(strNewConfig))					return false;			}			else if(curStage > 1) //not one of data sources			{				//get the first data source connected to the current 				try{					boolean bReturn = env.findFirstConnectedDataSourceMonitorandMove(location, curStage, curPlacement);					return bReturn;				}				catch(Exception e)				{					return false;				}			}			else			{				log.debug("can't find a new host");				return false;			}		}		else		{			//the computation is the bottleneck		}		}catch(Exception e)		{			log.error(e);			return false;		}		return true;	}	public void restart(String strNewDownstreamHandle)	{		if(strNewDownstreamHandle != null)		{			strDownstreamMonitorHandle = strNewDownstreamHandle.replaceFirst("StreamService", "StreamMonitorService");			//get downstream_stage and ...			String [] places = strDownstreamMonitorHandle.split("/");			int length = places.length;			String [] places1 = places[length - 1].split("-");					this.downstream_stage = Integer.parseInt(places1[0].substring(5));			this.downstream_placement = Integer.parseInt(places1[1].substring(9));			start();		}	}			public void halt(String strNeighstreamMonitorHandle)	{		taskArray.haltMonitorTask(strNeighstreamMonitorHandle);	}	public int getCurStage()	{		return curStage;	}/*	//? mill-seconds elapse	public void actionPerformed(ActionEvent e)	{		if(!timer.equals(e.getSource()))			log.warn("different timer");		envSimulatorSrv.			} */	}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -