⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 udpconnectionset.java

📁 这是一个基于java编写的torrent的P2P源码
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
/*
 * Created on 22 Jun 2006
 * Created by Paul Gardner
 * Copyright (C) 2006 Aelitis, All Rights Reserved.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
 * 
 * AELITIS, SAS au capital de 46,603.30 euros
 * 8 Allee Lenotre, La Grille Royale, 78600 Le Mesnil le Roi, France.
 *
 */

package com.aelitis.azureus.core.networkmanager.impl.udp;

import java.util.*;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;

import javax.crypto.spec.SecretKeySpec;

import org.bouncycastle.crypto.CipherParameters;
import org.bouncycastle.crypto.engines.RC4Engine;
import org.bouncycastle.crypto.params.KeyParameter;
import org.gudy.azureus2.core3.logging.LogEvent;
import org.gudy.azureus2.core3.logging.LogIDs;
import org.gudy.azureus2.core3.logging.Logger;
import org.gudy.azureus2.core3.util.Debug;
import org.gudy.azureus2.core3.util.SHA1Hasher;
import org.gudy.azureus2.core3.util.SystemTime;

import com.aelitis.net.udp.uc.PRUDPPacketReply;

public class 
UDPConnectionSet 
{
	private static final LogIDs LOGID = LogIDs.NET;

	public static final int PROTOCOL_DATA_HEADER_SIZE	= 30;
	
	private static final boolean	DEBUG_SEQUENCES	= false;
	

	static{
		if ( DEBUG_SEQUENCES ){
			System.out.println( "**** UDPConnectionSet: debug sequences is on ****" );
		}
	}
	
	private static final byte[]	KEYA_IV	= "UDPDriverKeyA".getBytes();
	private static final byte[]	KEYB_IV	= "UDPDriverKeyB".getBytes();
	private static final byte[]	KEYC_IV	= "UDPDriverKeyC".getBytes();
	private static final byte[]	KEYD_IV	= "UDPDriverKeyD".getBytes();
	
	private static final int MIN_MSS	= 256;
	private static final int MAX_HEADER	= 128;
	
	public static final int MIN_WRITE_PAYLOAD	= MIN_MSS - MAX_HEADER;
		
	private UDPConnectionManager	manager;
	private UDPSelector				selector;
	private int						local_port;
	private InetSocketAddress		remote_address;
	private boolean					outgoing;
	private String					connection_key;
	
	private Random			random;
	
	private UDPConnection	lead_connection;
		
	private RC4Engine		header_cipher_out;
	private RC4Engine		header_cipher_in;
	
	private SequenceGenerator	in_seq_generator;
	private SequenceGenerator	out_seq_generator;

	
	private volatile boolean	crypto_done;
	
	private volatile boolean	failed;
	
	private Map	connections = new HashMap();
	
	private LinkedList	connection_writers = new LinkedList();
	
	private long	total_tick_count;
	
	private static final int STATS_LOG_TIMER	= 60*1000;
	private static final int STATS_LOG_TICKS	= Math.max( 1, STATS_LOG_TIMER / UDPConnectionManager.TIMER_TICK_MILLIS );
	private int stats_log_ticks = STATS_LOG_TICKS;

	
	
	private static final int IDLE_TIMER	= 10*1000;
	private static final int IDLE_TICKS	= Math.max( 1, IDLE_TIMER / UDPConnectionManager.TIMER_TICK_MILLIS );
	private int idle_ticks = 0;

	private static final int	TIMER_BASE_DEFAULT		= 300;
	private static final int	TIMER_BASE_MIN			= 100;
	private static final int	TIMER_BASE_MAX			= 15*1000;
	
	private int	current_timer_base	= TIMER_BASE_DEFAULT;
	private int old_timer_base		= current_timer_base;
	private boolean	timer_is_adjusting;


		// these stats_ values are reset periodically
	
	private int	stats_packets_unique_sent;			// unique packets sent
	private int stats_packets_resent_via_timer;		// total resent due to resend timer expiry
	private int	stats_packets_unique_received;		// unique packets received (not resends)
	private int stats_packets_duplicates;			// duplicates received
	
	private static final int STATS_RESET_TIMER = 30*1000;
	
	private long	stats_reset_time = SystemTime.getCurrentTime();
	
		// cumulative stats
	
	private int	total_packets_sent		= 0;
	private int	total_data_sent			= 0;
	private int	total_data_resent		= 0;
	private int	total_protocol_sent		= 0;
	private int	total_protocol_resent	= 0;
	
	private int total_packets_unique_sent		= 0;
	private int total_packets_received			= 0;
	private int total_packets_unique_received	= 0;
	private int total_packets_duplicates		= 0;
	private int total_packets_out_of_order		= 0;
	private int total_packets_resent_via_timer	= 0;
	private int total_packets_resent_via_ack	= 0;


	
		// transmit


	
	private int retransmit_ticks = 0;
	private UDPPacket	current_retransmit_target;
	
	private static final int RETRANSMIT_COUNT_LIMIT	= 5;

	
	private static final int MIN_RETRANSMIT_TIMER	= 100;
	private static final int MIN_RETRANSMIT_TICKS	= Math.max( 1, MIN_RETRANSMIT_TIMER / UDPConnectionManager.TIMER_TICK_MILLIS );
	private static final int MAX_RETRANSMIT_TIMER	= 20*1000;
	private static final int MAX_RETRANSMIT_TICKS	= Math.max( 1, MAX_RETRANSMIT_TIMER / UDPConnectionManager.TIMER_TICK_MILLIS );

	
	private static final int 	MAX_TRANSMIT_UNACK_DATA_PACKETS	= 10;
	private static final int 	MAX_TRANSMIT_UNACK_PACKETS		= MAX_TRANSMIT_UNACK_DATA_PACKETS + 4;	// + protocol packets
	private List				transmit_unack_packets = new ArrayList();
		
	private static final int	MAX_CONTIGUOUS_RETRANS_FOR_ACK	= 3;
	
	private static final int MIN_KEEPALIVE_TIMER	= 10*1000;
	private static final int MIN_KEEPALIVE_TICKS	= Math.max( 1, MIN_KEEPALIVE_TIMER / UDPConnectionManager.TIMER_TICK_MILLIS );
	private static final int MAX_KEEPALIVE_TIMER	= 20*1000;
	private static final int MAX_KEEPALIVE_TICKS	= Math.max( 1, MAX_KEEPALIVE_TIMER / UDPConnectionManager.TIMER_TICK_MILLIS );
	private int	keep_alive_ticks;
		
		// receive
		
	private int		receive_last_inorder_sequence		= -1;
	private int		receive_last_inorder_alt_sequence	= -1;
	
	private int		receive_their_last_inorder_sequence		= -1;

	private static final int RECEIVE_UNACK_IN_SEQUENCE_LIMIT	= 3;
	private long	current_receive_unack_in_sequence_count	= 0;
	private long	sent_receive_unack_in_sequence_count	= 0;
	
	private static final int RECEIVE_OUT_OF_ORDER_ACK_LIMIT		= 3;
	private long	current_receive_out_of_order_count		= 0;
	private long	sent_receive_out_of_order_count			= 0;
	
	
	private static final int RECEIVE_DONE_SEQ_MAX	= 128;
	private LinkedList	receive_done_sequences	= new LinkedList();
	
	private static final int RECEIVE_OUT_OF_ORDER_PACKETS_MAX	= 64;
	private List	receive_out_of_order_packets	= new LinkedList();
	
	private int explicitack_ticks = 0;

	private static final int MAX_SEQ_MEMORY = Math.max( RECEIVE_OUT_OF_ORDER_PACKETS_MAX, MAX_TRANSMIT_UNACK_PACKETS );
	
	protected
	UDPConnectionSet(
		UDPConnectionManager	_manager,
		String					_connection_key,
		UDPSelector				_selector,
		int						_local_port,
		InetSocketAddress		_remote_address )
	{
		manager			= _manager;
		connection_key	= _connection_key;
		selector		= _selector;
		local_port		= _local_port;
		remote_address	= _remote_address;
	}
	
	protected UDPSelector
	getSelector()
	{
		return( selector );
	}
	
	protected InetSocketAddress
	getRemoteAddress()
	{
		return( remote_address );
	}
	
	protected String
	getKey()
	{
		return( connection_key );
	}
	
	protected void
	add(
		UDPConnection	connection )
	
		throws IOException
	{
		UDPConnection	old_connection = null;
		
		synchronized( connections ){
			
			if ( failed ){
				
				throw( new IOException( "Connection set has failed" ));
			}
			
			old_connection =  (UDPConnection)connections.put( new Integer( connection.getID()), connection );
			
			if ( connections.size() == 1 && lead_connection == null ){
				
				lead_connection = connection;
				
				outgoing		= true;
			}
		}
		
		if ( old_connection != null ){
			
			Debug.out( "Duplicate connection" );
			
			old_connection.close( "Duplication connection" );
		}
	}
	
	protected boolean
	remove(
		UDPConnection	connection )
	{
		synchronized( connections ){
	
			connections.remove( new Integer( connection.getID()));
			
			return( connections.size() == 0 );
		}
	}
	
	protected void
	poll()
	{
		synchronized( connections ){

			Iterator	it = connections.values().iterator();
			
			while( it.hasNext()){
				
				((UDPConnection)it.next()).poll();
			}
		}
	}
	
	protected void
	setSecret(
		UDPConnection	connection,
		byte[]			session_secret )
	{
		try{
			if ( connection == lead_connection ){
					
				if ( manager.trace() ){
					trace( "crypto done" );
				}
				
			    SHA1Hasher	hasher = new SHA1Hasher();
			    
			    hasher.update( KEYA_IV );
			    hasher.update( session_secret );
			    	
			    byte[]	a_key = hasher.getDigest();
			    
			    hasher = new SHA1Hasher();
			    
			    hasher.update( KEYB_IV );
			    hasher.update( session_secret );
			    	
			    byte[]	b_key = hasher.getDigest();
			    
			    hasher = new SHA1Hasher();
			    
			    hasher.update( KEYC_IV );
			    hasher.update( session_secret );
			    	
			    byte[]	c_key = hasher.getDigest();
	
			    hasher = new SHA1Hasher();
			    
			    hasher.update( KEYD_IV );
			    hasher.update( session_secret );
			    	
			    byte[]	d_key = hasher.getDigest();
				    
			    	// for RC4 enc/dec is irrelevant
			    
			    RC4Engine rc4_engine_a	= getCipher( a_key );	
	    		RC4Engine rc4_engine_b	= getCipher( b_key );
	    		RC4Engine rc4_engine_c	= getCipher( c_key );	
	    		RC4Engine rc4_engine_d	= getCipher( d_key );	
	    		
	    		
		    	if ( lead_connection.isIncoming()){
	    			
	    			header_cipher_out	= rc4_engine_a;
	    			header_cipher_in	= rc4_engine_b;
	
		 			out_seq_generator = new SequenceGenerator( new Random( bytesToLong( d_key )), rc4_engine_c, false );
		 			in_seq_generator  = new SequenceGenerator( new Random( bytesToLong( c_key )), rc4_engine_d, true );
 
		 			random = new Random( bytesToLong( d_key, 8 ));
		 			
	    		}else{
	    			
	       			header_cipher_out	= rc4_engine_b;
	    			header_cipher_in	= rc4_engine_a;
		 			
		 			in_seq_generator  = new SequenceGenerator( new Random( bytesToLong( d_key )), rc4_engine_c, true );
		 			out_seq_generator = new SequenceGenerator( new Random( bytesToLong( c_key )), rc4_engine_d, false );
		 			
		 			random = new Random( bytesToLong( c_key, 8 ));
		    	}
	    		
		    		// as the first packet each way is crypto we skip a sequence number from each generator
		    		// to represent this and initialise the last-in-order seq appropriately so a sensible value is
		    		// spliced into the next packet
		    	
		    	out_seq_generator.getNextSequenceNumber();
		    	
		    	int[]	initial_in_seqs = in_seq_generator.getNextSequenceNumber();
		    	
		    	receive_last_inorder_alt_sequence = initial_in_seqs[3];
		    	
	    		crypto_done	= true;
	    		
			}else if ( !crypto_done ){
				
				Debug.out( "Secondary setSecret but crypto not done" );
			}
		}catch( Throwable e ){
			
			Debug.printStackTrace(e);
			
			connection.close( "Crypto problems: "+ Debug.getNestedExceptionMessage(e));
		}
	}
	
	protected RC4Engine
	getCipher(
		byte[]			key )
	{
	    SecretKeySpec	secret_key_spec = new SecretKeySpec( key, "RC4" );
	    
	    RC4Engine rc4_engine	= new RC4Engine();
		
		CipherParameters	params_a = new KeyParameter( secret_key_spec.getEncoded());
		
			// for RC4 enc/dec is irrelevant
		
		rc4_engine.init( true, params_a ); 
		
			// skip first 1024 bytes of stream to protected against a Fluhrer, Mantin and Shamir attack
    	
    	byte[]	temp = new byte[1024];
	
    	rc4_engine.processBytes( temp, 0, temp.length, temp, 0 );
    	
    	return( rc4_engine );
	}
	
	protected void
	sendTimerBase()
	{
			// only the outgoing side of a connection can initiate changes in timer base
		
		if ( !outgoing ){
			
			return;
		}
		
			// the thing that kills a connection is too many retransmits so we need to keep these minimised during timer changes
		
			// when we increase our base timer we can immediately increase our retransmit timer but need to wait until they have 
			// modified their timer before increasing the ack. Otherwise they won't be receiving acks as fast as they expect and therefore
			// trigger retransmits 
		
			// when we decrease our base timer it is the opposite way around - we can immediately decrease acks but delay retrans
		
		synchronized( this ){
			
			if ( timer_is_adjusting ){
				
				return;
			}
				// only consider the stats if we've sent at least a few this interval
			
			if ( stats_packets_unique_sent > 2 ){
				
				int	new_timer_base = current_timer_base;
								
				if ( stats_packets_resent_via_timer > 0 ){
					
					float	resend_ratio = (float)stats_packets_resent_via_timer / stats_packets_unique_sent;
					
					// System.out.println( "resend ratio: " + resend_ratio );
					
					if ( resend_ratio >= 0.25 ){
						
						new_timer_base = (int)( current_timer_base * ( resend_ratio + 1 ));
						
						// round to 100th sec as we send 100ths over the wire and expect to get it back
						
						new_timer_base = (new_timer_base/10)*10;
	
						new_timer_base = Math.min( TIMER_BASE_MAX, new_timer_base );					
						
						if ( new_timer_base != current_timer_base ){
							
							if ( manager.trace()){
								
								trace( "Increasing timer base from " + current_timer_base + " to " + new_timer_base + " due to resends (ratio=" + resend_ratio + ")" );
							}
						}
					}
				}
				
				if ( new_timer_base == current_timer_base && stats_packets_unique_received > 2 ){
					
					float	duplicate_ratio = (float)stats_packets_duplicates / stats_packets_unique_received;

						// we use duplicate packets sometimes to force sequence numbers through, so 
						// reduce our sensitivity this them
					
					duplicate_ratio = duplicate_ratio/2;
					
					// System.out.println( "duplicate ratio: " + duplicate_ratio );

					if ( duplicate_ratio >= 0.25 ){
						
						new_timer_base = (int)( current_timer_base * ( duplicate_ratio + 1 ));
						
						new_timer_base = (new_timer_base/10)*10;

						new_timer_base = Math.min( TIMER_BASE_MAX, new_timer_base );
						
						if ( new_timer_base != current_timer_base ){

							if ( manager.trace() ){
								
								trace( "Increasing timer base from " + current_timer_base + " to " + new_timer_base + " due to duplicates (ratio=" + duplicate_ratio + ")" );
							}
						}
					}
				}
				
				if ( new_timer_base == current_timer_base && stats_packets_unique_received > 2 ){
					
						// conservative approach - reduce by 10% if we've had no errors
					
					if ( stats_packets_resent_via_timer == 0 && stats_packets_duplicates == 0 ){
						
						new_timer_base = current_timer_base - (current_timer_base/10);
						
						new_timer_base = (new_timer_base/10)*10;

						new_timer_base = Math.max( new_timer_base, TIMER_BASE_MIN );
						
						if ( new_timer_base != current_timer_base ){

							if ( manager.trace()){
								
								trace( "Decreasing timer base from " + current_timer_base + " to " + new_timer_base  );
							}
						}
					}
				}				
				
				boolean	reset_stats	= false;
				
				long	now = SystemTime.getCurrentTime();

				if ( new_timer_base == current_timer_base ){
					
					if ( now < stats_reset_time || now - stats_reset_time > STATS_RESET_TIMER ){
												
						reset_stats	= true;
					}
					
				}else{
					
					timer_is_adjusting	= true;
					
					old_timer_base		= current_timer_base;
					current_timer_base	= new_timer_base;
					
					reset_stats = true;
				}
				
				if ( reset_stats ){
					
					resetTimerStats();
				}
			}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -