⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dhttransportudpimpl.java

📁 这是一个基于java编写的torrent的P2P源码
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
							0 );
				}
			}else{
								
				if ( start < 0 ){
					
					start	= 0;
					
				}else if ( start >= data.length ){
					
					logger.log( "dataRequest: invalid start position" );
					
					return( data.length );
				}
								
				if ( length <= 0 ){
					
					length = data.length;
					
				}else if ( start + length > data.length ){
					
					logger.log( "dataRequest: invalid length" );
					
					return( data.length );
				}
				
				int	end = start+length;
				
				while( start < end ){
					
					int	chunk = end - start;
					
					if ( chunk > DHTUDPPacketData.MAX_DATA_SIZE ){
						
						chunk = DHTUDPPacketData.MAX_DATA_SIZE;								
					}
					
					if ( write_request ){
						
						sendWriteRequest(
								connection_id,
								target,
								transfer_key,
								request_key,
								data,
								start,
								chunk,
								data.length );
												
						if ( first_packet_only ){
							
							break;
						}
					}else{
						
						sendReadReply( 
								connection_id,
								target,
								transfer_key,
								request_key,
								data,
								start,
								chunk,
								data.length );
					}
					
					start += chunk;
				}
			}
			
			return( data.length );
		}
	}
	
	protected void
	dataRequest(
		final DHTTransportUDPContactImpl	originator,
		final DHTUDPPacketData				req )
	{
		/*
		if ((int)(Math.random() * 4 )== 0 ){
			
			System.out.println("dropping request packet:" + req.getString());
			
			return;
		}
		*/
		
		stats.dataReceived();
		
			// both requests and replies come through here. Currently we only support read
			// requests so we can safely use the data.length == 0 test to discriminate between
			// a request and a reply to an existing transfer
		
		byte	packet_type = req.getPacketType();
		
		if ( packet_type == DHTUDPPacketData.PT_READ_REPLY ){
			
			transferQueue	queue = lookupTransferQueue( read_transfers, req.getConnectionId());
			
				// unmatched -> drop it
			
			if ( queue != null ){
			
				queue.add( req );
			}
			
		}else if ( packet_type == DHTUDPPacketData.PT_WRITE_REPLY ){
				
			transferQueue	queue = lookupTransferQueue( write_transfers, req.getConnectionId());
				
				// unmatched -> drop it
				
			if ( queue != null ){
				
				queue.add( req );
			}
		}else{
			
			byte[]	transfer_key = req.getTransferKey();
									
			if ( packet_type == DHTUDPPacketData.PT_READ_REQUEST ){

				try{
					handleTransferRequest( 
							originator,
							req.getConnectionId(),
							transfer_key,
							req.getRequestKey(),
							null,
							req.getStartPosition(),
							req.getLength(),
							false, false );
					
				}catch( DHTTransportException e ){
					
					logger.log(e);
				}

			}else{
				
					// 	write request 
					
				transferQueue	old_queue = lookupTransferQueue( read_transfers, req.getConnectionId());
				
				if ( old_queue != null ){
					
					old_queue.add( req );
					
				}else{
				
					final DHTTransportTransferHandler	handler = (DHTTransportTransferHandler)transfer_handlers.get(new HashWrapper( transfer_key ));
					
					if ( handler == null ){
						
						logger.log( "No transfer handler registered for key '" + ByteFormatter.encodeString(transfer_key) + "'" );
						
					}else{
					
						try{
							final transferQueue new_queue = new transferQueue( read_transfers, req.getConnectionId());
						
								// add the initial data for this write request
							
							new_queue.add( req );
							
								// set up the queue processor
												
							new AEThread( "DHTTransportUDP:writeQueueProcessor", true )
								{
									public void
									runSupport()
									{
										try{
											byte[] write_data = 
												runTransferQueue( 
													new_queue, 
													new DHTTransportProgressListener()
													{
														public void
														reportSize(
															long	size )
														{
															if ( XFER_TRACE ){
																System.out.println( "writeXfer: size=" + size );
															}
														}
														
														public void
														reportActivity(
															String	str )
														{
															if ( XFER_TRACE ){
																System.out.println( "writeXfer: act=" + str );
															}
														}
														
														public void
														reportCompleteness(
															int		percent )
														{
															if ( XFER_TRACE ){
																System.out.println( "writeXfer: %=" + percent );
															}
														}
													},
													originator,
													req.getTransferKey(),
													req.getRequestKey(),
													60000,
													false );
											
											if ( write_data != null ){
												
													// xfer complete, send ack if multi-packet xfer
													// (ack already sent below if single packet)
												
												if ( 	req.getStartPosition() != 0 ||
														req.getLength() != req.getTotalLength() ){
													
													sendWriteReply(
															req.getConnectionId(),
															originator,
															req.getTransferKey(),
															req.getRequestKey(),
															0,
															req.getTotalLength());
												}
												
												byte[]	reply_data = handler.handleWrite( originator, req.getRequestKey(), write_data );
												
												if ( reply_data != null ){
													
													writeTransfer(
															new DHTTransportProgressListener()
															{
																public void
																reportSize(
																	long	size )
																{
																	if ( XFER_TRACE ){
																		System.out.println( "writeXferReply: size=" + size );
																	}
																}
																
																public void
																reportActivity(
																	String	str )
																{
																	if ( XFER_TRACE ){
																		System.out.println( "writeXferReply: act=" + str );
																	}
																}
																
																public void
																reportCompleteness(
																	int		percent )
																{
																	if ( XFER_TRACE ){
																		System.out.println( "writeXferReply: %=" + percent );
																	}
																}
															},
															originator,
															req.getTransferKey(),
															req.getRequestKey(),
															reply_data,
															WRITE_REPLY_TIMEOUT );
															
												}
											}
											
										}catch( DHTTransportException e ){
											
											logger.log( "Failed to process transfer queue: " + Debug.getNestedExceptionMessage(e));
										}
									}
								}.start();
								
									// indicate that at least one packet has been received
								
							sendWriteReply(
								req.getConnectionId(),
								originator,
								req.getTransferKey(),
								req.getRequestKey(),
								req.getStartPosition(),
								req.getLength());
							
						}catch( DHTTransportException e ){
							
							logger.log( "Faild to create transfer queue" );
							
							logger.log( e );
						}
					}
				}
			}
		}
	}
		
	public byte[]
	readTransfer(
		DHTTransportProgressListener	listener,
		DHTTransportContact				target,
		byte[]							handler_key,
		byte[]							key,
		long							timeout )
	
		throws DHTTransportException
	{
		long	connection_id 	= getConnectionID();
		
		transferQueue	transfer_queue = new transferQueue( read_transfers, connection_id );
		
		return( runTransferQueue( transfer_queue, listener, target, handler_key, key, timeout, true ));
	}
	
	protected byte[]
	runTransferQueue(
		transferQueue					transfer_queue,
		DHTTransportProgressListener	listener,
		DHTTransportContact				target,
		byte[]							handler_key,
		byte[]							key,
		long							timeout,
		boolean							read_transfer )
	
		throws DHTTransportException
	{		
		SortedSet	packets = 
			new TreeSet(
				new Comparator()
				{
					public int
					compare(
						Object	o1,
						Object	o2 )
					{
						DHTUDPPacketData	p1 = (DHTUDPPacketData)o1;
						DHTUDPPacketData	p2 = (DHTUDPPacketData)o2;
						
						return( p1.getStartPosition() - p2.getStartPosition());
					}
				});
		
		int	entire_request_count = 0;
		
		int transfer_size 	= -1;
		int	transferred		= 0;
		
		String	target_name = DHTLog.getString2(target.getID());
		
		try{
			long	start = SystemTime.getCurrentTime();
			
			if ( read_transfer ){
			
				listener.reportActivity( getMessageText( "request_all", target_name ));

				entire_request_count++;
			
				sendReadRequest( transfer_queue.getID(), (DHTTransportUDPContactImpl)target, handler_key, key );

			}else{
				
					// write transfer - data already on its way, no need to request it
				
				entire_request_count++;
			}
			
			while( SystemTime.getCurrentTime() - start <= timeout ){					
				
				DHTUDPPacketData	reply = transfer_queue.receive( READ_XFER_REREQUEST_DELAY );
				
				if ( reply != null ){
	
					if ( transfer_size == -1 ){
						
						transfer_size = reply.getTotalLength();
						
						listener.reportSize( transfer_size );
					}
					
					Iterator	it = packets.iterator();
					
					boolean	duplicate = false;
					
					while( it.hasNext()){
						
						DHTUDPPacketData	p = (DHTUDPPacketData)it.next();
						
							// ignore overlaps
						
						if (	p.getStartPosition() < reply.getStartPosition() + reply.getLength() &&
								p.getStartPosition() + p.getLength() > reply.getStartPosition()){
							
							duplicate	= true;
							
							break;
						}
					}
					
					if ( !duplicate ){
						
						listener.reportActivity( 
								getMessageText( "received_bit", 
								new String[]{ 
										String.valueOf( reply.getStartPosition()),
										String.valueOf(reply.getStartPosition() + reply.getLength()),
										target_name }));

						transferred += reply.getLength();
						
						listener.reportCompleteness( transfer_size==0?100: ( 100 * transferred / transfer_size ));
						
						packets.add( reply );
						
							// see if we're done				
					
						it = packets.iterator();
						
						int	pos			= 0;
						int	actual_end	= -1;
						
						while( it.hasNext()){
							
							DHTUDPPacketData	p = (DHTUDPPacketData)it.next();
						
							if ( actual_end == -1 ){
								
								actual_end = p.getTotalLength();
							}
							
							if ( p.getStartPosition() != pos ){
								
									// missing data, give up
								
								break;
							}
							
							pos += p.getLength();
							
							if ( pos == actual_end ){
							
									// huzzah, we got the lot
							
								listener.reportActivity( getMessageText( "complete" ));
								
								byte[]	result = new byte[actual_end];
								
								it =  packets.iterator();
								
								pos	= 0;
								
								while( it.hasNext()){
									
									p = (DHTUDPPacketData)it.next();

									System.arraycopy( p.getData(), 0, result, pos, p.getLength());
									
									pos	+= p.getLength();
								}
								
								return( result );
							}
						}
					}
				}else{
					
						// timeout, look for missing bits
					
					if ( packets.size() == 0 ){
						
						if ( entire_request_count == 2 ){
						
							listener.reportActivity( getMessageText( "timeout", target_name ));
							
							return( null );
						}
						
						entire_request_count++;
						
						listener.reportActivity( getMessageText( "rerequest_all", target_name ));
						
						sendReadRequest( transfer_queue.getID(), (DHTTransportUDPContactImpl)target, handler_key, key );
						
					}else{
						
						Iterator it = packets.iterator();
					
						int	pos			= 0;
						int	actual_end	= -1;
						
						while( it.hasNext()){
							
							DHTUDPPacketData	p = (DHTUDPPacketData)it.next();
						
							if ( actual_end == -1 ){
								
								actual_end = p.getTotalLength();
							}
							
							if ( p.getStartPosition() != pos ){
								
								listener.reportActivity( 
										getMessageText( "rerequest_bit",
												new String[]{
													String.valueOf( pos ),
													String.valueOf( p.getStartPosition()),
													target_name }));
								
								sendReadRequest( 
										transfer_queue.getID(), 
										(DHTTransportUDPContactImpl)target, 
										handler_key, 
										key,
										pos,
										p.getStartPositio

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -