📄 dhtrouterimpl.java
字号:
/*
* Created on 11-Jan-2005
* Created by Paul Gardner
* Copyright (C) 2004, 2005, 2006 Aelitis, All Rights Reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* AELITIS, SAS au capital de 46,603.30 euros
* 8 Allee Lenotre, La Grille Royale, 78600 Le Mesnil le Roi, France.
*
*/
package com.aelitis.azureus.core.dht.router.impl;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import org.gudy.azureus2.core3.util.AEMonitor;
import org.gudy.azureus2.core3.util.Debug;
import org.gudy.azureus2.core3.util.SystemTime;
import com.aelitis.azureus.core.dht.DHTLogger;
import com.aelitis.azureus.core.dht.impl.DHTLog;
import com.aelitis.azureus.core.dht.router.DHTRouter;
import com.aelitis.azureus.core.dht.router.DHTRouterAdapter;
import com.aelitis.azureus.core.dht.router.DHTRouterContact;
import com.aelitis.azureus.core.dht.router.DHTRouterContactAttachment;
import com.aelitis.azureus.core.dht.router.DHTRouterObserver;
import com.aelitis.azureus.core.dht.router.DHTRouterStats;
import com.aelitis.azureus.core.util.CopyOnWriteList;
/**
* @author parg
*
*/
public class
DHTRouterImpl
implements DHTRouter
{
private static final int SMALLEST_SUBTREE_MAX_EXCESS = 10*1024;
private int K;
private int B;
private int max_rep_per_node;
private DHTLogger logger;
private int smallest_subtree_max;
private DHTRouterAdapter adapter;
private DHTRouterContactImpl local_contact;
private byte[] router_node_id;
private DHTRouterNodeImpl root;
private DHTRouterNodeImpl smallest_subtree;
private int consecutive_dead;
private static long random_seed = SystemTime.getCurrentTime();
private Random random;
private List outstanding_pings = new ArrayList();
private List outstanding_adds = new ArrayList();
private DHTRouterStatsImpl stats = new DHTRouterStatsImpl( this );
private AEMonitor this_mon = new AEMonitor( "DHTRouter" );
private static AEMonitor class_mon = new AEMonitor( "DHTRouter:class" );
private final CopyOnWriteList observers = new CopyOnWriteList();
public
DHTRouterImpl(
int _K,
int _B,
int _max_rep_per_node,
byte[] _router_node_id,
DHTRouterContactAttachment _attachment,
DHTLogger _logger )
{
try{
// only needed for in-process multi-router testing :P
class_mon.enter();
random = new Random( random_seed++ );
}finally{
class_mon.exit();
}
K = _K;
B = _B;
max_rep_per_node = _max_rep_per_node;
logger = _logger;
smallest_subtree_max = 1;
for (int i=0;i<B;i++){
smallest_subtree_max *= 2;
}
smallest_subtree_max += SMALLEST_SUBTREE_MAX_EXCESS;
router_node_id = _router_node_id;
List buckets = new ArrayList();
local_contact = new DHTRouterContactImpl( router_node_id, _attachment, true );
buckets.add( local_contact );
root = new DHTRouterNodeImpl( this, 0, true, buckets );
}
protected void notifyAdded(DHTRouterContact contact) {
for (Iterator i = observers.iterator(); i.hasNext(); ) {
DHTRouterObserver rto = (DHTRouterObserver) i.next();
try{
rto.added(contact);
}catch( Throwable e ){
Debug.printStackTrace(e);
}
}
}
protected void notifyRemoved(DHTRouterContact contact) {
for (Iterator i = observers.iterator(); i.hasNext(); ) {
DHTRouterObserver rto = (DHTRouterObserver) i.next();
try{
rto.removed(contact);
}catch( Throwable e ){
Debug.printStackTrace(e);
}
}
}
protected void notifyLocationChanged(DHTRouterContact contact) {
for (Iterator i = observers.iterator(); i.hasNext(); ) {
DHTRouterObserver rto = (DHTRouterObserver) i.next();
try{
rto.locationChanged(contact);
}catch( Throwable e ){
Debug.printStackTrace(e);
}
}
}
protected void notifyNowAlive(DHTRouterContact contact) {
for (Iterator i = observers.iterator(); i.hasNext(); ) {
DHTRouterObserver rto = (DHTRouterObserver) i.next();
try{
rto.nowAlive(contact);
}catch( Throwable e ){
Debug.printStackTrace(e);
}
}
}
protected void notifyNowFailing(DHTRouterContact contact) {
for (Iterator i = observers.iterator(); i.hasNext(); ) {
DHTRouterObserver rto = (DHTRouterObserver) i.next();
try{
rto.nowFailing(contact);
}catch( Throwable e ){
Debug.printStackTrace(e);
}
}
}
protected void notifyDead() {
for (Iterator i = observers.iterator(); i.hasNext(); ) {
DHTRouterObserver rto = (DHTRouterObserver) i.next();
try{
rto.destroyed(this);
}catch( Throwable e ){
Debug.printStackTrace(e);
}
}
}
public boolean addObserver(DHTRouterObserver rto) {
if ((rto != null) && !observers.contains(rto)) {
observers.add(rto);
return true;
}
return false;
}
public boolean containsObserver(DHTRouterObserver rto) {
return ((rto != null) && observers.contains(rto));
}
public boolean removeObserver(DHTRouterObserver rto) {
return ((rto != null) && observers.remove(rto));
}
public DHTRouterStats
getStats()
{
return( stats );
}
public int
getK()
{
return( K );
}
public byte[]
getID()
{
return( router_node_id );
}
public boolean
isID(
byte[] id )
{
return( Arrays.equals( id, router_node_id ));
}
public DHTRouterContact
getLocalContact()
{
return( local_contact );
}
public void
setAdapter(
DHTRouterAdapter _adapter )
{
adapter = _adapter;
}
public DHTRouterContact
contactKnown(
byte[] node_id,
DHTRouterContactAttachment attachment )
{
return( addContact( node_id, attachment, false ));
}
public DHTRouterContact
contactAlive(
byte[] node_id,
DHTRouterContactAttachment attachment )
{
return( addContact( node_id, attachment, true ));
}
// all incoming node actions come through either contactDead or addContact
// A side effect of processing
// the node is that either a ping can be requested (if a replacement node
// is available and the router wants to check the liveness of an existing node)
// or a new node can be added (either directly to a node or indirectly via
// a replacement becoming "live"
// To avoid requesting these actions while synchronised these are recorded
// in lists and then kicked off separately here
public DHTRouterContact
contactDead(
byte[] node_id,
boolean force )
{
if ( Arrays.equals( router_node_id, node_id )){
// we should never become dead ourselves as this screws up things like
// checking that stored values are close enough to the K livest nodes (as if we are
// dead we don't return ourselves and it all goes doo daa )
Debug.out( "DHTRouter: contactDead called on router node!" );
return( local_contact );
}
try{
try{
this_mon.enter();
consecutive_dead++;
/*
if ( consecutive_dead != 0 && consecutive_dead % 10 == 0 ){
System.out.println( "consecutive_dead: " + consecutive_dead );
}
*/
Object[] res = findContactSupport( node_id );
DHTRouterNodeImpl node = (DHTRouterNodeImpl)res[0];
DHTRouterContactImpl contact = (DHTRouterContactImpl)res[1];
if ( contact != null ){
// some protection against network drop outs - start ignoring dead
// notifications if we're getting significant continous fails
if ( consecutive_dead < 100 || force ){
node.dead( contact, force );
}
}
return( contact );
}finally{
this_mon.exit();
}
}finally{
dispatchPings();
dispatchNodeAdds();
}
}
public void
contactRemoved(
byte[] node_id )
{
}
public DHTRouterContact
addContact(
byte[] node_id,
DHTRouterContactAttachment attachment,
boolean known_to_be_alive )
{
try{
try{
this_mon.enter();
if ( known_to_be_alive ){
consecutive_dead = 0;
}
return( addContactSupport( node_id, attachment, known_to_be_alive ));
}finally{
this_mon.exit();
}
}finally{
dispatchPings();
dispatchNodeAdds();
}
}
protected DHTRouterContact
addContactSupport(
byte[] node_id,
DHTRouterContactAttachment attachment,
boolean known_to_be_alive )
{
DHTRouterNodeImpl current_node = root;
boolean part_of_smallest_subtree = false;
for (int i=0;i<node_id.length;i++){
byte b = node_id[i];
int j = 7;
while( j >= 0 ){
if ( current_node == smallest_subtree ){
part_of_smallest_subtree = true;
}
boolean bit = ((b>>j)&0x01)==1?true:false;
DHTRouterNodeImpl next_node;
if ( bit ){
next_node = current_node.getLeft();
}else{
next_node = current_node.getRight();
}
if ( next_node == null ){
DHTRouterContact existing_contact = current_node.updateExistingNode( node_id, attachment, known_to_be_alive );
if ( existing_contact != null ){
return( existing_contact );
}
List buckets = current_node.getBuckets();
if ( buckets.size() == K ){
// split if either
// 1) this list contains router_node_id or
// 2) depth % B is not 0
// 3) this is part of the smallest subtree
boolean contains_router_node_id = current_node.containsRouterNodeID();
int depth = current_node.getDepth();
boolean too_deep_to_split = depth % B == 0; // note this will be true for 0 but other
// conditions will allow the split
if ( contains_router_node_id ||
(!too_deep_to_split) ||
part_of_smallest_subtree ){
// the smallest-subtree bit is to ensure that we remember all of
// our closest neighbours as ultimately they are the ones responsible
// for returning our identity to queries (due to binary choppery in
// general the query will home in on our neighbours before
// hitting us. It is therefore important that we keep ourselves live
// in their tree by refreshing. If we blindly chopped at K entries
// (down to B levels) then a highly unbalanced tree would result in
// us dropping some of them and therefore not refreshing them and
// therefore dropping out of their trees. There are also other benefits
// of maintaining this tree regarding stored value refresh
// Note that it is rare for such an unbalanced tree.
// However, a possible DOS here would be for a rogue node to
// deliberately try and create such a tree with a large number
// of entries.
if ( part_of_smallest_subtree &&
too_deep_to_split &&
( !contains_router_node_id ) &&
getContactCount( smallest_subtree ) > smallest_subtree_max ){
Debug.out( "DHTRouter: smallest subtree max size violation" );
return( null );
}
// split!!!!
List left_buckets = new ArrayList();
List right_buckets = new ArrayList();
for (int k=0;k<buckets.size();k++){
DHTRouterContactImpl contact = (DHTRouterContactImpl)buckets.get(k);
byte[] bucket_id = contact.getID();
if (((bucket_id[depth/8]>>(7-(depth%8)))&0x01 ) == 0 ){
right_buckets.add( contact );
}else{
left_buckets.add( contact );
}
}
boolean right_contains_rid = false;
boolean left_contains_rid = false;
if ( contains_router_node_id ){
right_contains_rid =
((router_node_id[depth/8]>>(7-(depth%8)))&0x01 ) == 0;
left_contains_rid = !right_contains_rid;
}
DHTRouterNodeImpl new_left = new DHTRouterNodeImpl( this, depth+1, left_contains_rid, left_buckets );
DHTRouterNodeImpl new_right = new DHTRouterNodeImpl( this, depth+1, right_contains_rid, right_buckets );
current_node.split( new_left, new_right );
if ( right_contains_rid ){
// we've created a new smallest subtree
// TODO: tidy up old smallest subtree - remember to factor in B...
smallest_subtree = new_left;
}else if ( left_contains_rid ){
// TODO: tidy up old smallest subtree - remember to factor in B...
smallest_subtree = new_right;
}
// not complete, retry addition
}else{
// split not appropriate, add as a replacemnet
DHTRouterContactImpl new_contact = new DHTRouterContactImpl( node_id, attachment, known_to_be_alive );
return( current_node.addReplacement( new_contact, max_rep_per_node ));
}
}else{
// bucket space free, just add it
DHTRouterContactImpl new_contact = new DHTRouterContactImpl( node_id, attachment, known_to_be_alive );
current_node.addNode( new_contact ); // complete - added to bucket
return( new_contact );
}
}else{
current_node = next_node;
j--;
}
}
}
Debug.out( "DHTRouter inconsistency" );
return( null );
}
public List
findClosestContacts(
byte[] node_id,
boolean live_only )
{
// find the K-ish closest nodes - consider all buckets, not just the closest
try{
this_mon.enter();
List res = new ArrayList();
findClosestContacts( node_id, 0, root, live_only, res );
return( res );
}finally{
this_mon.exit();
}
}
protected void
findClosestContacts(
byte[] node_id,
int depth,
DHTRouterNodeImpl current_node,
boolean live_only,
List res )
{
List buckets = current_node.getBuckets();
if ( buckets != null ){
// add everything from the buckets - caller will sort and select
// the best ones as required
for (int i=0;i<buckets.size();i++){
DHTRouterContactImpl contact = (DHTRouterContactImpl)buckets.get(i);
// use !failing at the moment to include unknown ones
if ( ! ( live_only && contact.isFailing())){
res.add( contact );
}
}
}else{
boolean bit = ((node_id[depth/8]>>(7-(depth%8)))&0x01 ) == 1;
DHTRouterNodeImpl best_node;
DHTRouterNodeImpl worse_node;
if ( bit ){
best_node = current_node.getLeft();
worse_node = current_node.getRight();
}else{
best_node = current_node.getRight();
worse_node = current_node.getLeft();
}
findClosestContacts( node_id, depth+1, best_node, live_only, res );
if ( res.size() < K ){
findClosestContacts( node_id, depth+1, worse_node, live_only, res );
}
}
}
public DHTRouterContact
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -