📄 streamserviceprovider.java
字号:
package org.osu.ogsa.stream.services;import org.osu.ogsa.stream.util.*;import org.osu.ogsa.stream.util.xmlconfig.*;import org.osu.ogsa.stream.Stream.StreamServiceGridLocator;import org.osu.ogsa.stream.Stream.StreamPortType;import org.osu.ogsa.stream.Stream.StreamServiceLocator;import org.osu.ogsa.stream.StreamMonitor.StreamMonitorServiceGridLocator;import org.osu.ogsa.stream.StreamMonitor.StreamMonitorPortType;import org.osu.ogsa.stream.StreamMonitor.StreamMonitorServiceLocator;import org.osu.ogsa.stream.EnvironmentSimulator.EnvironmentSimulatorPortType;import org.osu.ogsa.stream.EnvironmentSimulator.EnvironmentSimulatorServiceGridLocator;import org.osu.ogsa.stream.EnvironmentSimulator.EnvironmentSimulatorService;import org.osu.ogsa.stream.EnvironmentSimulator.EnvironmentSimulatorServiceLocator;import java.rmi.RemoteException;import java.io.*;import java.nio.*;import java.nio.channels.*;import java.nio.channels.spi.*;import java.nio.charset.*;import java.net.*;import java.util.*;import java.rmi.RemoteException;import javax.swing.tree.DefaultMutableTreeNode;import org.globus.ogsa.impl.ogsi.GridServiceImpl;import org.globus.ogsa.GridContext;import org.globus.ogsa.GridServiceException;import org.globus.ogsa.utils.AnyHelper;import org.globus.ogsa.OperationProvider;import org.globus.ogsa.GridServiceCallback;import org.globus.ogsa.GridServiceBase;import org.globus.ogsa.ServiceProperties;import org.gridforum.ogsi.ExtensibilityType;import org.gridforum.ogsi.GridService;import org.gridforum.ogsi.HandleType;import org.gridforum.ogsi.OGSIServiceGridLocator;import org.gridforum.ogsi.ServiceDataValuesType;import org.gridforum.ogsi.FaultType;import org.gridforum.ogsi.Factory;import org.gridforum.ogsi.LocatorType;import org.globus.ogsa.utils.GridServiceFactory;import org.apache.log4j.*;import org.apache.axis.client.Stub;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import javax.xml.namespace.QName;//BufferNotification interface includes "over_loaded" and light_loaded"//OperationProvide interface includes "initialized"//GridServiceCallback includes "pre, postcreate, activate and deactivate"public class StreamServiceProvider extends Object implements BufferNotification, OperationProvider, GridServiceCallback {// UserDefServiceData defServiceData = new UserDefServiceData(); int localServerSocketPort = DefConstants.SOCKET_PORT; int remoteServerSocketPort = localServerSocketPort + 1; private Hashtable hashUpstreams = new Hashtable(DefConstants.MAX_NUM_UPSTREAM); private Hashtable hashDownstreams = new Hashtable(DefConstants.MAX_NUM_DOWNSTREAM); private Hashtable hashTree = new Hashtable(); private OGSIServiceGridLocator gridLocator = new OGSIServiceGridLocator(); private StreamServiceGridLocator streamLocator = new StreamServiceGridLocator(); public AutoFillInputBufferArray inBufArray = new AutoFillInputBufferArray(); public AutoFillOutputBufferArray outBufArray = new AutoFillOutputBufferArray();// private static Category log = Category.getInstance(StreamServiceProvider.class.getName()); private static Log log = LogFactory.getLog(StreamServiceProvider.class.getName()); private FileLog fileLog; //For loading and running work class private boolean bJobStart = false; private boolean bJobDone = false; private boolean ifNeedLoadWorkClass = true; private boolean ifLoadedWorkClass = false; private boolean ifWorkClassRunning = false; private ThreadContainer tc = null; private boolean bIfSpecifiedPara = false; private boolean bFileLog;// private boolean isInsExist = false; //Accuracy para. private boolean bAccuracyParaTaken = false; private double accuracyPara; private double accuracyParaMin; private double accuracyParaMax; private double accuracyParaUnit = -1; //Perfomance Para. private boolean bPfmParaTaken = false; private double pfmPara ; private double pfmParaMin ; private double pfmParaMax; private double pfmParaUnit = -1; private double inputbuffer_factor; //Constants for accuracy and performance para. private static int NOTAKEN = -1; private static int REACHMIN = 0; private static int REACHMAX = 0; private static int SUCCESS = 1; private static double ZERO = 0.001; private static int HISTORY = 3; private Random rand; private Random rand1; private Random rand2; private Random rand3; //variable for accuracy and performance paras private int downstream_overloaded_times = 0; private int downstream_lightloaded_times = 0; private int downstream_loaded_times_window = 0; private double temp1_array[] = new double[HISTORY]; private int overloaded_times_array[] = new int[HISTORY]; private int lightloaded_times_array[] = new int[HISTORY]; private int point = 0; private int n = 0; private double temp1Factor; private double overloaded_times_factor ; private double lightloaded_times_factor; private double recordMaxTemp1, recordMinTemp1; private int recordMaxOverLoadedTimes, recordMaxLightLoadedTimes; private int upstream_overloaded_times = 0; private int upstream_lightloaded_times = 0; private StreamPortType downStream = null; private String curDownstreamHandle = null; private StreamPortType upStream = null; private String curUpstreamHandle = null; public String myHandle = null; public String myInstanceName = null; public int curStage = -1; public int curPlacement = -1; private int numStages, fakeNumStages, numPlacements; private String cfgURI[] = new String[1]; private String strConfigInfo = null; private String misc_info; //paras for monitor private int intBottleneckTimes = 0; private Object synBottleneck; private String strMonitorHandle = null; private String strEnvSimulatorHandle = null; private StreamMonitorPortType monitor = null; //for applications public int nDataSources = -1;// private SimConnectionContext simCc = new SimConnectionContext(); //abstract private initServiceData(); //abstract private specifyAccuracyData(); //abstract private specifyPerformanceData(); //public StreamService(){} //Internal defined class for storing services' handle and Id /*private class HandleId{ String strHandle; String strId; public HandleId(String strHandle, String strId) { this.strHandle = strHandle; this.strId = strId; } }*/ //Standard staff for the grid provider // Operation provider properties private static final QName[] operations = new QName[]{new QName("", "*")}; private GridServiceBase base;// private XMLConfigurator config = null; byte [] buffer = new byte[1000]; // Operation Provider methods public void initialize(GridServiceBase base) throws GridServiceException { this.base = base; /* ExtensibilityType extension = (ExtensibilityType)this.base.getProperty("__customerProperty"); log.debug("instance:[initialize] : extension" + extension); config = (XMLConfigurator)AnyHelper.getAsSingleObject(extension);*/ //init two arrays inBufArray.setNotification(this); outBufArray.setNotification(this); Date forRand = new Date(); rand = new Random(forRand.getTime()); rand1 = new Random(forRand.getTime() + 10); rand2 = new Random(forRand.getTime() + 100); rand3 = new Random(forRand.getTime() + 1000); synBottleneck = new Object(); } public QName[] getOperations() { return operations; }/* public StreamServiceProvider(String strTitle) { super(String strTitle); for(int i = 0; i < in.length ; i ++) in[i] = null; for(int i = 0; i < out.length; i ++) out[i] = null; } */ public void specifyAccuracyPara(double accPara, double min, double max, double unit) { bIfSpecifiedPara = true; this.accuracyPara = accPara; this.accuracyParaMin = min; this.accuracyParaMax = max; if(unit <= ZERO) this.accuracyParaUnit = (max - min)/20.0; else this.accuracyParaUnit = unit; this.bAccuracyParaTaken = false; } public void specifyPerformancePara(double pfmPara, double min, double max, double unit) { this.bIfSpecifiedPara = true; this.pfmPara = pfmPara; this.pfmParaMin = min; this.pfmParaMax = max; if(unit <= ZERO) this.pfmParaUnit = (max - min)/20.0; else this.pfmParaUnit = unit; this.bPfmParaTaken = false; inputbuffer_factor = DefConstants.INPUTBUFFER_FACTOR; for(int i = 0; i < HISTORY; i ++) { temp1_array[i] = -2; overloaded_times_array[i] = 0; lightloaded_times_array[i] = 0; } recordMaxTemp1 = -2; recordMinTemp1 = 0; recordMaxOverLoadedTimes = -1; recordMaxLightLoadedTimes = -1; } private double calculateLamda(boolean isPfmPara) { double lamda, temp1, temp2, part1, part2; //How many the input buffers are overloaded? int inNumber = inBufArray.howmanyInputBuffers();// int outNumber = outBufArray.howmanyOutputBuffers(); int i; temp1 = 0.0; temp2 = 0.0; for(i = 0; i < inNumber; i ++) temp1 += inBufArray.getInputBuffer(i).getConnectionContext().longTermLoadedFactor; if(inNumber > 0) temp1 = temp1/(double)inNumber; if(recordMaxOverLoadedTimes < downstream_overloaded_times) recordMaxOverLoadedTimes = downstream_overloaded_times; if(recordMaxLightLoadedTimes < downstream_lightloaded_times) recordMaxLightLoadedTimes = downstream_lightloaded_times; double maxTemp1, minTemp1, aveTemp1; int max_overloaded_times, min_overloaded_times, ave_overloaded_times; int max_lightloaded_times, min_lightloaded_times, ave_lightloaded_times; int max_overloaded_times_index, max_lightloaded_times_index, maxTemp1Index; double sqrTempDiff,sqrTempDiff1, sqrOverLoadedDiff1, sqrLightLoadedDiff1; maxTemp1 = -1.1; minTemp1 = 1.1; aveTemp1 = 0; max_overloaded_times = -10000; min_overloaded_times = 10000; ave_overloaded_times = 0; max_lightloaded_times = -10000; min_lightloaded_times = 10000; ave_lightloaded_times = 0; max_overloaded_times_index = -1; max_lightloaded_times_index = -1; maxTemp1Index = -1; sqrTempDiff = 0; sqrTempDiff1 = 0; sqrOverLoadedDiff1 = 0; sqrLightLoadedDiff1 = 0; if(n == 0 || n == 1) { temp1Factor = 1; overloaded_times_factor = 1; lightloaded_times_factor = 1; } else { for(i = 0; i < n; i ++) { if(minTemp1 > temp1_array[i]) minTemp1 = temp1_array[i]; if(maxTemp1 < temp1_array[i]) { maxTemp1 = temp1_array[i]; maxTemp1Index = i; } if(min_overloaded_times > overloaded_times_array[i]) min_overloaded_times = overloaded_times_array[i]; if(max_overloaded_times < overloaded_times_array[i]) { max_overloaded_times = overloaded_times_array[i]; max_overloaded_times_index = i; } if(min_lightloaded_times > lightloaded_times_array[i]) min_lightloaded_times = lightloaded_times_array[i]; if(max_lightloaded_times < lightloaded_times_array[i]) { max_lightloaded_times = lightloaded_times_array[i]; max_lightloaded_times_index = i; } aveTemp1 += temp1_array[i]; sqrTempDiff += (temp1 - temp1_array[i])*(temp1 - temp1_array[i]); ave_overloaded_times += overloaded_times_array[i]; ave_lightloaded_times += lightloaded_times_array[i]; } double ave1 = (aveTemp1 + temp1)/(n+1); double ave2 = (double)(ave_overloaded_times + downstream_overloaded_times)/(double)(n+1); double ave3 = (double)(ave_lightloaded_times + downstream_lightloaded_times)/(double)(n+1); double semi_sum = 0; double semi_sum2 = 0; double semi_sum3 = 0; for(i = 0 ; i < n; i ++) { semi_sum += (temp1_array[i] - ave1)*(temp1_array[i]-ave1); semi_sum2 += (overloaded_times_array[i] - ave2)*(overloaded_times_array[i] - ave2); semi_sum3 += (lightloaded_times_array[i] - ave3)*(lightloaded_times_array[i] - ave3); } semi_sum += (temp1 - ave1)*(temp1 - ave1); semi_sum2 += (downstream_overloaded_times - ave2)*(downstream_overloaded_times - ave2); semi_sum3 += (downstream_lightloaded_times - ave3)*(downstream_lightloaded_times - ave3); sqrTempDiff1 = semi_sum / (double)(n+1); sqrOverLoadedDiff1 = semi_sum2 / (double)(n+1); sqrLightLoadedDiff1 = semi_sum3 / (double)(n+1); aveTemp1 /= n; ave_overloaded_times /= n; ave_lightloaded_times /= n; sqrTempDiff /= n; if(Double.compare(maxTemp1, minTemp1) == 0 || Double.compare(sqrTempDiff, 0.05*0.05) < 0) temp1Factor = 0;// else if(Utilities.Biased_Coin(rand1, absTemp1/recordMaxTemp1) == DefConstants.HEAD) else if(Double.compare(temp1 ,maxTemp1) <= 0 && Double.compare(temp1 , minTemp1) >= 0) temp1Factor *= Math.abs(temp1 - aveTemp1)/(maxTemp1 - minTemp1); else if(Utilities.Biased_Coin(rand1, sqrTempDiff1/0.0125) == DefConstants.HEAD) temp1Factor = 1; else //temp1Factor = absTemp1/recordMaxTemp1; temp1Factor = sqrTempDiff1 / 0.0125;/* if(max_loaded_times == min_loaded_times || sqrLoadedDiff <= 3*3)*/ if((downstream_overloaded_times == max_overloaded_times && max_overloaded_times == min_overloaded_times) || Double.compare(sqrOverLoadedDiff1, 5) < 0) overloaded_times_factor = 0; else if(Utilities.Biased_Coin(rand2, (double)downstream_overloaded_times/(double)recordMaxOverLoadedTimes) == DefConstants.HEAD) overloaded_times_factor = 1; else if(downstream_overloaded_times <= max_overloaded_times && downstream_overloaded_times >= min_overloaded_times) overloaded_times_factor *= (double)Math.abs(downstream_overloaded_times - ave_overloaded_times)/(double)(max_overloaded_times - min_overloaded_times); else overloaded_times_factor = (double)downstream_overloaded_times/(double)recordMaxOverLoadedTimes; if((downstream_lightloaded_times == max_lightloaded_times && max_lightloaded_times == min_lightloaded_times) ||Double.compare(sqrLightLoadedDiff1, 5) <= 0) lightloaded_times_factor = 0; else if(Utilities.Biased_Coin(rand2, (double)downstream_lightloaded_times/(double)recordMaxLightLoadedTimes) == DefConstants.HEAD) lightloaded_times_factor = 1; else if(downstream_lightloaded_times <= max_lightloaded_times && downstream_lightloaded_times >= min_lightloaded_times) { if(Double.compare(lightloaded_times_factor, 0.0) == 0) lightloaded_times_factor = (double)Math.abs(downstream_lightloaded_times - ave_lightloaded_times)/(double)(max_lightloaded_times - min_lightloaded_times); else lightloaded_times_factor *= (double)Math.abs(downstream_lightloaded_times - ave_lightloaded_times)/(double)(max_lightloaded_times - min_lightloaded_times); } else lightloaded_times_factor = (double)downstream_lightloaded_times/(double)recordMaxLightLoadedTimes; } if(Utilities.Biased_Coin(rand3, 0.7) == DefConstants.HEAD && n == HISTORY) { if(Double.compare(temp1, maxTemp1) < 0) temp1_array[maxTemp1Index] = temp1; if(downstream_overloaded_times < max_overloaded_times) overloaded_times_array[max_overloaded_times_index] = downstream_overloaded_times; if(downstream_lightloaded_times < max_lightloaded_times) lightloaded_times_array[max_lightloaded_times_index] = downstream_lightloaded_times; } else { if(n < HISTORY) n++; temp1_array[point] = temp1; overloaded_times_array[point] = downstream_overloaded_times; lightloaded_times_array[point] = downstream_lightloaded_times; point ++; if(point == HISTORY) point = 0; } if(bFileLog) { fileLog.write("sqrOverLoadedDiff1" + sqrOverLoadedDiff1 + "sqrLightLoadedDiff1" + sqrLightLoadedDiff1 + "ave_temp1:" + aveTemp1+ " maxTemp1:" + maxTemp1 + " minTemp1:" + minTemp1 + " ave_overloaded_times:"+ ave_overloaded_times + " ave_lightloaded_times: " + ave_lightloaded_times+ " max_overloaded_times:" + max_overloaded_times + " min_overloaded_times:" + min_overloaded_times + " max_lightloaded_times:" + max_lightloaded_times+ " min_lightloaded_times:" + min_lightloaded_times + "recordMaxOverLoadedTimes" + recordMaxOverLoadedTimes + " recordMaxLightLoadedTimes" + recordMaxLightLoadedTimes); for(i = 0; i < n ; i ++) { fileLog.write(i + ":" + temp1_array[i]); fileLog.write(i + ":" + overloaded_times_array[i]); fileLog.write(i + ":" + lightloaded_times_array[i]); } } //Calculate the load factor of downstream int sign; if(isPfmPara) sign = 1; else sign = -1; if(downstream_overloaded_times != 0 || downstream_lightloaded_times != 0) { part1 = (double)(downstream_overloaded_times - downstream_lightloaded_times)/ (double)(downstream_overloaded_times + downstream_lightloaded_times); part1 = Math.abs(part1);/* if(loaded_times > DefConstants.WINDOW_SIZE) loaded_times = DefConstants.WINDOW_SIZE; if(downstream_loaded_times_window == 0) part2 = 1; else part2 = (downstream_loaded_times_window/Math.abs(downstream_loaded_times_window)) *(double)(Math.exp(Math.abs(downstream_loaded_times_window)- DefConstants.WINDOW_SIZE))/(double)Math.pow(2, (Math.abs(downstream_loaded_times_window)- DefConstants.WINDOW_SIZE)); part2 = 1; */ part2 = 1; temp1 *= sign* temp1Factor; temp2 = -1 * sign * (overloaded_times_factor - lightloaded_times_factor) * part1; if(Utilities.Biased_Coin(rand, DefConstants.INPUTBUFFER_THRESHOLD) == DefConstants.TAIL) lamda = temp1 * DefConstants.INPUTBUFFER_FACTOR + temp2*(1 - DefConstants.INPUTBUFFER_FACTOR); else lamda = temp2 * DefConstants.INPUTBUFFER_FACTOR + temp1*(1 - DefConstants.INPUTBUFFER_FACTOR); inputbuffer_factor = DefConstants.INPUTBUFFER_FACTOR; } else { //Decrease inputbuffer_factor by 2 times in DECRY_PRO probability if(Utilities.Biased_Coin(rand, DefConstants.DECAY_PRO) == DefConstants.HEAD) inputbuffer_factor /= 2; temp1 *= sign * temp1Factor; lamda = temp1 * inputbuffer_factor; } misc_info = "input buffers' average long term loaded:"+temp1+" and the downstream buffers's average is:"+temp2+ "input-factor : " + inputbuffer_factor + " downstream_overloaded_times:" + downstream_overloaded_times + " downstream_lightloaded_times " + downstream_lightloaded_times + " temp1Factor:" + temp1Factor + " overloaded_times_factor:" + overloaded_times_factor + " lightloaded_times_factor:"+ lightloaded_times_factor; downstream_overloaded_times = 0; downstream_lightloaded_times = 0; return lamda; } public synchronized double getSuggestedAccuracyPara() { double lamda = calculateLamda(false); if(!this.bAccuracyParaTaken) { bAccuracyParaTaken = true; return accuracyPara; } adjustAccuracyPara(lamda, misc_info); return accuracyPara; } public synchronized double getSuggestedPerformancePara() { double lamda = calculateLamda(true); if(!this.bPfmParaTaken) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -