⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 resmonitortask.java

📁 本人历尽千辛万苦找的clustream中的jar包
💻 JAVA
字号:
package org.osu.ogsa.stream.util;import org.osu.ogsa.stream.util.xmlconfig.*;import java.io.*;import java.util.*;import java.lang.Object;import javax.swing.Timer;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import java.awt.*;import java.awt.event.*;import org.w3c.dom.Element;public class ResMonitorTask extends Thread {//	private StreamMonitorImpl stream;        private static Log log = LogFactory.getLog(ResMonitorTask.class.getName());	private boolean bHalt = false;	private int curStage = -1;	/*	private int intAvailMem = 0; 	private int intAveLoad = 0;	private int intCPUSpeed = 0;	private Object synObject;	private int DELAY_MEM = 1000 * 10; //10 sec. 	private int DELAY_LOAD = 1000 * 60; //1 min	private String service_handle = "http://localhost:8080/ogsa/services/base/index/IndexService";	private String strXPathExpLoad = "//ce:Host/ce:ProcessorLoad";	private	String strXPathExpAvailMem = "//ce:Host/ce:MainMemory";	private	String strXPathExpCPUSpeed = "//ce:Host/ce:Processor";	private Timer timer_mem;	private Timer timer_load;	private Element [] elementRes; *//*	public ResMonitorTask(StreamMonitorImpl stream)	{		this.stream = stream;	}*/	///////////////////New ////////////////////////////	SimConnectionContext context = null;	public void setConnectionContext(SimConnectionContext scc)	{		context = scc;		curStage = context.monitor.getCurStage();	}	public ResMonitorTask()	{	}	public ResMonitorTask(SimConnectionContext scc)	{		setConnectionContext(scc);	}		public synchronized void halt()	{		bHalt = true;	}	public synchronized void activate()	{		bHalt = false;	}	public void run()	{		/*timer_load.start();		timer_mem.start();*/		//infinite loop		while(true)		{			try{				synchronized(context)				{				//	log.debug("sleep");					context.taskThreadSleeping();					context.wait();				}				//wake up by Monitor Service				if(bHalt)					continue;//				log.debug("wakeup");				context.taskThreadWaken();			}			catch(Exception e)			{				log.error(e);				continue;			}			if(context.in_or_out != DefConstants.IN_BUFFER &&				context.in_or_out != DefConstants.OUT_BUFFER)				continue;			log.debug("come to here to check if I am a bottleneck:" + context.in_or_out);			int intRet = context.monitor.isBottleneck(null, curStage , context.in_or_out);			if(bHalt || intRet == DefConstants.STAY || intRet == DefConstants.EXCEPTION) 			{				log.debug("stay");				continue; //Stay			}			//Gotta move			if(context.in_or_out == DefConstants.IN_BUFFER)			{				//immigrate				//determine if all of the input buffers are overloaded, 				//if it is, then it might be because the cpu computation				//power is not enough, we need to find another machine 				//and move all connections to another place.				//				//if minority of the input buffers are overloaded, 				//then it might be due to the high arrival rate of data stream, 				//we would like to move this connection alone to another spot.				log.debug("input bottleneck");				if(context.monitor.tryMoveStreamSrv(0))					halt();									}			if(context.in_or_out == DefConstants.OUT_BUFFER)			{				//1 means the network is the bottleneck				//immigration of the corresponding stream service 				//is done by Monitor				log.debug("ready to move ");				if(context.monitor.tryMoveStreamSrv(1))					halt();//					return;			}		}	}/*************************************************************/		//3 min elapse	/*public void actionPerformed(ActionEvent e)	{		if(timer_load.equals(e.getSource()))			calculateAveCpuLoad_Speed();		else if(timer_mem.equals(e.getSource()))			calculateAveMem();		else			log.info("wrong timer!!!");	}*//*	private void calculateAveCpuLoad_Speed()	{		//Query the resources from index services//		log.debug(location);		//Try the service handle with the ip address //		String service_handle = "http://" + location + ":8080/ogsa/services/base/index/IndexService";		//Query the load of cpu		elementRes = QueryServiceDataByXPath.Query("HostScript", service_handle, strXPathExpLoad);		if(elementRes == null)		{			log.error("can't query the processor's load");		}		else		{			int temp;			temp = (Integer.valueOf(elementRes[0].getAttribute("ce:Last1Min"))).intValue();			if(intAveLoad > 0)			{				temp += intAveLoad;				temp /= 2;			}			synchronized(synObject)			{				intAveLoad = temp;			}		}		//Query the frequery of cpu		if(intCPUSpeed > 0)			return;		else		{			elementRes = QueryServiceDataByXPath.Query("HostScript", service_handle, strXPathExpCPUSpeed);			if(elementRes == null)				log.error("can't query the processor's speed");			else				synchronized(synObject)				{					intCPUSpeed = (Integer.valueOf(elementRes[0].getAttribute("ce:ClockSpeed"))).intValue();				}		}	}		private void calculateAveMem()	{		//Query the available memory		int temp;		Element [] elementRes = QueryServiceDataByXPath.Query("HostScript", service_handle, strXPathExpAvailMem);		if(elementRes == null)		{			log.error("can't query the processor's available mem");			return;		}		else			temp = (Integer.valueOf(elementRes[0].getAttribute("ce:RAMAvailable"))).intValue();		if(intAvailMem > 0)		{			temp += intAvailMem;			temp = temp/2;		}				synchronized(synObject)		{			intAvailMem = temp;		}	}	public int getAvailMem()	{		int temp;		synchronized(synObject)		{			temp = intAvailMem;			intAvailMem = 0;		}		return temp;	}	public int getCPUSpeed()	{		int temp;		synchronized(synObject)		{			temp = intCPUSpeed;			intCPUSpeed = 0;		}		return temp;	}	public int getAveCPULoad()	{		int temp;		synchronized(synObject)		{			temp = intAveLoad;			intAveLoad = 0;		}		return temp;	}*/}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -