📄 autofillinputbuffer.java
字号:
package org.osu.ogsa.stream.util;import java.io.*;import java.nio.*;import java.nio.channels.*;import java.nio.channels.spi.*;import java.nio.charset.*;import java.net.*;import java.util.*;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import java.awt.event.*;import javax.swing.Timer;public class AutoFillInputBuffer extends Thread implements SocketNotification, ActionListener{// private final static float OVER_LOADED_THRESHOLD = (float)0.40;// private final static float LIGHT_LOADED_THRESHOLD = (float)0.10;// private final static float CAPABLE = (float)0.20; private int BUF_SIZE = DefConstants.DEFAULT_BUFFER_SIZE; private final static int STATE_NULL = 0; private final static int STATE_CONNECTED = STATE_NULL + 1; private final static int STATE_TRANSMISSION = STATE_CONNECTED + 1; private ConnectionContext context = null;/* private ServerSocketChannel selectableChannel = null; private Selector selector = null; */ private CircularBuffer cirBuffer; private ByteBuffer tempBuf = ByteBuffer.allocate(BUF_SIZE); private NonBlockingChannel sockChannel; private int state = STATE_NULL; //The tuning para. private int rtnBufSizeTimes = 1; private double tempLoadedFactor; private int n = 0; //the times the idle() was called private int point = 0; private double load_factors[] = new double[DefConstants.LD_FACTOR_WINDOW]; private double temp_load; private double delay_para = 0; private long millis_delay; private int nanos_delay; private javax.swing.Timer timerMonBuf; private boolean bValid = true; private static Log log = LogFactory.getLog(AutoFillInputBuffer.class.getName()); public AutoFillInputBuffer() { try{ timerMonBuf = new javax.swing.Timer(DefConstants.DELAY_MONITOR_BUF, this); cirBuffer = new CircularBuffer(); }catch(Exception e) { log.error(e); } } public AutoFillInputBuffer(int bufSize) { try{ cirBuffer = new CircularBuffer(bufSize); timerMonBuf = new javax.swing.Timer(DefConstants.DELAY_MONITOR_BUF, this); }catch(Exception e) { log.error(e); } } public void setConnectionContext(ConnectionContext cc) { context = cc; } public int getBufferSize() { return cirBuffer.getSize(); } public ConnectionContext getConnectionContext() { return context; } public boolean init_sock() throws InterruptedException, IOException { if(context == null) { log.error("initialize the ConnectionContext object first"); return false; } Selector selector = SelectorProvider.provider().openSelector(); ServerSocketChannel selectableChannel = ServerSocketChannel.open(); selectableChannel.configureBlocking(true); log.info("the port socket server is listening to is "+context.rcverPort); if(context.rcverPort > 0) { InetSocketAddress isa = new InetSocketAddress(context.rcverPort ); selectableChannel.socket().bind(isa); } else selectableChannel.socket().bind(null); log.info("the socket has successfully been bound:"+selectableChannel.socket().isBound()); SocketChannel sChannel = selectableChannel.accept(); if(sChannel == null) { log.error("accept client connection error"); throw new InterruptedException(); } if(!sChannel.isConnected()) { log.error("accept client connection error"); return false; } //selectableChannel.close(); sockChannel = new NonBlockingChannel(sChannel); sockChannel.addCallback(this); sockChannel.initialize(); //Set the conext object InetSocketAddress saddr = (InetSocketAddress)sChannel.socket().getLocalSocketAddress(); InetSocketAddress saddr_remote = (InetSocketAddress)sChannel.socket().getRemoteSocketAddress();/* log.debug(saddr); log.debug(saddr_remote); */ context.bufNotification.SocketConnected(context); return true; }/* public void setupChannel(SocketChannel s, Hashtable ht) throws IOException, ClosedChannelException { streamNameTable = ht; sockChannel = new NonBlockingChannel(s); sockChannel.addCallback(this); sockChannel.initialize(); } public void setBufferNotificationObject(BufferNotification bufNot, Object attachment) { bufNotification = bufNot; Object temp = attached_object; attached_object = (BufferEventHandler)attachment; } public void setBufferNotificationObject(BufferNotification bufNot) { bufNotification = bufNot; }*/ public synchronized void setBufferStatus(boolean bValid) { this.bValid = bValid; } public synchronized boolean getBufferStatus(boolean bValid) { return bValid; } public int get(byte[] buf, int length) { int i = 0; if(buf.length < length) return -1; try { while( i < length) { if(cirBuffer.BufferIsEmpty() == true) return i; buf[i] = cirBuffer.get(); /*if((cirBuffer.getLoadedFactor() < LIGHT_LOADED_THRESHOLD) && bufNotification != null && state == STATE_TRANSMISSION)*//* if(((tempLoadedFactor = cirBuffer.getLoadedFactor()) < DefConstants.LIGHT_LOADED_THRESHOLD) && context.bufNotification != null) { context.loaded = tempLoadedFactor; context.bufNotification.light_loaded(context); } */ i = i + 1; } } catch(BufferDataException e) { log.error(e); return -1; } catch(InterruptedException e) { log.error(e); return -1; } return i; } public int get(ByteBuffer buf, int length) { int i = 0; if(buf.limit()< length) return -1; buf.clear(); try { while( i < length) { if(cirBuffer.BufferIsEmpty() == true) return i; buf.put(cirBuffer.get()); /*if((cirBuffer.getLoadedFactor() < LIGHT_LOADED_THRESHOLD) && bufNotification != null && state == STATE_TRANSMISSION)*//* if(((tempLoadedFactor = cirBuffer.getLoadedFactor()) < DefConstants.LIGHT_LOADED_THRESHOLD) && context.bufNotification != null) { context.loaded = tempLoadedFactor; context.bufNotification.light_loaded(context); } */ i = i + 1; }// buf.rewind(); } catch(BufferDataException e) { log.error(e); return -1; } catch(InterruptedException e) { log.error(e); return -1; } return i; } public int readToByteBuffer(ByteBuffer buf, int length, boolean bBlock) { int lenRet; try { /* if(cirBuffer.BufferIsEmpty() == true) return -1;*/ lenRet = cirBuffer.readToByteBuffer(buf, length,true,bBlock); /*if((cirBuffer.getLoadedFactor() < LIGHT_LOADED_THRESHOLD) && bufNotification != null && state == STATE_TRANSMISSION)*//* if(((tempLoadedFactor = cirBuffer.getLoadedFactor()) < DefConstants.LIGHT_LOADED_THRESHOLD) && context.bufNotification != null) { context.loaded = tempLoadedFactor; context.bufNotification.light_loaded(context); } */ } catch(BufferDataException e) { log.error(e); return -1; } catch(InterruptedException e) { log.error(e); return -1; } return lenRet; } public void run() { setName("buffer thread"); try { init_sock(); } catch ( IOException e ) { e.printStackTrace(); return; } catch ( InterruptedException e ) { e.printStackTrace(); log.error( e ); return; } log.fatal("start timer...."); timerMonBuf.start(); while(true){ try { sockChannel.handleEvents(); idle(); handleDefEvents(); } catch ( IOException e ) { e.printStackTrace(); log.error( e ); } catch ( InterruptedException e ) { e.printStackTrace(); log.error( e ); } } } public void handleDefEvents() { } //Check if the buffer is over-loaded public double getLoadedFactor() { return cirBuffer.getLoadedFactor(); } //Read data from the socket channel and //put them to circular object public void readBytesEvent(SocketChannel channel) { tempBuf.clear(); int numBytesRead, toCir; int nRemaining = cirBuffer.getCapacity(); if(nRemaining == 0) return; else { tempBuf.position(BUF_SIZE - nRemaining); tempBuf.mark(); }// log.info("readByteEvent"); try { // Clear the buffer and read bytes from socket if((numBytesRead = channel.read(tempBuf)) > 0) { // To read the bytes, flip the buffer log.debug("the number of read from socket is" + numBytesRead); //simulate the possible network delay. //the bandwidth is DefConstants.NET_BANDWIDTH bps/* if(DefConstants.NET_BANDWIDTH > 0) { //(1e9 (ns) * 8 (bits)) / bandwidth delay_para += (1e9 * numBytesRead * 8.0 ) / (double)(DefConstants.NET_BANDWIDTH * 2); millis_delay = (long)(delay_para / 1e6); nanos_delay = (int)(delay_para - (double)millis_delay * 1e6); log.debug(delay_para + ":" + numBytesRead + "the network delay is " + millis_delay + ":" + nanos_delay); if(millis_delay >= (long)DefConstants.DELAY_THRESHOLD) { sleep(millis_delay, nanos_delay); delay_para = 0.0; } }*/ tempBuf.reset(); toCir = numBytesRead; while(toCir > 0) { // Read the bytes from the buffer ...; toCir -= cirBuffer.getFromByteBuffer(tempBuf, toCir ,false/*blocking mode*/); /*if(cirBuffer.getLoadedFactor() > OVER_LOADED_THRESHOLD && bufNotification != null && state == STATE_TRANSMISSION)*//* if((tempLoadedFactor = cirBuffer.getLoadedFactor()) > DefConstants.OVER_LOADED_THRESHOLD && context.bufNotification != null ) { log.warn("buffer OVERLOADED" + tempLoadedFactor); context.loaded = tempLoadedFactor; context.bufNotification.over_loaded(context); //attached_object.over_loaded(attached_object); } */ } } } catch (Exception e) { log.info("hi, I do't why I am here"); log.error(e); } } public boolean isBufferOverloaded() { return (cirBuffer.getLoadedFactor() > DefConstants.OVER_LOADED_THRESHOLD); } public boolean isBufferLightloaded() { return (cirBuffer.getLoadedFactor() < DefConstants.LIGHT_LOADED_THRESHOLD); } public void initAdaptationPara() { n = 0; context.initAdaptationPara(); } public void writeBytesEvent(SocketChannel channel ){} public void socketConnectedEvent(SelectionKey readyKey, SocketChannel channel) { // state = STATE_CONNECTED; } //50 ms elapse public void actionPerformed(ActionEvent e) { calculateLoad(); } public void idle() { if(cirBuffer.getSize() > 0) return; try{ sleep(DefConstants.IDLE_TIME); }catch(InterruptedException e) { } // calculateLoad(); } private void calculateLoad() { if(!context.bufNotification.isWorkClassRunning()) return; tempLoadedFactor = cirBuffer.getLoadedFactor(); log.info("tempLoadedFactor:" + tempLoadedFactor + " average_loaded:" + context.average_loaded); if(n < DefConstants.LD_FACTOR_WINDOW) { context.average_loaded = (context.average_loaded * (double)n + tempLoadedFactor)/ (double)(n+1); load_factors[point] = tempLoadedFactor; point ++; n++; } else { if(point == DefConstants.LD_FACTOR_WINDOW) point = 0; temp_load = load_factors[point]; context.average_loaded += (tempLoadedFactor - temp_load)/(double)DefConstants.LD_FACTOR_WINDOW; load_factors[point] = tempLoadedFactor; point ++; } for(int i = 0; i < DefConstants.LD_FACTOR_WINDOW; i++) { log.info(i + ":" + load_factors[i]); } //log.info(" average_loaded:" + context.average_loaded + "_________" + point); log.warn(" average_loaded:" + context.average_loaded); if(tempLoadedFactor > DefConstants.OVER_LOADED_THRESHOLD && context.bufNotification != null && bValid) { log.debug("buffer OVERLOADED" + tempLoadedFactor); context.loaded = tempLoadedFactor; context.bufNotification.over_loaded(context); //attached_object.over_loaded(attached_object); } else if(tempLoadedFactor < DefConstants.LIGHT_LOADED_THRESHOLD && context.bufNotification != null && bValid) { context.loaded = tempLoadedFactor; context.bufNotification.light_loaded(context); } else //this else statement is added on Sep. 14th to make the update of //severity more realicitic { context.loaded = tempLoadedFactor; context.setLongTermLoadedFactor(); } context.changingTrafficInfo(); log.warn(" longtermfactor:" + context.longTermLoadedFactor); if(bValid) context.bufNotification.IdleTransmission();/* if(state == STATE_CONNECTED) { state = STATE_TRANSMISSION; //Read the handle and id from the buffer*//* cirBuffer.readToByteBuffer(specifiedStreamName, 100); if(streamNameTable.containsKey(specifiedStreamName)) { String strHandleId = streamNameTable.get(specifiedStreamName); //attached_object = new BufferEventHandler(strHandleId); bufNotification.setConnectedHandle(strHandleId); if(bufNotification.sendAck()) state = STATE_TRANSMISSION; }*//* } if(state == STATE_TRANSMISSION) { bufNotification.idleTransmission(); } */ } public synchronized void setRntBufSizeTimes(int times) { if(times < 1) { log.info("the times of returning buf size must be larger than 1"); return; } rtnBufSizeTimes = times; } public synchronized int getRntBufSizeTimes(int times) { return rtnBufSizeTimes; } //The following function is added for tuning //the get() function put the data to internal //buffer and pass the reference of the buf //to the invoker. The reture value is the number //of bytes in the buffer //************************************** //This number of bytes in the buffer has //the following possible values: //1. any value less than 1Kb // if the availbe bytes in the circularbuf // is less than 1k, all the bytes will be // returned //2. nKb n is a integer( 0<n) // if the available bytes in the circularbuf // is more than 1Kb, the bytes passed to the // invoker depends on the current tuning para. // The current tuning para. is set by whatever // Grid service instance via the AutoFillBufferArray public int get(byte[] buf) { int i = 0; //How many bytes in the cirBuffer are available int availLen = cirBuffer.getSize(); if(availLen <= 0) return -1; int times = availLen / BUF_SIZE; if(times >= rtnBufSizeTimes) availLen = BUF_SIZE * rtnBufSizeTimes; else if (times > 0) availLen = BUF_SIZE* times; buf = new byte[availLen]; try { while( i < availLen) { buf[i] = cirBuffer.get(); /*if((cirBuffer.getLoadedFactor() < LIGHT_LOADED_THRESHOLD) && bufNotification != null && state == STATE_TRANSMISSION)*//* if(((tempLoadedFactor = cirBuffer.getLoadedFactor()) < DefConstants.LIGHT_LOADED_THRESHOLD) && context.bufNotification != null) { context.loaded = tempLoadedFactor; context.bufNotification.light_loaded(context); } */ i = i + 1; if(cirBuffer.BufferIsEmpty() == true) return i; } } catch(BufferDataException e) { log.error(e); return -1; } catch(InterruptedException e) { log.error(e); return -1; } return i; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -