⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 lowhub.java

📁 本人历尽千辛万苦找的clustream中的jar包
💻 JAVA
字号:
package org.osu.ogsa.stream.special;import org.osu.ogsa.stream.services.*;import org.osu.ogsa.stream.util.*;import java.rmi.RemoteException;import java.io.*;import java.nio.*;import java.nio.channels.*;import java.nio.channels.spi.*;import java.nio.charset.*;import java.net.*;import java.util.*;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;public class LowHub{	private int maxBufSize;	private int maxSampleSize;	private StreamServiceProvider srvProvider;	private Random rand;	private int SET_SIZE;	private int ELEMENT_SIZE;	private byte [] endingSQ;	private byte []exitSQ;	private ByteBuffer exitBuf;		private static Log log = LogFactory.getLog(LowHub.class.getName());		public LowHub()	{		endingSQ = exitSQ = null;	}        public void init_1(StreamServiceProvider srvProvider)	{		this.srvProvider = srvProvider;	}	public void init_2(int set_size, int element_size)	{		SET_SIZE = set_size;		ELEMENT_SIZE = element_size;		maxBufSize = (SET_SIZE + 1) * 2 * ELEMENT_SIZE; 	}	public void init_3(byte [] exitSQ)	{		this.exitSQ = exitSQ;		exitBuf = ByteBuffer.allocate(ELEMENT_SIZE);		for(int i = 0 ; i < ELEMENT_SIZE; i ++)			exitBuf.put(i, exitSQ[i]);	}	public void init_4(byte [] endingSQ)	{		this.endingSQ = endingSQ;	}	public void work(AutoFillInputBufferArray inBufArray, AutoFillOutputBufferArray outBufArray)	{		if(endingSQ == null)			work_without_endingSQ(inBufArray, outBufArray);		else			work_with_endingSQ(inBufArray, outBufArray);	}	public void work_without_endingSQ(AutoFillInputBufferArray inBufArray, AutoFillOutputBufferArray outBufArray)	{		int numBuf, i,k, rntCount, rntGet;		int nExit;		boolean bExit[];		if(inBufArray.howmanyInputBuffers() == 0 || outBufArray.howmanyOutputBuffers()== 0)		{			log.fatal("["+srvProvider.myHandle + "]:"+"[work]: No input buffer or output buffer");			System.exit(-1);		}		AutoFillOutputBuffer outBuffer;		ByteBuffer tempBuf = ByteBuffer.allocate(maxBufSize);		//		//generate the sample set		log.debug("["+srvProvider.myHandle + "]:"+"generate the sample set");		bExit = new boolean[30];		//Loop		for(i = 0; i < 30; i ++)			bExit[i] = false;		nExit = 0;		int inputIndex;		//For debug only		IntBuffer intBuf = tempBuf.asIntBuffer();		while(true){			numBuf = inBufArray.howmanyInputBuffers();			for(i = 0; i < numBuf; i ++)			{ 				tempBuf.clear();				if((rntGet = (inBufArray.getInputBuffer(i)).readToByteBuffer(tempBuf, (SET_SIZE - 1)* ELEMENT_SIZE, false)) <= 0)					continue;				//Make sure the data read from socket can be changed to INT				if(rntGet % ELEMENT_SIZE != 0)					rntGet += (inBufArray.getInputBuffer(i)).readToByteBuffer(tempBuf, ELEMENT_SIZE - (rntGet%ELEMENT_SIZE), true);				//find if there is a exit sq in the buffer		/*		for(inputIndex = 0; inputIndex < rntGet; inputIndex += ELEMENT_SIZE)				{					for(k = 0; k < ELEMENT_SIZE; k ++)					{						if(tempBuf.get(inputIndex+k) != exitSQ[k])							break;					}					if(k == ELEMENT_SIZE)					{						bExit[i] = true;						//Don't send the exit sequence until all sequences are got		//				rntGet -= ELEMENT_SIZE;						log.info("["+srvProvider.myHandle + "]:"+"get all data from " + i); 						break;					}				} */				//Output to the next stage				if(rntGet <= 0)					continue;								tempBuf.clear();				//for debug only/*				for(int f = 0; f < rntGet*2/ELEMENT_SIZE;f += 2)					log.debug(intBuf.get(f)+":"+intBuf.get(f+1)); */				outBuffer = outBufArray.getOutputBuffer();				outBuffer.put(tempBuf, rntGet, true );//Block mode						} 		}/*		log.info("LowHub finishs and  getis all data from "); 		exitBuf.rewind();		outBuffer.put(exitBuf, ELEMENT_SIZE, true );//Block mode			*/		}	public void work_with_endingSQ(AutoFillInputBufferArray inBufArray, AutoFillOutputBufferArray outBufArray)	{		Date tStart, tEnd;		boolean bRemain[], bExit[];		ByteBuffer tempBuf[];		ByteBuffer fin_buf;		int i,j,k,nEndingSQ, nExitSQ,  nExit, numBuf;		int inputIndex[], outputIndex[] ,rntGet[], nRemain[];		AutoFillOutputBuffer outBuffer;// = outBufArray.getOutputBuffer(0);		if((inBufArray.howmanyInputBuffers()) == 0)		{			log.fatal("["+srvProvider.myHandle + "]:"+"[work]: No input buffer");			System.exit(-1);		}		bRemain = new boolean[30];		bExit = new boolean[30];		inputIndex = new int[30];		rntGet = new int[30];		nRemain = new int[30];		tempBuf = new ByteBuffer[30];		nEndingSQ = nExitSQ = 0;		for(i = 0; i < 30; i ++)		{			bRemain[i] = false;			bExit[i] = false;			tempBuf[i] = ByteBuffer.allocate(maxBufSize);			rntGet[i] = 0;			inputIndex[i] = 0;		}		fin_buf = ByteBuffer.allocate(maxBufSize);		//Loop		tStart = new Date();		int tempIndex = 0;		int elementIndex, dataIndex;		byte temp1 = 0;		elementIndex = dataIndex = 0;		while(true)//nExit < numBuf)		{			numBuf = inBufArray.howmanyInputBuffers();			log.debug(" " + numBuf);			for(i = 0; i < numBuf; i ++)			{				if((rntGet[i] += (inBufArray.getInputBuffer(i)).readToByteBuffer(tempBuf[i], SET_SIZE*ELEMENT_SIZE, false)) <= 0)					continue;				//Make sure the data read from socket can be changed to INT				log.debug("the number of bytes read is "+ rntGet[i]);				if(rntGet[i] % ELEMENT_SIZE != 0)					rntGet[i] += (inBufArray.getInputBuffer(i)).readToByteBuffer(tempBuf[i], ELEMENT_SIZE - rntGet[i]%ELEMENT_SIZE, true);				log.debug("the number of bytes read second time is " + rntGet[i]);								elementIndex = inputIndex[i]/ELEMENT_SIZE;				dataIndex = rntGet[i]/ELEMENT_SIZE;				for(; inputIndex[i] < rntGet[i]; inputIndex[i] += ELEMENT_SIZE)				{					for(nEndingSQ = 0,					    nExitSQ = 0,					    k = 0,					    tempIndex = inputIndex[i]; 					    k < ELEMENT_SIZE;					    k ++, tempIndex++)					{//						log.debug(i + ":" + tempBuf[i].get(tempIndex));						if(tempBuf[i].get(tempIndex) == endingSQ[k])							nEndingSQ ++;						if (tempBuf[i].get(tempIndex) == exitSQ[k])							nExitSQ ++;						if(tempBuf[i].get(tempIndex) != endingSQ[k] && tempBuf[i].get(tempIndex) != exitSQ[k])							break;					}					if(nEndingSQ == ELEMENT_SIZE)						break;					else if(nExitSQ == ELEMENT_SIZE)					{						log.info("["+srvProvider.myHandle + "]:"+"get all data from " + i); 						break;					}				}				if(nEndingSQ != ELEMENT_SIZE &&						nExitSQ != ELEMENT_SIZE)					continue;																				                                //ready to output																								//                                finIntBuf.put(intSampleSize, -1);																								//                                                                fin_buf.clear();																								//                                                                                                outBuffer = outBufArray.getOutputBuffer();																								//                                                                                                                                outBuffer.put(fin_buf, (intSampleSize + 1)*4, true );//Block mode				//The ending sequence of the element				inputIndex[i] += ELEMENT_SIZE;				nRemain[i] = rntGet[i] - inputIndex[i];				log.debug("the number of digits in the buffer is:"+inputIndex[i]+":"+rntGet[i] +":"+nRemain[i]);				try{				tempBuf[i].clear();				outBuffer = outBufArray.getOutputBuffer();	                        outBuffer.put(tempBuf[i], inputIndex[i], true );//Block mode				}				catch(Exception e)				{					log.error(e);				}				//DEBUG			/*	for(j = 0; j < inputIndex[i]; j ++)					log.debug(j+":"+intTempBuf[i].get(j));*/				//move the rest of the data to the head of the buffer				try{				for(j = 0; j < nRemain[i]; j ++)				{					temp1 = tempBuf[i].get(inputIndex[i]+j);					tempBuf[i].put(j, temp1);					log.debug(j+":"+temp1);				}				if(nRemain[i] == 0)				{					tempBuf[i].clear();					rntGet[i] = 0;				}				else				{					rntGet[i] = nRemain[i];					tempBuf[i].position(rntGet[i]);				}				}				catch(Exception e)				{					log.error(e);				}				log.debug("ok");				inputIndex[i] = 0;			} 		}		/*log.info("LowHub finishs and  getis all data from "); 		exitBuf.rewind();		outBuffer = outBufArray.getOutputBuffer();		outBuffer.put(exitBuf, ELEMENT_SIZE, true );//Block mode			*/		}}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -