📄 dhtdbimpl.java
字号:
Object[] data = (Object[])count.get( key );
if ( data == null ){
data = new Object[2];
data[0] = new Integer(1);
data[1] = "";
count.put( key, data );
}else{
data[0] = new Integer(((Integer)data[0]).intValue() + 1 );
}
String s = (String)data[1];
s += (s.length()==0?"":", ") + "key=" + DHTLog.getString2(value_key.getHash()) + ",val=" + value.getString();
data[1] = s;
}
}
it = count.keySet().iterator();
while( it.hasNext()){
Integer k = (Integer)it.next();
Object[] data = (Object[])count.get(k);
logger.log( " " + k + " -> " + data[0] + " entries" ); // ": " + data[1]);
}
it = stored_values.entrySet().iterator();
String str = " ";
int str_entries = 0;
while( it.hasNext()){
Map.Entry entry = (Map.Entry)it.next();
HashWrapper value_key = (HashWrapper)entry.getKey();
DHTDBMapping mapping = (DHTDBMapping)entry.getValue();
if ( str_entries == 16 ){
logger.log( str );
str = " ";
str_entries = 0;
}
str_entries++;
str += (str_entries==1?"":", ") + DHTLog.getString2(value_key.getHash()) + " -> " + mapping.getValueCount() + "/" + mapping.getHits()+"["+mapping.getLocalSize()+","+mapping.getDirectSize()+","+mapping.getIndirectSize() + "]";
}
if ( str_entries > 0 ){
logger.log( str );
}
}finally{
this_mon.exit();
}
}
protected void
banContact(
final DHTTransportContact contact,
final String reason )
{
new AEThread( "DHTDBImpl:delayed flood delete", true )
{
public void
runSupport()
{
// delete their data on a separate thread so as not to
// interfere with the current action
try{
this_mon.enter();
Iterator it = stored_values.values().iterator();
while( it.hasNext()){
DHTDBMapping mapping = (DHTDBMapping)it.next();
Iterator it2 = mapping.getDirectValues();
while( it2.hasNext()){
DHTDBValueImpl val = (DHTDBValueImpl)it2.next();
if ( !val.isLocal()){
if ( Arrays.equals( val.getOriginator().getID(), contact.getID())){
it.remove();
}
}
}
}
}finally{
this_mon.exit();
}
}
}.start();
logger.log( "Banning " + contact.getString() + " due to store flooding (" + reason + ")" );
ip_filter.ban(
contact.getAddress().getAddress().getHostAddress(),
"DHT: Sender stored excessive entries at this node (" + reason + ")" );
}
protected void
incrementValueAdds(
DHTTransportContact contact )
{
// assume a node stores 1000 values at 20 (K) locations -> 20,000 values
// assume a DHT size of 100,000 nodes
// that is, on average, 1 value per 5 nodes
// assume NAT of up to 30 ports per address
// this gives 6 values per address
// with a factor of 10 error this is still only 60 per address
int hit_count = ip_count_bloom_filter.add( contact.getAddress().getAddress().getAddress());
if ( DHTLog.GLOBAL_BLOOM_TRACE ){
System.out.println( "direct add from " + contact.getAddress() + ", hit count = " + hit_count );
}
// allow up to 10% bloom filter utilisation
if ( ip_count_bloom_filter.getSize() / ip_count_bloom_filter.getEntryCount() < 10 ){
rebuildIPBloomFilter( true );
}
if ( hit_count > 64 ){
// obviously being spammed, drop all data originated by this IP and ban it
banContact( contact, "global flood" );
}
}
protected void
decrementValueAdds(
DHTTransportContact contact )
{
int hit_count = ip_count_bloom_filter.remove( contact.getAddress().getAddress().getAddress());
if ( DHTLog.GLOBAL_BLOOM_TRACE ){
System.out.println( "direct remove from " + contact.getAddress() + ", hit count = " + hit_count );
}
}
protected void
rebuildIPBloomFilter(
boolean increase_size )
{
BloomFilter new_filter;
if ( increase_size ){
new_filter = BloomFilterFactory.createAddRemove8Bit( ip_count_bloom_filter.getSize() + IP_COUNT_BLOOM_SIZE_INCREASE_CHUNK );
}else{
new_filter = BloomFilterFactory.createAddRemove8Bit( ip_count_bloom_filter.getSize());
}
try{
//Map sender_map = new HashMap();
//List senders = new ArrayList();
Iterator it = stored_values.values().iterator();
int max_hits = 0;
while( it.hasNext()){
DHTDBMapping mapping = (DHTDBMapping)it.next();
mapping.rebuildIPBloomFilter( false );
Iterator it2 = mapping.getDirectValues();
while( it2.hasNext()){
DHTDBValueImpl val = (DHTDBValueImpl)it2.next();
if ( !val.isLocal()){
// logger.log( " adding " + val.getOriginator().getAddress());
int hits = new_filter.add( val.getOriginator().getAddress().getAddress().getAddress());
if ( hits > max_hits ){
max_hits = hits;
}
}
}
// survey our neighbourhood
/*
* its is non-trivial to do anything about nodes that get "close" to us and then
* spam us with crap. Ultimately, of course, to take a key out you "just" create
* the 20 closest nodes to the key and then run nodes that swallow all registrations
* and return nothing.
* Protecting against one or two such nodes that flood crap requires crap to be
* identified. Tracing shows a large disparity between number of values registered
* per neighbour (factors of 100), so an approach based on number of registrations
* is non-trivial (assuming future scaling of the DHT, what do we consider crap?)
* A further approach would be to query the claimed originators of values (obviously
* a low bandwith approach, e.g. query 3 values from the contact with highest number
* of forwarded values). This requires originators to support long term knowledge of
* what they've published (we don't want to blacklist a neighbour because an originator
* has deleted a value/been restarted). We also then have to consider how to deal with
* non-responses to queries (assuming an affirmative Yes -> value has been forwarded
* correnctly, No -> probably crap). We can't treat non-replies as No. Thus a bad
* neighbour only has to forward crap with originators that aren't AZ nodes (very
* easy to do!) to break this aproach.
*
*
it2 = mapping.getIndirectValues();
while( it2.hasNext()){
DHTDBValueImpl val = (DHTDBValueImpl)it2.next();
DHTTransportContact sender = val.getSender();
HashWrapper hw = new HashWrapper( sender.getID());
Integer sender_count = (Integer)sender_map.get( hw );
if ( sender_count == null ){
sender_count = new Integer(1);
senders.add( sender );
}else{
sender_count = new Integer( sender_count.intValue() + 1 );
}
sender_map.put( hw, sender_count );
}
*/
}
logger.log( "Rebuilt global IP bloom filter, size = " + new_filter.getSize() + ", entries =" + new_filter.getEntryCount()+", max hits = " + max_hits );
/*
senders = control.sortContactsByDistance( senders );
for (int i=0;i<senders.size();i++){
DHTTransportContact sender = (DHTTransportContact)senders.get(i);
System.out.println( i + ":" + sender.getString() + " -> " + sender_map.get(new HashWrapper(sender.getID())));
}
*/
}finally{
ip_count_bloom_filter = new_filter;
}
}
protected void
reportSizes(
String op )
{
/*
if ( !this_mon.isHeld()){
Debug.out( "Monitor not held" );
}
int actual_keys = stored_values.size();
int actual_values = 0;
int actual_size = 0;
Iterator it = stored_values.values().iterator();
while( it.hasNext()){
DHTDBMapping mapping = (DHTDBMapping)it.next();
int reported_size = mapping.getLocalSize() + mapping.getDirectSize() + mapping.getIndirectSize();
actual_values += mapping.getValueCount();
Iterator it2 = mapping.getValues();
int sz = 0;
while( it2.hasNext()){
DHTDBValue val = (DHTDBValue)it2.next();
sz += val.getValue().length;
}
if ( sz != reported_size ){
Debug.out( "Reported mapping size != actual: " + reported_size + "/" + sz );
}
actual_size += sz;
}
if ( actual_keys != total_keys ){
Debug.out( "Actual keys != total: " + actual_keys + "/" + total_keys );
}
if ( actual_values != total_values ){
Debug.out( "Actual values != total: " + actual_values + "/" + total_values );
}
if ( actual_size != total_size ){
Debug.out( "Actual size != total: " + actual_size + "/" + total_size );
}
System.out.println( "DHTDB: " + op + " - keys=" + total_keys + ", values=" + total_values + ", size=" + total_size );
*/
}
protected int
getNextValueVersion()
{
try{
this_mon.enter();
if ( next_value_version_left == 0 ){
next_value_version_left = VALUE_VERSION_CHUNK;
if ( adapter == null ){
// no persistent manager, just carry on incrementing
}else{
next_value_version = adapter.getNextValueVersions( VALUE_VERSION_CHUNK );
}
//System.out.println( "next chunk:" + next_value_version );
}
next_value_version_left--;
int res = next_value_version++;
//System.out.println( "next value version = " + res );
return( res );
}finally{
this_mon.exit();
}
}
protected class
adapterFacade
implements DHTStorageAdapter
{
private DHTStorageAdapter delegate;
protected
adapterFacade(
DHTStorageAdapter _delegate )
{
delegate = _delegate;
}
public DHTStorageKey
keyCreated(
HashWrapper key,
boolean local )
{
// report *before* incrementing as this occurs before the key is locally added
reportSizes( "keyAdded" );
total_keys++;
return( delegate.keyCreated( key, local ));
}
public void
keyDeleted(
DHTStorageKey adapter_key )
{
total_keys--;
reportSizes( "keyDeleted" );
delegate.keyDeleted( adapter_key );
}
public void
keyRead(
DHTStorageKey adapter_key,
DHTTransportContact contact )
{
reportSizes( "keyRead" );
delegate.keyRead( adapter_key, contact );
}
public void
valueAdded(
DHTStorageKey key,
DHTTransportValue value )
{
total_values++;
total_size += value.getValue().length;
reportSizes( "valueAdded");
if ( !value.isLocal() ){
DHTDBValueImpl val = (DHTDBValueImpl)value;
boolean direct = Arrays.equals( value.getOriginator().getID(), val.getSender().getID());
if ( direct ){
incrementValueAdds( value.getOriginator());
}
}
delegate.valueAdded( key, value );
}
public void
valueUpdated(
DHTStorageKey key,
DHTTransportValue old_value,
DHTTransportValue new_value )
{
total_size += (new_value.getValue().length - old_value.getValue().length );
reportSizes("valueUpdated");
delegate.valueUpdated( key, old_value, new_value );
}
public void
valueDeleted(
DHTStorageKey key,
DHTTransportValue value )
{
total_values--;
total_size -= value.getValue().length;
reportSizes("valueDeleted");
if ( !value.isLocal() ){
DHTDBValueImpl val = (DHTDBValueImpl)value;
boolean direct = Arrays.equals( value.getOriginator().getID(), val.getSender().getID());
if ( direct ){
decrementValueAdds( value.getOriginator());
}
}
delegate.valueDeleted( key, value );
}
// local lookup/put operations
public byte[][]
getExistingDiversification(
byte[] key,
boolean put_operation,
boolean exhaustive_get )
{
return( delegate.getExistingDiversification( key, put_operation, exhaustive_get ));
}
public byte[][]
createNewDiversification(
DHTTransportContact cause,
byte[] key,
boolean put_operation,
byte diversification_type,
boolean exhaustive_get )
{
return( delegate.createNewDiversification( cause, key, put_operation, diversification_type, exhaustive_get ));
}
public int
getNextValueVersions(
int num )
{
return( delegate.getNextValueVersions(num));
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -