📄 dhtcontrolimpl.java
字号:
/*
* Created on 12-Jan-2005
* Created by Paul Gardner
* Copyright (C) 2004, 2005, 2006 Aelitis, All Rights Reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* AELITIS, SAS au capital de 46,603.30 euros
* 8 Allee Lenotre, La Grille Royale, 78600 Le Mesnil le Roi, France.
*
*/
package com.aelitis.azureus.core.dht.control.impl;
import java.io.*;
import java.math.BigInteger;
import java.util.*;
import javax.crypto.Cipher;
import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import org.gudy.azureus2.core3.util.AEMonitor;
import org.gudy.azureus2.core3.util.AESemaphore;
import org.gudy.azureus2.core3.util.Debug;
import org.gudy.azureus2.core3.util.HashWrapper;
import org.gudy.azureus2.core3.util.ListenerManager;
import org.gudy.azureus2.core3.util.ListenerManagerDispatcher;
import org.gudy.azureus2.core3.util.SHA1Simple;
import org.gudy.azureus2.core3.util.SystemTime;
import org.gudy.azureus2.core3.util.ThreadPool;
import org.gudy.azureus2.core3.util.ThreadPoolTask;
import com.aelitis.azureus.core.dht.DHT;
import com.aelitis.azureus.core.dht.DHTLogger;
import com.aelitis.azureus.core.dht.DHTOperationAdapter;
import com.aelitis.azureus.core.dht.DHTOperationListener;
import com.aelitis.azureus.core.dht.DHTStorageBlock;
import com.aelitis.azureus.core.dht.impl.*;
import com.aelitis.azureus.core.dht.control.*;
import com.aelitis.azureus.core.dht.db.*;
import com.aelitis.azureus.core.dht.netcoords.DHTNetworkPosition;
import com.aelitis.azureus.core.dht.netcoords.DHTNetworkPositionManager;
import com.aelitis.azureus.core.dht.router.*;
import com.aelitis.azureus.core.dht.transport.*;
import com.aelitis.azureus.core.dht.transport.udp.DHTTransportUDP;
/**
* @author parg
*
*/
public class
DHTControlImpl
implements DHTControl, DHTTransportRequestHandler
{
private static final int EXTERNAL_LOOKUP_CONCURRENCY = 32;
private static final int EXTERNAL_PUT_CONCURRENCY = 16;
private static final int RANDOM_QUERY_PERIOD = 5*60*1000;
private static final int INTEGRATION_TIME_MAX = 15*1000;
private DHTControlAdapter adapter;
private DHTTransport transport;
private DHTTransportContact local_contact;
private DHTRouter router;
private DHTDB database;
private DHTControlStatsImpl stats;
private DHTLogger logger;
private int node_id_byte_count;
private int search_concurrency;
private int lookup_concurrency;
private int cache_at_closest_n;
private int K;
private int B;
private int max_rep_per_node;
private long router_start_time;
private int router_count;
private ThreadPool internal_lookup_pool;
private ThreadPool external_lookup_pool;
private ThreadPool internal_put_pool;
private ThreadPool external_put_pool;
private Map imported_state = new HashMap();
private long last_lookup;
private ListenerManager listeners = ListenerManager.createAsyncManager(
"DHTControl:listenDispatcher",
new ListenerManagerDispatcher()
{
public void
dispatch(
Object _listener,
int type,
Object value )
{
DHTControlListener target = (DHTControlListener)_listener;
target.activityChanged((DHTControlActivity)value, type );
}
});
private List activities = new ArrayList();
private AEMonitor activity_mon = new AEMonitor( "DHTControl:activities" );
protected AEMonitor estimate_mon = new AEMonitor( "DHTControl:estimate" );
private long last_dht_estimate_time;
private long local_dht_estimate;
private long combined_dht_estimate;
private static final int LOCAL_ESTIMATE_HISTORY = 32;
private Map local_estimate_values =
new LinkedHashMap(LOCAL_ESTIMATE_HISTORY,0.75f,true)
{
protected boolean
removeEldestEntry(
Map.Entry eldest)
{
return( size() > LOCAL_ESTIMATE_HISTORY );
}
};
private static final int REMOTE_ESTIMATE_HISTORY = 128;
private List remote_estimate_values = new LinkedList();
protected AEMonitor spoof_mon = new AEMonitor( "DHTControl:spoof" );
private Cipher spoof_cipher;
private SecretKey spoof_key;
public
DHTControlImpl(
DHTControlAdapter _adapter,
DHTTransport _transport,
int _K,
int _B,
int _max_rep_per_node,
int _search_concurrency,
int _lookup_concurrency,
int _original_republish_interval,
int _cache_republish_interval,
int _cache_at_closest_n,
DHTLogger _logger )
{
adapter = _adapter;
transport = _transport;
logger = _logger;
K = _K;
B = _B;
max_rep_per_node = _max_rep_per_node;
search_concurrency = _search_concurrency;
lookup_concurrency = _lookup_concurrency;
cache_at_closest_n = _cache_at_closest_n;
// set this so we don't do initial calculation until reasonably populated
last_dht_estimate_time = SystemTime.getCurrentTime();
database = DHTDBFactory.create(
adapter.getStorageAdapter(),
_original_republish_interval,
_cache_republish_interval,
logger );
internal_lookup_pool = new ThreadPool("DHTControl:internallookups", lookup_concurrency );
internal_put_pool = new ThreadPool("DHTControl:internalputs", lookup_concurrency );
// external pools queue when full ( as opposed to blocking )
external_lookup_pool = new ThreadPool("DHTControl:externallookups", EXTERNAL_LOOKUP_CONCURRENCY, true );
external_put_pool = new ThreadPool("DHTControl:puts", EXTERNAL_PUT_CONCURRENCY, true );
createRouter( transport.getLocalContact());
node_id_byte_count = router.getID().length;
stats = new DHTControlStatsImpl( this );
// don't bother computing anti-spoof stuff if we don't support value storage
if ( transport.supportsStorage()){
try{
spoof_cipher = Cipher.getInstance("DESede/ECB/PKCS5Padding");
KeyGenerator keyGen = KeyGenerator.getInstance("DESede");
spoof_key = keyGen.generateKey();
}catch( Throwable e ){
Debug.printStackTrace( e );
logger.log( e );
}
}
transport.setRequestHandler( this );
transport.addListener(
new DHTTransportListener()
{
public void
localContactChanged(
DHTTransportContact new_local_contact )
{
logger.log( "Transport ID changed, recreating router" );
List old_contacts = router.findBestContacts( 0 );
byte[] old_router_id = router.getID();
createRouter( new_local_contact );
// sort for closeness to new router id
Set sorted_contacts = new sortedTransportContactSet( router.getID(), true ).getSet();
for (int i=0;i<old_contacts.size();i++){
DHTRouterContact contact = (DHTRouterContact)old_contacts.get(i);
if ( !Arrays.equals( old_router_id, contact.getID())){
if ( contact.isAlive()){
DHTTransportContact t_contact = ((DHTControlContactImpl)contact.getAttachment()).getTransportContact();
sorted_contacts.add( t_contact );
}
}
}
// fill up with non-alive ones to lower limit in case this is a start-of-day
// router change and we only have imported contacts in limbo state
for (int i=0;sorted_contacts.size() < 32 && i<old_contacts.size();i++){
DHTRouterContact contact = (DHTRouterContact)old_contacts.get(i);
if ( !Arrays.equals( old_router_id, contact.getID())){
if ( !contact.isAlive()){
DHTTransportContact t_contact = ((DHTControlContactImpl)contact.getAttachment()).getTransportContact();
sorted_contacts.add( t_contact );
}
}
}
Iterator it = sorted_contacts.iterator();
int added = 0;
// don't add them all otherwise we can skew the smallest-subtree. better
// to seed with some close ones and then let the normal seeding process
// populate it correctly
while( it.hasNext() && added < 128 ){
DHTTransportContact contact = (DHTTransportContact)it.next();
router.contactAlive( contact.getID(), new DHTControlContactImpl( contact ));
added++;
}
seed( false );
}
public void
currentAddress(
String address )
{
}
public void
reachabilityChanged(
boolean reacheable )
{
}
});
}
protected void
createRouter(
DHTTransportContact _local_contact)
{
router_start_time = SystemTime.getCurrentTime();
router_count++;
local_contact = _local_contact;
if ( router != null ){
router.destroy();
}
router = DHTRouterFactory.create(
K, B, max_rep_per_node,
local_contact.getID(),
new DHTControlContactImpl( local_contact ),
logger);
router.setAdapter(
new DHTRouterAdapter()
{
public void
requestPing(
DHTRouterContact contact )
{
DHTControlImpl.this.requestPing( contact );
}
public void
requestLookup(
byte[] id,
String description )
{
lookup( internal_lookup_pool, false,
id,
description,
(byte)0,
false,
0,
search_concurrency,
1,
router.getK(), // (parg - removed this) decrease search accuracy for refreshes
new lookupResultHandler(new DHTOperationAdapter())
{
public void
diversify(
DHTTransportContact cause,
byte diversification_type )
{
}
public void
closest(
List res )
{
}
});
}
public void
requestAdd(
DHTRouterContact contact )
{
nodeAddedToRouter( contact );
}
});
database.setControl( this );
}
public long
getRouterUptime()
{
long now = SystemTime.getCurrentTime();
if ( now < router_start_time ){
router_start_time = now;
}
return( now - router_start_time );
}
public int
getRouterCount()
{
return( router_count );
}
public DHTControlStats
getStats()
{
return( stats );
}
public DHTTransport
getTransport()
{
return( transport );
}
public DHTRouter
getRouter()
{
return( router );
}
public DHTDB
getDataBase()
{
return( database );
}
public void
contactImported(
DHTTransportContact contact )
{
router.contactKnown( contact.getID(), new DHTControlContactImpl(contact));
}
public void
contactRemoved(
DHTTransportContact contact )
{
// obviously we don't want to remove ourselves
if ( !router.isID( contact.getID())){
router.contactDead( contact.getID(), true );
}
}
public void
exportState(
DataOutputStream daos,
int max )
throws IOException
{
/*
* We need to be a bit smart about exporting state to deal with the situation where a
* DHT is started (with good import state) and then stopped before the goodness of the
* state can be re-established. So we remember what we imported and take account of this
* on a re-export
*/
// get all the contacts
List contacts = router.findBestContacts( 0 );
// give priority to any that were alive before and are alive now
List to_save = new ArrayList();
List reserves = new ArrayList();
//System.out.println( "Exporting" );
for (int i=0;i<contacts.size();i++){
DHTRouterContact contact = (DHTRouterContact)contacts.get(i);
Object[] imported = (Object[])imported_state.get( new HashWrapper( contact.getID()));
if ( imported != null ){
if ( contact.isAlive()){
// definitely want to keep this one
to_save.add( contact );
}else if ( !contact.isFailing()){
// dunno if its still good or not, however its got to be better than any
// new ones that we didn't import who aren't known to be alive
reserves.add( contact );
}
}
}
//System.out.println( " initial to_save = " + to_save.size() + ", reserves = " + reserves.size());
// now pull out any live ones
for (int i=0;i<contacts.size();i++){
DHTRouterContact contact = (DHTRouterContact)contacts.get(i);
if ( contact.isAlive() && !to_save.contains( contact )){
to_save.add( contact );
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -