📄 lowhub.java
字号:
package org.osu.ogsa.stream.special;import org.osu.ogsa.stream.services.*;import org.osu.ogsa.stream.util.*;import java.rmi.RemoteException;import java.io.*;import java.nio.*;import java.nio.channels.*;import java.nio.channels.spi.*;import java.nio.charset.*;import java.net.*;import java.util.*;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;public class LowHub{ private int maxBufSize; private int maxSampleSize; private StreamServiceProvider srvProvider; private Random rand; private int SET_SIZE; private int ELEMENT_SIZE; private byte [] endingSQ; private byte []exitSQ; private ByteBuffer exitBuf; private static Log log = LogFactory.getLog(LowHub.class.getName()); public LowHub() { endingSQ = exitSQ = null; } public void init_1(StreamServiceProvider srvProvider) { this.srvProvider = srvProvider; } public void init_2(int set_size, int element_size) { SET_SIZE = set_size; ELEMENT_SIZE = element_size; maxBufSize = (SET_SIZE + 1) * 2 * ELEMENT_SIZE; } public void init_3(byte [] exitSQ) { this.exitSQ = exitSQ; exitBuf = ByteBuffer.allocate(ELEMENT_SIZE); for(int i = 0 ; i < ELEMENT_SIZE; i ++) exitBuf.put(i, exitSQ[i]); } public void init_4(byte [] endingSQ) { this.endingSQ = endingSQ; } public void work(AutoFillInputBufferArray inBufArray, AutoFillOutputBufferArray outBufArray) { if(endingSQ == null) work_without_endingSQ(inBufArray, outBufArray); else work_with_endingSQ(inBufArray, outBufArray); } public void work_without_endingSQ(AutoFillInputBufferArray inBufArray, AutoFillOutputBufferArray outBufArray) { int numBuf, i,k, rntCount, rntGet; int nExit; boolean bExit[]; if(inBufArray.howmanyInputBuffers() == 0 || outBufArray.howmanyOutputBuffers()== 0) { log.fatal("["+srvProvider.myHandle + "]:"+"[work]: No input buffer or output buffer"); System.exit(-1); } AutoFillOutputBuffer outBuffer; ByteBuffer tempBuf = ByteBuffer.allocate(maxBufSize); // //generate the sample set log.debug("["+srvProvider.myHandle + "]:"+"generate the sample set"); bExit = new boolean[30]; //Loop for(i = 0; i < 30; i ++) bExit[i] = false; nExit = 0; int inputIndex; //For debug only IntBuffer intBuf = tempBuf.asIntBuffer(); while(true){ numBuf = inBufArray.howmanyInputBuffers(); for(i = 0; i < numBuf; i ++) { tempBuf.clear(); if((rntGet = (inBufArray.getInputBuffer(i)).readToByteBuffer(tempBuf, (SET_SIZE - 1)* ELEMENT_SIZE, false)) <= 0) continue; //Make sure the data read from socket can be changed to INT if(rntGet % ELEMENT_SIZE != 0) rntGet += (inBufArray.getInputBuffer(i)).readToByteBuffer(tempBuf, ELEMENT_SIZE - (rntGet%ELEMENT_SIZE), true); //find if there is a exit sq in the buffer /* for(inputIndex = 0; inputIndex < rntGet; inputIndex += ELEMENT_SIZE) { for(k = 0; k < ELEMENT_SIZE; k ++) { if(tempBuf.get(inputIndex+k) != exitSQ[k]) break; } if(k == ELEMENT_SIZE) { bExit[i] = true; //Don't send the exit sequence until all sequences are got // rntGet -= ELEMENT_SIZE; log.info("["+srvProvider.myHandle + "]:"+"get all data from " + i); break; } } */ //Output to the next stage if(rntGet <= 0) continue; tempBuf.clear(); //for debug only/* for(int f = 0; f < rntGet*2/ELEMENT_SIZE;f += 2) log.debug(intBuf.get(f)+":"+intBuf.get(f+1)); */ outBuffer = outBufArray.getOutputBuffer(); outBuffer.put(tempBuf, rntGet, true );//Block mode } }/* log.info("LowHub finishs and getis all data from "); exitBuf.rewind(); outBuffer.put(exitBuf, ELEMENT_SIZE, true );//Block mode */ } public void work_with_endingSQ(AutoFillInputBufferArray inBufArray, AutoFillOutputBufferArray outBufArray) { Date tStart, tEnd; boolean bRemain[], bExit[]; ByteBuffer tempBuf[]; ByteBuffer fin_buf; int i,j,k,nEndingSQ, nExitSQ, nExit, numBuf; int inputIndex[], outputIndex[] ,rntGet[], nRemain[]; AutoFillOutputBuffer outBuffer;// = outBufArray.getOutputBuffer(0); if((inBufArray.howmanyInputBuffers()) == 0) { log.fatal("["+srvProvider.myHandle + "]:"+"[work]: No input buffer"); System.exit(-1); } bRemain = new boolean[30]; bExit = new boolean[30]; inputIndex = new int[30]; rntGet = new int[30]; nRemain = new int[30]; tempBuf = new ByteBuffer[30]; nEndingSQ = nExitSQ = 0; for(i = 0; i < 30; i ++) { bRemain[i] = false; bExit[i] = false; tempBuf[i] = ByteBuffer.allocate(maxBufSize); rntGet[i] = 0; inputIndex[i] = 0; } fin_buf = ByteBuffer.allocate(maxBufSize); //Loop tStart = new Date(); int tempIndex = 0; int elementIndex, dataIndex; byte temp1 = 0; elementIndex = dataIndex = 0; while(true)//nExit < numBuf) { numBuf = inBufArray.howmanyInputBuffers(); log.debug(" " + numBuf); for(i = 0; i < numBuf; i ++) { if((rntGet[i] += (inBufArray.getInputBuffer(i)).readToByteBuffer(tempBuf[i], SET_SIZE*ELEMENT_SIZE, false)) <= 0) continue; //Make sure the data read from socket can be changed to INT log.debug("the number of bytes read is "+ rntGet[i]); if(rntGet[i] % ELEMENT_SIZE != 0) rntGet[i] += (inBufArray.getInputBuffer(i)).readToByteBuffer(tempBuf[i], ELEMENT_SIZE - rntGet[i]%ELEMENT_SIZE, true); log.debug("the number of bytes read second time is " + rntGet[i]); elementIndex = inputIndex[i]/ELEMENT_SIZE; dataIndex = rntGet[i]/ELEMENT_SIZE; for(; inputIndex[i] < rntGet[i]; inputIndex[i] += ELEMENT_SIZE) { for(nEndingSQ = 0, nExitSQ = 0, k = 0, tempIndex = inputIndex[i]; k < ELEMENT_SIZE; k ++, tempIndex++) {// log.debug(i + ":" + tempBuf[i].get(tempIndex)); if(tempBuf[i].get(tempIndex) == endingSQ[k]) nEndingSQ ++; if (tempBuf[i].get(tempIndex) == exitSQ[k]) nExitSQ ++; if(tempBuf[i].get(tempIndex) != endingSQ[k] && tempBuf[i].get(tempIndex) != exitSQ[k]) break; } if(nEndingSQ == ELEMENT_SIZE) break; else if(nExitSQ == ELEMENT_SIZE) { log.info("["+srvProvider.myHandle + "]:"+"get all data from " + i); break; } } if(nEndingSQ != ELEMENT_SIZE && nExitSQ != ELEMENT_SIZE) continue; //ready to output // finIntBuf.put(intSampleSize, -1); // fin_buf.clear(); // outBuffer = outBufArray.getOutputBuffer(); // outBuffer.put(fin_buf, (intSampleSize + 1)*4, true );//Block mode //The ending sequence of the element inputIndex[i] += ELEMENT_SIZE; nRemain[i] = rntGet[i] - inputIndex[i]; log.debug("the number of digits in the buffer is:"+inputIndex[i]+":"+rntGet[i] +":"+nRemain[i]); try{ tempBuf[i].clear(); outBuffer = outBufArray.getOutputBuffer(); outBuffer.put(tempBuf[i], inputIndex[i], true );//Block mode } catch(Exception e) { log.error(e); } //DEBUG /* for(j = 0; j < inputIndex[i]; j ++) log.debug(j+":"+intTempBuf[i].get(j));*/ //move the rest of the data to the head of the buffer try{ for(j = 0; j < nRemain[i]; j ++) { temp1 = tempBuf[i].get(inputIndex[i]+j); tempBuf[i].put(j, temp1); log.debug(j+":"+temp1); } if(nRemain[i] == 0) { tempBuf[i].clear(); rntGet[i] = 0; } else { rntGet[i] = nRemain[i]; tempBuf[i].position(rntGet[i]); } } catch(Exception e) { log.error(e); } log.debug("ok"); inputIndex[i] = 0; } } /*log.info("LowHub finishs and getis all data from "); exitBuf.rewind(); outBuffer = outBufArray.getOutputBuffer(); outBuffer.put(exitBuf, ELEMENT_SIZE, true );//Block mode */ }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -