⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dhttransportudpimpl.java

📁 这是一个基于java编写的torrent的P2P源码
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
/*
 * Created on 21-Jan-2005
 * Created by Paul Gardner
 * Copyright (C) 2004, 2005, 2006 Aelitis, All Rights Reserved.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
 * 
 * AELITIS, SAS au capital de 46,603.30 euros
 * 8 Allee Lenotre, La Grille Royale, 78600 Le Mesnil le Roi, France.
 *
 */

package com.aelitis.azureus.core.dht.transport.udp.impl;

import java.io.*;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.security.SecureRandom;
import java.util.*;

import org.gudy.azureus2.core3.internat.MessageText;
import org.gudy.azureus2.core3.ipfilter.IpFilter;
import org.gudy.azureus2.core3.ipfilter.IpFilterManagerFactory;
import org.gudy.azureus2.core3.util.AEMonitor;
import org.gudy.azureus2.core3.util.AESemaphore;
import org.gudy.azureus2.core3.util.AEThread;
import org.gudy.azureus2.core3.util.Average;
import org.gudy.azureus2.core3.util.ByteFormatter;
import org.gudy.azureus2.core3.util.Debug;
import org.gudy.azureus2.core3.util.HashWrapper;
import org.gudy.azureus2.core3.util.SimpleTimer;
import org.gudy.azureus2.core3.util.SystemTime;
import org.gudy.azureus2.core3.util.TimerEvent;
import org.gudy.azureus2.core3.util.TimerEventPerformer;

import com.aelitis.azureus.core.dht.DHT;
import com.aelitis.azureus.core.dht.DHTLogger;
import com.aelitis.azureus.core.dht.impl.DHTLog;
import com.aelitis.azureus.core.dht.netcoords.DHTNetworkPosition;
import com.aelitis.azureus.core.dht.netcoords.DHTNetworkPositionManager;
import com.aelitis.azureus.core.dht.netcoords.DHTNetworkPositionProvider;
import com.aelitis.azureus.core.dht.transport.*;
import com.aelitis.azureus.core.dht.transport.udp.*;
import com.aelitis.azureus.core.dht.transport.udp.impl.packethandler.DHTUDPPacketHandler;
import com.aelitis.azureus.core.dht.transport.udp.impl.packethandler.DHTUDPPacketHandlerException;
import com.aelitis.azureus.core.dht.transport.udp.impl.packethandler.DHTUDPPacketHandlerFactory;
import com.aelitis.azureus.core.dht.transport.udp.impl.packethandler.DHTUDPPacketReceiver;
import com.aelitis.azureus.core.dht.transport.udp.impl.packethandler.DHTUDPRequestHandler;
import com.aelitis.azureus.core.dht.transport.util.DHTTransportRequestCounter;
import com.aelitis.azureus.core.util.bloom.BloomFilter;
import com.aelitis.azureus.core.util.bloom.BloomFilterFactory;
import com.aelitis.net.udp.uc.PRUDPPacketHandler;

/**
 * @author parg
 *
 */

public class 
DHTTransportUDPImpl 
	implements DHTTransportUDP, DHTUDPRequestHandler
{
	public static boolean TEST_EXTERNAL_IP	= false;
	
	public static final int	TRANSFER_QUEUE_MAX	= 64;
	
	public static final long	WRITE_XFER_RESEND_DELAY		= 12500;
	public static final long	READ_XFER_REREQUEST_DELAY	= 5000;
	public static final long	WRITE_REPLY_TIMEOUT			= 60000;		
	
	private static boolean	XFER_TRACE	= false;
	
	static{
		if ( XFER_TRACE ){
			System.out.println( "**** DHTTransportUDPImpl xfer trace on ****" );
		}
	}	
	
	
	private String				external_address;
	
	private byte				protocol_version;
	private int					network;
	private String				ip_override;
	private int					port;
	private int					max_fails_for_live;
	private int					max_fails_for_unknown;
	private long				request_timeout;
	private long				store_timeout;
	private boolean				reachable;
	private boolean				reachable_accurate;
	private int					dht_send_delay;
	private int					dht_receive_delay;

	private DHTLogger			logger;
		
	private DHTUDPPacketHandler			packet_handler;
	
	private DHTTransportRequestHandler	request_handler;
	
	private DHTTransportUDPContactImpl		local_contact;
	
	private Map transfer_handlers 	= new HashMap();
	private Map	read_transfers		= new HashMap();
	private Map write_transfers		= new HashMap();
	
	private Map	call_transfers		= new HashMap();
	
	private long last_address_change;
	
	private List listeners	= new ArrayList();
	
	private IpFilter	ip_filter	= IpFilterManagerFactory.getSingleton().getIPFilter();

	
	private DHTTransportUDPStatsImpl	stats;

	private boolean		bootstrap_node	= false;
	
	
	private static final int CONTACT_HISTORY_MAX 		= 32;
	private static final int CONTACT_HISTORY_PING_SIZE	= 16;
	
	private Map	contact_history = 
		new LinkedHashMap(CONTACT_HISTORY_MAX,0.75f,true)
		{
			protected boolean 
			removeEldestEntry(
		   		Map.Entry eldest) 
			{
				return size() > CONTACT_HISTORY_MAX;
			}
		};
		
	private static final int ROUTABLE_CONTACT_HISTORY_MAX 		= 32;

	private Map	routable_contact_history = 
		new LinkedHashMap(ROUTABLE_CONTACT_HISTORY_MAX,0.75f,true)
		{
			protected boolean 
			removeEldestEntry(
		   		Map.Entry eldest) 
			{
				return size() > ROUTABLE_CONTACT_HISTORY_MAX;
			}
		};
			
		
	private long	other_routable_total;
	private long	other_non_routable_total;
	
	private static final int RECENT_REPORTS_HISTORY_MAX = 32;

	private Map	recent_reports = 
			new LinkedHashMap(RECENT_REPORTS_HISTORY_MAX,0.75f,true)
			{
				protected boolean 
				removeEldestEntry(
			   		Map.Entry eldest) 
				{
					return size() > RECENT_REPORTS_HISTORY_MAX;
				}
			};
		
			
	private static final int	STATS_PERIOD		= 60*1000;
	private static final int 	STATS_DURATION_SECS	= 600;			// 10 minute average
	private static final long	STATS_INIT_PERIOD	= 15*60*1000;	// bit more than 10 mins to allow average to establish
	
	private long	stats_start_time	= SystemTime.getCurrentTime();
	private long	last_alien_count;
	private long	last_alien_fv_count;
	
	private Average	alien_average 		= Average.getInstance(STATS_PERIOD,STATS_DURATION_SECS);
	private Average	alien_fv_average 	= Average.getInstance(STATS_PERIOD,STATS_DURATION_SECS);
		
	private Random				random;
	
	private static final int	BAD_IP_BLOOM_FILTER_SIZE	= 32000;
	private BloomFilter			bad_ip_bloom_filter;
	
	private static AEMonitor	class_mon	= new AEMonitor( "DHTTransportUDP:class" );
	
	private AEMonitor	this_mon	= new AEMonitor( "DHTTransportUDP" );

	public
	DHTTransportUDPImpl(
		byte			_protocol_version,
		int				_network,
		String			_ip,
		String			_default_ip,
		int				_port,
		int				_max_fails_for_live,
		int				_max_fails_for_unknown,
		long			_timeout,
		int				_dht_send_delay,
		int				_dht_receive_delay,
		boolean			_bootstrap_node,
		boolean			_initial_reachability,
		DHTLogger		_logger )
	
		throws DHTTransportException
	{
		protocol_version		= _protocol_version;
		network					= _network;
		ip_override				= _ip;
		port					= _port;
		max_fails_for_live		= _max_fails_for_live;
		max_fails_for_unknown	= _max_fails_for_unknown;
		request_timeout			= _timeout;
		dht_send_delay			= _dht_send_delay;
		dht_receive_delay		= _dht_receive_delay;
		bootstrap_node			= _bootstrap_node;
		reachable				= _initial_reachability;
		logger					= _logger;
				
		store_timeout			= request_timeout * 2;
		
		try{
			random = new SecureRandom();
			
		}catch( Throwable e ){
			
			random	= new Random();
			
			logger.log( e );
		}
		
		createPacketHandler();
		
		SimpleTimer.addPeriodicEvent(
			"DHTUDP:stats",
			STATS_PERIOD,
			new TimerEventPerformer()
			{
				public void
				perform(
					TimerEvent	event )
				{
					updateStats();
				}
			});
		
		String	default_ip = _default_ip==null?"127.0.0.1":_default_ip;
				
		getExternalAddress( default_ip, logger );
		
		InetSocketAddress	address = new InetSocketAddress( external_address, port );

		logger.log( "Initial external address: " + address );
		
		local_contact = new DHTTransportUDPContactImpl( true, this, address, address, protocol_version, random.nextInt(), 0 );
	}
	
	protected void
	createPacketHandler()
	
		throws DHTTransportException
	{
		DHTUDPPacketHelper.registerCodecs();

		// DHTPRUDPPacket relies on the request-handler being an instanceof THIS so watch out
		// if you change it :)
	
		try{
			if ( packet_handler != null ){
				
				packet_handler.destroy();
			}
			
			packet_handler = DHTUDPPacketHandlerFactory.getHandler( this, this );
			
		}catch( Throwable e ){
			
			throw( new DHTTransportException( "Failed to get packet handler", e ));
		}
	
			// limit send and receive rates. Receive rate is lower as we want a stricter limit
			// on the max speed we generate packets than those we're willing to process.
		
		// logger.log( "send delay = " + _dht_send_delay + ", recv = " + _dht_receive_delay );
		
		packet_handler.setDelays( dht_send_delay, dht_receive_delay, (int)request_timeout );
		
		stats_start_time	= SystemTime.getCurrentTime();
		
		if ( stats == null ){
			
			stats =  new DHTTransportUDPStatsImpl( protocol_version, packet_handler.getStats());
			
		}else{
			
			stats.setStats( packet_handler.getStats());
		}
	}
	
	protected void
	updateStats()
	{
		long	alien_count	= 0;
		
		long[]	aliens = stats.getAliens();
		
		for (int i=0;i<aliens.length;i++){
			
			alien_count	+= aliens[i];
		}
		
		long	alien_fv_count = aliens[ DHTTransportStats.AT_FIND_VALUE ];
		
		alien_average.addValue( (alien_count-last_alien_count)*STATS_PERIOD/1000);
		alien_fv_average.addValue( (alien_fv_count-last_alien_fv_count)*STATS_PERIOD/1000);

		last_alien_count	= alien_count;
		last_alien_fv_count	= alien_fv_count;
		
		long	now = SystemTime.getCurrentTime();
		
		if ( now < 	stats_start_time ){
			
			stats_start_time	= now;
			
		}else{
			
				// only fiddle with the initial view of reachability when things have had
				// time to stabilise
			
			if ( now - stats_start_time > STATS_INIT_PERIOD ){
				
				reachable_accurate	= true;
				
				boolean	old_reachable	= reachable;
				
				if ( alien_fv_average.getAverage() > 0 ){
					
					reachable	= true;
					
				}else if ( alien_average.getAverage() > 3 ){
					
					reachable	= true;
					
				}else{
					
					reachable	= false;
				}
				
				if ( old_reachable != reachable ){
					
					for (int i=0;i<listeners.size();i++){
						
						try{
							((DHTTransportListener)listeners.get(i)).reachabilityChanged( reachable );
							
						}catch( Throwable e ){
							
							Debug.printStackTrace(e);
						}
					}	
				}
			}
		}
		
		// System.out.println( "routables=" + other_routable_total + ", non=" + other_non_routable_total );
		
		// System.out.println( "net " + network + ": aliens = " + alien_average.getAverage() + ", alien fv = " + alien_fv_average.getAverage());
	}
	
	protected int
	getNodeStatus()
	{
		if ( bootstrap_node ){
		
				// bootstrap node is special case and not generally routable
			
			return( 0 );
		}
		
		if ( reachable_accurate ){
			
			int	status = reachable?DHTTransportUDPContactImpl.NODE_STATUS_ROUTABLE:0;
			
			return( status );
			
		}else{
			
			return( DHTTransportUDPContactImpl.NODE_STATUS_UNKNOWN );
		}
	}
	
	public boolean
	isReachable()
	{
		return( reachable );
	}
	
	public byte
	getProtocolVersion()
	{
		return( protocol_version );
	}
	
	public int
	getPort()
	{
		return( port );
	}
	
	public void
	setPort(
		int	new_port )
	
		throws DHTTransportException
	{
		if ( new_port == port ){
			
			return;
		}
		
		port	= new_port;
		
		createPacketHandler();
		
		setLocalContact();
	}
	
	public int
	getNetwork()
	{
		return( network );
	}
	
	public void
	testInstanceIDChange()
	
		throws DHTTransportException
	{
		local_contact = new DHTTransportUDPContactImpl( true, this, local_contact.getTransportAddress(), local_contact.getExternalAddress(), protocol_version, random.nextInt(), 0);		
	}
	
	public void
	testTransportIDChange()
	
		throws DHTTransportException
	{
		if ( external_address.equals("127.0.0.1")){
			
			external_address = "192.168.0.2";
		}else{
			
			external_address = "127.0.0.1";
		}
		
		InetSocketAddress	address = new InetSocketAddress( external_address, port );
		
		local_contact = new DHTTransportUDPContactImpl( true, this, address, address, protocol_version, local_contact.getInstanceID(), 0 );		

		for (int i=0;i<listeners.size();i++){
			
			try{
				((DHTTransportListener)listeners.get(i)).localContactChanged( local_contact );
				
			}catch( Throwable e ){
				
				Debug.printStackTrace(e);
			}
		}
	}
	
	public void
	testExternalAddressChange()
	{
		try{
			Iterator	it = contact_history.values().iterator();
			
			DHTTransportUDPContactImpl c1 = (DHTTransportUDPContactImpl)it.next();
			DHTTransportUDPContactImpl c2 = (DHTTransportUDPContactImpl)it.next();
			
			externalAddressChange( c1, c2.getExternalAddress());
			//externalAddressChange( c, new InetSocketAddress( "192.168.0.7", 6881 ));
			
		}catch( Throwable e ){
			
			Debug.printStackTrace(e);
		}
	}
	
	public void
	testNetworkAlive(
		boolean		alive )
	{
		packet_handler.testNetworkAlive( alive );
	}
	
	protected void
	getExternalAddress(
		String				default_address,
		final DHTLogger		log )
	{
			// class level synchronisation is for testing purposes when running multiple UDP instances
			// in the same VM
		
		try{
			class_mon.enter();
			
			String new_external_address = null;
			
			try{				
				log.log( "Obtaining external address" );
				
				if ( TEST_EXTERNAL_IP ){
					
					new_external_address	= "127.0.0.1";

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -