⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 streammonitorimpl.java

📁 本人历尽千辛万苦找的clustream中的jar包
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
package org.osu.ogsa.stream.services;import org.osu.ogsa.stream.util.*;import org.osu.ogsa.stream.util.xmlconfig.*;import org.osu.ogsa.stream.Stream.StreamServiceGridLocator;import org.osu.ogsa.stream.Stream.StreamPortType;import org.osu.ogsa.stream.Stream.StreamServiceLocator;import org.osu.ogsa.stream.StreamMonitor.StreamMonitorPortType;import org.osu.ogsa.stream.StreamMonitor.StreamMonitorServiceGridLocator;import org.osu.ogsa.stream.StreamMonitor.StreamMonitorServiceLocator;import org.osu.ogsa.stream.EnvironmentSimulator.EnvironmentSimulatorPortType;import org.osu.ogsa.stream.EnvironmentSimulator.EnvironmentSimulatorServiceGridLocator;import org.osu.ogsa.stream.EnvironmentSimulator.EnvironmentSimulatorService;import org.osu.ogsa.stream.EnvironmentSimulator.EnvironmentSimulatorServiceLocator;import java.io.*;import java.nio.*;import java.nio.channels.*;import java.nio.charset.*;import java.nio.channels.spi.*;import java.nio.charset.*;import java.net.*;import java.util.*;import java.rmi.RemoteException;import java.awt.*;import java.awt.event.*;import javax.swing.Timer;import org.globus.ogsa.impl.ogsi.GridServiceImpl;import org.globus.ogsa.GridContext;import org.globus.ogsa.GridServiceException;import org.globus.ogsa.utils.AnyHelper;import org.globus.ogsa.OperationProvider;import org.globus.ogsa.GridServiceCallback;import org.globus.ogsa.GridServiceBase;import org.globus.ogsa.ServiceProperties;import org.gridforum.ogsi.ExtensibilityType;import org.gridforum.ogsi.GridService;import org.gridforum.ogsi.HandleType;import org.gridforum.ogsi.OGSIServiceGridLocator;import org.gridforum.ogsi.ServiceDataValuesType;import org.gridforum.ogsi.FaultType;import org.gridforum.ogsi.Factory;import org.gridforum.ogsi.LocatorType;import org.globus.ogsa.utils.GridServiceFactory;import org.apache.axis.client.Stub;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import javax.xml.namespace.QName;import org.w3c.dom.Element;public class StreamMonitorImpl extends GridServiceImpl implements StreamMonitorPortType, MonitorNotification{	private StreamMonitorPortType monitor = null;//	private String curUpstreamHandle = null;	private String strConfigInfo = null;		//paras for monitor	private ResMonitorTaskArray taskArray = null;	private int intBottleneckTimes = 0;	private Object synBottleneck;	private String strMyHandle = null;	private String strStreamHandle = null;	private StreamPortType streamSrv = null;	private String strEnvSimulatorHandle = null;	private EnvironmentSimulatorPortType envSimulatorSrv = null;		private String strDownstreamMonitorHandle = null;	private	Hashtable hashStreamMonitorHandles = new Hashtable(DefConstants.MAX_NUM_UPSTREAM + DefConstants.MAX_NUM_DOWNSTREAM);//	private	Hashtable hashXMLConfig = new Hashtable();	private int curStage = -1;	private int curPlacement = -1;	private int numStages = -1;	private int numPlacements = -1;	private int fakeNumStages = -1;	private int downstream_stage, downstream_placement;	private String location = null;	private String downstream_location = null;	private boolean bFileLog  = false;	private FileLog fileLog  = null;//	private Random rand = null;	private int intTimeToChangeEnv;	private javax.swing.Timer timer;	private boolean bLastStage;	private static Log log = LogFactory.getLog(StreamMonitorImpl.class.getName());	public StreamMonitorImpl()	{	}        public boolean setConfigFile(String filename)	{		try		{			String [] files = new String[1];			files[0] = filename;			XMLConfigurator.init(files);//			Utilities.constructTree(hashXMLConfig);		}		catch(Exception e)		{			log.error(e);			return false;		}		return true;	}        public boolean setConfigInfo(String info)	{		try		{			byte [] tempBytes = info.getBytes();			ByteArrayInputStream byteIn = new ByteArrayInputStream(tempBytes);			XMLConfigurator.init(byteIn);//			Utilities.constructTree(hashXMLConfig);		}		catch(Exception e)		{			log.error(e);			return false;		}			return true;	}	//ith_stage is the current stage of the service	//ith_placement is the current plac	//location is the IP address of this service	//strHandle is the GRH of stream service	//strConfig is the xml configuration file	public boolean setCommunicationInfo(int ith_stage, int ith_placement, int numPlacements, String strStreamHandle, String strDownstreamHandle)	{		location = Utilities.getIPAddress(strStreamHandle);		curStage = ith_stage;		curPlacement = ith_placement;		this.numPlacements = numPlacements;		//????????????????????***********************				strMyHandle = strStreamHandle.replaceFirst("StreamService", "StreamMonitorService");		log.debug(strMyHandle);		this.strStreamHandle = strStreamHandle;		if(strDownstreamHandle != null)		{			strDownstreamMonitorHandle = strDownstreamHandle.replaceFirst("StreamService", "StreamMonitorService");			//get downstream_stage and ...			String [] places = strDownstreamHandle.split("/");			int length = places.length;			String [] places1 = places[length - 1].split("-");					this.downstream_stage = Integer.parseInt(places1[0].substring(5));			this.downstream_placement = Integer.parseInt(places1[1].substring(9));			this.downstream_location = Utilities.getIPAddress(strDownstreamMonitorHandle);		}	//	strMyHandle = "http://"+location + ":8080/ogsa/services/Stream/StreamMonitorService/stage" + curStage +"-placement" + curPlacement;		//Get the port type of stream service                try{			StreamServiceGridLocator locator =   new StreamServiceGridLocator();			streamSrv = locator.getStreamService(new URL(strStreamHandle));			if(streamSrv == null)			{				log.error("can't get my stream service's handle");				return false;			}			else				log.debug("successfully got the handle of" + strStreamHandle );		}		catch(Exception e)		{			log.error(e);			return false;		}		return true;	}	public void setEnvironmentSimulatorStringHandle(String strHandle)	{		strEnvSimulatorHandle = strHandle;	}	private EnvironmentSimulatorPortType getEnvironmentSimulatorHandle()	{		if(envSimulatorSrv != null)			return envSimulatorSrv;                try{			EnvironmentSimulatorServiceGridLocator locator =  new EnvironmentSimulatorServiceGridLocator();			log.debug(strEnvSimulatorHandle);			envSimulatorSrv = locator.getEnvironmentSimulatorService(new URL(strEnvSimulatorHandle));			if(envSimulatorSrv == null)			{				log.error("can't get my environment Simulator service's handle");				return null;			}		}		catch(Exception e)		{			log.error(e);			return null;		}		return envSimulatorSrv;	}/*	public void addMonitorInstanceHandle(int ith_stage, int ith_placement, String strMonitorHandle) {		if(hashMonitorInstances == null)			hashMonitorInstances = new Hashtable(50);				hashMonitorInstances.put(new InstanceIndex(ith_stage, ith_placement), strMonitorHandle);	}	public String getMonitorInstanceHandle(int ith_stage, int ith_placement)	{		if(hashMonitorInstances == null)		{			log.error("there is no instance information stored in the database");			return null;		}		InstanceIndex insIndex = new InstanceIndex(ith_stage, ith_placement);		String handle;		if((handle= (String)hashMonitorInstances.get(insIndex)) == null)			return null;		else			return handle;	}	public void changeMonitorInstanceHandle(int ith_stage, int ith_placement, String strMonitorHandle)	{		if(hashMonitorInstances == null)		{			log.error("there is no instance information stored in the database");			return;		}		InstanceIndex insIndex = new InstanceIndex(ith_stage, ith_placement);		if((String)hashMonitorInstances.get(insIndex) == null) 			addMonitorInstanceHandle(ith_stage, ith_placement, strMonitorHandle);		else			hashMonitorInstances.put(insIndex, strMonitorHandle);	} */	//The function will 	//1. Check if the hash table has a web service instance corresponding	//   to the handle	//2. Check if the corresponding web service has an instance, if	//   the instance exists, add the pair to hash table	//3. create a new instance, add the pair to hash table	//private StreamMonitorPortType getMonitorByName(String handle, int direction)	private StreamMonitorPortType getMonitorByName(String handle, int nstage, int nplacement, int direction)	{		StreamMonitorPortType monitorPortType = null;		//Step 1		SimConnectionContext scc = (SimConnectionContext)hashStreamMonitorHandles.get(handle);		if(scc != null)			monitorPortType = (StreamMonitorPortType)scc.monitorPortType;		if(monitorPortType != null)		{			return monitorPortType;		}		//else either scc == null or monitorPortType = null		//Step 2		try{			StreamMonitorServiceGridLocator locator = new StreamMonitorServiceGridLocator();			monitorPortType = locator.getStreamMonitorService(new URL(handle));			if(monitorPortType != null)			{				scc = new SimConnectionContext(handle, nstage, nplacement, monitorPortType, this, direction);				hashStreamMonitorHandles.put(handle, (Object)scc);				log.debug(handle + "monitor instance exist!!!! ");			}			return monitorPortType;		}		catch(GridServiceException e)		{			log.error(e);			return null;		}		catch(RemoteException e)		{			log.error(e);			return null;		}		catch(MalformedURLException e)		{			log.error(e);			return null;		}		catch(Exception e)                {			log.error(e);			return null;			//Do nothing here, skip to the next step		}	}	public boolean start() 	{		taskArray = new ResMonitorTaskArray(this);		log.debug(taskArray);		try{			Integer tempInt; 			//All configuration information is in XMLConfigurator			log.debug("*****Monitor*************"+curStage+"***Monitor**********");			tempInt = (Integer)XMLConfigurator.getParameter("numStages");			if(tempInt == null)			{				log.error("can't get numSTages from the configuration file");				return false;			}			numStages = ((Integer)XMLConfigurator.getParameter("numStages")).intValue();			if(numStages == curStage)				bLastStage = true;			else				bLastStage = false;			fakeNumStages = ((Integer)XMLConfigurator.getParameter("fakeNumStages")).intValue();			log.debug("number of stages is " + numStages);			log.debug("fake number of stages is " + fakeNumStages);//			int numPlacements = ((Integer)XMLConfigurator.getParameter("stages|" + "stage"+curStage+"|numPlacements")).intValue();			bFileLog = ((Boolean)XMLConfigurator.getParameter("filelogging")).booleanValue();			log.debug("number of placements is " + numPlacements);			if(bFileLog)			{				Date tempTime = new Date();				String myLogFileName = "Monitorstage" + curStage + "-placement"+curPlacement + tempTime.getTime();				fileLog = new FileLog(myLogFileName);			}			StreamMonitorPortType downstream = null;//			StreamMonitorPortType upstream = null;						if(curStage == numStages)			{				//This is the last stage in the pipeline,				//just output to stdout			}			else if(curStage <= fakeNumStages)			{				//The current service is going to find out 				//the next available placement				StreamMonitorPortType downStream = (StreamMonitorPortType)getMonitorByName(strDownstreamMonitorHandle, downstream_stage, downstream_placement, DefConstants.DOWNSTREAM);				log.debug("get the downstream " + strDownstreamMonitorHandle + " 's handle:" + downStream);				SimConnectionContext scc = (SimConnectionContext)hashStreamMonitorHandles.get(strDownstreamMonitorHandle);				if(scc == null)				{					log.error("the connection context shouldn't be null");					return false;				}				//Add a task thread for output buffer				taskArray.addMonitorTask(scc);//				log.debug("The next instance "+strDownstreamMonitorHandle+" exists?:" + isInsExist);				if(downStream == null)				{					log.fatal("can't find the service target");					return false;				}				downStream.addUpstreamHandle(curStage, curPlacement, strMyHandle);			}			//get environment handle			if(strEnvSimulatorHandle == null)			{				log.error("the handle of envirnoment service is null");				return false;			}			log.debug("come to here");			if(getEnvironmentSimulatorHandle() == null)			{				log.error("can't get environment handle");				return false;			}			else			{				//envsimulatorSrv has been gotten in getEnvironmentSimulatorHandle()				log.debug("come to here" + envSimulatorSrv + strMyHandle + " " + strDownstreamMonitorHandle);				envSimulatorSrv.monitorSignIn(strMyHandle, strDownstreamMonitorHandle);			}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -