📄 streammonitorimpl.java
字号:
return true; } catch(GridServiceException e) { log.error(e); return false; } catch(RemoteException e) { log.error(e); return false; } catch(IOException e) { log.error(e); return false; } } public void addUpstreamHandle(int upStage, int upPlacement, String handle) { log.debug("adding upstream info: Handle: " + handle + " upStage: " + upStage + " upPlacement" + upPlacement); StreamMonitorPortType monitor = getMonitorByName(handle, upStage, upPlacement, DefConstants.UPSTREAM); if(monitor == null) { log.error("can't get the monitor port type of " + handle); return; } SimConnectionContext scc = (SimConnectionContext)hashStreamMonitorHandles.get(handle); if(scc == null) { log.error("the connection context shouldn't be null"); return; } //Add a task thread for output buffer taskArray.addMonitorTask(scc); log.debug("successfully add upstream info"); } public void resetNumofNotifications(String strNeighborMonitorHandle) { if(strNeighborMonitorHandle == null) return; SimConnectionContext scc = (SimConnectionContext)hashStreamMonitorHandles.get(strNeighborMonitorHandle); if(scc == null) { log.error("can't find the connection context for " + strNeighborMonitorHandle); return; } scc.resetNumofNotifications(); } public void executing(String strNeighborMonitorHandle) { if(strNeighborMonitorHandle == null) { log.error("the handle of neighbour monitor is null"); return; } SimConnectionContext scc = (SimConnectionContext)hashStreamMonitorHandles.get(strNeighborMonitorHandle); if(scc == null) { log.error("can't find the connection context for " + strNeighborMonitorHandle); return; } scc.wakeupThread(); } //this is a recursive function //the return value is the location of the bottleneck public int isBottleneck(String strUpstreamMonitorHandle, int ith_stage, int in_or_out) { //if strUpstreamMonitorHandle is null, then it is called from itself ResMonitorTask/* if(strUpstreamMonitorHandle == null) { log.error("the upstream monitor handle is null"); return DefConstants.EXCEPTION; } *//* log.debug("come to check if the following buffer bottleneck *******"); log.debug("the upstream handle is :" + strUpstreamMonitorHandle); log.debug("from which stage:" + ith_stage + " and in_or_out buffer? " + in_or_out); */ if(curStage == numStages) //the last stage { //if the call is from upstream? if(ith_stage == numStages) // from myself return DefConstants.STAY; else return DefConstants.MOVE; } //some stage in the middle else { boolean needCheckInBuffer, needCheckOutBuffer; if(ith_stage < curStage) //Check both input and output buffer of mine needCheckInBuffer = needCheckOutBuffer = true; else if(ith_stage == curStage && in_or_out == DefConstants.IN_BUFFER) { needCheckInBuffer = false; needCheckOutBuffer = true; } else needCheckInBuffer = needCheckOutBuffer = false; //if curStage is at the same node as the downstream stage, don't check output buffer if(location.equals(downstream_location)) needCheckOutBuffer = false; //Check the input Buffer if(needCheckInBuffer) { SimConnectionContext scc_in = (SimConnectionContext)hashStreamMonitorHandles.get(strUpstreamMonitorHandle); if(scc_in == null) { log.error("can't get connection context of " + strUpstreamMonitorHandle); return DefConstants.EXCEPTION; } if(scc_in.isBufOverloaded()) { //log.debug("sorry for the stage " + ith_stage + ":" + location + ": my input buffer is overloaded"); return DefConstants.STAY; } } //check the output buffer if(needCheckOutBuffer) { SimConnectionContext scc_out = (SimConnectionContext)hashStreamMonitorHandles.get(strDownstreamMonitorHandle); if(scc_out == null) { log.error("can't get connection context of " + strDownstreamMonitorHandle); return DefConstants.EXCEPTION; } if(scc_out.isBufOverloaded()) { //log.debug("sorry for the stage " + ith_stage + ":" + location + ": my output buffer is overloaded"); return DefConstants.STAY; } } //now call the downstream try{ StreamMonitorPortType downMonitorStream = (StreamMonitorPortType)getMonitorByName(strDownstreamMonitorHandle, -1 , -1 , DefConstants.DOWNSTREAM); if(downMonitorStream == null) { log.error("can't get downstream's port type " + strDownstreamMonitorHandle); return DefConstants.EXCEPTION; } return downMonitorStream.isBottleneck(strMyHandle, ith_stage, DefConstants.IN_BUFFER); } catch(java.rmi.RemoteException e) { System.out.println(e); return DefConstants.EXCEPTION; } } } public void netBandwidthNotification(int bandwidth, double dNetworkUtil, String strDownstreamHandle) { if(curStage == numStages) return; SimConnectionContext scc = (SimConnectionContext)hashStreamMonitorHandles.get(strDownstreamHandle); if(scc == null) { log.debug("can't find the corresponding connection context for " + strDownstreamHandle); return; } else if(!scc.bValid) { log.debug(strDownstreamHandle + "is invalid already"); return; } //Seconds ---> double package_interval = (double)DefConstants.PACKAGE_INTERAL/(double)1000; //In fact ffg_ave is the mean packages per PACKAGE_INTERAL /* * M*PACKAGE_SIZE/PACKAGE_INTERAL = bandwidth*dNetworkUtil; */ double ffg_ave = (double)bandwidth*dNetworkUtil*package_interval/(double)DefConstants.PACKAGE_SIZE; double ffg_variance = 4.0 * ffg_ave; log.info("the network util is " + dNetworkUtil + "ffg_ave is " + ffg_ave); String strStreamHandle = strDownstreamHandle.replaceFirst("StreamMonitorService", "StreamService"); try{ log.debug("seting service's bandwidth"); streamSrv.netBandwidthNotification(ffg_ave, ffg_variance, dNetworkUtil, strStreamHandle); } catch(java.rmi.RemoteException e) { log.error(e); } } public boolean realMoveStreamSrv(String strNewConfig) { if(strNewConfig == null) return false; if(curStage == numStages) return false; setConfigInfo(strNewConfig); try{ streamSrv.setConfigInfo(strNewConfig); } catch(Exception e) { log.error(e); return false; } String strNextConnection = (String)XMLConfigurator.getParameter("stages|stage" + curStage +"|connection" + curPlacement); String [] places = strNextConnection.split(":"); int nextStage = Integer.parseInt(places[0].substring(5)); int nextPlacement = Integer.parseInt(places[1].substring(9)); URL urlHandle = (URL)XMLConfigurator.getParameter("stages|stage" + nextStage + "|placement" + nextPlacement); if(urlHandle == null) { log.error("can't find the downstream service of stage " + curStage + " and placment " + curPlacement); return false; } String newDownstreamHandle = urlHandle.toString(); String newDowstreamMonitorHandle = newDownstreamHandle.replaceFirst("StreamService", "StreamMonitorService") + "/stage" + nextStage + "-placement" + nextPlacement; log.debug("new downstream monitor:" + newDowstreamMonitorHandle); log.debug("cur downstream monitor:" + strDownstreamMonitorHandle); try{ if(newDowstreamMonitorHandle.equals(strDownstreamMonitorHandle)) { log.warn("in the new path, the new downstream " + newDownstreamHandle + "is the same as the old one"); StreamMonitorPortType downStream = (StreamMonitorPortType)getMonitorByName(strDownstreamMonitorHandle, downstream_stage, downstream_placement, DefConstants.DOWNSTREAM); return downStream.realMoveStreamSrv(strNewConfig); } else { String strDownstreamHandle = streamSrv.launchNextService(); if(strDownstreamHandle == null) { log.error("can't lanuch up the downstream service"); return false; } int nIndexOutBuf = streamSrv.connectToNextService(); if(nIndexOutBuf < 0) { log.error("can't connect to the downstream service"); return false; } else log.debug("the index of output buffer is " + nIndexOutBuf); if(!streamSrv.shiftToNextService(nIndexOutBuf)) { log.error("can't shift to the next service"); return false; } if(!streamSrv.runDownstreamWorkClass(strDownstreamHandle)) { log.error("can't run the downstream work class"); return false; } //We also need to contact the next Monitor by calling restart() restart(strDownstreamHandle); } } catch(Exception e) { log.error(e); return false; } return true; } public synchronized void setMigratable(boolean flag) { DefConstants.IF_DYNAMIC_ALLOCATION_APPLIED = flag; } //Called by ResMonitorTask public synchronized boolean tryMoveStreamSrv(int nKind) { if(!DefConstants.IF_DYNAMIC_ALLOCATION_APPLIED) return false; try{ if(nKind == 1) { //if current location is same as the downstream location //return false if(location.equals(downstream_location)) return false; //the network is bottleneck if(curStage == numStages) return false; EnvironmentSimulatorPortType env = getEnvironmentSimulatorHandle(); if(env == null) { log.warn("EnvironmentSimulatorPortType is null"); return false; } //try to find a new path// log.debug("trying to find a new path"); String strNewConfig = env.findNewPath(location, curStage, curPlacement); if(strNewConfig != null) { log.debug("new path in the config file is:" + strNewConfig); //the XMLConfigurator has been updated //find the connection infor of the current instance if(!realMoveStreamSrv(strNewConfig)) return false; } else if(curStage > 1) //not one of data sources { //get the first data source connected to the current try{ boolean bReturn = env.findFirstConnectedDataSourceMonitorandMove(location, curStage, curPlacement); return bReturn; } catch(Exception e) { return false; } } else { log.debug("can't find a new host"); return false; } } else { //the computation is the bottleneck } }catch(Exception e) { log.error(e); return false; } return true; } public void restart(String strNewDownstreamHandle) { if(strNewDownstreamHandle != null) { strDownstreamMonitorHandle = strNewDownstreamHandle.replaceFirst("StreamService", "StreamMonitorService"); //get downstream_stage and ... String [] places = strDownstreamMonitorHandle.split("/"); int length = places.length; String [] places1 = places[length - 1].split("-"); this.downstream_stage = Integer.parseInt(places1[0].substring(5)); this.downstream_placement = Integer.parseInt(places1[1].substring(9)); start(); } } public void halt(String strNeighstreamMonitorHandle) { taskArray.haltMonitorTask(strNeighstreamMonitorHandle); } public int getCurStage() { return curStage; }/* //? mill-seconds elapse public void actionPerformed(ActionEvent e) { if(!timer.equals(e.getSource())) log.warn("different timer"); envSimulatorSrv. } */ }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -