📄 dhtdbimpl.java
字号:
/*
* Created on 28-Jan-2005
* Created by Paul Gardner
* Copyright (C) 2004, 2005, 2006 Aelitis, All Rights Reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* AELITIS, SAS au capital de 46,603.30 euros
* 8 Allee Lenotre, La Grille Royale, 78600 Le Mesnil le Roi, France.
*
*/
package com.aelitis.azureus.core.dht.db.impl;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.*;
import org.gudy.azureus2.core3.ipfilter.IpFilter;
import org.gudy.azureus2.core3.ipfilter.IpFilterManagerFactory;
import org.gudy.azureus2.core3.util.AEMonitor;
import org.gudy.azureus2.core3.util.AESemaphore;
import org.gudy.azureus2.core3.util.AEThread;
import org.gudy.azureus2.core3.util.HashWrapper;
import org.gudy.azureus2.core3.util.SimpleTimer;
import org.gudy.azureus2.core3.util.SystemTime;
import org.gudy.azureus2.core3.util.Timer;
import org.gudy.azureus2.core3.util.TimerEvent;
import org.gudy.azureus2.core3.util.TimerEventPerformer;
import com.aelitis.azureus.core.dht.DHT;
import com.aelitis.azureus.core.dht.DHTLogger;
import com.aelitis.azureus.core.dht.DHTStorageAdapter;
import com.aelitis.azureus.core.dht.DHTStorageBlock;
import com.aelitis.azureus.core.dht.DHTStorageKey;
import com.aelitis.azureus.core.dht.DHTStorageKeyStats;
import com.aelitis.azureus.core.dht.db.*;
import com.aelitis.azureus.core.dht.impl.DHTLog;
import com.aelitis.azureus.core.dht.router.DHTRouter;
import com.aelitis.azureus.core.dht.control.DHTControl;
import com.aelitis.azureus.core.dht.transport.DHTTransportContact;
import com.aelitis.azureus.core.dht.transport.DHTTransportReplyHandlerAdapter;
import com.aelitis.azureus.core.dht.transport.DHTTransportValue;
import com.aelitis.azureus.core.dht.transport.udp.DHTTransportUDP;
import com.aelitis.azureus.core.util.bloom.BloomFilter;
import com.aelitis.azureus.core.util.bloom.BloomFilterFactory;
/**
* @author parg
*
*/
public class
DHTDBImpl
implements DHTDB, DHTDBStats
{
private int original_republish_interval;
// the grace period gives the originator time to republish their data as this could involve
// some work on their behalf to find closest nodes etc. There's no real urgency here anyway
private int ORIGINAL_REPUBLISH_INTERVAL_GRACE = 60*60*1000;
private int cache_republish_interval;
private long MIN_CACHE_EXPIRY_CHECK_INTERVAL = 60000;
private long last_cache_expiry_check;
private static final long IP_BLOOM_FILTER_REBUILD_PERIOD = 15*60*1000;
private static final int IP_COUNT_BLOOM_SIZE_INCREASE_CHUNK = 1000;
private BloomFilter ip_count_bloom_filter = BloomFilterFactory.createAddRemove8Bit( IP_COUNT_BLOOM_SIZE_INCREASE_CHUNK );
private static final int VALUE_VERSION_CHUNK = 128;
private int next_value_version;
private int next_value_version_left;
private Map stored_values = new HashMap();
private DHTControl control;
private DHTStorageAdapter adapter;
private DHTRouter router;
private DHTTransportContact local_contact;
private DHTLogger logger;
private static final long MAX_TOTAL_SIZE = 4*1024*1024;
private long total_size;
private long total_values;
private long total_keys;
private boolean force_original_republish;
private IpFilter ip_filter = IpFilterManagerFactory.getSingleton().getIPFilter();
private AEMonitor this_mon = new AEMonitor( "DHTDB" );
public
DHTDBImpl(
DHTStorageAdapter _adapter,
int _original_republish_interval,
int _cache_republish_interval,
DHTLogger _logger )
{
adapter = _adapter==null?null:new adapterFacade( _adapter );
original_republish_interval = _original_republish_interval;
cache_republish_interval = _cache_republish_interval;
logger = _logger;
SimpleTimer.addPeriodicEvent(
"DHTDB:op",
original_republish_interval,
new TimerEventPerformer()
{
public void
perform(
TimerEvent event )
{
logger.log( "Republish of original mappings starts" );
long start = SystemTime.getCurrentTime();
int stats = republishOriginalMappings();
long end = SystemTime.getCurrentTime();
logger.log( "Republish of original mappings completed in " + (end-start) + ": " +
"values = " + stats );
}
});
// random skew here so that cache refresh isn't very synchronised, as the optimisations
// regarding non-republising benefit from this
SimpleTimer.addPeriodicEvent(
"DHTDB:cp",
cache_republish_interval + 10000 - (int)(Math.random()*20000),
new TimerEventPerformer()
{
public void
perform(
TimerEvent event )
{
logger.log( "Republish of cached mappings starts" );
long start = SystemTime.getCurrentTime();
int[] stats = republishCachedMappings();
long end = SystemTime.getCurrentTime();
logger.log( "Republish of cached mappings completed in " + (end-start) + ": " +
"values = " + stats[0] + ", keys = " + stats[1] + ", ops = " + stats[2]);
if ( force_original_republish ){
force_original_republish = false;
logger.log( "Force republish of original mappings due to router change starts" );
start = SystemTime.getCurrentTime();
int stats2 = republishOriginalMappings();
end = SystemTime.getCurrentTime();
logger.log( "Force republish of original mappings due to router change completed in " + (end-start) + ": " +
"values = " + stats2 );
}
}
});
SimpleTimer.addPeriodicEvent(
"DHTDB:bloom",
IP_BLOOM_FILTER_REBUILD_PERIOD,
new TimerEventPerformer()
{
public void
perform(
TimerEvent event )
{
try{
this_mon.enter();
rebuildIPBloomFilter( false );
}finally{
this_mon.exit();
}
}
});
}
public void
setControl(
DHTControl _control )
{
control = _control;
// trigger an "original value republish" if router has changed
force_original_republish = router != null;
router = control.getRouter();
local_contact = control.getTransport().getLocalContact();
// our ID has changed - amend the originator of all our values
try{
this_mon.enter();
Iterator it = stored_values.values().iterator();
while( it.hasNext()){
DHTDBMapping mapping = (DHTDBMapping)it.next();
mapping.updateLocalContact( local_contact );
}
}finally{
this_mon.exit();
}
}
public DHTDBValue
store(
HashWrapper key,
byte[] value,
byte flags )
{
// local store
try{
this_mon.enter();
// don't police max check for locally stored data
// only that received
DHTDBMapping mapping = (DHTDBMapping)stored_values.get( key );
if ( mapping == null ){
mapping = new DHTDBMapping( this, key, true );
stored_values.put( key, mapping );
}
DHTDBValueImpl res =
new DHTDBValueImpl(
SystemTime.getCurrentTime(),
value,
getNextValueVersion(),
local_contact,
local_contact,
true,
flags );
mapping.add( res );
return( res );
}finally{
this_mon.exit();
}
}
public byte
store(
DHTTransportContact sender,
HashWrapper key,
DHTTransportValue[] values )
{
// allow 4 bytes per value entry to deal with overhead (prolly should be more but we're really
// trying to deal with 0-length value stores)
if ( total_size + ( total_values*4 ) > MAX_TOTAL_SIZE ){
DHTLog.log( "Not storing " + DHTLog.getString2(key.getHash()) + " as maximum storage limit exceeded" );
return( DHT.DT_SIZE );
}
// remote store for cache values
// Make sure that we only accept values for storing that are reasonable.
// Assumption is that the caller has made a reasonable effort to ascertain
// the correct place to store a value. Part of this will in general have
// needed them to query us for example. Therefore, limit values to those
// that are at least as close to us
List closest_contacts = control.getClosestKContactsList( key.getHash(), true );
boolean store_it = false;
for (int i=0;i<closest_contacts.size();i++){
if ( router.isID(((DHTTransportContact)closest_contacts.get(i)).getID())){
store_it = true;
break;
}
}
if ( !store_it ){
DHTLog.log( "Not storing " + DHTLog.getString2(key.getHash()) + " as key too far away" );
return( DHT.DT_NONE );
}
// next, for cache forwards (rather then values coming directly from
// originators) we ensure that the contact sending the values to us is
// close enough. If any values are coming indirect then we can safely assume
// that they all are
boolean cache_forward = false;
for (int i=0;i<values.length;i++){
if (!Arrays.equals( sender.getID(), values[i].getOriginator().getID())){
cache_forward = true;
break;
}
}
if ( cache_forward ){
// get the closest contacts to me
byte[] my_id = local_contact.getID();
closest_contacts = control.getClosestKContactsList( my_id, true );
DHTTransportContact furthest = (DHTTransportContact)closest_contacts.get( closest_contacts.size()-1);
if ( control.computeAndCompareDistances( furthest.getID(), sender.getID(), my_id ) < 0 ){
store_it = false;
}
}
if ( !store_it ){
DHTLog.log( "Not storing " + DHTLog.getString2(key.getHash()) + " as cache forward and sender too far away" );
return( DHT.DT_NONE );
}
try{
this_mon.enter();
checkCacheExpiration( false );
DHTDBMapping mapping = (DHTDBMapping)stored_values.get( key );
if ( mapping == null ){
mapping = new DHTDBMapping( this, key, false );
stored_values.put( key, mapping );
}
boolean contact_checked = false;
boolean contact_ok = false;
// we carry on an update as its ok to replace existing entries
// even if diversified
for (int i=0;i<values.length;i++){
DHTTransportValue t_value = values[i];
// last check, verify that the contact is who they say they are, only for non-forwards
// as cache forwards are only accepted if they are "close enough" and we can't
// rely on their identify due to the way that cache republish works (it doesn't
// guarantee a "lookup_node" prior to "store".
DHTTransportValue value = values[i];
boolean ok_to_store = false;
boolean direct =Arrays.equals( sender.getID(), value.getOriginator().getID());
if ( !contact_checked ){
contact_ok = control.verifyContact( sender, direct );
if ( !contact_ok ){
logger.log( "DB: verification of contact '" + sender.getName() + "' failed for store operation" );
}
contact_checked = true;
}
ok_to_store = contact_ok;
if ( ok_to_store ){
DHTDBValueImpl mapping_value = new DHTDBValueImpl( sender, value, false );
mapping.add( mapping_value );
}
}
return( mapping.getDiversificationType());
}finally{
this_mon.exit();
}
}
public DHTDBLookupResult
get(
DHTTransportContact reader,
HashWrapper key,
int max_values, // 0 -> all
byte flags,
boolean external_request )
{
try{
this_mon.enter();
checkCacheExpiration( false );
final DHTDBMapping mapping = (DHTDBMapping)stored_values.get(key);
if ( mapping == null ){
return( null );
}
if ( external_request ){
mapping.addHit();
}
final DHTDBValue[] values = mapping.get( reader, max_values, flags );
return(
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -