⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 diskaccesscontrollerinstance.java

📁 这是一个基于java编写的torrent的P2P源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/*
 * Created on 04-Dec-2005
 * Created by Paul Gardner
 * Copyright (C) 2005, 2006 Aelitis, All Rights Reserved.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
 * 
 * AELITIS, SAS au capital de 40,000 euros
 * 8 Allee Lenotre, La Grille Royale, 78600 Le Mesnil le Roi, France.
 *
 */

package com.aelitis.azureus.core.diskmanager.access.impl;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import org.gudy.azureus2.core3.torrent.TOTorrent;
import org.gudy.azureus2.core3.util.AESemaphore;
import org.gudy.azureus2.core3.util.AEThread;
import org.gudy.azureus2.core3.util.Debug;
import org.gudy.azureus2.core3.util.RandomUtils;
import org.gudy.azureus2.core3.util.SystemTime;

import com.aelitis.azureus.core.diskmanager.cache.CacheFile;

public class 
DiskAccessControllerInstance 
{
	private final int aggregation_request_limit;
	private final int aggregation_byte_limit;
	
	private String		name;
	private boolean		enable_aggregation;
	
	private int	max_mb_queued;
		
	private groupSemaphore	max_mb_sem;
	
	private long			request_bytes_queued;
	private long			requests_queued;
	
	private long			total_requests;
	private long			total_single_requests_made;
	private long			total_aggregated_requests_made;
	
	private long			total_bytes;
	private long			total_single_bytes;
	private long			total_aggregated_bytes;
	
	private long			io_time;

	private requestDispatcher[]	dispatchers;
	
	private long		last_check		= 0;	
	
	private Map			request_map	= new HashMap();	
	
	private static final int REQUEST_NUM_LOG_CHUNK 		= 100;
	private static final int REQUEST_BYTE_LOG_CHUNK 	= 1024*1024;
	
	private int			next_request_num_log	= REQUEST_NUM_LOG_CHUNK;
	private long		next_request_byte_log	= REQUEST_BYTE_LOG_CHUNK;
	
	private static ThreadLocal		tls	= 
		new ThreadLocal()
		{
			public Object
			initialValue()
			{
				return( null );
			}
		};
		
	public
	DiskAccessControllerInstance(
		String	_name,
		boolean	_enable_aggregation,
		int		_aggregation_request_limit,
		int		_aggregation_byte_limit,
		int		_max_threads,
		int		_max_mb )
	{		
		name				= _name;
		
		enable_aggregation			= _enable_aggregation;
		aggregation_request_limit	= _aggregation_request_limit;
		aggregation_byte_limit		= _aggregation_byte_limit;
		
		max_mb_queued		= _max_mb;
				
		max_mb_sem 			= new groupSemaphore( max_mb_queued );

		dispatchers	= new requestDispatcher[_max_threads];
		
		for (int i=0;i<_max_threads;i++){
			dispatchers[i]	= new requestDispatcher(i);
		}
	}
	
	protected long
	getBlockCount()
	{
		return( max_mb_sem.getBlockCount());
	}
	
	protected long
	getQueueSize()
	{
		return( requests_queued );
	}
	
	protected long
	getQueuedBytes()
	{
		return( request_bytes_queued );
	}
	
	protected long
	getTotalRequests()
	{
		return( total_requests );
	}
	
	protected long
	getTotalSingleRequests()
	{
		return( total_single_requests_made );
	}
	
	protected long
	getTotalAggregatedRequests()
	{
		return( total_aggregated_requests_made );
	}
	
	public long
	getTotalBytes()
	{
		return( total_bytes );
	}
	
	public long
	getTotalSingleBytes()
	{
		return( total_single_bytes );
	}
	
	public long
	getTotalAggregatedBytes()
	{
		return( total_aggregated_bytes );
	}
	
	public long
	getIOTime()
	{
		return( io_time );
	}
	
	protected void
	queueRequest(
		DiskAccessRequestImpl	request )
	{
		TOTorrent	torrent = request.getFile().getTorrentFile().getTorrent();
		
		requestDispatcher	dispatcher;
		
		synchronized( request_map ){
			
			int	min_index 	= 0;
			int	min_size	= Integer.MAX_VALUE;
			
			long	now = System.currentTimeMillis();
			
			boolean	check = false;
			
			if ( now - last_check > 60000 || now < last_check ){
				
				check		= true;
				last_check	= now;
			}
			
			if ( check ){
				
				Iterator	it = request_map.values().iterator();
				
				while( it.hasNext()){
					
					requestDispatcher	d = (requestDispatcher)it.next();
					
					long	last_active = d.getLastRequestTime();
					
					if ( now - last_active > 60000 ){
												
						it.remove();
						
					}else if ( now < last_active ){
						
						d.setLastRequestTime( now );
					}
				}
			}
			
			dispatcher = (requestDispatcher)request_map.get(torrent);			

			if ( dispatcher == null ){
				
				for (int i=0;i<dispatchers.length;i++){
					
					int	size = dispatchers[i].size();
					
					if ( size == 0 ){
						
						min_index = i;
						
						break;
					}
					
					if ( size < min_size ){
						
						min_size 	= size;
						min_index	= i;
					}
				}
				
				dispatcher = dispatchers[min_index];
				
				request_map.put( torrent, dispatcher );
			}
			
			dispatcher.setLastRequestTime( now );
		}
		
		dispatcher.queue( request );
	}
	
	protected void
	getSpaceAllowance(
		DiskAccessRequestImpl	request )
	{
		int	mb_diff;
				
		synchronized( request_map ){
			
			int	old_mb = (int)(request_bytes_queued/(1024*1024));
			
			request_bytes_queued += request.getSize();
							
			int	new_mb = (int)(request_bytes_queued/(1024*1024));
		
			mb_diff = new_mb - old_mb;
		
			if ( mb_diff > max_mb_queued ){
				
					// if this request is bigger than the max allowed queueable then easiest
					// approach is to bump up the limit
									
				max_mb_sem.releaseGroup( mb_diff - max_mb_queued );
				
				max_mb_queued	= mb_diff;
			}
			
			requests_queued++;
			
			if ( requests_queued >= next_request_num_log ){
				
				//System.out.println( "DAC:" + name + ": requests = " + requests_queued );
				
				next_request_num_log += REQUEST_NUM_LOG_CHUNK;
			}
			
			if ( request_bytes_queued >= next_request_byte_log ){
				
				//System.out.println( "DAC:" + name + ": bytes = " + request_bytes_queued );
				
				next_request_byte_log += REQUEST_BYTE_LOG_CHUNK;
			}
		}
		
		if ( mb_diff > 0 ){
			
			max_mb_sem.reserveGroup( mb_diff );
		}
	}
	
	protected void
	releaseSpaceAllowance(
		DiskAccessRequestImpl	request )
	{
		int	mb_diff;
		
		synchronized( request_map ){
			
			int	old_mb = (int)(request_bytes_queued/(1024*1024));
			
			request_bytes_queued -= request.getSize();
							
			int	new_mb = (int)(request_bytes_queued/(1024*1024));
		
			mb_diff = old_mb - new_mb;
			
			requests_queued--;
		}
		
		if ( mb_diff > 0 ){
			
			max_mb_sem.releaseGroup( mb_diff );
		}
	}
	
	protected class
	requestDispatcher
	{
		private int			index;
		private AEThread	thread;
		private LinkedList	requests 	= new LinkedList();
		
		private Map			request_map	= new HashMap();
		
		private AESemaphore	request_sem	= new AESemaphore("DiskAccessControllerInstance:requestDispatcher" );
		
		private long	last_request_time;
				
		protected
		requestDispatcher(
			int	_index )
		{
			index	= _index;
		}
		
		protected void
		queue(
			DiskAccessRequestImpl			request )
		{
			if ( tls.get() != null ){
				
					// let recursive calls straight through
				
				synchronized( requests ){

						// stats not synced on the right object, but they're only stats...
					
					total_requests++;
					
					total_single_requests_made++;
					
					total_bytes	+= request.getSize();
					
					total_single_bytes += request.getSize();
				}
				
				try{
					request.runRequest();
					
				}catch( Throwable e ){
					
					Debug.printStackTrace(e);
				}
				
			}else{
												
				getSpaceAllowance( request );
				
				synchronized( requests ){
					
					total_requests++;
					
					total_bytes	+= request.getSize();
					
					boolean	added = false;
					
					int	priority = request.getPriority();
										
					if ( priority >= 0 ){
						
						int	pos = 0;
						
						for (Iterator it = requests.iterator();it.hasNext();){
							
							DiskAccessRequestImpl	r = (DiskAccessRequestImpl)it.next();
						
							if ( r.getPriority() < priority ){
								
								requests.add( pos, request );
								
								added = true;
								
								break;
							}
							
							pos++;
						}
					}
					
					if ( !added ){
						
						requests.add( request );
					}
					
					if ( enable_aggregation ){
						
						Map	m = (Map)request_map.get( request.getFile());
						
						if ( m == null ){
							
							m = new HashMap();
							
							request_map.put( request.getFile(), m ); 
						}
						
						m.put( new Long( request.getOffset()), request );
					}
					
					// System.out.println( "request queue: req = " + requests.size() + ", bytes = " + request_bytes_queued );

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -