📄 diskaccesscontrollerinstance.java
字号:
/*
* Created on 04-Dec-2005
* Created by Paul Gardner
* Copyright (C) 2005, 2006 Aelitis, All Rights Reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* AELITIS, SAS au capital de 40,000 euros
* 8 Allee Lenotre, La Grille Royale, 78600 Le Mesnil le Roi, France.
*
*/
package com.aelitis.azureus.core.diskmanager.access.impl;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.gudy.azureus2.core3.torrent.TOTorrent;
import org.gudy.azureus2.core3.util.AESemaphore;
import org.gudy.azureus2.core3.util.AEThread;
import org.gudy.azureus2.core3.util.Debug;
import org.gudy.azureus2.core3.util.RandomUtils;
import org.gudy.azureus2.core3.util.SystemTime;
import com.aelitis.azureus.core.diskmanager.cache.CacheFile;
public class
DiskAccessControllerInstance
{
private final int aggregation_request_limit;
private final int aggregation_byte_limit;
private String name;
private boolean enable_aggregation;
private int max_mb_queued;
private groupSemaphore max_mb_sem;
private long request_bytes_queued;
private long requests_queued;
private long total_requests;
private long total_single_requests_made;
private long total_aggregated_requests_made;
private long total_bytes;
private long total_single_bytes;
private long total_aggregated_bytes;
private long io_time;
private requestDispatcher[] dispatchers;
private long last_check = 0;
private Map request_map = new HashMap();
private static final int REQUEST_NUM_LOG_CHUNK = 100;
private static final int REQUEST_BYTE_LOG_CHUNK = 1024*1024;
private int next_request_num_log = REQUEST_NUM_LOG_CHUNK;
private long next_request_byte_log = REQUEST_BYTE_LOG_CHUNK;
private static ThreadLocal tls =
new ThreadLocal()
{
public Object
initialValue()
{
return( null );
}
};
public
DiskAccessControllerInstance(
String _name,
boolean _enable_aggregation,
int _aggregation_request_limit,
int _aggregation_byte_limit,
int _max_threads,
int _max_mb )
{
name = _name;
enable_aggregation = _enable_aggregation;
aggregation_request_limit = _aggregation_request_limit;
aggregation_byte_limit = _aggregation_byte_limit;
max_mb_queued = _max_mb;
max_mb_sem = new groupSemaphore( max_mb_queued );
dispatchers = new requestDispatcher[_max_threads];
for (int i=0;i<_max_threads;i++){
dispatchers[i] = new requestDispatcher(i);
}
}
protected long
getBlockCount()
{
return( max_mb_sem.getBlockCount());
}
protected long
getQueueSize()
{
return( requests_queued );
}
protected long
getQueuedBytes()
{
return( request_bytes_queued );
}
protected long
getTotalRequests()
{
return( total_requests );
}
protected long
getTotalSingleRequests()
{
return( total_single_requests_made );
}
protected long
getTotalAggregatedRequests()
{
return( total_aggregated_requests_made );
}
public long
getTotalBytes()
{
return( total_bytes );
}
public long
getTotalSingleBytes()
{
return( total_single_bytes );
}
public long
getTotalAggregatedBytes()
{
return( total_aggregated_bytes );
}
public long
getIOTime()
{
return( io_time );
}
protected void
queueRequest(
DiskAccessRequestImpl request )
{
TOTorrent torrent = request.getFile().getTorrentFile().getTorrent();
requestDispatcher dispatcher;
synchronized( request_map ){
int min_index = 0;
int min_size = Integer.MAX_VALUE;
long now = System.currentTimeMillis();
boolean check = false;
if ( now - last_check > 60000 || now < last_check ){
check = true;
last_check = now;
}
if ( check ){
Iterator it = request_map.values().iterator();
while( it.hasNext()){
requestDispatcher d = (requestDispatcher)it.next();
long last_active = d.getLastRequestTime();
if ( now - last_active > 60000 ){
it.remove();
}else if ( now < last_active ){
d.setLastRequestTime( now );
}
}
}
dispatcher = (requestDispatcher)request_map.get(torrent);
if ( dispatcher == null ){
for (int i=0;i<dispatchers.length;i++){
int size = dispatchers[i].size();
if ( size == 0 ){
min_index = i;
break;
}
if ( size < min_size ){
min_size = size;
min_index = i;
}
}
dispatcher = dispatchers[min_index];
request_map.put( torrent, dispatcher );
}
dispatcher.setLastRequestTime( now );
}
dispatcher.queue( request );
}
protected void
getSpaceAllowance(
DiskAccessRequestImpl request )
{
int mb_diff;
synchronized( request_map ){
int old_mb = (int)(request_bytes_queued/(1024*1024));
request_bytes_queued += request.getSize();
int new_mb = (int)(request_bytes_queued/(1024*1024));
mb_diff = new_mb - old_mb;
if ( mb_diff > max_mb_queued ){
// if this request is bigger than the max allowed queueable then easiest
// approach is to bump up the limit
max_mb_sem.releaseGroup( mb_diff - max_mb_queued );
max_mb_queued = mb_diff;
}
requests_queued++;
if ( requests_queued >= next_request_num_log ){
//System.out.println( "DAC:" + name + ": requests = " + requests_queued );
next_request_num_log += REQUEST_NUM_LOG_CHUNK;
}
if ( request_bytes_queued >= next_request_byte_log ){
//System.out.println( "DAC:" + name + ": bytes = " + request_bytes_queued );
next_request_byte_log += REQUEST_BYTE_LOG_CHUNK;
}
}
if ( mb_diff > 0 ){
max_mb_sem.reserveGroup( mb_diff );
}
}
protected void
releaseSpaceAllowance(
DiskAccessRequestImpl request )
{
int mb_diff;
synchronized( request_map ){
int old_mb = (int)(request_bytes_queued/(1024*1024));
request_bytes_queued -= request.getSize();
int new_mb = (int)(request_bytes_queued/(1024*1024));
mb_diff = old_mb - new_mb;
requests_queued--;
}
if ( mb_diff > 0 ){
max_mb_sem.releaseGroup( mb_diff );
}
}
protected class
requestDispatcher
{
private int index;
private AEThread thread;
private LinkedList requests = new LinkedList();
private Map request_map = new HashMap();
private AESemaphore request_sem = new AESemaphore("DiskAccessControllerInstance:requestDispatcher" );
private long last_request_time;
protected
requestDispatcher(
int _index )
{
index = _index;
}
protected void
queue(
DiskAccessRequestImpl request )
{
if ( tls.get() != null ){
// let recursive calls straight through
synchronized( requests ){
// stats not synced on the right object, but they're only stats...
total_requests++;
total_single_requests_made++;
total_bytes += request.getSize();
total_single_bytes += request.getSize();
}
try{
request.runRequest();
}catch( Throwable e ){
Debug.printStackTrace(e);
}
}else{
getSpaceAllowance( request );
synchronized( requests ){
total_requests++;
total_bytes += request.getSize();
boolean added = false;
int priority = request.getPriority();
if ( priority >= 0 ){
int pos = 0;
for (Iterator it = requests.iterator();it.hasNext();){
DiskAccessRequestImpl r = (DiskAccessRequestImpl)it.next();
if ( r.getPriority() < priority ){
requests.add( pos, request );
added = true;
break;
}
pos++;
}
}
if ( !added ){
requests.add( request );
}
if ( enable_aggregation ){
Map m = (Map)request_map.get( request.getFile());
if ( m == null ){
m = new HashMap();
request_map.put( request.getFile(), m );
}
m.put( new Long( request.getOffset()), request );
}
// System.out.println( "request queue: req = " + requests.size() + ", bytes = " + request_bytes_queued );
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -