⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 streamserviceprovider.java

📁 本人历尽千辛万苦找的clustream中的jar包
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
			log.error(e);			return "Fail";		}			return "OK";	}/*	private void constructTree()	{		String places[] = null;		String strNextConnection = null;;		int nSourcePlacements = ((Integer)XMLConfigurator.getParameter("stages|stage1|numPlacements")).intValue();		int nstage, nplacement;		DefaultMutableTreeNode treenode, parent, root; 		int numStages = ((Integer)XMLConfigurator.getParameter("numStages")).intValue();		String strFinalStagePlacement = "stage" + numStages + ":placement1";		nstage = 1;		nplacement = 1;		treenode = parent = root = null;		for(int i = 1; i <= nSourcePlacements; i ++)		{			nplacement = i;			nstage = 1;			treenode = new DefaultMutableTreeNode(new Integer(1)); //new String(parent.getChildCount() + 1));			hashTree.put("stage1:placement"+i, treenode);			do{				log.debug("stage"+nstage+":placement"+nplacement);				//Get the next stage's placement				strNextConnection = (String)XMLConfigurator.getParameter("stages|stage" + nstage +"|connection" + nplacement);				log.debug(strNextConnection);				if(hashTree.containsKey(strNextConnection))				{					parent = (DefaultMutableTreeNode)hashTree.get(strNextConnection);					parent.add(treenode);					treenode.setUserObject(new Integer(parent.getChildCount()));					break;				}				else				{					parent = new DefaultMutableTreeNode(new Integer(1));					parent.add(treenode);					hashTree.put(strNextConnection, parent);					treenode = parent;					//change nstage and nplacement					places = strNextConnection.split(":");					nstage = Integer.parseInt(places[0].substring(5)); //the length of "placement" is 9					nplacement = Integer.parseInt(places[1].substring(9)); //the length of "placement" is 9				}				//log.debug(treenode);				//log.debug(parent);			}while(!strNextConnection.equals(strFinalStagePlacement));			if(strNextConnection.equals(strFinalStagePlacement))				root = parent;		}		//Debug 				DefaultMutableTreeNode node;		for (Enumeration e = root.breadthFirstEnumeration() ; e.hasMoreElements() ;) 		{			node = (DefaultMutableTreeNode)e.nextElement();			System.out.println(((Integer)(node.getUserObject())).intValue());		}	} */	public boolean startMonitorService()	{		//create a Monitor service		//Get the GSH of the corresponding monitor		String IPAdd = Utilities.getIPAddress(myHandle);		strMonitorHandle = "http://" + IPAdd + ":8080/ogsa/services/Stream/StreamMonitorService"; 		//Debug region		log.debug("****************");		log.debug(strMonitorHandle);		log.debug("****************");					try{			OGSIServiceGridLocator gridLocator = new OGSIServiceGridLocator();			Factory factory = gridLocator.getFactoryPort(new URL(strMonitorHandle));			GridServiceFactory monitorFactory = new GridServiceFactory(factory);			LocatorType locator = monitorFactory.createService(myInstanceName);			log.debug(locator);			StreamMonitorServiceGridLocator monitorLocator =  new StreamMonitorServiceGridLocator();			monitor = monitorLocator.getStreamMonitorService(locator);			log.debug(monitor);							//now we can communicate with the monitor			monitor.setCommunicationInfo(curStage, curPlacement, numPlacements, myHandle, curDownstreamHandle);			if(strEnvSimulatorHandle != null)				monitor.setEnvironmentSimulatorStringHandle(strEnvSimulatorHandle);			else 				log.error("didn't set Environment simulator's handle");			//Sep. 11th***** I comment the following statements out, 			//because monitor and stream service is in the 			//same machine			/*if(strConfigInfo == null)				monitor.setConfigFile(cfgURI[0]);			else				monitor.setConfigInfo(strConfigInfo); */			boolean bStart = monitor.start();			log.debug("result " + bStart);			if(bStart)				System.out.println("monitor work.....");			else				System.out.println("can't start the monitor services");		}		catch(Exception e)		{			log.error(e);			//Do nothing here, skip to the next step			return false;		}		return true;	}//	public boolean launchNextService()	public String launchNextService()	{		try{//			int numStages = ((Integer)XMLConfigurator.getParameter("numStages")).intValue();//			int fakeNumStages = ((Integer)XMLConfigurator.getParameter("fakeNumStages")).intValue();			boolean [] isInsExist = new boolean[1];			String nextPlace = (String)XMLConfigurator.getParameter("stages|" + "stage"+curStage+"|connection"+curPlacement);			log.debug(nextPlace);			String [] places =nextPlace.split(":");			//the expression of placement is like:			//  "stageN:placement?"			if(places.length != 2)			{				log.error("The expression of placement in the stage "+ curStage+"isn't correct");				return null;			}			int nextPlc = Integer.parseInt(places[1].substring(9)); //the length of "placement" is 9			int nextStage = Integer.parseInt(places[0].substring(5)); //the length of "stage" is 5			String strURL = "stages|" + "stage" + nextStage + "|" + places[1];			log.debug(strURL);			URL GSH = (URL)XMLConfigurator.getParameter(strURL);			if(GSH == null)			{				log.error("The location of the service can't be find");				return null;			}			log.debug(GSH);			String nextInstanceName = "stage"+ nextStage +"-" + places[1];			curDownstreamHandle = GSH.toString()+"/"+nextInstanceName;			downStream = (StreamPortType)getStreamByName(curDownstreamHandle, DefConstants.DOWNSTREAM, isInsExist);			log.debug("The next instance "+curDownstreamHandle+" exists?:" + isInsExist[0]);			if(downStream == null)			{				log.fatal("can't find the service target");				return null;			}			log.debug("my handle is " + myHandle);			downStream.addUpstreamHandle(myHandle);			boolean flagLoad = needLoadWorkClass(numStages, fakeNumStages);			log.info("need load work class?:"+flagLoad);			downStream.setLoadWorkClassFlag(flagLoad);						downStream.setEnvironmentSimulatorStringHandle(strEnvSimulatorHandle);			downStream.setSocketPort(remoteServerSocketPort);			if(!isInsExist[0])			{//				downStream.setSocketPort(remoteServerSocketPort);				downStream.setCurrentStage(nextStage,nextPlc);				if(strConfigInfo == null)					downStream.setConfigFile(cfgURI[0]);				else					downStream.setConfigInfo(strConfigInfo);				log.debug("now start the next stream service");								//the current service will wait until the next 				//service have been launched				if(downStream.start())					log.info(strURL + "'s instance've been launched....");				else{					log.fatal("can't launch the "+strURL+"instance");					return null;				}			}			else			{				log.debug("now add a socket server to the next stream service");				int temp = downStream.addSocketServer();				if(temp < 0)				{					log.error("can't add a socket server at the next service");					return null;				}				if(flagLoad)					downStream.loadWorkClass();			}		}		catch(GridServiceException e)		{			log.error(e);			return null;		}		catch(RemoteException e)		{			log.error(e);			return null;		}		catch(IOException e)		{			log.error(e);			return null;		}		return curDownstreamHandle;	}	public int connectToNextService()	{		//estabish the socket connections to the following instance		if(curStage != numStages)		{			//connect to the next socket server 			String nextPlace = (String)XMLConfigurator.getParameter("stages|" + "stage"+curStage+"|connection"+curPlacement);			String [] places =nextPlace.split(":");			String strURL = "stages|" + places[0] + "|" + places[1];			URL GSH = (URL)XMLConfigurator.getParameter(strURL);			log.debug(GSH);			//Get the bandwidth between two stages			ConnectionContext cc2 = new ConnectionContext();			String strNetBand =  (String)XMLConfigurator.getParameter("stages|" + "stage"+curStage+"|bandwidth"+curPlacement);			if(strNetBand == null)			{				cc2.net_bandwidth = -1;				cc2.net_util = 3.0;				log.info("the network bandwidth is " + cc2.net_bandwidth);			}			else if(strNetBand.indexOf(':') < 0)			{				//The constant number is specified				cc2.net_bandwidth = (Integer.valueOf(strNetBand)).doubleValue();				cc2.net_util = 3.0;				log.info("the network bandwidth is " + cc2.net_bandwidth);			}			else 			{				//The util has been specified				//The function is specified				String [] strArray = strNetBand.split(":");				cc2.net_bandwidth = (Integer.valueOf(strArray[0])).doubleValue();				cc2.net_util = (Double.valueOf(strArray[1])).doubleValue();				log.info("the network bandwidth is " + cc2.net_bandwidth + "and the utility is " + cc2.net_util);				//log.info("*****haven't implemented it");				//.....			}			cc2.iStep = curStage;			cc2.strId = myHandle;			cc2.rcverPort = remoteServerSocketPort;			//cc1.strId = (String)stream.getProperty(ServiceProperties.HANDLE);			cc2.neighStreamHandle = curDownstreamHandle;			cc2.rcverHostName = GSH.getHost();			int index2 = outBufArray.addOutputBuffer(cc2);			//Update remoteServerSocketPort			remoteServerSocketPort += 10;			log.debug(cc2);			log.debug("the index is " + index2 );			if(index2 < 0)			{				log.error("can't add an output buffer and a socket server");				return -1;			}			return index2;		}		else 			return -1;	}	public boolean shiftToNextService(int index)	{		log.error("*********ATTENTION**** come to shifting ************");		int size = outBufArray.howmanyOutputBuffers();		if(index < 0 || index >= size)		{			log.error("the index " + index + "is wrong");			return false;		}		ConnectionContext cc;		for(int i = 0; i < size ; i ++)		{			if(i != index)			{				if(!outBufArray.isOutputBufValid(i))				{					log.info("the output " + i + "is invalid");					continue;				}				log.info("set the output " + i + "to be invalid");				outBufArray.setOutputBufStatus(i, false);				//notify the downstream srv				cc = outBufArray.getOutputBuffer(i).getConnectionContext();				StreamPortType streamPT = (StreamPortType)getStreamByName(cc.neighStreamHandle, DefConstants.DOWNSTREAM);								try{					streamPT.halt(myHandle);					log.info(myHandle + "is halting " + cc.neighStreamHandle );				}				catch(RemoteException e)				{					log.error(e.getCause());					return false;				}			}		}		return true;	}	public void halt(String strUpstreamHandle)	{		int size = inBufArray.howmanyInputBuffers();		int i;		for(i = 0; i < size ; i ++)		{			ConnectionContext cc = inBufArray.getInputBuffer(i).getConnectionContext();			if(strUpstreamHandle.equals(cc.neighStreamHandle))			{				log.info("set input buffer " + i + " to be invalid");				inBufArray.setInputBufStatus(i, false);				break;			}		}		if(i == size)			return;		if(monitor != null)		{			String strUpstreamMonitorHandle = strUpstreamHandle.replaceFirst("StreamService", "StreamMonitorService");			try{				monitor.halt(strUpstreamMonitorHandle);			}			catch(RemoteException e)			{				log.error(e.getCause());				return;			}		}	}	public void netBandwidthNotification(double ffg_ave, double ffg_variance, double net_util, String strDownstreamHandle)	{		SelfSimilarSeqGenerator generator = new SelfSimilarSeqGenerator(ffg_ave, ffg_variance, DefConstants.SIZE_SELF_SIMILAR_ARRAY);		double [] dTempArray = generator.generating();		/** Debug		 *//*		int len = dTempArray.length;		for(int i = 0; i < len; i ++)			log.debug(i + ":" + dTempArray[i]);   */		/** Debug		 */		outBufArray.setNetBandwidth(strDownstreamHandle, dTempArray, net_util);	}	public void setEnvironmentSimulatorStringHandle(String strHandle)	{		strEnvSimulatorHandle = strHandle;	}	public int setCurrentStage(int curStage, int curPlacement)	{		this.curStage  = curStage;		this.curPlacement = curPlacement;		return curStage;	}	public int getCurrentStage()	{		return curStage;	}	public void setLoadWorkClassFlag(boolean flag)	{		ifNeedLoadWorkClass = flag;	}		public void addUpstreamHandle(String handle)	{		curUpstreamHandle = handle;	}			public int getSocketPort()	{		return localServerSocketPort;	}			public void startJob()	{		setStartFlag();	}	public void finishJob()	{		setDoneFlag();		closeAllSock();	}	private synchronized void setStartFlag()	{		bJobStart = true;	}	private synchronized void setDoneFlag()	{		bJobDone = true;	}	private void closeAllSock()	{		return;	}	public boolean tryInstance()	{		return true;	}	    	public void postCreate(GridContext context) throws GridServiceException 	{        /*counterData = this.serviceData.create("CounterStatus");        counterData.setValue("initialized");        this.serviceData.add(counterData);*/		 //super.postCreate(context);//		myHandle = (String)(context.getServiceProperties()).getProperty(ServiceProperties.HANDLE);//		log.debug("in postCreate and myHandle is "+myHandle);/*		initServiceData();		specifyAccuracyData();		specifyPerformanceData(); */	}        public void IdleTransmission()	{	}	public void SocketConnected(ConnectionContext cc)	{	}		// Empty callback methods	public void preCreate(GridServiceBase base) throws GridServiceException	{		int i;	}	public void preDestroy(GridContext context) throws GridServiceException	{		int i;	}	public void activate(GridContext context) throws GridServiceException	{		int i;	}	public void deactivate(GridContext context) throws GridServiceException	{		int i;	}}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -