📄 environmentsimulatorimpl.java
字号:
package org.osu.ogsa.stream.services;import org.osu.ogsa.stream.util.*;import org.osu.ogsa.stream.util.xmlconfig.*;import java.io.*;import java.nio.*;import java.nio.channels.*;import java.nio.charset.*;import java.nio.channels.spi.*;import java.nio.charset.*;import java.net.*;import java.util.*;import java.rmi.RemoteException;import java.awt.*;import java.awt.event.*;import javax.swing.Timer;import org.osu.ogsa.stream.Stream.StreamServiceGridLocator;import org.osu.ogsa.stream.Stream.StreamPortType;import org.osu.ogsa.stream.Stream.StreamServiceLocator;import org.osu.ogsa.stream.StreamMonitor.StreamMonitorPortType;import org.osu.ogsa.stream.StreamMonitor.StreamMonitorServiceGridLocator;import org.osu.ogsa.stream.StreamMonitor.StreamMonitorServiceLocator;import org.osu.ogsa.stream.StreamMonitor.StreamMonitorServiceLocator;import org.osu.ogsa.stream.EnvironmentSimulator.EnvironmentSimulatorPortType;import org.osu.ogsa.stream.EnvironmentSimulator.EnvironmentSimulatorServiceGridLocator;import org.osu.ogsa.stream.EnvironmentSimulator.EnvironmentSimulatorService;import org.osu.ogsa.stream.EnvironmentSimulator.EnvironmentSimulatorServiceLocator;import org.globus.ogsa.impl.ogsi.GridServiceImpl;import org.globus.ogsa.GridContext;import org.globus.ogsa.GridServiceException;import org.globus.ogsa.utils.AnyHelper;import org.globus.ogsa.OperationProvider;import org.globus.ogsa.GridServiceCallback;import org.globus.ogsa.GridServiceBase;import org.globus.ogsa.ServiceProperties;import javax.swing.tree.DefaultMutableTreeNode;import org.gridforum.ogsi.ExtensibilityType;import org.gridforum.ogsi.GridService;import org.gridforum.ogsi.HandleType;import org.gridforum.ogsi.OGSIServiceGridLocator;import org.gridforum.ogsi.ServiceDataValuesType;import org.gridforum.ogsi.FaultType;import org.gridforum.ogsi.Factory;import org.gridforum.ogsi.LocatorType;import org.globus.ogsa.utils.GridServiceFactory;import org.apache.axis.client.Stub;import org.apache.axis.utils.XMLUtils;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import javax.xml.namespace.QName;import org.w3c.dom.Element;public class EnvironmentSimulatorImpl extends GridServiceImpl implements EnvironmentSimulatorPortType{ private static Log log = LogFactory.getLog(EnvironmentSimulatorImpl.class.getName()); private FileLog fileLog; private String strConfigInfo; private boolean bStart = false; private Object synObject = new Object(); public int numStages, numInstances , numInstancesSignedIn; public Prim ga_init, ga_config; public Hashtable hashEdge = new Hashtable(); public Hashtable hashNode = new Hashtable(); public Hashtable hashStrDownstream= new Hashtable(); public Hashtable hashXMLConfig = new Hashtable(); public boolean [][] stagePlacements; public String strDest; public String strTopologyFile;// public void queryOverloaded(int direction); public EnvironmentSimulatorImpl() { } public synchronized boolean setConfigInfo(String info) { try { strConfigInfo = info; byte [] tempBytes = info.getBytes(); ByteArrayInputStream byteIn = new ByteArrayInputStream(tempBytes); XMLConfigurator.init(byteIn); } catch(Exception e) { log.error(e); return false; } return true; } public synchronized boolean setConfigFile(String filename) { try { String [] files = new String[1]; files[0] = filename; XMLConfigurator.init(files); } catch(Exception e) { log.error(e); return false; } return true; } public void setTopologyFile(String filename) { strTopologyFile = filename; } public synchronized boolean isAllMonitorsSignedIn() { if(numInstances <= numInstancesSignedIn) return true; else { return false; } } public synchronized boolean monitorSignIn(String strCurMonitorHandle, String strDownstreamMonitorHandle) { int i = 0; numInstancesSignedIn ++; log.debug(strCurMonitorHandle + " " + strDownstreamMonitorHandle); //get location first String start_node = Utilities.getIPAddress(strCurMonitorHandle); String end_node; if(strDownstreamMonitorHandle == null) //the last stage end_node = null; else end_node = Utilities.getIPAddress(strDownstreamMonitorHandle); int nNode = ga_init.findNode(start_node); int nEdge; if(end_node == null) //the last stage nEdge = -1; else nEdge = ga_init.findEdge(start_node, end_node); if(nNode == -1) { log.error("can't find the corresponding edge for " + strCurMonitorHandle + strDownstreamMonitorHandle); return false; } log.debug("nNode's index: " + nNode); log.debug("nEdge's index: " + nEdge); Integer intTemp; //Get Monitor's port type try{ StreamMonitorServiceGridLocator locator = new StreamMonitorServiceGridLocator(); StreamMonitorPortType monitorPortType = locator.getStreamMonitorService(new URL(strCurMonitorHandle)); if(monitorPortType != null) { if(nEdge >= 0) { intTemp = new Integer(nEdge); StreamMonitorPortType [] monArray = (StreamMonitorPortType[])hashEdge.get(intTemp); if(monArray == null) { monArray = new StreamMonitorPortType[10]; monArray[0] = monitorPortType; hashEdge.put(intTemp, monArray); } else { for(i = 0; monArray[i] != null; i ++) ; monArray[i] = monitorPortType; } //Engine needs this hash buffer hashStrDownstream.put(monitorPortType, strDownstreamMonitorHandle); } if(nNode >= 0) { intTemp = new Integer(nNode); hashNode.put(intTemp, monitorPortType); } } else { log.error("can't get monitor's handle"); return false; } } catch(Exception e) { log.error(e); return false; } return true; } public synchronized boolean findFirstConnectedDataSourceMonitorandMove(String location, int nstage, int nplacement) { log.debug(location + ": " + nstage + ":" + nplacement); if(location == null) return false; int i = ga_config.findNode(location); DefaultMutableTreeNode tr; if(i == -1) { log.error("can't be"); return false; }/* else { Hashtable hash = ga_config.d[i].hashTreenode; for(int j = 1; j < 8 ; j ++) for(int k = 1; k < 8 ; k ++) { tr = (DefaultMutableTreeNode)hash.get(new InstanceIndex(j,k)); if(tr != null) log.error(j + ":" + k + ":" + ((DynamicEnv)(tr.getUserObject())).name); else log.error(j + ":" + k + ":null"); } }*/ DefaultMutableTreeNode myself = (DefaultMutableTreeNode)ga_config.getTreeNode(location, nstage, nplacement); if(myself == null) { log.error("can't find the corresponding treenode for " + location + nstage + nplacement); return false; } DefaultMutableTreeNode firstLeaf = myself.getFirstLeaf(); String strLeafName = ((DynamicEnv)firstLeaf.getUserObject()).name; int nNodeIndex = ga_config.findNode(strLeafName); log.debug("leaf's name is " + strLeafName + "its index is " + nNodeIndex); StreamMonitorPortType firstMon = (StreamMonitorPortType)hashNode.get(new Integer(nNodeIndex)); if(firstMon == null) { log.error("can't find the first connected data sources"); return false; } //ask the first data source find a new path and move try{ boolean bReturn = firstMon.tryMoveStreamSrv(1); return bReturn; } catch(Exception e) { return false; } } public synchronized StreamMonitorPortType [] getMonitor(int indexEdge) { return (StreamMonitorPortType[])hashEdge.get(new Integer(indexEdge)); } public boolean start() { int fakeNumStages = ((Integer)XMLConfigurator.getParameter("fakeNumStages")).intValue(); numInstances = 0; numInstancesSignedIn = 0; int i,j; for(i = 1; i <= fakeNumStages; i++) numInstances += ((Integer)XMLConfigurator.getParameter("stages|" + "stage"+i+"|numPlacements")).intValue(); //Destination numStages = ((Integer)XMLConfigurator.getParameter("numStages")).intValue(); log.debug("numStages:"+numStages + " numInstances:" + numInstances); String strDestHandle = ((URL)XMLConfigurator.getParameter("stages|stage" + numStages + "|placement1")).toString(); log.debug(strDestHandle); strDest = Utilities.getIPAddress(strDestHandle); //init graph structures ga_init = new Prim(); ga_init.init(strTopologyFile); ga_init.init_dynamic_variables(0); //static variables has been initilized by ga_init //don't need to call ga_config.init() ga_config = new Prim(); ga_config.init_dynamic_variables(0); stagePlacements = new boolean[50][10]; for(i = 0; i < 50; i ++) for(j = 0; j < 10 ; j++) stagePlacements[i][j] = false; constructTree(ga_config); if(DefConstants.IF_NET_BANDWIDTH_VARING) { if(DefConstants.IF_FOR_EXP) { SimulatorEngineForExp engine = new SimulatorEngineForExp(this); Thread t = new Thread(engine); t.start(); } else { SimulatorEngine engine = new SimulatorEngine(this); Thread t = new Thread(engine); t.start(); } } return true; /* try{ synchronized(synObject) { bStart = true; } return true; } catch(RemoteException e) { log.error(e.getCause()); return false; } catch(IOException e) { log.error(e.getCause()); return false; }*/ } private synchronized boolean arePathesSame(DefaultMutableTreeNode newPathNode, DefaultMutableTreeNode exitingNode) {// log.debug("the number of treenodes at " + gaNode_Source.name + " is " + nodes.length); //check if the two pathes are same DynamicEnv gaNode_Source1 , gaNode_Dest1; boolean bSame = true; while(newPathNode != null) { if(exitingNode == null) { bSame = false; break; } gaNode_Source1 = (DynamicEnv)newPathNode.getUserObject(); gaNode_Dest1 = (DynamicEnv)exitingNode.getUserObject(); if(gaNode_Source1 == null || gaNode_Dest1 == null) { log.error("some thing wrong with the ga_config"); bSame = false; break; } if(!gaNode_Source1.name.equals(gaNode_Dest1.name)) { bSame = false; break; } exitingNode = (DefaultMutableTreeNode)exitingNode.getParent(); newPathNode = (DefaultMutableTreeNode)newPathNode.getParent(); } if(newPathNode == null && exitingNode != null) return false; return bSame; } public synchronized String findNewPath(String strStartingName, int nstage, int nplacement) { if(strStartingName.equals(strDest)) return null; ga_init.calculateKeyPaths(strStartingName); ga_init.constructPath(strDest); //check if the downstream in the new path is same //as the old one DefaultMutableTreeNode parent = (DefaultMutableTreeNode)ga_init.getTreeNode(strStartingName, -1, -1).getParent();/* String strNewDownstreamName = ((DynamicEnv)parent.getUserObject()).name; int nNodeIndex = ga_init.findNode(strStartingName); log.debug("parent's name is " + strNewDownstreamName + "its index is " + nNodeIndex + " nstage:" + nstage + " nplacement:" + nplacement); String strNextConnection = (String)XMLConfigurator.getParameter("stages|stage" + nstage +"|connection" + nplacement); String [] places = strNextConnection.split(":"); int nextStage = Integer.parseInt(places[0].substring(5)); int nextPlacement = Integer.parseInt(places[1].substring(9)); URL urlHandle = (URL)XMLConfigurator.getParameter("stages|stage" + nextStage + "|placement" + nextPlacement); if(urlHandle == null) { log.error("can't find the downstream service of stage " + nstage + " and placment " + nplacement); return null; } String strOldDownstreamHandle = urlHandle.toString(); String strOldDownstreamName = Utilities.getIPAddress(strOldDownstreamHandle); if(strOldDownstreamName.equals(strNewDownstreamName)) { log.debug("the same path is found"); return null; } else log.debug("the old downstream name is " + strOldDownstreamName); */ DefaultMutableTreeNode dest = ga_config.getTreeNode(strStartingName, nstage, nplacement); if(dest == null) { log.error("some thing wrong with the configuration information"); return null; } if(arePathesSame(parent, (DefaultMutableTreeNode)dest.getParent())) {// log.debug("the same path was found"); return null; } //Update hashStagePlacement// deletePlacements(strStartingName, nstage, nplacement); //merge the new path to the tree, and delete the old one updateTree(ga_init, ga_config, strStartingName, nstage, nplacement); String strConfig = generatingConfigString(ga_config); if(setConfigInfo(strConfig))
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -