dmwriterandcheckerimpl.java
来自「Azureus is a powerful, full-featured, cr」· Java 代码 · 共 1,083 行 · 第 1/2 页
JAVA
1,083 行
/*
* Created on 31-Jul-2004
* Created by Paul Gardner
* Copyright (C) 2004 Aelitis, All Rights Reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* AELITIS, SARL au capital de 30,000 euros
* 8 Allee Lenotre, La Grille Royale, 78600 Le Mesnil le Roi, France.
*
*/
package org.gudy.azureus2.core3.disk.impl.access.impl;
import java.util.LinkedList;
import java.util.List;
import org.gudy.azureus2.core3.config.COConfigurationManager;
import org.gudy.azureus2.core3.config.ParameterListener;
import org.gudy.azureus2.core3.disk.*;
import org.gudy.azureus2.core3.disk.impl.DiskManagerFileInfoImpl;
import org.gudy.azureus2.core3.disk.impl.DiskManagerHelper;
import org.gudy.azureus2.core3.disk.impl.PieceList;
import org.gudy.azureus2.core3.disk.impl.PieceMapEntry;
import org.gudy.azureus2.core3.disk.impl.access.*;
import org.gudy.azureus2.core3.logging.LGLogger;
import org.gudy.azureus2.core3.util.*;
import com.aelitis.azureus.core.diskmanager.cache.*;
/**
* @author parg
*
*/
public class
DMWriterAndCheckerImpl
implements DMWriterAndChecker
{
protected static final boolean CONCURRENT_CHECKING = true;
protected static final int DEFAULT_WRITE_QUEUE_MAX = 256;
protected static final int DEFAULT_CHECK_QUEUE_MAX = 128;
protected static final int QUEUE_REPORT_CHUNK = 32;
// global limit on size of write queue
private static int global_write_queue_block_sem_size;
private static AESemaphore global_write_queue_block_sem;
private static int global_write_queue_block_sem_next_report_size;
// global limit on size of check queue
private static int global_check_queue_block_sem_size;
private static AESemaphore global_check_queue_block_sem;
private static int global_check_queue_block_sem_next_report_size;
private static boolean friendly_hashing = COConfigurationManager.getBooleanParameter( "diskmanager.friendly.hashchecking" );
static{
int write_limit_blocks = COConfigurationManager.getIntParameter("DiskManager Write Queue Block Limit", 0);
global_write_queue_block_sem_size = write_limit_blocks==0?DEFAULT_WRITE_QUEUE_MAX:write_limit_blocks;
global_write_queue_block_sem_next_report_size = global_write_queue_block_sem_size - QUEUE_REPORT_CHUNK;
global_write_queue_block_sem = new AESemaphore("DMW&C::writeQ", global_write_queue_block_sem_size);
if ( global_write_queue_block_sem_size == 0 ){
global_write_queue_block_sem.releaseForever();
}
int check_limit_pieces = COConfigurationManager.getIntParameter("DiskManager Check Queue Piece Limit", 0);
global_check_queue_block_sem_size = check_limit_pieces==0?DEFAULT_CHECK_QUEUE_MAX:check_limit_pieces;
global_check_queue_block_sem_next_report_size = global_check_queue_block_sem_size - QUEUE_REPORT_CHUNK;
global_check_queue_block_sem = new AESemaphore("DMW&C::checkQ", global_check_queue_block_sem_size);
if ( global_check_queue_block_sem_size == 0 ){
global_check_queue_block_sem.releaseForever();
}
}
private static final ParameterListener param_listener = new ParameterListener() {
public void parameterChanged( String str ) {
friendly_hashing = COConfigurationManager.getBooleanParameter( "diskmanager.friendly.hashchecking" );
}
};
private DiskManagerHelper disk_manager;
private DiskWriteThread writeThread;
private List writeQueue = new LinkedList();
private List checkQueue = new LinkedList();
private AESemaphore writeCheckQueueSem = new AESemaphore("writeCheckQ");
protected long total_async_check_requests;
protected AESemaphore async_check_request_sem = new AESemaphore("DMW&C::asyncReq");
protected boolean started;
protected boolean bOverallContinue = true;
protected int pieceLength;
protected int lastPieceLength;
protected long totalLength;
protected int nbPieces;
protected boolean complete_recheck_in_progress;
protected AEMonitor this_mon = new AEMonitor( "DMW&C" );
protected AESemaphore stop_sem = new AESemaphore( "DMW&C::stop");
public
DMWriterAndCheckerImpl(
DiskManagerHelper _disk_manager )
{
disk_manager = _disk_manager;
pieceLength = disk_manager.getPieceLength();
lastPieceLength = disk_manager.getLastPieceLength();
totalLength = disk_manager.getTotalLength();
nbPieces = disk_manager.getNumberOfPieces();
// System.out.println( "DMW&C: write sem = " + global_write_queue_block_sem.getValue() + ", check = " + global_check_queue_block_sem.getValue());
}
public void
start()
{
try{
this_mon.enter();
if ( started ){
throw( new RuntimeException( "DMW&C: start while started"));
}
if ( !bOverallContinue ){
throw( new RuntimeException( "DMW&C: start after stopped"));
}
started = true;
COConfigurationManager.addParameterListener( "diskmanager.friendly.hashchecking", param_listener );
writeThread = new DiskWriteThread();
writeThread.start();
}finally{
this_mon.exit();
}
}
public void
stop()
{
try{
this_mon.enter();
if ( !started ){
return;
}
started = false;
// when we exit here we guarantee that all file usage operations have completed
// i.e. writes and checks (checks being doubly async)
bOverallContinue = false;
COConfigurationManager.removeParameterListener( "diskmanager.friendly.hashchecking", param_listener );
}finally{
this_mon.exit();
}
writeThread.stopWriteThread();
// wait until the thread has stopped
stop_sem.reserve();
// now wait until any outstanding piece checks have completed
for (int i=0;i<total_async_check_requests;i++){
async_check_request_sem.reserve();
}
}
public boolean
isChecking()
{
return( complete_recheck_in_progress );
}
public boolean
zeroFile(
DiskManagerFileInfoImpl file,
long length )
{
CacheFile cache_file = file.getCacheFile();
long written = 0;
try{
if( length == 0 ){ //create a zero-length file if it is listed in the torrent
cache_file.setLength( 0 );
}else{
DirectByteBuffer buffer = DirectByteBufferPool.getBuffer(DirectByteBuffer.AL_DM_ZERO,pieceLength);
try{
buffer.limit(DirectByteBuffer.SS_DW, pieceLength);
for (int i = 0; i < buffer.limit(DirectByteBuffer.SS_DW); i++){
buffer.put(DirectByteBuffer.SS_DW, (byte)0);
}
buffer.position(DirectByteBuffer.SS_DW, 0);
while (written < length && bOverallContinue){
int write_size = buffer.capacity(DirectByteBuffer.SS_DW);
if ((length - written) < write_size ){
write_size = (int)(length - written);
}
buffer.limit(DirectByteBuffer.SS_DW, write_size);
cache_file.write( buffer, written );
buffer.position(DirectByteBuffer.SS_DW, 0);
written += write_size;
disk_manager.setAllocated( disk_manager.getAllocated() + write_size );
disk_manager.setPercentDone((int) ((disk_manager.getAllocated() * 1000) / totalLength));
}
}finally{
buffer.returnToPool();
}
}
if (!bOverallContinue){
cache_file.close();
return false;
}
}catch (Exception e){
Debug.printStackTrace( e );
try{
cache_file.close();
}
catch( Throwable t ) { /*ignore*/ }
return( false );
}
return true;
}
public void
enqueueCompleteRecheckRequest(
final DiskManagerCheckRequestListener listener,
final Object user_data )
{
Thread t = new AEThread("DMW&C::checker")
{
public void
runSupport()
{
try{
complete_recheck_in_progress = true;
final AESemaphore sem = new AESemaphore( "DMW&C::checker" );
int checks_submitted = 0;
int delay = 0; //if friendly hashing is enabled, no need to delay even more here
if( !friendly_hashing ) {
//delay a bit normally anyway, as we don't want to kill the user's system
//during the post-completion check (10k of piece = 1ms of sleep)
delay = pieceLength /1024 /10;
delay = Math.min( delay, 409 );
delay = Math.max( delay, 12 );
}
for ( int i=0; i < nbPieces; i++ ){
if ( !bOverallContinue ){
break;
}
enqueueCheckRequest(
i,
new DiskManagerCheckRequestListener()
{
public void
pieceChecked(
int _pieceNumber,
boolean _result,
Object _user_data )
{
try{
listener.pieceChecked( _pieceNumber, _result, _user_data );
}finally{
sem.release();
}
}
},
user_data );
checks_submitted++;
if( delay > 0 ) Thread.sleep( delay );
}
if ( bOverallContinue ){
// wait for all to complete
for (int i=0;i<checks_submitted;i++){
sem.reserve();
}
}
}catch( Throwable e ){
// we get here if the disk manager's stopped running
Ignore.ignore(e);
}finally{
complete_recheck_in_progress = false;
}
}
};
t.setDaemon(true);
t.start();
}
public void
enqueueCheckRequest(
int pieceNumber,
DiskManagerCheckRequestListener listener,
Object user_data )
{
global_check_queue_block_sem.reserve();
if ( global_check_queue_block_sem.getValue() < global_check_queue_block_sem_next_report_size ){
// Debug.out( "Disk Manager check queue size exceeds " + ( global_check_queue_block_sem_size - global_check_queue_block_sem_next_report_size ));
global_check_queue_block_sem_next_report_size -= QUEUE_REPORT_CHUNK;
}
// System.out.println( "check queue size = " + ( global_check_queue_block_sem_size - global_check_queue_block_sem.getValue()));
try{
this_mon.enter();
if ( !bOverallContinue ){
global_check_queue_block_sem.release();
throw( new RuntimeException( "WriteChecker stopped" ));
}
checkQueue.add(new QueueElement(pieceNumber, 0, null, user_data, listener ));
}finally{
this_mon.exit();
}
writeCheckQueueSem.release();
}
public void
checkPiece(
final int pieceNumber,
final CheckPieceResultHandler _result_handler,
final Object user_data )
throws Exception
{
final int this_piece_length = pieceNumber < nbPieces - 1 ? pieceLength : lastPieceLength;
final byte[] required_hash = disk_manager.getPieceHash(pieceNumber);
final CheckPieceResultHandler result_handler =
new CheckPieceResultHandler()
{
public void
processResult(
int piece_number,
int result,
Object _user_data )
{
try{
disk_manager.getPieces()[piece_number].setDone( result == CheckPieceResultHandler.OP_SUCCESS );
}finally{
if ( _result_handler != null ){
_result_handler.processResult( pieceNumber, result, _user_data );
}
}
}
};
int check_result = CheckPieceResultHandler.OP_CANCELLED;
DirectByteBuffer buffer = null;
boolean async_request = false;
try{
buffer = DirectByteBufferPool.getBuffer(DirectByteBuffer.AL_DM_CHECK,this_piece_length);
if ( !bOverallContinue ){
return;
}
//get the piece list
PieceList pieceList = disk_manager.getPieceList(pieceNumber);
//for each piece-part-to-file mapping entry
for (int i = 0; i < pieceList.size(); i++) {
PieceMapEntry tempPiece = pieceList.get(i);
try {
//if the file is large enough
if ( tempPiece.getFile().getCacheFile().getLength() >= tempPiece.getOffset()){
//Make sure we only read in this entry-length's worth of data from the file
//NOTE: Without this limit the read op will
// a) fill the entire buffer with file data if the file length is big enough,
// i.e. the whole piece is contained within the file somewhere, or
// b) read the file into the buffer until it reaches EOF,
// i.e. the piece overlaps two different files
//Under normal conditions this works ok, because the assumption is that if a piece
//is contained within a single file, then there will only be one PieceMapEntry for
//that piece, so we can just fill the buffer. It also assumes that if a piece
//overlaps two (or more) files, then there will be multiple PieceMapEntrys, with
//each entry ending at the file EOF boundary. However, if for some reason one of
//these files is at least one byte too large, then the read op will read in too
//many bytes before hitting EOF, and our piece buffer data will be misaligned,
//causing hash failure (and a 99.9% bug). Better to set the buffer limit explicitly.
int entry_read_limit = buffer.position( DirectByteBuffer.SS_DW ) + tempPiece.getLength();
buffer.limit( DirectByteBuffer.SS_DW, entry_read_limit );
tempPiece.getFile().getCacheFile().read(buffer, tempPiece.getOffset()); //do read
buffer.limit( DirectByteBuffer.SS_DW, this_piece_length ); //restore limit
}else{
// file is too small, therefore required data hasn't been
// written yet -> check fails
return;
}
}catch (Exception e){
Debug.printStackTrace( e );
return;
}
}
try {
if ( !bOverallContinue ){
return;
}
buffer.position(DirectByteBuffer.SS_DW, 0);
if ( CONCURRENT_CHECKING ){
async_request = true;
final DirectByteBuffer f_buffer = buffer;
ConcurrentHasher.getSingleton().addRequest(
buffer.getBuffer(DirectByteBuffer.SS_DW),
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?