📄 autofilloutputbuffer.java
字号:
package org.osu.ogsa.stream.util;import java.io.*;import java.nio.*;import java.nio.channels.*;import java.nio.channels.spi.*;import java.nio.charset.*;import java.net.*;import java.util.*;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import javax.swing.Timer;import java.awt.event.*;public class AutoFillOutputBuffer extends Thread implements SocketNotification, ActionListener{ private final static int BUF_SIZE = 1024;// private final static float OVER_LOADED_THRESHOLD = (float)0.80;// private final static float LIGHT_LOADED_THRESHOLD = (float)0.2; private CircularBuffer cirBuffer;// private byte[] tempBuf = new byte[BUF_SIZE]; private ByteBuffer tempBuf = ByteBuffer.allocate(BUF_SIZE); private NonBlockingClient clientSocket; private BufferNotification bufNotification = null; private ConnectionContext context = null; private double tempLoadedFactor; private int n = 0; //the times the idle() was called private int point = 0; private double load_factors[] = new double[DefConstants.LD_FACTOR_WINDOW]; private double temp_load; private double delay_para = 0; private long millis_delay; private int nanos_delay; private int NET_BANDWIDTH = -1; private int sign = -1; private double [] seqBgTraffic; private int nIndexOfTrafficArray = 0; private double nAccumulatedPkg = 0; private int nTimes = 0; private boolean bValid = true; private double net_util = 0.2; private int occupiedBandwidth = 0; private int newNetBandwidth = 0; private javax.swing.Timer timerMonBuf, timerBgTraffic, timerNetworkBandwidthVaring; private static Log log = LogFactory.getLog(AutoFillOutputBuffer.class.getName()); public AutoFillOutputBuffer() { try{ cirBuffer = new CircularBuffer(); timerMonBuf = new javax.swing.Timer(DefConstants.DELAY_MONITOR_BUF, this); timerBgTraffic = new javax.swing.Timer(DefConstants.PACKAGE_INTERAL, this); timerNetworkBandwidthVaring = null; }catch(Exception e) { log.error(e); } } public AutoFillOutputBuffer(int bufSize) { try{ cirBuffer = new CircularBuffer(bufSize); timerMonBuf = new javax.swing.Timer(DefConstants.DELAY_MONITOR_BUF, this); timerBgTraffic = new javax.swing.Timer(DefConstants.PACKAGE_INTERAL, this); timerNetworkBandwidthVaring = null; }catch(Exception e) { log.error(e); } } public void setConnectionContext(ConnectionContext cc) { context = cc; if(context.net_bandwidth > 0) setNetworkBandwidth((int)context.net_bandwidth); else if(DefConstants.NET_BANDWIDTH > 0) setNetworkBandwidth((int)DefConstants.NET_BANDWIDTH); else if(context.net_bandwidth == 0 || DefConstants.NET_BANDWIDTH == 0) { if(DefConstants.timeIntervalForVaringBandwidth > 0 && DefConstants.minProducingRate > 0 && DefConstants.maxProducingRate > 0 && DefConstants.minProducingRate != DefConstants.maxProducingRate && DefConstants.nDivide > 0) { try{ timerNetworkBandwidthVaring = new javax.swing.Timer(DefConstants.timeIntervalForVaringBandwidth, this); }catch(Exception e) { log.error(e); } setNetworkBandwidth(DefConstants.maxProducingRate); sign = -1; } else timerNetworkBandwidthVaring = null; } else setNetworkBandwidth(-1); if(context.net_util < 1.0) setNetUtil(context.net_util); } public ConnectionContext getConnectionContext() { return context; } public int put(byte[] buf, int length) { int i = 0; //make sure the data in the buffer //can put into cirbuffer int capacity = cirBuffer.getCapacity(); int size; if(capacity < length) size = capacity; else size = length; try { while( i < size ) { cirBuffer.put(buf[i]);/* if(((tempLoadedFactor = cirBuffer.getLoadedFactor()) > DefConstants.OVER_LOADED_THRESHOLD) && context.bufNotification != null) { context.loaded = tempLoadedFactor; context.bufNotification.over_loaded(context); }*/ i = i + 1; } } catch(BufferDataException e) { log.error(e); return -1; } catch(InterruptedException e) { log.error(e); return -1; } return size; } public int put(ByteBuffer buf, int length, boolean bBlock) { int i = 0; if(bBlock == false) //make sure the data in the buffer //can put into cirbuffer { int size; try { int capacity = cirBuffer.getCapacity(); if(capacity < length) size = capacity; else size = length; while( i < size ) { cirBuffer.put(buf.get(i));/* if(((tempLoadedFactor = cirBuffer.getLoadedFactor()) > DefConstants.OVER_LOADED_THRESHOLD) && context.bufNotification != null) { context.loaded = tempLoadedFactor; context.bufNotification.over_loaded(context); }*/ i = i + 1; } } catch(BufferDataException e) { log.error(e); return -1; } catch(InterruptedException e) { log.error(e); return -1; } return size; } else //blocking mode { try { while( i < length) { //log.debug("The value gotten from cirBuffer is"+buf.get(i)+"and i is "+ i); cirBuffer.put(buf.get(i), true/*block mode*/);/* if(((tempLoadedFactor = cirBuffer.getLoadedFactor()) > DefConstants.OVER_LOADED_THRESHOLD) && context.bufNotification != null) { context.loaded = tempLoadedFactor; context.bufNotification.over_loaded(context); } */ i = i + 1; } } catch(BufferDataException e) { log.error(e); return -1; } catch(InterruptedException e) { log.error(e); return -1; } return length; } } public void run() { setName("buffer thread"); try { if(context == null) { log.error("initialize the ConnectionContext object first"); return; } log.debug(context.rcverHostName+context.rcverPort); clientSocket = new NonBlockingClient(context.rcverHostName, context.rcverPort); clientSocket.addCallback(this); SocketChannel sChannel = clientSocket.initialize(); //System.out.println(sChannel); //InetSocketAddress saddr = (InetSocketAddress)sChannel.socket().getLocalSocketAddress(); //InetSocketAddress saddr_remote = (InetSocketAddress)sChannel.socket().getRemoteSocketAddress(); //context.rcverPort = saddr.getPort(); //context.rcverAddr = saddr.getAddress(); //context.senderPort = saddr_remote.getPort(); //context.senderAddr = saddr_remote.getAddress(); context.bufNotification.SocketConnected(context); } catch ( IOException e ) { e.printStackTrace(); System.exit( -1 ); } timerMonBuf.start(); timerBgTraffic.start(); if(timerNetworkBandwidthVaring != null) timerNetworkBandwidthVaring.start(); while(true){ try { clientSocket.handleEvents(); idle(); handleDefEvents(); } catch ( IOException e ) { e.printStackTrace(); log.error( e ); } catch ( InterruptedException e ) { e.printStackTrace(); log.error( e ); } } } public synchronized void setBufferStatus(boolean bValid) { this.bValid = bValid; } public synchronized boolean getBufferStatus(boolean bValid) { return bValid; } public void readBytesEvent(SocketChannel channel) {} //Keep it for dynamic one/* public void writeBytesEvent(SocketChannel channel) { tempBuf.clear(); int numBytesWrite; try { if(NET_BANDWIDTH > 0) { //(1e9 (ns) * 8 (bits)) / bandwidth double dTempPkg = getNumAccumulatedPkg(); int timeConsumed = DefConstants.PACKAGE_INTERAL * getNTimes(); if(timeConsumed != 0) { occupiedBandwidth = (int)((double)(dTempPkg*DefConstants.PACKAGE_SIZE*1000)/(double)timeConsumed); System.out.println("the occupied netbandwidth is " + occupiedBandwidth); } else { occupiedBandwidth = 0; System.out.println("the occupied netbandwidth is 0 "); }// delay_para += 1e9*(numBytesWrite * 8.0 + getNumAccumulatedPkg()*DefConstants.PACKAGE_SIZE)/(double)(NET_BANDWIDTH); // delay_para += 1e9*(numBytesWrite * 8.0 + dTempPkg*DefConstants.PACKAGE_SIZE)/(double)(NET_BANDWIDTH); newNetBandwidth = NET_BANDWIDTH - occupiedBandwidth; log.debug("new network bandwidth is :" + newNetBandwidth); if(newNetBandwidth <= 0 ) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -