📄 dhttransportudpimpl.java
字号:
/*
* Created on 21-Jan-2005
* Created by Paul Gardner
* Copyright (C) 2004, 2005, 2006 Aelitis, All Rights Reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* AELITIS, SAS au capital de 46,603.30 euros
* 8 Allee Lenotre, La Grille Royale, 78600 Le Mesnil le Roi, France.
*
*/
package com.aelitis.azureus.core.dht.transport.udp.impl;
import java.io.*;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.security.SecureRandom;
import java.util.*;
import org.gudy.azureus2.core3.internat.MessageText;
import org.gudy.azureus2.core3.ipfilter.IpFilter;
import org.gudy.azureus2.core3.ipfilter.IpFilterManagerFactory;
import org.gudy.azureus2.core3.util.AEMonitor;
import org.gudy.azureus2.core3.util.AESemaphore;
import org.gudy.azureus2.core3.util.AEThread;
import org.gudy.azureus2.core3.util.Average;
import org.gudy.azureus2.core3.util.ByteFormatter;
import org.gudy.azureus2.core3.util.Debug;
import org.gudy.azureus2.core3.util.HashWrapper;
import org.gudy.azureus2.core3.util.SimpleTimer;
import org.gudy.azureus2.core3.util.SystemTime;
import org.gudy.azureus2.core3.util.TimerEvent;
import org.gudy.azureus2.core3.util.TimerEventPerformer;
import com.aelitis.azureus.core.dht.DHT;
import com.aelitis.azureus.core.dht.DHTLogger;
import com.aelitis.azureus.core.dht.impl.DHTLog;
import com.aelitis.azureus.core.dht.netcoords.DHTNetworkPosition;
import com.aelitis.azureus.core.dht.netcoords.DHTNetworkPositionManager;
import com.aelitis.azureus.core.dht.netcoords.DHTNetworkPositionProvider;
import com.aelitis.azureus.core.dht.transport.*;
import com.aelitis.azureus.core.dht.transport.udp.*;
import com.aelitis.azureus.core.dht.transport.udp.impl.packethandler.DHTUDPPacketHandler;
import com.aelitis.azureus.core.dht.transport.udp.impl.packethandler.DHTUDPPacketHandlerException;
import com.aelitis.azureus.core.dht.transport.udp.impl.packethandler.DHTUDPPacketHandlerFactory;
import com.aelitis.azureus.core.dht.transport.udp.impl.packethandler.DHTUDPPacketReceiver;
import com.aelitis.azureus.core.dht.transport.udp.impl.packethandler.DHTUDPRequestHandler;
import com.aelitis.azureus.core.dht.transport.util.DHTTransportRequestCounter;
import com.aelitis.azureus.core.util.bloom.BloomFilter;
import com.aelitis.azureus.core.util.bloom.BloomFilterFactory;
import com.aelitis.net.udp.uc.PRUDPPacketHandler;
/**
* @author parg
*
*/
public class
DHTTransportUDPImpl
implements DHTTransportUDP, DHTUDPRequestHandler
{
public static boolean TEST_EXTERNAL_IP = false;
public static final int TRANSFER_QUEUE_MAX = 64;
public static final long WRITE_XFER_RESEND_DELAY = 12500;
public static final long READ_XFER_REREQUEST_DELAY = 5000;
public static final long WRITE_REPLY_TIMEOUT = 60000;
private static boolean XFER_TRACE = false;
static{
if ( XFER_TRACE ){
System.out.println( "**** DHTTransportUDPImpl xfer trace on ****" );
}
}
private String external_address;
private byte protocol_version;
private int network;
private String ip_override;
private int port;
private int max_fails_for_live;
private int max_fails_for_unknown;
private long request_timeout;
private long store_timeout;
private boolean reachable;
private boolean reachable_accurate;
private int dht_send_delay;
private int dht_receive_delay;
private DHTLogger logger;
private DHTUDPPacketHandler packet_handler;
private DHTTransportRequestHandler request_handler;
private DHTTransportUDPContactImpl local_contact;
private Map transfer_handlers = new HashMap();
private Map read_transfers = new HashMap();
private Map write_transfers = new HashMap();
private Map call_transfers = new HashMap();
private long last_address_change;
private List listeners = new ArrayList();
private IpFilter ip_filter = IpFilterManagerFactory.getSingleton().getIPFilter();
private DHTTransportUDPStatsImpl stats;
private boolean bootstrap_node = false;
private static final int CONTACT_HISTORY_MAX = 32;
private static final int CONTACT_HISTORY_PING_SIZE = 16;
private Map contact_history =
new LinkedHashMap(CONTACT_HISTORY_MAX,0.75f,true)
{
protected boolean
removeEldestEntry(
Map.Entry eldest)
{
return size() > CONTACT_HISTORY_MAX;
}
};
private static final int ROUTABLE_CONTACT_HISTORY_MAX = 32;
private Map routable_contact_history =
new LinkedHashMap(ROUTABLE_CONTACT_HISTORY_MAX,0.75f,true)
{
protected boolean
removeEldestEntry(
Map.Entry eldest)
{
return size() > ROUTABLE_CONTACT_HISTORY_MAX;
}
};
private long other_routable_total;
private long other_non_routable_total;
private static final int RECENT_REPORTS_HISTORY_MAX = 32;
private Map recent_reports =
new LinkedHashMap(RECENT_REPORTS_HISTORY_MAX,0.75f,true)
{
protected boolean
removeEldestEntry(
Map.Entry eldest)
{
return size() > RECENT_REPORTS_HISTORY_MAX;
}
};
private static final int STATS_PERIOD = 60*1000;
private static final int STATS_DURATION_SECS = 600; // 10 minute average
private static final long STATS_INIT_PERIOD = 15*60*1000; // bit more than 10 mins to allow average to establish
private long stats_start_time = SystemTime.getCurrentTime();
private long last_alien_count;
private long last_alien_fv_count;
private Average alien_average = Average.getInstance(STATS_PERIOD,STATS_DURATION_SECS);
private Average alien_fv_average = Average.getInstance(STATS_PERIOD,STATS_DURATION_SECS);
private Random random;
private static final int BAD_IP_BLOOM_FILTER_SIZE = 32000;
private BloomFilter bad_ip_bloom_filter;
private static AEMonitor class_mon = new AEMonitor( "DHTTransportUDP:class" );
private AEMonitor this_mon = new AEMonitor( "DHTTransportUDP" );
public
DHTTransportUDPImpl(
byte _protocol_version,
int _network,
String _ip,
String _default_ip,
int _port,
int _max_fails_for_live,
int _max_fails_for_unknown,
long _timeout,
int _dht_send_delay,
int _dht_receive_delay,
boolean _bootstrap_node,
boolean _initial_reachability,
DHTLogger _logger )
throws DHTTransportException
{
protocol_version = _protocol_version;
network = _network;
ip_override = _ip;
port = _port;
max_fails_for_live = _max_fails_for_live;
max_fails_for_unknown = _max_fails_for_unknown;
request_timeout = _timeout;
dht_send_delay = _dht_send_delay;
dht_receive_delay = _dht_receive_delay;
bootstrap_node = _bootstrap_node;
reachable = _initial_reachability;
logger = _logger;
store_timeout = request_timeout * 2;
try{
random = new SecureRandom();
}catch( Throwable e ){
random = new Random();
logger.log( e );
}
createPacketHandler();
SimpleTimer.addPeriodicEvent(
"DHTUDP:stats",
STATS_PERIOD,
new TimerEventPerformer()
{
public void
perform(
TimerEvent event )
{
updateStats();
}
});
String default_ip = _default_ip==null?"127.0.0.1":_default_ip;
getExternalAddress( default_ip, logger );
InetSocketAddress address = new InetSocketAddress( external_address, port );
logger.log( "Initial external address: " + address );
local_contact = new DHTTransportUDPContactImpl( true, this, address, address, protocol_version, random.nextInt(), 0 );
}
protected void
createPacketHandler()
throws DHTTransportException
{
DHTUDPPacketHelper.registerCodecs();
// DHTPRUDPPacket relies on the request-handler being an instanceof THIS so watch out
// if you change it :)
try{
if ( packet_handler != null ){
packet_handler.destroy();
}
packet_handler = DHTUDPPacketHandlerFactory.getHandler( this, this );
}catch( Throwable e ){
throw( new DHTTransportException( "Failed to get packet handler", e ));
}
// limit send and receive rates. Receive rate is lower as we want a stricter limit
// on the max speed we generate packets than those we're willing to process.
// logger.log( "send delay = " + _dht_send_delay + ", recv = " + _dht_receive_delay );
packet_handler.setDelays( dht_send_delay, dht_receive_delay, (int)request_timeout );
stats_start_time = SystemTime.getCurrentTime();
if ( stats == null ){
stats = new DHTTransportUDPStatsImpl( protocol_version, packet_handler.getStats());
}else{
stats.setStats( packet_handler.getStats());
}
}
protected void
updateStats()
{
long alien_count = 0;
long[] aliens = stats.getAliens();
for (int i=0;i<aliens.length;i++){
alien_count += aliens[i];
}
long alien_fv_count = aliens[ DHTTransportStats.AT_FIND_VALUE ];
alien_average.addValue( (alien_count-last_alien_count)*STATS_PERIOD/1000);
alien_fv_average.addValue( (alien_fv_count-last_alien_fv_count)*STATS_PERIOD/1000);
last_alien_count = alien_count;
last_alien_fv_count = alien_fv_count;
long now = SystemTime.getCurrentTime();
if ( now < stats_start_time ){
stats_start_time = now;
}else{
// only fiddle with the initial view of reachability when things have had
// time to stabilise
if ( now - stats_start_time > STATS_INIT_PERIOD ){
reachable_accurate = true;
boolean old_reachable = reachable;
if ( alien_fv_average.getAverage() > 0 ){
reachable = true;
}else if ( alien_average.getAverage() > 3 ){
reachable = true;
}else{
reachable = false;
}
if ( old_reachable != reachable ){
for (int i=0;i<listeners.size();i++){
try{
((DHTTransportListener)listeners.get(i)).reachabilityChanged( reachable );
}catch( Throwable e ){
Debug.printStackTrace(e);
}
}
}
}
}
// System.out.println( "routables=" + other_routable_total + ", non=" + other_non_routable_total );
// System.out.println( "net " + network + ": aliens = " + alien_average.getAverage() + ", alien fv = " + alien_fv_average.getAverage());
}
protected int
getNodeStatus()
{
if ( bootstrap_node ){
// bootstrap node is special case and not generally routable
return( 0 );
}
if ( reachable_accurate ){
int status = reachable?DHTTransportUDPContactImpl.NODE_STATUS_ROUTABLE:0;
return( status );
}else{
return( DHTTransportUDPContactImpl.NODE_STATUS_UNKNOWN );
}
}
public boolean
isReachable()
{
return( reachable );
}
public byte
getProtocolVersion()
{
return( protocol_version );
}
public int
getPort()
{
return( port );
}
public void
setPort(
int new_port )
throws DHTTransportException
{
if ( new_port == port ){
return;
}
port = new_port;
createPacketHandler();
setLocalContact();
}
public int
getNetwork()
{
return( network );
}
public void
testInstanceIDChange()
throws DHTTransportException
{
local_contact = new DHTTransportUDPContactImpl( true, this, local_contact.getTransportAddress(), local_contact.getExternalAddress(), protocol_version, random.nextInt(), 0);
}
public void
testTransportIDChange()
throws DHTTransportException
{
if ( external_address.equals("127.0.0.1")){
external_address = "192.168.0.2";
}else{
external_address = "127.0.0.1";
}
InetSocketAddress address = new InetSocketAddress( external_address, port );
local_contact = new DHTTransportUDPContactImpl( true, this, address, address, protocol_version, local_contact.getInstanceID(), 0 );
for (int i=0;i<listeners.size();i++){
try{
((DHTTransportListener)listeners.get(i)).localContactChanged( local_contact );
}catch( Throwable e ){
Debug.printStackTrace(e);
}
}
}
public void
testExternalAddressChange()
{
try{
Iterator it = contact_history.values().iterator();
DHTTransportUDPContactImpl c1 = (DHTTransportUDPContactImpl)it.next();
DHTTransportUDPContactImpl c2 = (DHTTransportUDPContactImpl)it.next();
externalAddressChange( c1, c2.getExternalAddress());
//externalAddressChange( c, new InetSocketAddress( "192.168.0.7", 6881 ));
}catch( Throwable e ){
Debug.printStackTrace(e);
}
}
public void
testNetworkAlive(
boolean alive )
{
packet_handler.testNetworkAlive( alive );
}
protected void
getExternalAddress(
String default_address,
final DHTLogger log )
{
// class level synchronisation is for testing purposes when running multiple UDP instances
// in the same VM
try{
class_mon.enter();
String new_external_address = null;
try{
log.log( "Obtaining external address" );
if ( TEST_EXTERNAL_IP ){
new_external_address = "127.0.0.1";
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -