📄 dhtpluginimpl.java
字号:
/*
* Created on 24-Jan-2005
* Created by Paul Gardner
* Copyright (C) 2004, 2005, 2006 Aelitis, All Rights Reserved
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* AELITIS, SAS au capital de 46,603.30 euros
* 8 Allee Lenotre, La Grille Royale, 78600 Le Mesnil le Roi, France.
*
*/
package com.aelitis.azureus.plugins.dht.impl;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Properties;
import org.gudy.azureus2.core3.util.Debug;
import org.gudy.azureus2.core3.util.FileUtil;
import org.gudy.azureus2.core3.util.SystemTime;
import org.gudy.azureus2.plugins.*;
import org.gudy.azureus2.plugins.download.Download;
import org.gudy.azureus2.plugins.logging.LoggerChannel;
import org.gudy.azureus2.plugins.peers.Peer;
import org.gudy.azureus2.plugins.peers.PeerManager;
import org.gudy.azureus2.plugins.ui.config.ActionParameter;
import org.gudy.azureus2.plugins.utils.UTTimerEvent;
import org.gudy.azureus2.plugins.utils.UTTimerEventPerformer;
import com.aelitis.azureus.core.dht.DHT;
import com.aelitis.azureus.core.dht.DHTFactory;
import com.aelitis.azureus.core.dht.DHTLogger;
import com.aelitis.azureus.core.dht.DHTOperationListener;
import com.aelitis.azureus.core.dht.DHTStorageKeyStats;
import com.aelitis.azureus.core.dht.control.DHTControlStats;
import com.aelitis.azureus.core.dht.db.DHTDBStats;
import com.aelitis.azureus.core.dht.nat.DHTNATPuncherAdapter;
import com.aelitis.azureus.core.dht.router.DHTRouterStats;
import com.aelitis.azureus.core.dht.transport.DHTTransportContact;
import com.aelitis.azureus.core.dht.transport.DHTTransportException;
import com.aelitis.azureus.core.dht.transport.DHTTransportFactory;
import com.aelitis.azureus.core.dht.transport.DHTTransportListener;
import com.aelitis.azureus.core.dht.transport.DHTTransportProgressListener;
import com.aelitis.azureus.core.dht.transport.DHTTransportStats;
import com.aelitis.azureus.core.dht.transport.DHTTransportTransferHandler;
import com.aelitis.azureus.core.dht.transport.DHTTransportValue;
import com.aelitis.azureus.core.dht.transport.udp.DHTTransportUDP;
import com.aelitis.azureus.plugins.dht.DHTPlugin;
import com.aelitis.azureus.plugins.dht.DHTPluginContact;
import com.aelitis.azureus.plugins.dht.DHTPluginKeyStats;
import com.aelitis.azureus.plugins.dht.DHTPluginOperationListener;
import com.aelitis.azureus.plugins.dht.DHTPluginProgressListener;
import com.aelitis.azureus.plugins.dht.DHTPluginTransferHandler;
import com.aelitis.azureus.plugins.dht.DHTPluginValue;
import com.aelitis.azureus.plugins.dht.impl.DHTPluginStorageManager;
/**
* @author parg
*
*/
public class
DHTPluginImpl
{
private static final String SEED_ADDRESS = "dht.aelitis.com";
private static final String SEED_IP = "85.31.105.2"; // fallback in case DNS resolution fails
private static final int SEED_PORT = 6881;
private static final long MIN_ROOT_SEED_IMPORT_PERIOD = 8*60*60*1000;
private PluginInterface plugin_interface;
private int status;
private String status_text;
private ActionParameter reseed;
private DHT dht;
private int port;
private byte protocol_version;
private int network;
private DHTTransportUDP transport;
private long integrated_time;
private DHTPluginStorageManager storage_manager;
private long last_root_seed_import_time;
private LoggerChannel log;
private DHTLogger dht_log;
private int stats_ticks;
public
DHTPluginImpl(
PluginInterface _plugin_interface,
DHTNATPuncherAdapter _nat_adapter,
byte _protocol_version,
int _network,
String _ip,
int _port,
ActionParameter _reseed,
boolean _logging,
LoggerChannel _log,
DHTLogger _dht_log )
{
plugin_interface = _plugin_interface;
protocol_version = _protocol_version;
network = _network;
port = _port;
reseed = _reseed;
log = _log;
dht_log = _dht_log;
try{
storage_manager = new DHTPluginStorageManager( network, dht_log, getDataDir( _network ));
final PluginConfig conf = plugin_interface.getPluginconfig();
int send_delay = conf.getPluginIntParameter( "dht.senddelay", 50 );
int recv_delay = conf.getPluginIntParameter( "dht.recvdelay", 25 );
boolean bootstrap = conf.getPluginBooleanParameter( "dht.bootstrapnode", false );
// start off optimistic with reachable = true
boolean initial_reachable = conf.getPluginBooleanParameter( "dht.reachable." + network, true );
transport =
DHTTransportFactory.createUDP(
_protocol_version,
_network,
_ip,
storage_manager.getMostRecentAddress(),
_port,
4,
2,
20000, // udp timeout - tried less but a significant number of
// premature timeouts occurred
send_delay, recv_delay,
bootstrap,
initial_reachable,
dht_log );
transport.addListener(
new DHTTransportListener()
{
public void
localContactChanged(
DHTTransportContact local_contact )
{
storage_manager.localContactChanged( local_contact );
}
public void
currentAddress(
String address )
{
storage_manager.recordCurrentAddress( address );
}
public void
reachabilityChanged(
boolean reacheable )
{
}
});
Properties props = new Properties();
/*
System.out.println( "FRIGGED REFRESH PERIOD" );
props.put( DHT.PR_CACHE_REPUBLISH_INTERVAL, new Integer( 5*60*1000 ));
*/
if ( _network == DHT.NW_CVS ){
// reduce network usage
//System.out.println( "CVS DHT cache republish interval modified" );
props.put( DHT.PR_CACHE_REPUBLISH_INTERVAL, new Integer( 1*60*60*1000 ));
}
dht = DHTFactory.create(
transport,
props,
storage_manager,
_nat_adapter,
dht_log );
plugin_interface.firePluginEvent(
new PluginEvent()
{
public int
getType()
{
return( DHTPlugin.EVENT_DHT_AVAILABLE );
}
public Object
getValue()
{
return( dht );
}
});
dht.setLogging( _logging );
DHTTransportContact root_seed = importRootSeed();
storage_manager.importContacts( dht );
plugin_interface.getUtilities().createTimer( "DHTExport", true ).addPeriodicEvent(
10*60*1000,
new UTTimerEventPerformer()
{
public void
perform(
UTTimerEvent event )
{
checkForReSeed(false);
storage_manager.exportContacts( dht );
}
});
integrateDHT( true, root_seed );
status = DHTPlugin.STATUS_RUNNING;
status_text = "Running";
}catch( Throwable e ){
Debug.printStackTrace(e);
log.log( "DHT integrtion fails", e );
status_text = "DHT Integration fails: " + Debug.getNestedExceptionMessage( e );
status = DHTPlugin.STATUS_FAILED;
}
}
public void
updateStats(
int sample_stats_ticks )
{
stats_ticks++;
if ( transport != null ){
PluginConfig conf = plugin_interface.getPluginconfig();
boolean current_reachable = transport.isReachable();
if ( current_reachable != conf.getPluginBooleanParameter( "dht.reachable." + network, true )){
// reachability has changed
conf.setPluginParameter( "dht.reachable." + network, current_reachable );
if ( !current_reachable ){
String msg = "If you have a router/firewall, please check that you have port " + port +
" UDP open.\nDecentralised tracking requires this." ;
int warned_port = plugin_interface.getPluginconfig().getPluginIntParameter( "udp_warned_port", 0 );
if ( warned_port == port ){
log.log( msg );
}else{
plugin_interface.getPluginconfig().setPluginParameter( "udp_warned_port", port );
log.logAlert( LoggerChannel.LT_WARNING, msg );
}
}else{
log.log( "Reachability changed for the better" );
}
}
if ( stats_ticks % sample_stats_ticks == 0 ){
logStats();
}
}
}
public int
getStatus()
{
return( status );
}
public String
getStatusText()
{
return( status_text );
}
public boolean
isReachable()
{
return( transport.isReachable());
}
public void
setLogging(
boolean l )
{
dht.setLogging( l );
}
public void
tick()
{
}
public int
getPort()
{
return( port );
}
public void
setPort(
int new_port )
{
port = new_port;
try{
transport.setPort( port );
}catch( Throwable e ){
log.log( e );
}
}
public void
logStats()
{
DHTDBStats d_stats = dht.getDataBase().getStats();
DHTControlStats c_stats = dht.getControl().getStats();
DHTRouterStats r_stats = dht.getRouter().getStats();
DHTTransportStats t_stats = transport.getStats();
long[] rs = r_stats.getStats();
log.log( "DHT:ip=" + transport.getLocalContact().getAddress() +
",net=" + transport.getNetwork() +
",prot=V" + transport.getProtocolVersion()+
",reach=" + transport.isReachable());
log.log( "Router" +
":nodes=" + rs[DHTRouterStats.ST_NODES] +
",leaves=" + rs[DHTRouterStats.ST_LEAVES] +
",contacts=" + rs[DHTRouterStats.ST_CONTACTS] +
",replacement=" + rs[DHTRouterStats.ST_REPLACEMENTS] +
",live=" + rs[DHTRouterStats.ST_CONTACTS_LIVE] +
",unknown=" + rs[DHTRouterStats.ST_CONTACTS_UNKNOWN] +
",failing=" + rs[DHTRouterStats.ST_CONTACTS_DEAD]);
log.log( "Transport" +
":" + t_stats.getString());
int[] dbv_details = d_stats.getValueDetails();
log.log( "Control:dht=" + c_stats.getEstimatedDHTSize() +
", Database:keys=" + d_stats.getKeyCount() +
",vals=" + dbv_details[DHTDBStats.VD_VALUE_COUNT]+
",loc=" + dbv_details[DHTDBStats.VD_LOCAL_SIZE]+
",dir=" + dbv_details[DHTDBStats.VD_DIRECT_SIZE]+
",ind=" + dbv_details[DHTDBStats.VD_INDIRECT_SIZE]+
",div_f=" + dbv_details[DHTDBStats.VD_DIV_FREQ]+
",div_s=" + dbv_details[DHTDBStats.VD_DIV_SIZE] );
}
protected File
getDataDir(
int network )
{
File dir = new File( plugin_interface.getUtilities().getAzureusUserDir(), "dht" );
if ( network != 0 ){
dir = new File( dir, "net" + network );
}
FileUtil.mkdirs(dir);
return( dir );
}
public void
integrateDHT(
boolean first,
DHTTransportContact remove_afterwards )
{
try{
reseed.setEnabled( false );
log.log( "DHT " + (first?"":"re-") + "integration starts" );
long start = SystemTime.getCurrentTime();
dht.integrate( false );
if ( remove_afterwards != null ){
log.log( "Removing seed " + remove_afterwards.getString());
remove_afterwards.remove();
}
long end = SystemTime.getCurrentTime();
integrated_time = end;
log.log( "DHT " + (first?"":"re-") + "integration complete: elapsed = " + (end-start));
dht.print();
}finally{
reseed.setEnabled( true );
}
}
public void
checkForReSeed(
boolean force )
{
int seed_limit = 32;
try{
long[] router_stats = dht.getRouter().getStats().getStats();
if ( router_stats[ DHTRouterStats.ST_CONTACTS_LIVE] < seed_limit || force ){
if ( force ){
log.log( "Reseeding" );
}else{
log.log( "Less than 32 live contacts, reseeding" );
}
int peers_imported = 0;
// only try boostrapping off connected peers on the main network as it is unlikely
// any of them are running CVS and hence the boostrap will fail
if ( network == DHT.NW_MAIN ){
// first look for peers to directly import
Download[] downloads = plugin_interface.getDownloadManager().getDownloads();
outer:
for (int i=0;i<downloads.length;i++){
Download download = downloads[i];
PeerManager pm = download.getPeerManager();
if ( pm == null ){
continue;
}
Peer[] peers = pm.getPeers();
for (int j=0;j<peers.length;j++){
Peer p = peers[j];
int peer_udp_port = p.getUDPNonDataListenPort();
if ( peer_udp_port != 0 ){
if ( importSeed( p.getIp(), peer_udp_port ) != null ){
peers_imported++;
if ( peers_imported > seed_limit ){
break outer;
}
}
}
}
}
}
DHTTransportContact root_to_remove = null;
if ( peers_imported == 0 ){
root_to_remove = importRootSeed();
if ( root_to_remove != null ){
peers_imported++;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -