⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 streamserviceprovider.java

📁 本人历尽千辛万苦找的clustream中的jar包
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
			bPfmParaTaken = true;			return pfmPara;		}		adjustPfmPara(lamda, misc_info); 		return pfmPara;	}	protected synchronized int adjustPfmPara(double lamda, String debug_info)	{		pfmPara += pfmParaUnit * lamda;		if(Double.compare(pfmPara, pfmParaMin) < 0)		{			pfmPara = pfmParaMin;			if(bFileLog)				fileLog.write(debug_info + ":in adjustPfmPara and pfmPara:" + pfmPara +"::lamda:"+lamda);			return REACHMIN;		}		else if(Double.compare(pfmPara, pfmParaMax) > 0)		{			pfmPara = pfmParaMax;			if(bFileLog)				fileLog.write(debug_info + ":in adjustPfmPara and pfmPara:" + pfmPara +"::lamda:"+lamda);			return REACHMAX; 		}				if(bFileLog)				fileLog.write(debug_info + ":in adjustPfmPara and pfmPara:" + pfmPara +"::lamda:"+lamda);		return SUCCESS;	}	protected synchronized void setAccuracyPara(double para)	{		if(para < accuracyParaMin || para > accuracyParaMax)		{			log.warn("can't set the accurate parameter: the value beyond the region");			return;		}		this.accuracyPara = para;		this.bAccuracyParaTaken = false;	}	protected synchronized int adjustAccuracyPara(double lamda, String debug_info)	{/*		if(!bAccuracyParaTaken)			return NOTAKEN; */		accuracyPara += accuracyParaUnit * lamda;		if(Double.compare(accuracyPara, accuracyParaMin) < 0)		{			accuracyPara = accuracyParaMin;			if(bFileLog)				fileLog.write(debug_info + ":in adjustAccuracyPara and accuracyPara:" + accuracyPara +"::lamda:"+lamda);			return REACHMIN;		}		else if(Double.compare(accuracyPara, accuracyParaMax) > 0)		{			accuracyPara = accuracyParaMax;			if(bFileLog)				fileLog.write(debug_info + ":in adjustAccuracyPara and accuracyPara:" + accuracyPara +"::lamda:"+lamda);			return REACHMAX; 		}				if(bFileLog)				fileLog.write(debug_info + ":in adjustAccuracyPara and accuracyPara:" + accuracyPara +"::lamda:"+lamda);		return SUCCESS;	}		/*	protected synchronized int increaseAccuracyPara(double unit, ConnectionContext cc)	{		double temp = accuracyParaUnit*unit;		temp += accuracyPara;		if(Double.compare(temp, accuracyParaMax) > 0)		{			accuracyPara = accuracyParaMax;			if(bFileLog)				fileLog.write(cc.misc_info +" --increase para ---:"+ accuracyPara);			return REACHMAX;		}		else		{			accuracyPara = temp;			if(bFileLog)				fileLog.write(cc.misc_info +" --increase para ---:"+ accuracyPara);			return SUCCESS; //increase the value		}	} 	protected synchronized int decreaseAccuracyPara(double unit, ConnectionContext cc)	{		double temp = accuracyParaUnit*unit;		temp = accuracyPara - temp;		if(Double.compare(temp, accuracyParaMin) < 0)		{			accuracyPara = accuracyParaMin;			if(bFileLog)				fileLog.write(cc.misc_info +" --decrease para --:"+ accuracyPara);			return REACHMIN;		}		else		{			accuracyPara = temp;			if(bFileLog)				fileLog.write(cc.misc_info +" --decrease para --:"+ accuracyPara);			return SUCCESS; //decrease the value 		}	} */	protected synchronized void setpfmPara(double para)	{		if(para < pfmParaMin || para > pfmParaMax)		{			log.warn("can't set the performance parameter: the value beyond the region");			return;		}		this.pfmPara = para;		this.bPfmParaTaken = false;	}	public void setSocketPort(int port)	{		localServerSocketPort = port;		remoteServerSocketPort = port + 1;	}	public int addSocketServer()	{		/*int number = inBufArray.howmanyInputBuffers();		int i = 0;		int temp, max;		max = -1;		while(i < number)		{			temp = inBufArray.getInputBuffer(i).getConnectionContext().rcverPort;			if(temp == port)				port = localServerSocketPort;			i++;		}*/		ConnectionContext cc = new ConnectionContext();		cc.iStep = curStage;		if(curStage > 1)//			cc.neighStream = upStream; //represents for itself			cc.neighStreamHandle = curUpstreamHandle; //represents for itself		cc.strId = myHandle;		//cc1.strId = GSH.toString() + "?" + cc1.rcverPort;		//cc1.strId = (String)stream.getProperty(ServiceProperties.HANDLE);		//The socket port was set by the prior instance//		cc.rcverPort = port;		cc.rcverPort = localServerSocketPort;		int index = inBufArray.addInputBuffer(cc);		log.debug("the index of the auto input buffer is " + index);		if(index < 0)		{			log.error("can't add a input buffer and a socket server");			return -1;		}		//Update localServerSocketPort.		//Why 10, not 1? In case confliction of ports.		//localServerSocketPort += 10;		return 1;	}	public boolean runDownstreamWorkClass(String strDownstreamHandle)	{		if(strDownstreamHandle == null)		{			log.debug("the downstream handle is null");			return true;		}		try{			StreamPortType downstream = (StreamPortType)getStreamByName(strDownstreamHandle, DefConstants.DOWNSTREAM);			if(!downstream.runWorkClass())			{				log.error("can't let the downstream's work class run");				return false;			}			return true;		}		catch(Exception e)		{			log.error(e);			return false;		}	}	public boolean runWorkClass()	{		if(isWorkClassRunning())			return true;		else if(!ifLoadedWorkClass)			return false;		log.info("starting to run class");		try{			if(curStage != numStages)			{				boolean flagLoad = needLoadWorkClass(numStages, fakeNumStages);				log.debug("need to run the downstream's class?" + flagLoad);				if(flagLoad)				{					if(!runDownstreamWorkClass(curDownstreamHandle))						log.error("can't let the downstream's work class run");				}			}			tc.startProcessing();			setWorkClassRunningFlag(true);			return true;		}		catch(Exception e)		{			log.error(e);			return false;		}	}	private synchronized void setWorkClassRunningFlag(boolean flag)	{		ifWorkClassRunning = flag;	}	public synchronized boolean isWorkClassRunning()	{		return ifWorkClassRunning;	}	public boolean loadWorkClass()	{		if(ifLoadedWorkClass)			return true;		if(!ifNeedLoadWorkClass)			return false;		log.info("starting to load class");		//Load the working class		try{			String strClassLocation = "stages|" + "stage" + (curStage) + "|class|location";			URL[] urlCP = {(URL)XMLConfigurator.getParameter(strClassLocation)};			log.debug(urlCP[0]);			ClassLoader classLoader = new URLClassLoader(urlCP);			Class work = null;			//Now to retrieve the class you only need to do the following:			String strClassName = "stages|" + "stage" + (curStage) + "|class|class_name";			String strClass = (String)XMLConfigurator.getParameter(strClassName);			log.debug(strClass);			work = classLoader.loadClass(strClass);			log.debug(work);			StreamProcessor streamProcessor = (StreamProcessor)work.newInstance();				log.info("The name of the class loaded is " + streamProcessor.getClass().getName()); 			tc = new ThreadContainer(this);		//	tc.startProcessing(streamProcessor);			tc.initProcessing(streamProcessor);			ifLoadedWorkClass = true;			return true;		}		catch(ClassNotFoundException e)		{			log.error(e);			return false;		}		catch(InstantiationException e)		{			log.error(e.getCause());			return false;		}		catch(IllegalAccessException e)		{			log.error(e.getCause());			return false;		}		catch(Exception e)		{			log.error(e.getCause());			return false;		}	}					private boolean isServiceInstanceExist(String handle)	{		Object streamPortType1, streamPortType2;		//Step 1		streamPortType1 = hashUpstreams.get(handle);		streamPortType2 = hashDownstreams.get(handle);		if(streamPortType1 != null || streamPortType2 != null)			return true;		else 			return false;		//Step 2/*		try{			StreamServiceGridLocator locator =   new StreamServiceGridLocator();			streamPortType = locator.getStreamService(new URL(handle));			if(streamPortType != null)			{				hashStreamHandles.put(handle, streamPortType);				return true;			}		}		catch(Exception e)		{			log.info(e);			return false;			//Do nothing here, skip to the next step		}*/	}	private StreamPortType getStreamByName(String handle, int direction)	{		boolean [] tempBool = new boolean[1];		return getStreamByName(handle, direction, tempBool);	}	//The function will 	//1. Check if the hash table has a web service instance corresponding	//   to the handle	//2. Check if the corresponding web service has an instance, if	//   the instance exists, add the pair to hash table	//3. create a new instance, add the pair to hash table	private StreamPortType getStreamByName(String handle, int direction, boolean [] isInsExist)	{		StreamPortType streamPortType;		Hashtable hashStreamHandles;//		log.debug(handle+direction);		if(direction == DefConstants.UPSTREAM)			hashStreamHandles = hashUpstreams;		else			hashStreamHandles = hashDownstreams;		//Step 1		streamPortType = (StreamPortType)hashStreamHandles.get(handle);		if(streamPortType != null)		{			isInsExist[0] = true;			return streamPortType;		}		//Step 2		try{			StreamServiceGridLocator locator =   new StreamServiceGridLocator();			streamPortType = locator.getStreamService(new URL(handle));			if(streamPortType != null)			{				streamPortType.tryInstance();				org.apache.axis.client.Stub s = (Stub) streamPortType;				s.setTimeout(500000);  // 1 second, in miliseconds				hashStreamHandles.put(handle, (Object)streamPortType);//				log.debug(handle + "instance exist!!!! ");				isInsExist[0] = true;				return streamPortType;			}		}	        catch(FaultType e)		{			log.error(e);		        //Do nothing here, skip to the next step		}		catch(GridServiceException e)		{			log.error(e);		}		catch(RemoteException e)		{			log.error(e);		}		catch(MalformedURLException e)		{			log.error(e);		}		//Step 3: create a web service instance		//Get placement and instance name		isInsExist[0] = false;		int lastIndex = handle.lastIndexOf('/');		log.debug("lastIndex of //" + lastIndex);		if(lastIndex == -1)			return null;		String strGSH = handle.substring(0, lastIndex);		String instanceName = handle.substring(lastIndex + 1);		log.debug("GSH is " + strGSH);		log.debug("instance name is "+ instanceName);		//Create the first service		// Get a reference to the StreamService Factory		try{			OGSIServiceGridLocator gridLocator = new OGSIServiceGridLocator();			Factory factory = gridLocator.getFactoryPort(new URL(strGSH));			log.debug(factory);			GridServiceFactory streamFactory = new GridServiceFactory(factory);			// Create a new StreamService instance and get a reference to its to its Stream PortType//			LocatorType locator = streamFactory.createService(content);			//log.debug(content);			LocatorType locator = streamFactory.createService(instanceName);			log.debug(locator);			StreamServiceGridLocator streamLocator =   new StreamServiceGridLocator();			streamPortType = streamLocator.getStreamService(locator);			org.apache.axis.client.Stub s = (Stub) streamPortType;			s.setTimeout(500000);  // 1 second, in miliseconds			log.debug(streamPortType);			//Add it to hashtable			if(streamPortType != null)				hashStreamHandles.put(handle, (Object)streamPortType);			return streamPortType; 		}		catch(Exception e)		{			log.error(e);			//Do nothing here, skip to the next step			return null;		}	}	private boolean needLoadWorkClass(int numStages, int fakeNumStages)	{		log.debug("numStages:" + numStages+" "+curPlacement +" " + curStage);		if(curStage <= fakeNumStages && curStage != numStages )		{			String strConnection= (String)XMLConfigurator.getParameter("stages|stage" +curStage+"|connection"+curPlacement);			String strMyPlacement = "stage" + curStage + ":placement" + curPlacement;			log.debug(strConnection);			log.debug(strMyPlacement);			DefaultMutableTreeNode parent = (DefaultMutableTreeNode)hashTree.get(strConnection);			DefaultMutableTreeNode myNode = (DefaultMutableTreeNode)hashTree.get(strMyPlacement);			if(parent == null || myNode == null)				log.error("the configuration information is not right");			Integer orderChild = (Integer)myNode.getUserObject();			if(parent.getChildCount() == orderChild.intValue())				return true;			else				return false;		}		else			return false;	}	public boolean start() 	{				try{		//All configuration information is in XMLConfigurator		log.debug("*********************"+curStage+"***********************");		numStages = ((Integer)XMLConfigurator.getParameter("numStages")).intValue();		fakeNumStages = ((Integer)XMLConfigurator.getParameter("fakeNumStages")).intValue();		log.debug("number of stages is " + numStages);		log.debug("fake number of stages is " + fakeNumStages);		numPlacements = ((Integer)XMLConfigurator.getParameter("stages|" + "stage"+curStage+"|numPlacements")).intValue();		nDataSources = ((Integer)XMLConfigurator.getParameter("stages|stage1|numPlacements")).intValue();				bFileLog = ((Boolean)XMLConfigurator.getParameter("filelogging")).booleanValue();		log.debug("number of placements is " + numPlacements);//		int curStage = ((Integer)XMLConfigurator.getParameter("__currentStage")).intValue();		//get "myHandle" here instead of in postCreate()		//because in postCreate(), the IP of "myHandle" is the real ip address		//of the service. We want to use its name as its ip address for EnviornmentService		myHandle = ((URL)XMLConfigurator.getParameter("stages|stage"+ curStage+ "|placement"+ curPlacement)).toString();		myHandle = myHandle + "/"+ "stage" + curStage + "-placement"+curPlacement;		log.debug("myHandle is "+myHandle);		myInstanceName = "stage" + curStage + "-placement"+curPlacement;		Date tempTime = new Date();		if(bFileLog)		{			String myLogFileName = myInstanceName + tempTime.getTime();			fileLog = new FileLog(myLogFileName);		}		if(curStage == numStages)		{			//This is the last stage in the pipeline,			//just output to stdout		}		else if(curStage <= fakeNumStages)		{			launchNextService();		}				if(curStage != 1)			addSocketServer();		//estabish the socket connections to the following instance

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -