⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 streamserviceprovider.java

📁 本人历尽千辛万苦找的clustream中的jar包
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
package org.osu.ogsa.stream.services;import org.osu.ogsa.stream.util.*;import org.osu.ogsa.stream.util.xmlconfig.*;import org.osu.ogsa.stream.Stream.StreamServiceGridLocator;import org.osu.ogsa.stream.Stream.StreamPortType;import org.osu.ogsa.stream.Stream.StreamServiceLocator;import org.osu.ogsa.stream.StreamMonitor.StreamMonitorServiceGridLocator;import org.osu.ogsa.stream.StreamMonitor.StreamMonitorPortType;import org.osu.ogsa.stream.StreamMonitor.StreamMonitorServiceLocator;import org.osu.ogsa.stream.EnvironmentSimulator.EnvironmentSimulatorPortType;import org.osu.ogsa.stream.EnvironmentSimulator.EnvironmentSimulatorServiceGridLocator;import org.osu.ogsa.stream.EnvironmentSimulator.EnvironmentSimulatorService;import org.osu.ogsa.stream.EnvironmentSimulator.EnvironmentSimulatorServiceLocator;import java.rmi.RemoteException;import java.io.*;import java.nio.*;import java.nio.channels.*;import java.nio.channels.spi.*;import java.nio.charset.*;import java.net.*;import java.util.*;import java.rmi.RemoteException;import javax.swing.tree.DefaultMutableTreeNode;import org.globus.ogsa.impl.ogsi.GridServiceImpl;import org.globus.ogsa.GridContext;import org.globus.ogsa.GridServiceException;import org.globus.ogsa.utils.AnyHelper;import org.globus.ogsa.OperationProvider;import org.globus.ogsa.GridServiceCallback;import org.globus.ogsa.GridServiceBase;import org.globus.ogsa.ServiceProperties;import org.gridforum.ogsi.ExtensibilityType;import org.gridforum.ogsi.GridService;import org.gridforum.ogsi.HandleType;import org.gridforum.ogsi.OGSIServiceGridLocator;import org.gridforum.ogsi.ServiceDataValuesType;import org.gridforum.ogsi.FaultType;import org.gridforum.ogsi.Factory;import org.gridforum.ogsi.LocatorType;import org.globus.ogsa.utils.GridServiceFactory;import org.apache.log4j.*;import org.apache.axis.client.Stub;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import javax.xml.namespace.QName;//BufferNotification interface includes "over_loaded" and light_loaded"//OperationProvide interface includes "initialized"//GridServiceCallback includes "pre, postcreate, activate and deactivate"public class StreamServiceProvider extends Object implements BufferNotification, OperationProvider, GridServiceCallback {//	UserDefServiceData defServiceData = new UserDefServiceData();	int localServerSocketPort = DefConstants.SOCKET_PORT; 	int remoteServerSocketPort = localServerSocketPort + 1;	private Hashtable hashUpstreams = new Hashtable(DefConstants.MAX_NUM_UPSTREAM);	private Hashtable hashDownstreams = new Hashtable(DefConstants.MAX_NUM_DOWNSTREAM);	private Hashtable hashTree = new Hashtable();	private OGSIServiceGridLocator gridLocator = new OGSIServiceGridLocator();        private StreamServiceGridLocator streamLocator = new StreamServiceGridLocator();        public AutoFillInputBufferArray inBufArray = new AutoFillInputBufferArray();	public AutoFillOutputBufferArray outBufArray = new AutoFillOutputBufferArray();//	private static Category log = Category.getInstance(StreamServiceProvider.class.getName());	private static Log log = LogFactory.getLog(StreamServiceProvider.class.getName());	private FileLog fileLog;	//For loading and running work class	private boolean bJobStart = false;	private boolean bJobDone = false;	private boolean ifNeedLoadWorkClass = true;	private boolean ifLoadedWorkClass = false;	private boolean ifWorkClassRunning = false;	private ThreadContainer tc = null;	private boolean bIfSpecifiedPara = false;	private	boolean bFileLog;//	private	boolean isInsExist = false;	//Accuracy para.	private boolean bAccuracyParaTaken = false;	private double accuracyPara;	private double accuracyParaMin;	private double accuracyParaMax;	private double accuracyParaUnit = -1;	//Perfomance Para.	private boolean bPfmParaTaken = false;	private double pfmPara ;	private double pfmParaMin ;	private double pfmParaMax;	private double pfmParaUnit = -1;	private double inputbuffer_factor;	//Constants for accuracy and performance para.	private static int NOTAKEN = -1;	private static int REACHMIN = 0;	private static int REACHMAX = 0;	private static int SUCCESS = 1;	private static double ZERO = 0.001;	private static int HISTORY = 3;	private Random rand;	private Random rand1;	private Random rand2;	private Random rand3;	//variable for accuracy and performance paras	private int downstream_overloaded_times = 0;	private int downstream_lightloaded_times = 0;	private int downstream_loaded_times_window = 0;	private double temp1_array[] = new double[HISTORY];	private int overloaded_times_array[] = new int[HISTORY];	private int lightloaded_times_array[] = new int[HISTORY];	private int point = 0;	private int n = 0;	private double temp1Factor;	private double overloaded_times_factor ;	private double lightloaded_times_factor;	private double recordMaxTemp1, recordMinTemp1;	private int recordMaxOverLoadedTimes, recordMaxLightLoadedTimes;	private int upstream_overloaded_times = 0;	private int upstream_lightloaded_times = 0;	private StreamPortType downStream = null;	private String curDownstreamHandle = null;	private StreamPortType upStream = null;	private String curUpstreamHandle = null;	public String myHandle = null;	public String myInstanceName = null;	public int curStage = -1;	public int curPlacement = -1;	private int numStages, fakeNumStages, numPlacements;	private String cfgURI[] = new String[1];	private String strConfigInfo = null;	private String misc_info;		//paras for monitor	private int intBottleneckTimes = 0;	private Object synBottleneck;	private String strMonitorHandle = null;	private String strEnvSimulatorHandle = null;	private StreamMonitorPortType monitor = null;	//for applications	public  int nDataSources = -1;//	private	SimConnectionContext simCc = new SimConnectionContext();	//abstract private initServiceData();	//abstract private specifyAccuracyData();	//abstract private specifyPerformanceData(); 	//public StreamService(){} 	//Internal defined class for storing services' handle and Id	/*private class HandleId{		String strHandle;		String strId;		public HandleId(String strHandle, String strId)		{			this.strHandle = strHandle;			this.strId = strId;		}	}*/	//Standard staff for the grid provider        // Operation provider properties	private static final QName[] operations = new QName[]{new QName("", "*")};	private GridServiceBase base;//	private XMLConfigurator config = null;	byte [] buffer = new byte[1000];	// Operation Provider methods	public void initialize(GridServiceBase base) throws GridServiceException	{       		this.base = base;	/*	ExtensibilityType extension = (ExtensibilityType)this.base.getProperty("__customerProperty");		log.debug("instance:[initialize] : extension" + extension);		config = (XMLConfigurator)AnyHelper.getAsSingleObject(extension);*/		//init two arrays		inBufArray.setNotification(this);		outBufArray.setNotification(this);		Date forRand = new Date();		rand = new Random(forRand.getTime());		rand1 = new Random(forRand.getTime() + 10);		rand2 = new Random(forRand.getTime() + 100);		rand3 = new Random(forRand.getTime() + 1000);		synBottleneck = new Object();	}	public QName[] getOperations()	{		return operations;	}/*	public StreamServiceProvider(String strTitle)	{		super(String strTitle);		for(int i  = 0; i < in.length ; i ++)			in[i] = null;		for(int i = 0; i < out.length; i ++)			out[i] = null;	} */	public void specifyAccuracyPara(double accPara, double min, double max, double unit)	{		bIfSpecifiedPara = true;		this.accuracyPara = accPara;		this.accuracyParaMin = min;		this.accuracyParaMax = max;		if(unit <= ZERO)			this.accuracyParaUnit = (max - min)/20.0;		else 			this.accuracyParaUnit = unit;		this.bAccuracyParaTaken = false;	}	public void specifyPerformancePara(double pfmPara, double min, double max, double unit) 	{		this.bIfSpecifiedPara = true;		this.pfmPara = pfmPara;		this.pfmParaMin = min;		this.pfmParaMax = max;		if(unit <= ZERO)			this.pfmParaUnit = (max - min)/20.0;		else			this.pfmParaUnit = unit;		this.bPfmParaTaken = false;		inputbuffer_factor = DefConstants.INPUTBUFFER_FACTOR;		for(int i = 0; i < HISTORY; i ++)		{			temp1_array[i] = -2;			overloaded_times_array[i] = 0;			lightloaded_times_array[i] = 0;		}	        recordMaxTemp1 = -2;		recordMinTemp1 = 0;		recordMaxOverLoadedTimes = -1;		recordMaxLightLoadedTimes = -1;	}	private double calculateLamda(boolean isPfmPara)	{		double lamda, temp1, temp2, part1, part2;		//How many the input buffers are overloaded?		int inNumber = inBufArray.howmanyInputBuffers();//		int outNumber = outBufArray.howmanyOutputBuffers();		int i;		temp1 = 0.0;		temp2 = 0.0;		for(i = 0; i < inNumber; i ++)			temp1 += inBufArray.getInputBuffer(i).getConnectionContext().longTermLoadedFactor;		if(inNumber > 0)			temp1 = temp1/(double)inNumber;		if(recordMaxOverLoadedTimes < downstream_overloaded_times)			recordMaxOverLoadedTimes = downstream_overloaded_times;		if(recordMaxLightLoadedTimes < downstream_lightloaded_times)			recordMaxLightLoadedTimes = downstream_lightloaded_times;		double maxTemp1, minTemp1, aveTemp1;		int max_overloaded_times, min_overloaded_times, ave_overloaded_times;		int max_lightloaded_times, min_lightloaded_times, ave_lightloaded_times;		int max_overloaded_times_index, max_lightloaded_times_index, maxTemp1Index;		double sqrTempDiff,sqrTempDiff1, sqrOverLoadedDiff1, sqrLightLoadedDiff1;		maxTemp1 = -1.1;		minTemp1 = 1.1;		aveTemp1 = 0;		max_overloaded_times = -10000;		min_overloaded_times = 10000;		ave_overloaded_times = 0;		max_lightloaded_times = -10000;		min_lightloaded_times = 10000;		ave_lightloaded_times = 0;		max_overloaded_times_index = -1;		max_lightloaded_times_index = -1;		maxTemp1Index = -1;		sqrTempDiff = 0;		sqrTempDiff1 = 0;		sqrOverLoadedDiff1 = 0;		sqrLightLoadedDiff1 = 0;		if(n == 0 || n == 1)		{			temp1Factor = 1;			overloaded_times_factor = 1;			lightloaded_times_factor = 1;		}		else		{			for(i = 0; i < n; i ++)			{				if(minTemp1 > temp1_array[i])					minTemp1 = temp1_array[i];				if(maxTemp1 < temp1_array[i])				{					maxTemp1 = temp1_array[i];					maxTemp1Index = i;				}				if(min_overloaded_times > overloaded_times_array[i])					min_overloaded_times = overloaded_times_array[i];				if(max_overloaded_times < overloaded_times_array[i])				{					max_overloaded_times = overloaded_times_array[i];					max_overloaded_times_index = i;				}				if(min_lightloaded_times > lightloaded_times_array[i])					min_lightloaded_times = lightloaded_times_array[i];				if(max_lightloaded_times < lightloaded_times_array[i])				{					max_lightloaded_times = lightloaded_times_array[i];					max_lightloaded_times_index = i;				}				aveTemp1 += temp1_array[i];				sqrTempDiff += (temp1 - temp1_array[i])*(temp1 - temp1_array[i]);				ave_overloaded_times += overloaded_times_array[i];				ave_lightloaded_times += lightloaded_times_array[i];			}			double ave1 = (aveTemp1 + temp1)/(n+1);			double ave2 = (double)(ave_overloaded_times + downstream_overloaded_times)/(double)(n+1);			double ave3 = (double)(ave_lightloaded_times + downstream_lightloaded_times)/(double)(n+1);			double semi_sum = 0;			double semi_sum2 = 0;			double semi_sum3 = 0;			for(i = 0 ; i < n; i ++)			{				semi_sum += (temp1_array[i] - ave1)*(temp1_array[i]-ave1);					semi_sum2 += (overloaded_times_array[i] - ave2)*(overloaded_times_array[i] - ave2);				semi_sum3 += (lightloaded_times_array[i] - ave3)*(lightloaded_times_array[i] - ave3);			}			semi_sum += (temp1 - ave1)*(temp1 - ave1);			semi_sum2 += (downstream_overloaded_times - ave2)*(downstream_overloaded_times - ave2);			semi_sum3 += (downstream_lightloaded_times - ave3)*(downstream_lightloaded_times - ave3);			sqrTempDiff1 = semi_sum / (double)(n+1);			sqrOverLoadedDiff1 = semi_sum2 / (double)(n+1);			sqrLightLoadedDiff1 = semi_sum3 / (double)(n+1);			aveTemp1 /= n;			ave_overloaded_times /= n;			ave_lightloaded_times /= n;			sqrTempDiff /= n;			if(Double.compare(maxTemp1, minTemp1) == 0 					|| Double.compare(sqrTempDiff,  0.05*0.05) < 0)				temp1Factor = 0;//			else if(Utilities.Biased_Coin(rand1, absTemp1/recordMaxTemp1) == DefConstants.HEAD)			else if(Double.compare(temp1 ,maxTemp1) <= 0 && Double.compare(temp1 , minTemp1) >= 0)				temp1Factor *= Math.abs(temp1 - aveTemp1)/(maxTemp1 - minTemp1);			else if(Utilities.Biased_Coin(rand1, sqrTempDiff1/0.0125) == DefConstants.HEAD)				temp1Factor = 1; 			else				//temp1Factor = absTemp1/recordMaxTemp1;				temp1Factor = sqrTempDiff1 / 0.0125;/*			if(max_loaded_times == min_loaded_times || sqrLoadedDiff <= 3*3)*/			if((downstream_overloaded_times == max_overloaded_times				&& max_overloaded_times == min_overloaded_times)				|| Double.compare(sqrOverLoadedDiff1, 5) < 0) 				overloaded_times_factor = 0;			else if(Utilities.Biased_Coin(rand2, (double)downstream_overloaded_times/(double)recordMaxOverLoadedTimes) == DefConstants.HEAD)				overloaded_times_factor = 1;			else if(downstream_overloaded_times <= max_overloaded_times && downstream_overloaded_times >= min_overloaded_times)				overloaded_times_factor *= (double)Math.abs(downstream_overloaded_times - ave_overloaded_times)/(double)(max_overloaded_times - min_overloaded_times);			else				overloaded_times_factor = (double)downstream_overloaded_times/(double)recordMaxOverLoadedTimes;			if((downstream_lightloaded_times == max_lightloaded_times				&& max_lightloaded_times == min_lightloaded_times)				||Double.compare(sqrLightLoadedDiff1, 5) <= 0) 				lightloaded_times_factor = 0;			else if(Utilities.Biased_Coin(rand2, (double)downstream_lightloaded_times/(double)recordMaxLightLoadedTimes) == DefConstants.HEAD)				lightloaded_times_factor = 1;			else if(downstream_lightloaded_times <= max_lightloaded_times && downstream_lightloaded_times >= min_lightloaded_times)			{				if(Double.compare(lightloaded_times_factor, 0.0) == 0) 					lightloaded_times_factor = (double)Math.abs(downstream_lightloaded_times - ave_lightloaded_times)/(double)(max_lightloaded_times - min_lightloaded_times);				else					lightloaded_times_factor *= (double)Math.abs(downstream_lightloaded_times - ave_lightloaded_times)/(double)(max_lightloaded_times - min_lightloaded_times);			}			else				lightloaded_times_factor = (double)downstream_lightloaded_times/(double)recordMaxLightLoadedTimes;		}		if(Utilities.Biased_Coin(rand3, 0.7) == DefConstants.HEAD && n == HISTORY)		{		if(Double.compare(temp1, maxTemp1) < 0)			temp1_array[maxTemp1Index] = temp1;		if(downstream_overloaded_times < max_overloaded_times)			overloaded_times_array[max_overloaded_times_index] = downstream_overloaded_times;		if(downstream_lightloaded_times < max_lightloaded_times)			lightloaded_times_array[max_lightloaded_times_index] = downstream_lightloaded_times;		}		else		{			if(n < HISTORY)					n++;			temp1_array[point] = temp1;			overloaded_times_array[point] = downstream_overloaded_times;			lightloaded_times_array[point] = downstream_lightloaded_times;			point ++;			if(point == HISTORY)				point = 0;		}				if(bFileLog)		{			fileLog.write("sqrOverLoadedDiff1" + sqrOverLoadedDiff1 + "sqrLightLoadedDiff1" + sqrLightLoadedDiff1 + "ave_temp1:" + aveTemp1+ " maxTemp1:" + maxTemp1 + " minTemp1:" + minTemp1 + " ave_overloaded_times:"+ ave_overloaded_times + " ave_lightloaded_times: " + ave_lightloaded_times+ " max_overloaded_times:" + max_overloaded_times + " min_overloaded_times:" + min_overloaded_times + " max_lightloaded_times:" + max_lightloaded_times+ " min_lightloaded_times:" + min_lightloaded_times + "recordMaxOverLoadedTimes" + recordMaxOverLoadedTimes + " recordMaxLightLoadedTimes" + recordMaxLightLoadedTimes); 			for(i = 0; i < n ; i ++)			{			fileLog.write(i + ":" + temp1_array[i]);			fileLog.write(i + ":" + overloaded_times_array[i]);			fileLog.write(i + ":" + lightloaded_times_array[i]);			}		}		//Calculate the load factor of downstream		int sign;		if(isPfmPara)			sign = 1;		else 			sign = -1;		if(downstream_overloaded_times != 0 || downstream_lightloaded_times != 0)		{			part1 = (double)(downstream_overloaded_times - downstream_lightloaded_times)/ (double)(downstream_overloaded_times + downstream_lightloaded_times);			part1 = Math.abs(part1);/*			if(loaded_times > DefConstants.WINDOW_SIZE)				loaded_times = DefConstants.WINDOW_SIZE; 			if(downstream_loaded_times_window == 0)				part2 = 1;			else	                        part2 = (downstream_loaded_times_window/Math.abs(downstream_loaded_times_window)) *(double)(Math.exp(Math.abs(downstream_loaded_times_window)- DefConstants.WINDOW_SIZE))/(double)Math.pow(2, (Math.abs(downstream_loaded_times_window)- DefConstants.WINDOW_SIZE));	                        part2 = 1; */			part2 = 1;			temp1 *= sign* temp1Factor;			temp2 = -1 * sign * (overloaded_times_factor - lightloaded_times_factor) * part1;			if(Utilities.Biased_Coin(rand, DefConstants.INPUTBUFFER_THRESHOLD) == DefConstants.TAIL)				lamda = temp1 *  DefConstants.INPUTBUFFER_FACTOR + temp2*(1 -  DefConstants.INPUTBUFFER_FACTOR);			else				lamda = temp2 *  DefConstants.INPUTBUFFER_FACTOR + temp1*(1 -  DefConstants.INPUTBUFFER_FACTOR);			inputbuffer_factor =  DefConstants.INPUTBUFFER_FACTOR;		}		else		{			//Decrease inputbuffer_factor by 2 times in DECRY_PRO probability			if(Utilities.Biased_Coin(rand, DefConstants.DECAY_PRO) == DefConstants.HEAD)				inputbuffer_factor /= 2;			temp1 *= sign * temp1Factor;			lamda = temp1 * inputbuffer_factor;		}		misc_info = "input buffers' average long term loaded:"+temp1+" and the downstream buffers's average is:"+temp2+ "input-factor : " + inputbuffer_factor + " downstream_overloaded_times:" + downstream_overloaded_times + " downstream_lightloaded_times " + downstream_lightloaded_times + " temp1Factor:" + temp1Factor + " overloaded_times_factor:" + overloaded_times_factor + " lightloaded_times_factor:"+ lightloaded_times_factor;		downstream_overloaded_times = 0;		downstream_lightloaded_times = 0;		return lamda;	}	public synchronized double getSuggestedAccuracyPara()	{		double lamda = calculateLamda(false);		if(!this.bAccuracyParaTaken)		{			bAccuracyParaTaken = true;			return accuracyPara;		}		adjustAccuracyPara(lamda, misc_info); 		return  accuracyPara;	}	public synchronized double getSuggestedPerformancePara()	{		double lamda = calculateLamda(true);		if(!this.bPfmParaTaken)		{

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -