⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dmwriterimpl.java

📁 这是一个基于java编写的torrent的P2P源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
				DMPieceList pieceList = disk_manager.getPieceList(pieceNumber);
				
				DMPieceMapEntry current_piece = pieceList.get(currentFile);
				
				long fileOffset = current_piece.getOffset();
				
				while ((previousFilesLength + current_piece.getLength()) < offset) {
					
					previousFilesLength += current_piece.getLength();
					
					currentFile++;
					
					fileOffset = 0;
					
					current_piece = pieceList.get(currentFile);
				}
		
				List	chunks = new ArrayList();
				
					// Now current_piece points to the first file that contains data for this block
				
				while ( buffer_position < buffer_limit ){
						
					current_piece = pieceList.get(currentFile);
	
					long file_limit = buffer_position + 
										((current_piece.getFile().getLength() - current_piece.getOffset()) - 
											(offset - previousFilesLength));
		       
					if ( file_limit > buffer_limit ){
						
						file_limit	= buffer_limit;
					}
						
						// could be a zero-length file
					
					if ( file_limit > buffer_position ){
	
						long	file_pos = fileOffset + (offset - previousFilesLength);
						
						chunks.add( 
								new Object[]{ current_piece.getFile(),
								new Long( file_pos ),
								new Integer((int)file_limit )});
											
						buffer_position = (int)file_limit;
					}
					
					currentFile++;
					
					fileOffset = 0;
					
					previousFilesLength = offset;
				}
				
				
				DiskManagerWriteRequestListener	l = 
					new DiskManagerWriteRequestListener()
					{
						public void 
						writeCompleted( 
							DiskManagerWriteRequest 	request ) 
						{
							complete();
							 
							listener.writeCompleted( request );
						}
						  
						public void 
						writeFailed( 
							DiskManagerWriteRequest 	request, 
							Throwable		 			cause )
						{
							complete();
							  
							if ( dmPiece.isDone()){

									// There's a small chance of us ending up writing the same block twice around
									// the time that a file completes and gets toggled to read-only which then
									// fails with a non-writeable-channel exception
								
								// Debug.out( "writeFailed: piece already done (" + request.getPieceNumber() + "/" + request.getOffset() + "/" + write_length );
								
								if ( Logger.isEnabled()){
									
									Logger.log(new LogEvent(disk_manager, LOGID, "Piece " + dmPiece.getPieceNumber() + " write failed but already marked as done" ));
								}
								
								listener.writeCompleted( request );
								
							}else{
								
								disk_manager.setFailed( "Disk write error - " + Debug.getNestedExceptionMessage(cause));
								
								Debug.printStackTrace( cause );
								
								listener.writeFailed( request, cause );
							}
						}
						  
						protected void
						complete()
						{
							try{
								this_mon.enter();
								
								async_writes--;
								  
								if ( !write_requests.remove( request )){
									
									Debug.out( "request not found" );
								}
								
								if ( stopped ){
									  
									async_write_sem.release();
								}
							}finally{
								  
								this_mon.exit();
							}
						}
					};
	
				try{
					this_mon.enter();
					
					if ( stopped ){
						
						buffer.returnToPool();
						
						listener.writeFailed( request, new Exception( "Disk writer has been stopped" ));
						
						return;
						
					}else{
					
						async_writes++;
						
						write_requests.add( request );
					}
					
				}finally{
					
					this_mon.exit();
				}
									
				new requestDispatcher( request, l, buffer, chunks );
			}
		}catch( Throwable e ){
						
			request.getBuffer().returnToPool();
			
			disk_manager.setFailed( "Disk write error - " + Debug.getNestedExceptionMessage(e));
			
			Debug.printStackTrace( e );
			
			listener.writeFailed( request, e );
		}
	}
	
	protected class
	requestDispatcher
		implements DiskAccessRequestListener
	{
		private DiskManagerWriteRequest			request;
		private DiskManagerWriteRequestListener	listener;
		private DirectByteBuffer				buffer;
		private List							chunks;
				
		private int	chunk_index;
		
		protected
		requestDispatcher(
			DiskManagerWriteRequest			_request,
			DiskManagerWriteRequestListener	_listener,
			DirectByteBuffer				_buffer,
			List							_chunks )
		{	
			request		= _request;
			listener	= _listener;
			buffer		= _buffer;
			chunks		= _chunks;
			
			/*
			String	str = "Write: " + request.getPieceNumber() + "/" + request.getOffset() + ":";

			for (int i=0;i<chunks.size();i++){
			
				Object[]	entry = (Object[])chunks.get(i);
				
				String	str2 = entry[0] + "/" + entry[1] +"/" + entry[2];
				
				str += (i==0?"":",") + str2;
			}
			
			System.out.println( str );
			*/
						
			dispatch();
		}	
		
		protected void
		dispatch()
		{
			try{
				if ( chunk_index == chunks.size()){
					
					listener.writeCompleted( request );
					
				}else{
												
					if ( chunk_index == 1 && chunks.size() > 32 ){
						
							// for large numbers of chunks drop the recursion approach and
							// do it linearly (but on the async thread)
						
						for (int i=1;i<chunks.size();i++){
							
							final AESemaphore	sem 	= new AESemaphore( "DMW&C:dispatch:asyncReq" );
							final Throwable[]	error	= {null};
							
							doRequest( 
								new DiskAccessRequestListener()
								{
									public void
									requestComplete(
										DiskAccessRequest	request )
									{
										sem.release();
									}
									
									public void
									requestCancelled(
										DiskAccessRequest	request )
									{
										Debug.out( "shouldn't get here" );
									}
									
									public void
									requestFailed(
										DiskAccessRequest	request,
										Throwable			cause )
									{
										error[0]	= cause;
										
										sem.release();
									}
									
									public int
									getPriority()
									{
										return( -1 );
									}
								});
							
							sem.reserve();
							
							if ( error[0] != null ){
								
								throw( error[0] );
							}
						}
						
						listener.writeCompleted( request );
						
					}else{
						
						doRequest( this );
					}
				}
			}catch( Throwable e ){
				
				failed( e );
			}
		}
		
		protected void
		doRequest(
			final DiskAccessRequestListener	l )
		
			throws CacheFileManagerException
		{
			Object[]	stuff = (Object[])chunks.get( chunk_index++ );

			final DiskManagerFileInfoImpl	file = (DiskManagerFileInfoImpl)stuff[0];
			
			buffer.limit( DirectByteBuffer.SS_DR, ((Integer)stuff[2]).intValue());
						
			if ( file.getAccessMode() == DiskManagerFileInfo.READ ){
				
				if (Logger.isEnabled())
					Logger.log(new LogEvent(disk_manager, LOGID, "Changing "
							+ file.getFile(true).getName()
							+ " to read/write"));
					
				file.setAccessMode( DiskManagerFileInfo.WRITE );
			}
			
			boolean	handover_buffer	= chunk_index == chunks.size(); 
			
			DiskAccessRequestListener	delegate_listener = 
				new DiskAccessRequestListener()
				{
					public void
					requestComplete(
						DiskAccessRequest	request )
					{
						l.requestComplete( request );
						
						file.dataWritten( request.getOffset(), request.getSize());
					}
					
					public void
					requestCancelled(
						DiskAccessRequest	request )
					{
						l.requestCancelled( request );
					}
					
					public void
					requestFailed(
						DiskAccessRequest	request,
						Throwable			cause )
					{
						l.requestFailed( request, cause );
					}
					
					public int
					getPriority()
					{
						return( -1 );
					}
				};
				
			disk_access.queueWriteRequest(
				file.getCacheFile(),
				((Long)stuff[1]).longValue(),
				buffer,
				handover_buffer,
				delegate_listener );
		}
		
		public void
		requestComplete(
			DiskAccessRequest	request )
		{
			dispatch();
		}
		
		public void
		requestCancelled(
			DiskAccessRequest	request )
		{
				// we never cancel so nothing to do here
			
			Debug.out( "shouldn't get here" );
		}
		
		public void
		requestFailed(
			DiskAccessRequest	request,
			Throwable			cause )
		{
			failed( cause );
		}
		
		
		public int
		getPriority()
		{
			return( -1 );
		}
		
		protected void
		failed(
			Throwable			cause )
		{
			buffer.returnToPool();
			
			listener.writeFailed( request, cause );
		}	
	}	
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -