dhtdbimpl.java
来自「这是一个基于java编写的torrent的P2P源码」· Java 代码 · 共 1,866 行 · 第 1/4 页
JAVA
1,866 行
protected void
incrementValueAdds(
DHTTransportContact contact )
{
// assume a node stores 1000 values at 20 (K) locations -> 20,000 values
// assume a DHT size of 100,000 nodes
// that is, on average, 1 value per 5 nodes
// assume NAT of up to 30 ports per address
// this gives 6 values per address
// with a factor of 10 error this is still only 60 per address
int hit_count = ip_count_bloom_filter.add( contact.getAddress().getAddress().getAddress());
if ( DHTLog.GLOBAL_BLOOM_TRACE ){
System.out.println( "direct add from " + contact.getAddress() + ", hit count = " + hit_count );
}
// allow up to 10% bloom filter utilisation
if ( ip_count_bloom_filter.getSize() / ip_count_bloom_filter.getEntryCount() < 10 ){
rebuildIPBloomFilter( true );
}
if ( hit_count > 64 ){
// obviously being spammed, drop all data originated by this IP and ban it
banContact( contact, "global flood" );
}
}
protected void
decrementValueAdds(
DHTTransportContact contact )
{
int hit_count = ip_count_bloom_filter.remove( contact.getAddress().getAddress().getAddress());
if ( DHTLog.GLOBAL_BLOOM_TRACE ){
System.out.println( "direct remove from " + contact.getAddress() + ", hit count = " + hit_count );
}
}
protected void
rebuildIPBloomFilter(
boolean increase_size )
{
BloomFilter new_filter;
if ( increase_size ){
new_filter = BloomFilterFactory.createAddRemove8Bit( ip_count_bloom_filter.getSize() + IP_COUNT_BLOOM_SIZE_INCREASE_CHUNK );
}else{
new_filter = BloomFilterFactory.createAddRemove8Bit( ip_count_bloom_filter.getSize());
}
try{
//Map sender_map = new HashMap();
//List senders = new ArrayList();
Iterator it = stored_values.values().iterator();
int max_hits = 0;
while( it.hasNext()){
DHTDBMapping mapping = (DHTDBMapping)it.next();
mapping.rebuildIPBloomFilter( false );
Iterator it2 = mapping.getDirectValues();
while( it2.hasNext()){
DHTDBValueImpl val = (DHTDBValueImpl)it2.next();
if ( !val.isLocal()){
// logger.log( " adding " + val.getOriginator().getAddress());
int hits = new_filter.add( val.getOriginator().getAddress().getAddress().getAddress());
if ( hits > max_hits ){
max_hits = hits;
}
}
}
// survey our neighbourhood
/*
* its is non-trivial to do anything about nodes that get "close" to us and then
* spam us with crap. Ultimately, of course, to take a key out you "just" create
* the 20 closest nodes to the key and then run nodes that swallow all registrations
* and return nothing.
* Protecting against one or two such nodes that flood crap requires crap to be
* identified. Tracing shows a large disparity between number of values registered
* per neighbour (factors of 100), so an approach based on number of registrations
* is non-trivial (assuming future scaling of the DHT, what do we consider crap?)
* A further approach would be to query the claimed originators of values (obviously
* a low bandwith approach, e.g. query 3 values from the contact with highest number
* of forwarded values). This requires originators to support long term knowledge of
* what they've published (we don't want to blacklist a neighbour because an originator
* has deleted a value/been restarted). We also then have to consider how to deal with
* non-responses to queries (assuming an affirmative Yes -> value has been forwarded
* correnctly, No -> probably crap). We can't treat non-replies as No. Thus a bad
* neighbour only has to forward crap with originators that aren't AZ nodes (very
* easy to do!) to break this aproach.
*
*
it2 = mapping.getIndirectValues();
while( it2.hasNext()){
DHTDBValueImpl val = (DHTDBValueImpl)it2.next();
DHTTransportContact sender = val.getSender();
HashWrapper hw = new HashWrapper( sender.getID());
Integer sender_count = (Integer)sender_map.get( hw );
if ( sender_count == null ){
sender_count = new Integer(1);
senders.add( sender );
}else{
sender_count = new Integer( sender_count.intValue() + 1 );
}
sender_map.put( hw, sender_count );
}
*/
}
logger.log( "Rebuilt global IP bloom filter, size = " + new_filter.getSize() + ", entries =" + new_filter.getEntryCount()+", max hits = " + max_hits );
/*
senders = control.sortContactsByDistance( senders );
for (int i=0;i<senders.size();i++){
DHTTransportContact sender = (DHTTransportContact)senders.get(i);
System.out.println( i + ":" + sender.getString() + " -> " + sender_map.get(new HashWrapper(sender.getID())));
}
*/
}finally{
ip_count_bloom_filter = new_filter;
}
}
protected void
reportSizes(
String op )
{
/*
if ( !this_mon.isHeld()){
Debug.out( "Monitor not held" );
}
int actual_keys = stored_values.size();
int actual_values = 0;
int actual_size = 0;
Iterator it = stored_values.values().iterator();
while( it.hasNext()){
DHTDBMapping mapping = (DHTDBMapping)it.next();
int reported_size = mapping.getLocalSize() + mapping.getDirectSize() + mapping.getIndirectSize();
actual_values += mapping.getValueCount();
Iterator it2 = mapping.getValues();
int sz = 0;
while( it2.hasNext()){
DHTDBValue val = (DHTDBValue)it2.next();
sz += val.getValue().length;
}
if ( sz != reported_size ){
Debug.out( "Reported mapping size != actual: " + reported_size + "/" + sz );
}
actual_size += sz;
}
if ( actual_keys != total_keys ){
Debug.out( "Actual keys != total: " + actual_keys + "/" + total_keys );
}
if ( actual_values != total_values ){
Debug.out( "Actual values != total: " + actual_values + "/" + total_values );
}
if ( actual_size != total_size ){
Debug.out( "Actual size != total: " + actual_size + "/" + total_size );
}
System.out.println( "DHTDB: " + op + " - keys=" + total_keys + ", values=" + total_values + ", size=" + total_size );
*/
}
protected int
getNextValueVersion()
{
try{
this_mon.enter();
if ( next_value_version_left == 0 ){
next_value_version_left = VALUE_VERSION_CHUNK;
if ( adapter == null ){
// no persistent manager, just carry on incrementing
}else{
next_value_version = adapter.getNextValueVersions( VALUE_VERSION_CHUNK );
}
//System.out.println( "next chunk:" + next_value_version );
}
next_value_version_left--;
int res = next_value_version++;
//System.out.println( "next value version = " + res );
return( res );
}finally{
this_mon.exit();
}
}
protected class
adapterFacade
implements DHTStorageAdapter
{
private DHTStorageAdapter delegate;
protected
adapterFacade(
DHTStorageAdapter _delegate )
{
delegate = _delegate;
}
public DHTStorageKey
keyCreated(
HashWrapper key,
boolean local )
{
// report *before* incrementing as this occurs before the key is locally added
reportSizes( "keyAdded" );
total_keys++;
return( delegate.keyCreated( key, local ));
}
public void
keyDeleted(
DHTStorageKey adapter_key )
{
total_keys--;
reportSizes( "keyDeleted" );
delegate.keyDeleted( adapter_key );
}
public void
keyRead(
DHTStorageKey adapter_key,
DHTTransportContact contact )
{
reportSizes( "keyRead" );
delegate.keyRead( adapter_key, contact );
}
public DHTStorageKeyStats
deserialiseStats(
DataInputStream is )
throws IOException
{
return( delegate.deserialiseStats( is ));
}
public void
valueAdded(
DHTStorageKey key,
DHTTransportValue value )
{
total_values++;
total_size += value.getValue().length;
reportSizes( "valueAdded");
if ( !value.isLocal() ){
DHTDBValueImpl val = (DHTDBValueImpl)value;
boolean direct = Arrays.equals( value.getOriginator().getID(), val.getSender().getID());
if ( direct ){
incrementValueAdds( value.getOriginator());
}
}
delegate.valueAdded( key, value );
}
public void
valueUpdated(
DHTStorageKey key,
DHTTransportValue old_value,
DHTTransportValue new_value )
{
total_size += (new_value.getValue().length - old_value.getValue().length );
reportSizes("valueUpdated");
delegate.valueUpdated( key, old_value, new_value );
}
public void
valueDeleted(
DHTStorageKey key,
DHTTransportValue value )
{
total_values--;
total_size -= value.getValue().length;
reportSizes("valueDeleted");
if ( !value.isLocal() ){
DHTDBValueImpl val = (DHTDBValueImpl)value;
boolean direct = Arrays.equals( value.getOriginator().getID(), val.getSender().getID());
if ( direct ){
decrementValueAdds( value.getOriginator());
}
}
delegate.valueDeleted( key, value );
}
// local lookup/put operations
public boolean
isDiversified(
byte[] key )
{
return( delegate.isDiversified( key ));
}
public byte[][]
getExistingDiversification(
byte[] key,
boolean put_operation,
boolean exhaustive_get )
{
return( delegate.getExistingDiversification( key, put_operation, exhaustive_get ));
}
public byte[][]
createNewDiversification(
DHTTransportContact cause,
byte[] key,
boolean put_operation,
byte diversification_type,
boolean exhaustive_get )
{
return( delegate.createNewDiversification( cause, key, put_operation, diversification_type, exhaustive_get ));
}
public int
getNextValueVersions(
int num )
{
return( delegate.getNextValueVersions(num));
}
public DHTStorageBlock
keyBlockRequest(
DHTTransportContact direct_sender,
byte[] request,
byte[] signature )
{
return( delegate.keyBlockRequest( direct_sender, request, signature ));
}
public DHTStorageBlock
getKeyBlockDetails(
byte[] key )
{
return( delegate.getKeyBlockDetails(key));
}
public DHTStorageBlock[]
getDirectKeyBlocks()
{
return( delegate.getDirectKeyBlocks());
}
public byte[]
getKeyForKeyBlock(
byte[] request )
{
return( delegate.getKeyForKeyBlock( request ));
}
public void
setStorageForKey(
String key,
byte[] data )
{
delegate.setStorageForKey( key, data );
}
public byte[]
getStorageForKey(
String key )
{
return( delegate.getStorageForKey(key));
}
}
}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?