📄 streamserviceprovider.java
字号:
bPfmParaTaken = true; return pfmPara; } adjustPfmPara(lamda, misc_info); return pfmPara; } protected synchronized int adjustPfmPara(double lamda, String debug_info) { pfmPara += pfmParaUnit * lamda; if(Double.compare(pfmPara, pfmParaMin) < 0) { pfmPara = pfmParaMin; if(bFileLog) fileLog.write(debug_info + ":in adjustPfmPara and pfmPara:" + pfmPara +"::lamda:"+lamda); return REACHMIN; } else if(Double.compare(pfmPara, pfmParaMax) > 0) { pfmPara = pfmParaMax; if(bFileLog) fileLog.write(debug_info + ":in adjustPfmPara and pfmPara:" + pfmPara +"::lamda:"+lamda); return REACHMAX; } if(bFileLog) fileLog.write(debug_info + ":in adjustPfmPara and pfmPara:" + pfmPara +"::lamda:"+lamda); return SUCCESS; } protected synchronized void setAccuracyPara(double para) { if(para < accuracyParaMin || para > accuracyParaMax) { log.warn("can't set the accurate parameter: the value beyond the region"); return; } this.accuracyPara = para; this.bAccuracyParaTaken = false; } protected synchronized int adjustAccuracyPara(double lamda, String debug_info) {/* if(!bAccuracyParaTaken) return NOTAKEN; */ accuracyPara += accuracyParaUnit * lamda; if(Double.compare(accuracyPara, accuracyParaMin) < 0) { accuracyPara = accuracyParaMin; if(bFileLog) fileLog.write(debug_info + ":in adjustAccuracyPara and accuracyPara:" + accuracyPara +"::lamda:"+lamda); return REACHMIN; } else if(Double.compare(accuracyPara, accuracyParaMax) > 0) { accuracyPara = accuracyParaMax; if(bFileLog) fileLog.write(debug_info + ":in adjustAccuracyPara and accuracyPara:" + accuracyPara +"::lamda:"+lamda); return REACHMAX; } if(bFileLog) fileLog.write(debug_info + ":in adjustAccuracyPara and accuracyPara:" + accuracyPara +"::lamda:"+lamda); return SUCCESS; } /* protected synchronized int increaseAccuracyPara(double unit, ConnectionContext cc) { double temp = accuracyParaUnit*unit; temp += accuracyPara; if(Double.compare(temp, accuracyParaMax) > 0) { accuracyPara = accuracyParaMax; if(bFileLog) fileLog.write(cc.misc_info +" --increase para ---:"+ accuracyPara); return REACHMAX; } else { accuracyPara = temp; if(bFileLog) fileLog.write(cc.misc_info +" --increase para ---:"+ accuracyPara); return SUCCESS; //increase the value } } protected synchronized int decreaseAccuracyPara(double unit, ConnectionContext cc) { double temp = accuracyParaUnit*unit; temp = accuracyPara - temp; if(Double.compare(temp, accuracyParaMin) < 0) { accuracyPara = accuracyParaMin; if(bFileLog) fileLog.write(cc.misc_info +" --decrease para --:"+ accuracyPara); return REACHMIN; } else { accuracyPara = temp; if(bFileLog) fileLog.write(cc.misc_info +" --decrease para --:"+ accuracyPara); return SUCCESS; //decrease the value } } */ protected synchronized void setpfmPara(double para) { if(para < pfmParaMin || para > pfmParaMax) { log.warn("can't set the performance parameter: the value beyond the region"); return; } this.pfmPara = para; this.bPfmParaTaken = false; } public void setSocketPort(int port) { localServerSocketPort = port; remoteServerSocketPort = port + 1; } public int addSocketServer() { /*int number = inBufArray.howmanyInputBuffers(); int i = 0; int temp, max; max = -1; while(i < number) { temp = inBufArray.getInputBuffer(i).getConnectionContext().rcverPort; if(temp == port) port = localServerSocketPort; i++; }*/ ConnectionContext cc = new ConnectionContext(); cc.iStep = curStage; if(curStage > 1)// cc.neighStream = upStream; //represents for itself cc.neighStreamHandle = curUpstreamHandle; //represents for itself cc.strId = myHandle; //cc1.strId = GSH.toString() + "?" + cc1.rcverPort; //cc1.strId = (String)stream.getProperty(ServiceProperties.HANDLE); //The socket port was set by the prior instance// cc.rcverPort = port; cc.rcverPort = localServerSocketPort; int index = inBufArray.addInputBuffer(cc); log.debug("the index of the auto input buffer is " + index); if(index < 0) { log.error("can't add a input buffer and a socket server"); return -1; } //Update localServerSocketPort. //Why 10, not 1? In case confliction of ports. //localServerSocketPort += 10; return 1; } public boolean runDownstreamWorkClass(String strDownstreamHandle) { if(strDownstreamHandle == null) { log.debug("the downstream handle is null"); return true; } try{ StreamPortType downstream = (StreamPortType)getStreamByName(strDownstreamHandle, DefConstants.DOWNSTREAM); if(!downstream.runWorkClass()) { log.error("can't let the downstream's work class run"); return false; } return true; } catch(Exception e) { log.error(e); return false; } } public boolean runWorkClass() { if(isWorkClassRunning()) return true; else if(!ifLoadedWorkClass) return false; log.info("starting to run class"); try{ if(curStage != numStages) { boolean flagLoad = needLoadWorkClass(numStages, fakeNumStages); log.debug("need to run the downstream's class?" + flagLoad); if(flagLoad) { if(!runDownstreamWorkClass(curDownstreamHandle)) log.error("can't let the downstream's work class run"); } } tc.startProcessing(); setWorkClassRunningFlag(true); return true; } catch(Exception e) { log.error(e); return false; } } private synchronized void setWorkClassRunningFlag(boolean flag) { ifWorkClassRunning = flag; } public synchronized boolean isWorkClassRunning() { return ifWorkClassRunning; } public boolean loadWorkClass() { if(ifLoadedWorkClass) return true; if(!ifNeedLoadWorkClass) return false; log.info("starting to load class"); //Load the working class try{ String strClassLocation = "stages|" + "stage" + (curStage) + "|class|location"; URL[] urlCP = {(URL)XMLConfigurator.getParameter(strClassLocation)}; log.debug(urlCP[0]); ClassLoader classLoader = new URLClassLoader(urlCP); Class work = null; //Now to retrieve the class you only need to do the following: String strClassName = "stages|" + "stage" + (curStage) + "|class|class_name"; String strClass = (String)XMLConfigurator.getParameter(strClassName); log.debug(strClass); work = classLoader.loadClass(strClass); log.debug(work); StreamProcessor streamProcessor = (StreamProcessor)work.newInstance(); log.info("The name of the class loaded is " + streamProcessor.getClass().getName()); tc = new ThreadContainer(this); // tc.startProcessing(streamProcessor); tc.initProcessing(streamProcessor); ifLoadedWorkClass = true; return true; } catch(ClassNotFoundException e) { log.error(e); return false; } catch(InstantiationException e) { log.error(e.getCause()); return false; } catch(IllegalAccessException e) { log.error(e.getCause()); return false; } catch(Exception e) { log.error(e.getCause()); return false; } } private boolean isServiceInstanceExist(String handle) { Object streamPortType1, streamPortType2; //Step 1 streamPortType1 = hashUpstreams.get(handle); streamPortType2 = hashDownstreams.get(handle); if(streamPortType1 != null || streamPortType2 != null) return true; else return false; //Step 2/* try{ StreamServiceGridLocator locator = new StreamServiceGridLocator(); streamPortType = locator.getStreamService(new URL(handle)); if(streamPortType != null) { hashStreamHandles.put(handle, streamPortType); return true; } } catch(Exception e) { log.info(e); return false; //Do nothing here, skip to the next step }*/ } private StreamPortType getStreamByName(String handle, int direction) { boolean [] tempBool = new boolean[1]; return getStreamByName(handle, direction, tempBool); } //The function will //1. Check if the hash table has a web service instance corresponding // to the handle //2. Check if the corresponding web service has an instance, if // the instance exists, add the pair to hash table //3. create a new instance, add the pair to hash table private StreamPortType getStreamByName(String handle, int direction, boolean [] isInsExist) { StreamPortType streamPortType; Hashtable hashStreamHandles;// log.debug(handle+direction); if(direction == DefConstants.UPSTREAM) hashStreamHandles = hashUpstreams; else hashStreamHandles = hashDownstreams; //Step 1 streamPortType = (StreamPortType)hashStreamHandles.get(handle); if(streamPortType != null) { isInsExist[0] = true; return streamPortType; } //Step 2 try{ StreamServiceGridLocator locator = new StreamServiceGridLocator(); streamPortType = locator.getStreamService(new URL(handle)); if(streamPortType != null) { streamPortType.tryInstance(); org.apache.axis.client.Stub s = (Stub) streamPortType; s.setTimeout(500000); // 1 second, in miliseconds hashStreamHandles.put(handle, (Object)streamPortType);// log.debug(handle + "instance exist!!!! "); isInsExist[0] = true; return streamPortType; } } catch(FaultType e) { log.error(e); //Do nothing here, skip to the next step } catch(GridServiceException e) { log.error(e); } catch(RemoteException e) { log.error(e); } catch(MalformedURLException e) { log.error(e); } //Step 3: create a web service instance //Get placement and instance name isInsExist[0] = false; int lastIndex = handle.lastIndexOf('/'); log.debug("lastIndex of //" + lastIndex); if(lastIndex == -1) return null; String strGSH = handle.substring(0, lastIndex); String instanceName = handle.substring(lastIndex + 1); log.debug("GSH is " + strGSH); log.debug("instance name is "+ instanceName); //Create the first service // Get a reference to the StreamService Factory try{ OGSIServiceGridLocator gridLocator = new OGSIServiceGridLocator(); Factory factory = gridLocator.getFactoryPort(new URL(strGSH)); log.debug(factory); GridServiceFactory streamFactory = new GridServiceFactory(factory); // Create a new StreamService instance and get a reference to its to its Stream PortType// LocatorType locator = streamFactory.createService(content); //log.debug(content); LocatorType locator = streamFactory.createService(instanceName); log.debug(locator); StreamServiceGridLocator streamLocator = new StreamServiceGridLocator(); streamPortType = streamLocator.getStreamService(locator); org.apache.axis.client.Stub s = (Stub) streamPortType; s.setTimeout(500000); // 1 second, in miliseconds log.debug(streamPortType); //Add it to hashtable if(streamPortType != null) hashStreamHandles.put(handle, (Object)streamPortType); return streamPortType; } catch(Exception e) { log.error(e); //Do nothing here, skip to the next step return null; } } private boolean needLoadWorkClass(int numStages, int fakeNumStages) { log.debug("numStages:" + numStages+" "+curPlacement +" " + curStage); if(curStage <= fakeNumStages && curStage != numStages ) { String strConnection= (String)XMLConfigurator.getParameter("stages|stage" +curStage+"|connection"+curPlacement); String strMyPlacement = "stage" + curStage + ":placement" + curPlacement; log.debug(strConnection); log.debug(strMyPlacement); DefaultMutableTreeNode parent = (DefaultMutableTreeNode)hashTree.get(strConnection); DefaultMutableTreeNode myNode = (DefaultMutableTreeNode)hashTree.get(strMyPlacement); if(parent == null || myNode == null) log.error("the configuration information is not right"); Integer orderChild = (Integer)myNode.getUserObject(); if(parent.getChildCount() == orderChild.intValue()) return true; else return false; } else return false; } public boolean start() { try{ //All configuration information is in XMLConfigurator log.debug("*********************"+curStage+"***********************"); numStages = ((Integer)XMLConfigurator.getParameter("numStages")).intValue(); fakeNumStages = ((Integer)XMLConfigurator.getParameter("fakeNumStages")).intValue(); log.debug("number of stages is " + numStages); log.debug("fake number of stages is " + fakeNumStages); numPlacements = ((Integer)XMLConfigurator.getParameter("stages|" + "stage"+curStage+"|numPlacements")).intValue(); nDataSources = ((Integer)XMLConfigurator.getParameter("stages|stage1|numPlacements")).intValue(); bFileLog = ((Boolean)XMLConfigurator.getParameter("filelogging")).booleanValue(); log.debug("number of placements is " + numPlacements);// int curStage = ((Integer)XMLConfigurator.getParameter("__currentStage")).intValue(); //get "myHandle" here instead of in postCreate() //because in postCreate(), the IP of "myHandle" is the real ip address //of the service. We want to use its name as its ip address for EnviornmentService myHandle = ((URL)XMLConfigurator.getParameter("stages|stage"+ curStage+ "|placement"+ curPlacement)).toString(); myHandle = myHandle + "/"+ "stage" + curStage + "-placement"+curPlacement; log.debug("myHandle is "+myHandle); myInstanceName = "stage" + curStage + "-placement"+curPlacement; Date tempTime = new Date(); if(bFileLog) { String myLogFileName = myInstanceName + tempTime.getTime(); fileLog = new FileLog(myLogFileName); } if(curStage == numStages) { //This is the last stage in the pipeline, //just output to stdout } else if(curStage <= fakeNumStages) { launchNextService(); } if(curStage != 1) addSocketServer(); //estabish the socket connections to the following instance
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -