📄 streamserviceprovider.java
字号:
log.error(e); return "Fail"; } return "OK"; }/* private void constructTree() { String places[] = null; String strNextConnection = null;; int nSourcePlacements = ((Integer)XMLConfigurator.getParameter("stages|stage1|numPlacements")).intValue(); int nstage, nplacement; DefaultMutableTreeNode treenode, parent, root; int numStages = ((Integer)XMLConfigurator.getParameter("numStages")).intValue(); String strFinalStagePlacement = "stage" + numStages + ":placement1"; nstage = 1; nplacement = 1; treenode = parent = root = null; for(int i = 1; i <= nSourcePlacements; i ++) { nplacement = i; nstage = 1; treenode = new DefaultMutableTreeNode(new Integer(1)); //new String(parent.getChildCount() + 1)); hashTree.put("stage1:placement"+i, treenode); do{ log.debug("stage"+nstage+":placement"+nplacement); //Get the next stage's placement strNextConnection = (String)XMLConfigurator.getParameter("stages|stage" + nstage +"|connection" + nplacement); log.debug(strNextConnection); if(hashTree.containsKey(strNextConnection)) { parent = (DefaultMutableTreeNode)hashTree.get(strNextConnection); parent.add(treenode); treenode.setUserObject(new Integer(parent.getChildCount())); break; } else { parent = new DefaultMutableTreeNode(new Integer(1)); parent.add(treenode); hashTree.put(strNextConnection, parent); treenode = parent; //change nstage and nplacement places = strNextConnection.split(":"); nstage = Integer.parseInt(places[0].substring(5)); //the length of "placement" is 9 nplacement = Integer.parseInt(places[1].substring(9)); //the length of "placement" is 9 } //log.debug(treenode); //log.debug(parent); }while(!strNextConnection.equals(strFinalStagePlacement)); if(strNextConnection.equals(strFinalStagePlacement)) root = parent; } //Debug DefaultMutableTreeNode node; for (Enumeration e = root.breadthFirstEnumeration() ; e.hasMoreElements() ;) { node = (DefaultMutableTreeNode)e.nextElement(); System.out.println(((Integer)(node.getUserObject())).intValue()); } } */ public boolean startMonitorService() { //create a Monitor service //Get the GSH of the corresponding monitor String IPAdd = Utilities.getIPAddress(myHandle); strMonitorHandle = "http://" + IPAdd + ":8080/ogsa/services/Stream/StreamMonitorService"; //Debug region log.debug("****************"); log.debug(strMonitorHandle); log.debug("****************"); try{ OGSIServiceGridLocator gridLocator = new OGSIServiceGridLocator(); Factory factory = gridLocator.getFactoryPort(new URL(strMonitorHandle)); GridServiceFactory monitorFactory = new GridServiceFactory(factory); LocatorType locator = monitorFactory.createService(myInstanceName); log.debug(locator); StreamMonitorServiceGridLocator monitorLocator = new StreamMonitorServiceGridLocator(); monitor = monitorLocator.getStreamMonitorService(locator); log.debug(monitor); //now we can communicate with the monitor monitor.setCommunicationInfo(curStage, curPlacement, numPlacements, myHandle, curDownstreamHandle); if(strEnvSimulatorHandle != null) monitor.setEnvironmentSimulatorStringHandle(strEnvSimulatorHandle); else log.error("didn't set Environment simulator's handle"); //Sep. 11th***** I comment the following statements out, //because monitor and stream service is in the //same machine /*if(strConfigInfo == null) monitor.setConfigFile(cfgURI[0]); else monitor.setConfigInfo(strConfigInfo); */ boolean bStart = monitor.start(); log.debug("result " + bStart); if(bStart) System.out.println("monitor work....."); else System.out.println("can't start the monitor services"); } catch(Exception e) { log.error(e); //Do nothing here, skip to the next step return false; } return true; }// public boolean launchNextService() public String launchNextService() { try{// int numStages = ((Integer)XMLConfigurator.getParameter("numStages")).intValue();// int fakeNumStages = ((Integer)XMLConfigurator.getParameter("fakeNumStages")).intValue(); boolean [] isInsExist = new boolean[1]; String nextPlace = (String)XMLConfigurator.getParameter("stages|" + "stage"+curStage+"|connection"+curPlacement); log.debug(nextPlace); String [] places =nextPlace.split(":"); //the expression of placement is like: // "stageN:placement?" if(places.length != 2) { log.error("The expression of placement in the stage "+ curStage+"isn't correct"); return null; } int nextPlc = Integer.parseInt(places[1].substring(9)); //the length of "placement" is 9 int nextStage = Integer.parseInt(places[0].substring(5)); //the length of "stage" is 5 String strURL = "stages|" + "stage" + nextStage + "|" + places[1]; log.debug(strURL); URL GSH = (URL)XMLConfigurator.getParameter(strURL); if(GSH == null) { log.error("The location of the service can't be find"); return null; } log.debug(GSH); String nextInstanceName = "stage"+ nextStage +"-" + places[1]; curDownstreamHandle = GSH.toString()+"/"+nextInstanceName; downStream = (StreamPortType)getStreamByName(curDownstreamHandle, DefConstants.DOWNSTREAM, isInsExist); log.debug("The next instance "+curDownstreamHandle+" exists?:" + isInsExist[0]); if(downStream == null) { log.fatal("can't find the service target"); return null; } log.debug("my handle is " + myHandle); downStream.addUpstreamHandle(myHandle); boolean flagLoad = needLoadWorkClass(numStages, fakeNumStages); log.info("need load work class?:"+flagLoad); downStream.setLoadWorkClassFlag(flagLoad); downStream.setEnvironmentSimulatorStringHandle(strEnvSimulatorHandle); downStream.setSocketPort(remoteServerSocketPort); if(!isInsExist[0]) {// downStream.setSocketPort(remoteServerSocketPort); downStream.setCurrentStage(nextStage,nextPlc); if(strConfigInfo == null) downStream.setConfigFile(cfgURI[0]); else downStream.setConfigInfo(strConfigInfo); log.debug("now start the next stream service"); //the current service will wait until the next //service have been launched if(downStream.start()) log.info(strURL + "'s instance've been launched...."); else{ log.fatal("can't launch the "+strURL+"instance"); return null; } } else { log.debug("now add a socket server to the next stream service"); int temp = downStream.addSocketServer(); if(temp < 0) { log.error("can't add a socket server at the next service"); return null; } if(flagLoad) downStream.loadWorkClass(); } } catch(GridServiceException e) { log.error(e); return null; } catch(RemoteException e) { log.error(e); return null; } catch(IOException e) { log.error(e); return null; } return curDownstreamHandle; } public int connectToNextService() { //estabish the socket connections to the following instance if(curStage != numStages) { //connect to the next socket server String nextPlace = (String)XMLConfigurator.getParameter("stages|" + "stage"+curStage+"|connection"+curPlacement); String [] places =nextPlace.split(":"); String strURL = "stages|" + places[0] + "|" + places[1]; URL GSH = (URL)XMLConfigurator.getParameter(strURL); log.debug(GSH); //Get the bandwidth between two stages ConnectionContext cc2 = new ConnectionContext(); String strNetBand = (String)XMLConfigurator.getParameter("stages|" + "stage"+curStage+"|bandwidth"+curPlacement); if(strNetBand == null) { cc2.net_bandwidth = -1; cc2.net_util = 3.0; log.info("the network bandwidth is " + cc2.net_bandwidth); } else if(strNetBand.indexOf(':') < 0) { //The constant number is specified cc2.net_bandwidth = (Integer.valueOf(strNetBand)).doubleValue(); cc2.net_util = 3.0; log.info("the network bandwidth is " + cc2.net_bandwidth); } else { //The util has been specified //The function is specified String [] strArray = strNetBand.split(":"); cc2.net_bandwidth = (Integer.valueOf(strArray[0])).doubleValue(); cc2.net_util = (Double.valueOf(strArray[1])).doubleValue(); log.info("the network bandwidth is " + cc2.net_bandwidth + "and the utility is " + cc2.net_util); //log.info("*****haven't implemented it"); //..... } cc2.iStep = curStage; cc2.strId = myHandle; cc2.rcverPort = remoteServerSocketPort; //cc1.strId = (String)stream.getProperty(ServiceProperties.HANDLE); cc2.neighStreamHandle = curDownstreamHandle; cc2.rcverHostName = GSH.getHost(); int index2 = outBufArray.addOutputBuffer(cc2); //Update remoteServerSocketPort remoteServerSocketPort += 10; log.debug(cc2); log.debug("the index is " + index2 ); if(index2 < 0) { log.error("can't add an output buffer and a socket server"); return -1; } return index2; } else return -1; } public boolean shiftToNextService(int index) { log.error("*********ATTENTION**** come to shifting ************"); int size = outBufArray.howmanyOutputBuffers(); if(index < 0 || index >= size) { log.error("the index " + index + "is wrong"); return false; } ConnectionContext cc; for(int i = 0; i < size ; i ++) { if(i != index) { if(!outBufArray.isOutputBufValid(i)) { log.info("the output " + i + "is invalid"); continue; } log.info("set the output " + i + "to be invalid"); outBufArray.setOutputBufStatus(i, false); //notify the downstream srv cc = outBufArray.getOutputBuffer(i).getConnectionContext(); StreamPortType streamPT = (StreamPortType)getStreamByName(cc.neighStreamHandle, DefConstants.DOWNSTREAM); try{ streamPT.halt(myHandle); log.info(myHandle + "is halting " + cc.neighStreamHandle ); } catch(RemoteException e) { log.error(e.getCause()); return false; } } } return true; } public void halt(String strUpstreamHandle) { int size = inBufArray.howmanyInputBuffers(); int i; for(i = 0; i < size ; i ++) { ConnectionContext cc = inBufArray.getInputBuffer(i).getConnectionContext(); if(strUpstreamHandle.equals(cc.neighStreamHandle)) { log.info("set input buffer " + i + " to be invalid"); inBufArray.setInputBufStatus(i, false); break; } } if(i == size) return; if(monitor != null) { String strUpstreamMonitorHandle = strUpstreamHandle.replaceFirst("StreamService", "StreamMonitorService"); try{ monitor.halt(strUpstreamMonitorHandle); } catch(RemoteException e) { log.error(e.getCause()); return; } } } public void netBandwidthNotification(double ffg_ave, double ffg_variance, double net_util, String strDownstreamHandle) { SelfSimilarSeqGenerator generator = new SelfSimilarSeqGenerator(ffg_ave, ffg_variance, DefConstants.SIZE_SELF_SIMILAR_ARRAY); double [] dTempArray = generator.generating(); /** Debug *//* int len = dTempArray.length; for(int i = 0; i < len; i ++) log.debug(i + ":" + dTempArray[i]); */ /** Debug */ outBufArray.setNetBandwidth(strDownstreamHandle, dTempArray, net_util); } public void setEnvironmentSimulatorStringHandle(String strHandle) { strEnvSimulatorHandle = strHandle; } public int setCurrentStage(int curStage, int curPlacement) { this.curStage = curStage; this.curPlacement = curPlacement; return curStage; } public int getCurrentStage() { return curStage; } public void setLoadWorkClassFlag(boolean flag) { ifNeedLoadWorkClass = flag; } public void addUpstreamHandle(String handle) { curUpstreamHandle = handle; } public int getSocketPort() { return localServerSocketPort; } public void startJob() { setStartFlag(); } public void finishJob() { setDoneFlag(); closeAllSock(); } private synchronized void setStartFlag() { bJobStart = true; } private synchronized void setDoneFlag() { bJobDone = true; } private void closeAllSock() { return; } public boolean tryInstance() { return true; } public void postCreate(GridContext context) throws GridServiceException { /*counterData = this.serviceData.create("CounterStatus"); counterData.setValue("initialized"); this.serviceData.add(counterData);*/ //super.postCreate(context);// myHandle = (String)(context.getServiceProperties()).getProperty(ServiceProperties.HANDLE);// log.debug("in postCreate and myHandle is "+myHandle);/* initServiceData(); specifyAccuracyData(); specifyPerformanceData(); */ } public void IdleTransmission() { } public void SocketConnected(ConnectionContext cc) { } // Empty callback methods public void preCreate(GridServiceBase base) throws GridServiceException { int i; } public void preDestroy(GridContext context) throws GridServiceException { int i; } public void activate(GridContext context) throws GridServiceException { int i; } public void deactivate(GridContext context) throws GridServiceException { int i; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -