📄 streamserviceprovider.java
字号:
if(curStage != numStages) { connectToNextService(); } //Create a new instance of monitor service startMonitorService(); //Load the working class loadWorkClass(); return true; } catch(GridServiceException e) { log.error(e.getCause()); return false; } catch(RemoteException e) { log.error(e.getCause()); return false; } catch(IOException e) { log.error(e.getCause()); return false; } } public void over_loaded(ConnectionContext cc) { AutoFillInputBuffer in; AutoFillOutputBuffer out; int i; if(bJobDone || !bJobStart) return; //log.info(cc); if(cc.in_or_out == -1) { log.fatal("the context is not correct"); return; } if(cc.times_overloaded < 100) cc.times_overloaded ++; cc.window ++;// cc.window %= DefConstants.WINDOW_SIZE + 1; if(cc.window > DefConstants.WINDOW_SIZE) cc.window = DefConstants.WINDOW_SIZE; if(cc.in_or_out == DefConstants.IN_BUFFER) cc.misc_info = myHandle + "====Input Buffer====="; else cc.misc_info = myHandle + "====Output Buffer===="; cc.setLongTermLoadedFactor(); if(cc.in_or_out == DefConstants.IN_BUFFER) { //log.info(myHandle + "::the input buffer is overloaded:"+cc.loaded); /* if(bFileLog) fileLog.write(myHandle + "::the input buffer is overloaded:"+cc.loaded);*/ cc.misc_info = myHandle + ":the input buffer is overloaded and the long term loaded factor :"+cc.longTermLoadedFactor + "::" + cc.loaded + "---"; if(bFileLog) fileLog.write(cc.misc_info); if(cc.longTermLoadedFactor > DefConstants.LONGTERM_EXTREMELY_OVER_LOADED_THRESHOLD && DefConstants.IFSPECIFYPARA) //if(adjustAccuracyPara(cc.longTermLoadedFactor, cc) == REACHMAX) //reach the max value {//the input buffer is long term overloaded //call the upstream StreamPortType streamPT = (StreamPortType)getStreamByName(cc.neighStreamHandle, DefConstants.UPSTREAM); if(streamPT == null) { log.fatal("the stream handle is null"); return; } // cc.fillSimContext(simCc); try { //To the neighbour, I am a downstream //streamPT.neighOverloaded(DefConstants.DOWNSTREAM, simCc); streamPT.neighOverloaded(DefConstants.DOWNSTREAM); } catch(Exception e) { log.fatal(e); } //5/25/04 comments the following 'if' statement out //the purpose of doing this is to get buffer's running //condition even applications don't specify the papa, //so that monitor service can have idea about the system //conditions. /*if(!bIfSpecifiedPara) { cc.times_overloaded = 0; cc.times_lightloaded = 0; cc.window = 0; }*/ } //How many the input buffers are overloaded? int number = inBufArray.howmanyInputBuffers(); //if there is only one buffer in the buffer array if(number > 1 && inBufArray.isMajorityOverloaded()) { //at lease two buffer in the array //and the majority of the buffers are overloaded, //change the para.s. // adjustAccuracyPara(cc); } } else { //log.info(myHandle+"the output buffer is overloaded:"+cc.loaded); /*if(bFileLog) fileLog.write(myHandle+"the output buffer is overloaded:"+cc.loaded);*/ cc.misc_info = myHandle + ":the output buffer is overloaded and the long term loaded factor :"+cc.longTermLoadedFactor + "::" + cc.loaded + "---"; if(bFileLog) fileLog.write(cc.misc_info); //adjust the local parameters //the reason is to try to locally //solve the traffic if(cc.longTermLoadedFactor > DefConstants.LONGTERM_EXTREMELY_OVER_LOADED_THRESHOLD && DefConstants.IFSPECIFYPARA) //if(adjustAccuracyPara(cc) == REACHMAX) //reach the max value { //call the downstream StreamPortType streamPT = (StreamPortType)getStreamByName(cc.neighStreamHandle, DefConstants.DOWNSTREAM); if(streamPT == null) { log.fatal("the stream handle is null"); return; }// cc.fillSimContext(simCc); try { //streamPT.neighOverloaded(DefConstants.UPSTREAM, simCc); streamPT.neighOverloaded(DefConstants.UPSTREAM); } catch(Exception e) { log.fatal(e); } //5/25/04 comments the following 'if' statement out //the purpose of doing this is to get buffer's running //condition even applications don't specify the papa, //so that monitor service can have idea about the system //conditions. /*if(!bIfSpecifiedPara) { cc.times_overloaded = 0; cc.times_lightloaded = 0; cc.window = 0; }*/ //DO I need to flip accuracyParaTake??????????????? //here I can get the notification from the upstream that the corresponding para has been changed } } } public void light_loaded(ConnectionContext cc) { AutoFillInputBuffer in; AutoFillOutputBuffer out; int i; if(bJobDone || !bJobStart) return; //log.info(cc); if(cc.in_or_out == -1) { log.fatal("the context is not correct"); return; } if(cc.times_lightloaded < 100) cc.times_lightloaded ++; cc.window --; if(cc.window < -(DefConstants.WINDOW_SIZE)) cc.window = -(DefConstants.WINDOW_SIZE);// cc.window %= DefConstants.WINDOW_SIZE + 1; cc.misc_info = myHandle; cc.setLongTermLoadedFactor(); if(cc.in_or_out == DefConstants.IN_BUFFER) { //log.info("the input buffer is lightloaded"); //log.info(myHandle+"the input buffer is light loaded:" + cc.loaded); cc.misc_info = myHandle + ":the input buffer is lightloaded and the long term loaded factor :"+cc.longTermLoadedFactor + "::" + cc.loaded + "---"; if(bFileLog) fileLog.write(cc.misc_info); if(cc.longTermLoadedFactor < DefConstants.LONGTERM_EXTREMELY_LIGHT_LOADED_THRESHOLD && DefConstants.IFSPECIFYPARA) { StreamPortType streamPT = (StreamPortType)getStreamByName(cc.neighStreamHandle, DefConstants.UPSTREAM); if(streamPT == null) { log.fatal("the stream handle is null"); return; } // cc.fillSimContext(simCc); try { //To the neighbour, I am a downstream streamPT.neighLightloaded(DefConstants.DOWNSTREAM); } catch(Exception e) { log.fatal(e); } //5/25/04 comments the following 'if' statement out //the purpose of doing this is to get buffer's running //condition even applications don't specify the papa, //so that monitor service can have idea about the system //conditions. /*if(!bIfSpecifiedPara) { cc.times_overloaded = 0; cc.times_lightloaded = 0; cc.window = 0; }*/ } //How many the input buffers are lightloaded? int number = inBufArray.howmanyInputBuffers(); //if there is only one buffer in the buffer array if(number >1 && inBufArray.isMajorityLightloaded()) { //At lease two buffers in the array //the majority of the buffers are lightly loaded, //change the para.s. //adjustAccuracyPara(1, cc); } } else //Output buffer { //log.info(myHandle+ ":the output buffer is lightly loaded:"+cc.loaded); cc.misc_info = myHandle + ":the output buffer is lightloaded and the long term loaded factor :"+cc.longTermLoadedFactor + "::" + cc.loaded + "---"; if(bFileLog) fileLog.write(cc.misc_info); //adjust the local parameters //the reason is to try to locally //solve the traffic if(cc.longTermLoadedFactor < DefConstants.LONGTERM_EXTREMELY_LIGHT_LOADED_THRESHOLD && DefConstants.IFSPECIFYPARA) //if(adjustAccuracyPara(cc) == REACHMIN) //reach the min value { //call the downstream StreamPortType streamPT = (StreamPortType)getStreamByName(cc.neighStreamHandle, DefConstants.DOWNSTREAM); if(streamPT == null) { log.fatal("the stream handle is null"); return; }// cc.fillSimContext(simCc); try { //streamPT.neighOverloaded(DefConstants.UPSTREAM, simCc); streamPT.neighLightloaded(DefConstants.UPSTREAM); } catch(Exception e) { log.fatal(e); } //5/25/04 comments the following 'if' statement out //the purpose of doing this is to get buffer's running //condition even applications don't specify the papa, //so that monitor service can have idea about the system //conditions. /*if(!bIfSpecifiedPara) { cc.times_overloaded = 0; cc.times_lightloaded = 0; cc.window = 0; }*/ } } } public void neighLightloaded(int direction) { int i; //5/25/04 comments the following 'if' statement out //the purpose of doing this is to get buffer's running //condition even applications don't specify the papa, //so that monitor service can have idea about the system //conditions. //if(!ifLoadedWorkClass || !bIfSpecifiedPara) if(!ifLoadedWorkClass) return; // log.debug(myHandle + "come to neighLightLoaded"); boolean bAdjustPara = false; ConnectionContext cc = new ConnectionContext(); if(direction == DefConstants.UPSTREAM) {//upstream lightloaded upstream_lightloaded_times ++; inBufArray.adjustRtnBufSizeTimes(cc);/* cc.misc_info = "upstream lightloaded"; if(!inBufArray.isMajorityOverloaded()) adjustAccuracyPara(1, cc.misc_info); //Tune it slowly */ } else //downstream lightloaded { downstream_lightloaded_times ++; downstream_loaded_times_window --; outBufArray.adjustRtnBufSizeTimes(cc); cc.misc_info = "downstream lightloaded";/* if(outBufArray.isMajorityOverloaded()) adjustAccuracyPara(-1, cc.misc_info); //Tune it fastly*/ } } public void neighOverloaded(int direction) { int i; //5/25/04 comments the following 'if' statement out //the purpose of doing this is to get buffer's running //condition even applications don't specify the papa, //so that monitor service can have idea about the system //conditions. //if(!ifLoadedWorkClass || !bIfSpecifiedPara) if(!ifLoadedWorkClass) return; // log.debug(myHandle + "come to neighOverLoaded"); boolean bAdjustPara = false; ConnectionContext cc = new ConnectionContext(); if(direction == DefConstants.UPSTREAM) {//upstream overloaded upstream_overloaded_times ++; inBufArray.adjustRtnBufSizeTimes(cc); cc.misc_info = "upstream overloaded"; /* if(inBufArray.isMajorityOverloaded()) adjustAccuracyPara(-1, cc.misc_info); //Tune it fastly*/ } else {//downstream overloaded downstream_overloaded_times ++; downstream_loaded_times_window ++; outBufArray.adjustRtnBufSizeTimes(cc); cc.misc_info = "downstream overloaded";/* if(!outBufArray.isMajorityOverloaded()) adjustAccuracyPara(1, cc.misc_info); //Tune it slowly */ } } public void handleBottleneck(ConnectionContext cc) { if(cc.neighStreamHandle == null) return; String strMonHandle = cc.neighStreamHandle.replaceFirst("StreamService", "StreamMonitorService");/* if(cc.getDegradingFlag()) //Degrading from CRITICAL to SEVERE { //changing the flag in the SimConnectionContext of the monitor try{ monitor.resetNumofNotifications(strMonHandle); }catch(java.rmi.RemoteException e) { log.error(e); } cc.setDegradingFlag(false); log.info("the traffic of the buffer " + cc.in_or_out + " degrade "); } */ if(cc.severity == DefConstants.LV_CRITICAL) {// log.info("the traffic of the buffer " + cc.in_or_out + " turns to CRITICAL"); try{ monitor.executing(strMonHandle); } catch(java.rmi.RemoteException e) { log.error(e.getCause()); } } else if(cc.severity == DefConstants.LV_SEVERE) { if(!cc.getCalledFlag()) { try{ monitor.executing(strMonHandle); } catch(java.rmi.RemoteException e) { log.error(e.getCause()); } cc.setCalledFlag(true); } } } public int howmanyTimesBottleneck(boolean bFlush) { int temp; synchronized(synBottleneck) { temp = intBottleneckTimes; if(bFlush && intBottleneckTimes != 0) intBottleneckTimes = 0; } return temp; } /* private boolean isBottleneck() { boolean bBufOverLoaded = false; //How many the input buffers are overloaded? int number = inBufArray.howmanyInputBuffers(); //if there is only one buffer in the buffer array int intOk, i; for(i = 0, intOk = 0; i < number ; i++) if(inBufArray.getInputBuffer(i).getConnectionContext().longTermLoadedFactor > DefConstants.LONGTERM_EXTREMELY_OVER_LOADED_THRESHOLD) intOk ++; if(intOk >= number/2 + 1) bBufOverLoaded = true; else bBufOverLoaded = false; if(bBufOverLoaded) { Double doublePfmUnit = new Double(pfmParaUnit); Double doubleAccuracyUnit = new Double(accuracyParaUnit); if(Double.compare(pfmParaUnit, 0.0) > 0) { //the performance para has been specified if(Double.compare(pfmPara, pfmParaMax) >= 0) //the processing rate has been tuned to be fastest return true; else return false; } else if(Double.compare(accuracyParaUnit, 0.0) >= 0) { if(Double.compare(accuracyPara, accuracyParaMin) <= 0) //the processing rate has been tuned to be fastest return true; else return false; } else { double tempLamda = calculateLamda(true); if(Double.compare(tempLamda, 0.9) >=0) return true; else return false; } } else return false; } */ public String setConfigFile(String add) { try { cfgURI[0] = add; XMLConfigurator.init(cfgURI); hashTree = new Hashtable(); Utilities.constructTree(hashTree); } catch(Exception e) { log.error(e); } return add; } public String setConfigInfo(String info) { try { strConfigInfo = info; log.error("**********"+info); byte [] tempBytes = info.getBytes(); ByteArrayInputStream byteIn = new ByteArrayInputStream(tempBytes); XMLConfigurator.init(byteIn); hashTree = new Hashtable(); Utilities.constructTree(hashTree); } catch(Exception e) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -