datareaderspeedlimiter.java
来自「Azureus is a powerful, full-featured, cr」· Java 代码 · 共 331 行
JAVA
331 行
/*
* Created on 30-Apr-2004
* Created by Paul Gardner
* Copyright (C) 2004 Aelitis, All Rights Reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* AELITIS, SARL au capital de 30,000 euros
* 8 Allee Lenotre, La Grille Royale, 78600 Le Mesnil le Roi, France.
*
*/
package org.gudy.azureus2.core3.peer.impl.transport.base;
/**
* @author parg
*
*/
import java.io.*;
import java.nio.channels.*;
import org.gudy.azureus2.core3.peer.impl.PEPeerTransportDataReader;
import org.gudy.azureus2.core3.util.*;
import org.gudy.azureus2.core3.config.*;
public class
DataReaderSpeedLimiter
{
protected final int slot_period_millis = (int)(SystemTime.TIME_GRANULARITY_MILLIS+5);
protected final int slot_count = 1000/slot_period_millis;
protected int bytes_per_second = 0; // global bytes-per-second limit
protected int bytes_per_slot; // current global bytes-per-slot
protected long current_slot; // current (last) slot used
protected int bytes_available; // bytes available unused by current slot
protected AEMonitor this_mon = new AEMonitor( "DataReaderSpeedLimiter");
protected static DataReaderSpeedLimiter singleton = new DataReaderSpeedLimiter();
public static DataReaderSpeedLimiter
getSingleton()
{
return( singleton );
}
protected
DataReaderSpeedLimiter()
{
COConfigurationManager.addParameterListener(
"Max Download Speed KBs",
new ParameterListener()
{
public void
parameterChanged(
String str )
{
bytes_per_second = COConfigurationManager.getIntParameter( "Max Download Speed KBs", 0 ) * 1024;
bytes_per_slot = bytes_per_second/slot_count;
}
});
bytes_per_second = COConfigurationManager.getIntParameter( "Max Download Speed KBs", 0 ) * 1024;
bytes_per_slot = bytes_per_second/slot_count;
}
public PEPeerTransportDataReader
getDataReader(
DataReaderOwner owner )
{
return( new limitedDataReader( owner ));
}
protected class
unlimitedDataReader
implements PEPeerTransportDataReader
{
public int
read(
SocketChannel channel,
DirectByteBuffer direct_buffer )
throws IOException
{
return( direct_buffer.read(DirectByteBuffer.SS_PEER,channel));
}
}
protected class
limitedDataReader
implements PEPeerTransportDataReader
{
protected DataReaderOwner owner;
protected long my_current_slot;
protected int my_bytes_available;
protected
limitedDataReader(
DataReaderOwner _owner )
{
owner = _owner;
}
public int
read(
SocketChannel channel,
DirectByteBuffer direct_buffer )
throws IOException
{
int my_bytes_per_second = owner.getMaximumBytesPerSecond();
if ( bytes_per_second == 0 && my_bytes_per_second == 0 ){
// unlimited
return( direct_buffer.read( DirectByteBuffer.SS_PEER, channel ));
}
int position = direct_buffer.position(DirectByteBuffer.SS_PEER);
int limit = direct_buffer.limit(DirectByteBuffer.SS_PEER);
int bytes_allocated = 0;
int debug_max_bytes = -1;
int debug_limit = -1;
try{
try{
this_mon.enter();
long now = SystemTime.getCurrentTime();
long new_slot = now/slot_period_millis;
// do the global limit first
if ( bytes_per_second > 0 ){
long slots = new_slot - current_slot;
current_slot = new_slot;
if ( slots < 0 ){
// someone must have changed the clock, reset our position in time
return( 0 );
}
if ( slots > slot_count ){
slots = slot_count;
}
bytes_available += slots*bytes_per_slot;
// give a bit of slack for bursty transfers
if ( bytes_available > (3*bytes_per_second )){
bytes_available = 3*bytes_per_second;
}
if ( bytes_available == 0 ){
return( 0 );
}
}else{
bytes_available = 0;
}
// we've got access to a "global" amount of bytes we can read
// now apply specific policy if required
if ( my_bytes_per_second > 0 ){
long my_slots = new_slot - my_current_slot;
my_current_slot = new_slot;
if ( my_slots < 0 ){
// someone must have changed the clock, reset our position in time
return( 0 );
}
if ( my_slots > slot_count ){
my_slots = slot_count;
}
int my_bytes_per_slot = my_bytes_per_second/slot_count;
my_bytes_available += my_slots*my_bytes_per_slot;
// give a bit of slack for bursty transfers
if ( my_bytes_available > (3*my_bytes_per_second )){
my_bytes_available = 3*my_bytes_per_second;
}
if ( my_bytes_available == 0 ){
return( 0 );
}
}else{
my_bytes_available = 0;
}
// bytes_available: 0 -> unlimited
// my_bytes_available: 0 -> unlimited
int max_bytes;
if ( bytes_available == 0 && my_bytes_available == 0 ){
max_bytes = 0;
}else if ( bytes_available == 0 ){
max_bytes = my_bytes_available;
}else if ( my_bytes_available == 0 ){
max_bytes = bytes_available;
}else{
max_bytes = bytes_available < my_bytes_available?bytes_available:my_bytes_available;
}
int request_read_size = limit - position;
// now limit the read based on any restrictions
debug_max_bytes = max_bytes;
if ( max_bytes != 0 && request_read_size > max_bytes ){
debug_limit = position + max_bytes;
direct_buffer.limit( DirectByteBuffer.SS_PEER, position + max_bytes );
bytes_allocated = max_bytes;
}else{
bytes_allocated = request_read_size;
}
my_bytes_available -= bytes_allocated;
if ( my_bytes_available < 0 ){
my_bytes_available = 0;
}
bytes_available -= bytes_allocated;
if ( bytes_available < 0 ){
bytes_available = 0;
}
}finally{
this_mon.exit();
}
int bytes_read = 0;
try{
bytes_read = direct_buffer.read(DirectByteBuffer.SS_PEER,channel);
return( bytes_read );
}finally{
if ( bytes_read < bytes_allocated ){
try{
this_mon.enter();
bytes_available += ( bytes_allocated - bytes_read );
my_bytes_available += ( bytes_allocated - bytes_read );
}finally{
this_mon.exit();
}
}
direct_buffer.limit( DirectByteBuffer.SS_PEER, limit );
}
}catch( IllegalArgumentException e ){
System.out.println( "Illegal arg exception" );
System.out.println( "buffer: " + direct_buffer.position(DirectByteBuffer.SS_PEER) + "/" + direct_buffer.limit(DirectByteBuffer.SS_PEER));
System.out.println( " start values:" + position + "/" + limit );
System.out.println( " alloc = " + bytes_allocated + ", ba = " + bytes_available + ", mba = " + my_bytes_available );
System.out.println( " max_bytes = " + debug_max_bytes + ", limit = " + debug_limit );
throw( e );
}
}
}
}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?