📄 dmwriterimpl.java
字号:
DMPieceList pieceList = disk_manager.getPieceList(pieceNumber);
DMPieceMapEntry current_piece = pieceList.get(currentFile);
long fileOffset = current_piece.getOffset();
while ((previousFilesLength + current_piece.getLength()) < offset) {
previousFilesLength += current_piece.getLength();
currentFile++;
fileOffset = 0;
current_piece = pieceList.get(currentFile);
}
List chunks = new ArrayList();
// Now current_piece points to the first file that contains data for this block
while ( buffer_position < buffer_limit ){
current_piece = pieceList.get(currentFile);
long file_limit = buffer_position +
((current_piece.getFile().getLength() - current_piece.getOffset()) -
(offset - previousFilesLength));
if ( file_limit > buffer_limit ){
file_limit = buffer_limit;
}
// could be a zero-length file
if ( file_limit > buffer_position ){
long file_pos = fileOffset + (offset - previousFilesLength);
chunks.add(
new Object[]{ current_piece.getFile(),
new Long( file_pos ),
new Integer((int)file_limit )});
buffer_position = (int)file_limit;
}
currentFile++;
fileOffset = 0;
previousFilesLength = offset;
}
DiskManagerWriteRequestListener l =
new DiskManagerWriteRequestListener()
{
public void
writeCompleted(
DiskManagerWriteRequest request )
{
complete();
listener.writeCompleted( request );
}
public void
writeFailed(
DiskManagerWriteRequest request,
Throwable cause )
{
complete();
if ( dmPiece.isDone()){
// There's a small chance of us ending up writing the same block twice around
// the time that a file completes and gets toggled to read-only which then
// fails with a non-writeable-channel exception
// Debug.out( "writeFailed: piece already done (" + request.getPieceNumber() + "/" + request.getOffset() + "/" + write_length );
if ( Logger.isEnabled()){
Logger.log(new LogEvent(disk_manager, LOGID, "Piece " + dmPiece.getPieceNumber() + " write failed but already marked as done" ));
}
listener.writeCompleted( request );
}else{
disk_manager.setFailed( "Disk write error - " + Debug.getNestedExceptionMessage(cause));
Debug.printStackTrace( cause );
listener.writeFailed( request, cause );
}
}
protected void
complete()
{
try{
this_mon.enter();
async_writes--;
if ( !write_requests.remove( request )){
Debug.out( "request not found" );
}
if ( stopped ){
async_write_sem.release();
}
}finally{
this_mon.exit();
}
}
};
try{
this_mon.enter();
if ( stopped ){
buffer.returnToPool();
listener.writeFailed( request, new Exception( "Disk writer has been stopped" ));
return;
}else{
async_writes++;
write_requests.add( request );
}
}finally{
this_mon.exit();
}
new requestDispatcher( request, l, buffer, chunks );
}
}catch( Throwable e ){
request.getBuffer().returnToPool();
disk_manager.setFailed( "Disk write error - " + Debug.getNestedExceptionMessage(e));
Debug.printStackTrace( e );
listener.writeFailed( request, e );
}
}
protected class
requestDispatcher
implements DiskAccessRequestListener
{
private DiskManagerWriteRequest request;
private DiskManagerWriteRequestListener listener;
private DirectByteBuffer buffer;
private List chunks;
private int chunk_index;
protected
requestDispatcher(
DiskManagerWriteRequest _request,
DiskManagerWriteRequestListener _listener,
DirectByteBuffer _buffer,
List _chunks )
{
request = _request;
listener = _listener;
buffer = _buffer;
chunks = _chunks;
/*
String str = "Write: " + request.getPieceNumber() + "/" + request.getOffset() + ":";
for (int i=0;i<chunks.size();i++){
Object[] entry = (Object[])chunks.get(i);
String str2 = entry[0] + "/" + entry[1] +"/" + entry[2];
str += (i==0?"":",") + str2;
}
System.out.println( str );
*/
dispatch();
}
protected void
dispatch()
{
try{
if ( chunk_index == chunks.size()){
listener.writeCompleted( request );
}else{
if ( chunk_index == 1 && chunks.size() > 32 ){
// for large numbers of chunks drop the recursion approach and
// do it linearly (but on the async thread)
for (int i=1;i<chunks.size();i++){
final AESemaphore sem = new AESemaphore( "DMW&C:dispatch:asyncReq" );
final Throwable[] error = {null};
doRequest(
new DiskAccessRequestListener()
{
public void
requestComplete(
DiskAccessRequest request )
{
sem.release();
}
public void
requestCancelled(
DiskAccessRequest request )
{
Debug.out( "shouldn't get here" );
}
public void
requestFailed(
DiskAccessRequest request,
Throwable cause )
{
error[0] = cause;
sem.release();
}
public int
getPriority()
{
return( -1 );
}
});
sem.reserve();
if ( error[0] != null ){
throw( error[0] );
}
}
listener.writeCompleted( request );
}else{
doRequest( this );
}
}
}catch( Throwable e ){
failed( e );
}
}
protected void
doRequest(
final DiskAccessRequestListener l )
throws CacheFileManagerException
{
Object[] stuff = (Object[])chunks.get( chunk_index++ );
final DiskManagerFileInfoImpl file = (DiskManagerFileInfoImpl)stuff[0];
buffer.limit( DirectByteBuffer.SS_DR, ((Integer)stuff[2]).intValue());
if ( file.getAccessMode() == DiskManagerFileInfo.READ ){
if (Logger.isEnabled())
Logger.log(new LogEvent(disk_manager, LOGID, "Changing "
+ file.getFile(true).getName()
+ " to read/write"));
file.setAccessMode( DiskManagerFileInfo.WRITE );
}
boolean handover_buffer = chunk_index == chunks.size();
DiskAccessRequestListener delegate_listener =
new DiskAccessRequestListener()
{
public void
requestComplete(
DiskAccessRequest request )
{
l.requestComplete( request );
file.dataWritten( request.getOffset(), request.getSize());
}
public void
requestCancelled(
DiskAccessRequest request )
{
l.requestCancelled( request );
}
public void
requestFailed(
DiskAccessRequest request,
Throwable cause )
{
l.requestFailed( request, cause );
}
public int
getPriority()
{
return( -1 );
}
};
disk_access.queueWriteRequest(
file.getCacheFile(),
((Long)stuff[1]).longValue(),
buffer,
handover_buffer,
delegate_listener );
}
public void
requestComplete(
DiskAccessRequest request )
{
dispatch();
}
public void
requestCancelled(
DiskAccessRequest request )
{
// we never cancel so nothing to do here
Debug.out( "shouldn't get here" );
}
public void
requestFailed(
DiskAccessRequest request,
Throwable cause )
{
failed( cause );
}
public int
getPriority()
{
return( -1 );
}
protected void
failed(
Throwable cause )
{
buffer.returnToPool();
listener.writeFailed( request, cause );
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -