⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dhttransportudpimpl.java

📁 一个基于JAVA的多torrent下载程序
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
						
								// add the initial data for this write request
							
							new_queue.add( req );
							
								// set up the queue processor
												
							new AEThread( "DHTTransportUDP:writeQueueProcessor", true )
								{
									public void
									runSupport()
									{
										try{
											byte[] write_data = 
												runTransferQueue( 
													new_queue, 
													new DHTTransportProgressListener()
													{
														public void
														reportSize(
															long	size )
														{
															if ( XFER_TRACE ){
																System.out.println( "writeXfer: size=" + size );
															}
														}
														
														public void
														reportActivity(
															String	str )
														{
															if ( XFER_TRACE ){
																System.out.println( "writeXfer: act=" + str );
															}
														}
														
														public void
														reportCompleteness(
															int		percent )
														{
															if ( XFER_TRACE ){
																System.out.println( "writeXfer: %=" + percent );
															}
														}
													},
													originator,
													req.getTransferKey(),
													req.getRequestKey(),
													60000,
													false );
											
											if ( write_data != null ){
												
													// xfer complete, send ack if multi-packet xfer
													// (ack already sent below if single packet)
												
												if ( 	req.getStartPosition() != 0 ||
														req.getLength() != req.getTotalLength() ){
													
													sendWriteReply(
															req.getConnectionId(),
															originator,
															req.getTransferKey(),
															req.getRequestKey(),
															0,
															req.getTotalLength());
												}
												
												byte[]	reply_data = handler.handleWrite( originator, req.getRequestKey(), write_data );
												
												if ( reply_data != null ){
													
													writeTransfer(
															new DHTTransportProgressListener()
															{
																public void
																reportSize(
																	long	size )
																{
																	if ( XFER_TRACE ){
																		System.out.println( "writeXferReply: size=" + size );
																	}
																}
																
																public void
																reportActivity(
																	String	str )
																{
																	if ( XFER_TRACE ){
																		System.out.println( "writeXferReply: act=" + str );
																	}
																}
																
																public void
																reportCompleteness(
																	int		percent )
																{
																	if ( XFER_TRACE ){
																		System.out.println( "writeXferReply: %=" + percent );
																	}
																}
															},
															originator,
															req.getTransferKey(),
															req.getRequestKey(),
															reply_data,
															WRITE_REPLY_TIMEOUT );
															
												}
											}
											
										}catch( DHTTransportException e ){
											
											logger.log( "Failed to process transfer queue: " + Debug.getNestedExceptionMessage(e));
										}
									}
								}.start();
								
									// indicate that at least one packet has been received
								
							sendWriteReply(
								req.getConnectionId(),
								originator,
								req.getTransferKey(),
								req.getRequestKey(),
								req.getStartPosition(),
								req.getLength());
							
						}catch( DHTTransportException e ){
							
							logger.log( "Faild to create transfer queue" );
							
							logger.log( e );
						}
					}
				}
			}
		}
	}
		
	public byte[]
	readTransfer(
		DHTTransportProgressListener	listener,
		DHTTransportContact				target,
		byte[]							handler_key,
		byte[]							key,
		long							timeout )
	
		throws DHTTransportException
	{
		long	connection_id 	= getConnectionID();
		
		transferQueue	transfer_queue = new transferQueue( read_transfers, connection_id );
		
		return( runTransferQueue( transfer_queue, listener, target, handler_key, key, timeout, true ));
	}
	
	protected byte[]
	runTransferQueue(
		transferQueue					transfer_queue,
		DHTTransportProgressListener	listener,
		DHTTransportContact				target,
		byte[]							handler_key,
		byte[]							key,
		long							timeout,
		boolean							read_transfer )
	
		throws DHTTransportException
	{		
		SortedSet	packets = 
			new TreeSet(
				new Comparator()
				{
					public int
					compare(
						Object	o1,
						Object	o2 )
					{
						DHTUDPPacketData	p1 = (DHTUDPPacketData)o1;
						DHTUDPPacketData	p2 = (DHTUDPPacketData)o2;
						
						return( p1.getStartPosition() - p2.getStartPosition());
					}
				});
		
		int	entire_request_count = 0;
		
		int transfer_size 	= -1;
		int	transferred		= 0;
		
		String	target_name = DHTLog.getString2(target.getID());
		
		try{
			long	start = SystemTime.getCurrentTime();
			
			if ( read_transfer ){
			
				listener.reportActivity( "Requesting entire transfer from " + target_name );

				entire_request_count++;
			
				sendReadRequest( transfer_queue.getID(), (DHTTransportUDPContactImpl)target, handler_key, key );

			}else{
				
					// write transfer - data already on its way, no need to request it
				
				entire_request_count++;
			}
			
			while( SystemTime.getCurrentTime() - start <= timeout ){					
				
				DHTUDPPacketData	reply = transfer_queue.receive( READ_XFER_REREQUEST_DELAY );
				
				if ( reply != null ){
	
					if ( transfer_size == -1 ){
						
						transfer_size = reply.getTotalLength();
						
						listener.reportSize( transfer_size );
					}
					
					Iterator	it = packets.iterator();
					
					boolean	duplicate = false;
					
					while( it.hasNext()){
						
						DHTUDPPacketData	p = (DHTUDPPacketData)it.next();
						
							// ignore overlaps
						
						if (	p.getStartPosition() < reply.getStartPosition() + reply.getLength() &&
								p.getStartPosition() + p.getLength() > reply.getStartPosition()){
							
							duplicate	= true;
							
							break;
						}
					}
					
					if ( !duplicate ){
						
						listener.reportActivity( "Received " + reply.getStartPosition() + " to " + (reply.getStartPosition() + reply.getLength()) + " from " + target_name );

						transferred += reply.getLength();
						
						listener.reportCompleteness( transfer_size==0?100: ( 100 * transferred / transfer_size ));
						
						packets.add( reply );
						
							// see if we're done				
					
						it = packets.iterator();
						
						int	pos			= 0;
						int	actual_end	= -1;
						
						while( it.hasNext()){
							
							DHTUDPPacketData	p = (DHTUDPPacketData)it.next();
						
							if ( actual_end == -1 ){
								
								actual_end = p.getTotalLength();
							}
							
							if ( p.getStartPosition() != pos ){
								
									// missing data, give up
								
								break;
							}
							
							pos += p.getLength();
							
							if ( pos == actual_end ){
							
									// huzzah, we got the lot
							
								listener.reportActivity( "Complete" );
								
								byte[]	result = new byte[actual_end];
								
								it =  packets.iterator();
								
								pos	= 0;
								
								while( it.hasNext()){
									
									p = (DHTUDPPacketData)it.next();

									System.arraycopy( p.getData(), 0, result, pos, p.getLength());
									
									pos	+= p.getLength();
								}
								
								return( result );
							}
						}
					}
				}else{
					
						// timeout, look for missing bits
					
					if ( packets.size() == 0 ){
						
						if ( entire_request_count == 2 ){
						
							listener.reportActivity( "Timeout, no replies received" );
							
							return( null );
						}
						
						entire_request_count++;
						
						listener.reportActivity( "Re-requesting entire transfer from " + target_name );
						
						sendReadRequest( transfer_queue.getID(), (DHTTransportUDPContactImpl)target, handler_key, key );
						
					}else{
						
						Iterator it = packets.iterator();
					
						int	pos			= 0;
						int	actual_end	= -1;
						
						while( it.hasNext()){
							
							DHTUDPPacketData	p = (DHTUDPPacketData)it.next();
						
							if ( actual_end == -1 ){
								
								actual_end = p.getTotalLength();
							}
							
							if ( p.getStartPosition() != pos ){
								
								listener.reportActivity( "Re-requesting " + pos + " to " + p.getStartPosition() +  " from " + target_name );
								
								sendReadRequest( 
										transfer_queue.getID(), 
										(DHTTransportUDPContactImpl)target, 
										handler_key, 
										key,
										pos,
										p.getStartPosition()-pos );
							
							}
							
							pos = p.getStartPosition() + p.getLength();
						}
						
						if ( pos != actual_end ){
							
							listener.reportActivity( "Re-requesting " + pos + " to " + actual_end + " from " + target_name );

							sendReadRequest( 
									transfer_queue.getID(), 
									(DHTTransportUDPContactImpl)target, 
									handler_key, 
									key,
									pos,
									actual_end - pos );						
						}
					}
				}
			}
			
			listener.reportActivity( 
					"Timeout, " + 
						(packets.size()==0?
							" no replies received":
							("" + packets.size() + " packets received but incomplete" )));
			
			return( null );
			
		}finally{
			
			transfer_queue.destroy();
		}
	}
	
	public void
	writeTransfer(
		DHTTransportProgressListener	listener,
		DHTTransportContact				target,
		byte[]							handler_key,
		byte[]							key,
		byte[]							data,
		long							timeout )
	
		throws DHTTransportException
	{
		transferQueue	transfer_queue = null;
	
		try{
			long	connection_id 	= getConnectionID();
		
			transfer_queue = new transferQueue( write_transfers, connection_id );
		
			boolean	ok 				= false;
			boolean	reply_received	= false;
			
			int		loop			= 0;
			int		total_length	= data.length;
			
			long	start = SystemTime.getCurrentTime();
			
			long	last_packet_time = 0;
			
			while( true ){
				
				long	now = SystemTime.getCurrentTime();
				
				if ( now < start ){
					
					start				= now;
					
					last_packet_time	= 0;
					
				}else{
					
					if ( now - start > timeout ){
						
						break;
					}
				}
				
				long	time_since_last_packet = now - last_packet_time;
				
				if ( time_since_last_packet >= WRITE_XFER_RESEND_DELAY ){
					
					listener.reportActivity( loop==0?"Sending data":"Resending data" );
				
					loop++;
				
					total_length =	handleTransferRequest(
												(DHTTransportUDPContactImpl)target,
												connection_id,
												handler_key,
												key,
												data,
												-1, -1,
												true,
												reply_received );	// first packet only if we've has a reply
				
					last_packet_time		= now;
					time_since_last_packet	= 0;
				}
				
				DHTUDPPacketData packet = transfer_queue.receive( WRITE_XFER_RESEND_DELAY - time_since_last_packet );
			
				if ( packet != null ){
					
					last_packet_time	= now;
					
					reply_received = true;
					
					if ( packet.getStartPosition() == 0 && packet.getLength() == total_length ){
						
						ok	= true;
					
						break;
					}
				}
			}
			
			if ( ok ){
				
				listener.reportCompleteness( 100 );
				
				listener.reportActivity( "Complete" );
				
			}else{
				
				listener.reportActivity( "Failed, timeout" );
				
				throw( new DHTTransportException( "Timeout" ));
			}
		}finally{
			
			if ( transfer_queue != null ){
				
				transfer_queue.destroy();
			}
		}
	}
	
	public byte[]
	writeReadTransfer(
		DHTTransportProgressListener	listener,
		DHTTransportContact				target,
		byte[]							handler_key,
		byte[]							data,
		long							timeout )	
	
		throws DHTTransportException
	{
		byte[]	call_key = new byte[20];
		
		random.nextBytes( call_key );
		
		AESemaphore	call_sem = new AESemaphore( "DHTTransportUDP:calSem" );
		
		HashWrapper	wrapped_key = new HashWrapper( call_key );
		
		try{
			this_mon.enter();
			
			call_transfers.put( wrapped_key, call_sem );
		
		}finally{
			
			this_mon.exit();
		}
		
		writeTransfer( listener, target, handler_key, call_key, data, timeout );
		
		if ( call_sem.reserve( timeout )){
			
			try{
				this_mon.enter();
			
				Object	res = call_transfers.remove( wrapped_key );
				
				if ( res instanceof byte[] ){
	

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -