📄 streammonitorimpl.java
字号:
package org.osu.ogsa.stream.services;import org.osu.ogsa.stream.util.*;import org.osu.ogsa.stream.util.xmlconfig.*;import org.osu.ogsa.stream.Stream.StreamServiceGridLocator;import org.osu.ogsa.stream.Stream.StreamPortType;import org.osu.ogsa.stream.Stream.StreamServiceLocator;import org.osu.ogsa.stream.StreamMonitor.StreamMonitorPortType;import org.osu.ogsa.stream.StreamMonitor.StreamMonitorServiceGridLocator;import org.osu.ogsa.stream.StreamMonitor.StreamMonitorServiceLocator;import org.osu.ogsa.stream.EnvironmentSimulator.EnvironmentSimulatorPortType;import org.osu.ogsa.stream.EnvironmentSimulator.EnvironmentSimulatorServiceGridLocator;import org.osu.ogsa.stream.EnvironmentSimulator.EnvironmentSimulatorService;import org.osu.ogsa.stream.EnvironmentSimulator.EnvironmentSimulatorServiceLocator;import java.io.*;import java.nio.*;import java.nio.channels.*;import java.nio.charset.*;import java.nio.channels.spi.*;import java.nio.charset.*;import java.net.*;import java.util.*;import java.rmi.RemoteException;import java.awt.*;import java.awt.event.*;import javax.swing.Timer;import org.globus.ogsa.impl.ogsi.GridServiceImpl;import org.globus.ogsa.GridContext;import org.globus.ogsa.GridServiceException;import org.globus.ogsa.utils.AnyHelper;import org.globus.ogsa.OperationProvider;import org.globus.ogsa.GridServiceCallback;import org.globus.ogsa.GridServiceBase;import org.globus.ogsa.ServiceProperties;import org.gridforum.ogsi.ExtensibilityType;import org.gridforum.ogsi.GridService;import org.gridforum.ogsi.HandleType;import org.gridforum.ogsi.OGSIServiceGridLocator;import org.gridforum.ogsi.ServiceDataValuesType;import org.gridforum.ogsi.FaultType;import org.gridforum.ogsi.Factory;import org.gridforum.ogsi.LocatorType;import org.globus.ogsa.utils.GridServiceFactory;import org.apache.axis.client.Stub;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import javax.xml.namespace.QName;import org.w3c.dom.Element;public class StreamMonitorImpl extends GridServiceImpl implements StreamMonitorPortType, MonitorNotification{ private StreamMonitorPortType monitor = null;// private String curUpstreamHandle = null; private String strConfigInfo = null; //paras for monitor private ResMonitorTaskArray taskArray = null; private int intBottleneckTimes = 0; private Object synBottleneck; private String strMyHandle = null; private String strStreamHandle = null; private StreamPortType streamSrv = null; private String strEnvSimulatorHandle = null; private EnvironmentSimulatorPortType envSimulatorSrv = null; private String strDownstreamMonitorHandle = null; private Hashtable hashStreamMonitorHandles = new Hashtable(DefConstants.MAX_NUM_UPSTREAM + DefConstants.MAX_NUM_DOWNSTREAM);// private Hashtable hashXMLConfig = new Hashtable(); private int curStage = -1; private int curPlacement = -1; private int numStages = -1; private int numPlacements = -1; private int fakeNumStages = -1; private int downstream_stage, downstream_placement; private String location = null; private String downstream_location = null; private boolean bFileLog = false; private FileLog fileLog = null;// private Random rand = null; private int intTimeToChangeEnv; private javax.swing.Timer timer; private boolean bLastStage; private static Log log = LogFactory.getLog(StreamMonitorImpl.class.getName()); public StreamMonitorImpl() { } public boolean setConfigFile(String filename) { try { String [] files = new String[1]; files[0] = filename; XMLConfigurator.init(files);// Utilities.constructTree(hashXMLConfig); } catch(Exception e) { log.error(e); return false; } return true; } public boolean setConfigInfo(String info) { try { byte [] tempBytes = info.getBytes(); ByteArrayInputStream byteIn = new ByteArrayInputStream(tempBytes); XMLConfigurator.init(byteIn);// Utilities.constructTree(hashXMLConfig); } catch(Exception e) { log.error(e); return false; } return true; } //ith_stage is the current stage of the service //ith_placement is the current plac //location is the IP address of this service //strHandle is the GRH of stream service //strConfig is the xml configuration file public boolean setCommunicationInfo(int ith_stage, int ith_placement, int numPlacements, String strStreamHandle, String strDownstreamHandle) { location = Utilities.getIPAddress(strStreamHandle); curStage = ith_stage; curPlacement = ith_placement; this.numPlacements = numPlacements; //????????????????????*********************** strMyHandle = strStreamHandle.replaceFirst("StreamService", "StreamMonitorService"); log.debug(strMyHandle); this.strStreamHandle = strStreamHandle; if(strDownstreamHandle != null) { strDownstreamMonitorHandle = strDownstreamHandle.replaceFirst("StreamService", "StreamMonitorService"); //get downstream_stage and ... String [] places = strDownstreamHandle.split("/"); int length = places.length; String [] places1 = places[length - 1].split("-"); this.downstream_stage = Integer.parseInt(places1[0].substring(5)); this.downstream_placement = Integer.parseInt(places1[1].substring(9)); this.downstream_location = Utilities.getIPAddress(strDownstreamMonitorHandle); } // strMyHandle = "http://"+location + ":8080/ogsa/services/Stream/StreamMonitorService/stage" + curStage +"-placement" + curPlacement; //Get the port type of stream service try{ StreamServiceGridLocator locator = new StreamServiceGridLocator(); streamSrv = locator.getStreamService(new URL(strStreamHandle)); if(streamSrv == null) { log.error("can't get my stream service's handle"); return false; } else log.debug("successfully got the handle of" + strStreamHandle ); } catch(Exception e) { log.error(e); return false; } return true; } public void setEnvironmentSimulatorStringHandle(String strHandle) { strEnvSimulatorHandle = strHandle; } private EnvironmentSimulatorPortType getEnvironmentSimulatorHandle() { if(envSimulatorSrv != null) return envSimulatorSrv; try{ EnvironmentSimulatorServiceGridLocator locator = new EnvironmentSimulatorServiceGridLocator(); log.debug(strEnvSimulatorHandle); envSimulatorSrv = locator.getEnvironmentSimulatorService(new URL(strEnvSimulatorHandle)); if(envSimulatorSrv == null) { log.error("can't get my environment Simulator service's handle"); return null; } } catch(Exception e) { log.error(e); return null; } return envSimulatorSrv; }/* public void addMonitorInstanceHandle(int ith_stage, int ith_placement, String strMonitorHandle) { if(hashMonitorInstances == null) hashMonitorInstances = new Hashtable(50); hashMonitorInstances.put(new InstanceIndex(ith_stage, ith_placement), strMonitorHandle); } public String getMonitorInstanceHandle(int ith_stage, int ith_placement) { if(hashMonitorInstances == null) { log.error("there is no instance information stored in the database"); return null; } InstanceIndex insIndex = new InstanceIndex(ith_stage, ith_placement); String handle; if((handle= (String)hashMonitorInstances.get(insIndex)) == null) return null; else return handle; } public void changeMonitorInstanceHandle(int ith_stage, int ith_placement, String strMonitorHandle) { if(hashMonitorInstances == null) { log.error("there is no instance information stored in the database"); return; } InstanceIndex insIndex = new InstanceIndex(ith_stage, ith_placement); if((String)hashMonitorInstances.get(insIndex) == null) addMonitorInstanceHandle(ith_stage, ith_placement, strMonitorHandle); else hashMonitorInstances.put(insIndex, strMonitorHandle); } */ //The function will //1. Check if the hash table has a web service instance corresponding // to the handle //2. Check if the corresponding web service has an instance, if // the instance exists, add the pair to hash table //3. create a new instance, add the pair to hash table //private StreamMonitorPortType getMonitorByName(String handle, int direction) private StreamMonitorPortType getMonitorByName(String handle, int nstage, int nplacement, int direction) { StreamMonitorPortType monitorPortType = null; //Step 1 SimConnectionContext scc = (SimConnectionContext)hashStreamMonitorHandles.get(handle); if(scc != null) monitorPortType = (StreamMonitorPortType)scc.monitorPortType; if(monitorPortType != null) { return monitorPortType; } //else either scc == null or monitorPortType = null //Step 2 try{ StreamMonitorServiceGridLocator locator = new StreamMonitorServiceGridLocator(); monitorPortType = locator.getStreamMonitorService(new URL(handle)); if(monitorPortType != null) { scc = new SimConnectionContext(handle, nstage, nplacement, monitorPortType, this, direction); hashStreamMonitorHandles.put(handle, (Object)scc); log.debug(handle + "monitor instance exist!!!! "); } return monitorPortType; } catch(GridServiceException e) { log.error(e); return null; } catch(RemoteException e) { log.error(e); return null; } catch(MalformedURLException e) { log.error(e); return null; } catch(Exception e) { log.error(e); return null; //Do nothing here, skip to the next step } } public boolean start() { taskArray = new ResMonitorTaskArray(this); log.debug(taskArray); try{ Integer tempInt; //All configuration information is in XMLConfigurator log.debug("*****Monitor*************"+curStage+"***Monitor**********"); tempInt = (Integer)XMLConfigurator.getParameter("numStages"); if(tempInt == null) { log.error("can't get numSTages from the configuration file"); return false; } numStages = ((Integer)XMLConfigurator.getParameter("numStages")).intValue(); if(numStages == curStage) bLastStage = true; else bLastStage = false; fakeNumStages = ((Integer)XMLConfigurator.getParameter("fakeNumStages")).intValue(); log.debug("number of stages is " + numStages); log.debug("fake number of stages is " + fakeNumStages);// int numPlacements = ((Integer)XMLConfigurator.getParameter("stages|" + "stage"+curStage+"|numPlacements")).intValue(); bFileLog = ((Boolean)XMLConfigurator.getParameter("filelogging")).booleanValue(); log.debug("number of placements is " + numPlacements); if(bFileLog) { Date tempTime = new Date(); String myLogFileName = "Monitorstage" + curStage + "-placement"+curPlacement + tempTime.getTime(); fileLog = new FileLog(myLogFileName); } StreamMonitorPortType downstream = null;// StreamMonitorPortType upstream = null; if(curStage == numStages) { //This is the last stage in the pipeline, //just output to stdout } else if(curStage <= fakeNumStages) { //The current service is going to find out //the next available placement StreamMonitorPortType downStream = (StreamMonitorPortType)getMonitorByName(strDownstreamMonitorHandle, downstream_stage, downstream_placement, DefConstants.DOWNSTREAM); log.debug("get the downstream " + strDownstreamMonitorHandle + " 's handle:" + downStream); SimConnectionContext scc = (SimConnectionContext)hashStreamMonitorHandles.get(strDownstreamMonitorHandle); if(scc == null) { log.error("the connection context shouldn't be null"); return false; } //Add a task thread for output buffer taskArray.addMonitorTask(scc);// log.debug("The next instance "+strDownstreamMonitorHandle+" exists?:" + isInsExist); if(downStream == null) { log.fatal("can't find the service target"); return false; } downStream.addUpstreamHandle(curStage, curPlacement, strMyHandle); } //get environment handle if(strEnvSimulatorHandle == null) { log.error("the handle of envirnoment service is null"); return false; } log.debug("come to here"); if(getEnvironmentSimulatorHandle() == null) { log.error("can't get environment handle"); return false; } else { //envsimulatorSrv has been gotten in getEnvironmentSimulatorHandle() log.debug("come to here" + envSimulatorSrv + strMyHandle + " " + strDownstreamMonitorHandle); envSimulatorSrv.monitorSignIn(strMyHandle, strDownstreamMonitorHandle); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -