📄 dhtdbmapping.java
字号:
// its important to ignore old versions as a peer's increasing version sequence may
// have been reset and if this is the case we want the "future" values to timeout
if ( TRACE_ADDS ){
System.out.println( "addDirect[ignore]:" + old.getString() + "/" + value.getString());
}
}
// put the old value back!
direct_originator_map.put( value_key, old );
return;
}
if ( TRACE_ADDS ){
System.out.println( "addDirect:" + old.getString() + "/" + value.getString());
}
direct_data_size -= old.getValue().length;
if ( old.isLocal()){
local_size -= old.getValue().length;
}
}else{
if ( TRACE_ADDS ){
System.out.println( "addDirect:[new]" + value.getString());
}
}
direct_data_size += value.getValue().length;
if ( value.isLocal()){
local_size += value.getValue().length;
}
if ( old == null ){
informAdded( value );
}else{
informUpdated( old, value );
}
}
protected DHTDBValueImpl
removeDirectValue(
HashWrapper value_key )
{
DHTDBValueImpl old = (DHTDBValueImpl)direct_originator_map.remove( value_key );
if ( old != null ){
direct_data_size -= old.getValue().length;
if ( old.isLocal()){
local_size -= old.getValue().length;
}
informDeleted( old );
}
return( old );
}
protected void
addIndirectValue(
HashWrapper value_key,
DHTDBValueImpl value )
{
DHTDBValueImpl old = (DHTDBValueImpl)indirect_originator_value_map.put( value_key, value );
if ( old != null ){
// discard updates that are older than current value
int old_version = old.getVersion();
int new_version = value.getVersion();
if ( old_version != -1 && new_version != -1 && old_version >= new_version ){
if ( old_version == new_version ){
if ( TRACE_ADDS ){
System.out.println( "addIndirect[reset]:" + old.getString() + "/" + value.getString());
}
old.reset(); // update store time as this means we don't need to republish
// as someone else has just done it
}else{
if ( TRACE_ADDS ){
System.out.println( "addIndirect[ignore]:" + old.getString() + "/" + value.getString());
}
}
// put the old value back!
indirect_originator_value_map.put( value_key, old );
return;
}
// vague backwards compatability - if the creation date of the "new" value is significantly
// less than the old then we ignore it (given that creation date is adjusted for time-skew you can
// see the problem with this approach...)
if ( old_version == -1 || new_version == -1 ){
if ( old.getCreationTime() > value.getCreationTime() + 30000 ){
if ( TRACE_ADDS ){
System.out.println( "backward compat: ignoring store: " + old.getString() + "/" + value.getString());
}
// put the old value back!
indirect_originator_value_map.put( value_key, old );
return;
}
}
if ( TRACE_ADDS ){
System.out.println( "addIndirect:" + old.getString() + "/" + value.getString());
}
indirect_data_size -= old.getValue().length;
if ( old.isLocal()){
local_size -= old.getValue().length;
}
}else{
if ( TRACE_ADDS ){
System.out.println( "addIndirect:[new]" + value.getString());
}
}
indirect_data_size += value.getValue().length;
if ( value.isLocal()){
local_size += value.getValue().length;
}
if ( old == null ){
informAdded( value );
}else{
informUpdated( old, value );
}
}
protected void
removeIndirectValue(
HashWrapper value_key )
{
DHTDBValueImpl old = (DHTDBValueImpl)indirect_originator_value_map.remove( value_key );
if ( old != null ){
indirect_data_size -= old.getValue().length;
if ( old.isLocal()){
local_size -= old.getValue().length;
}
informDeleted( old );
}
}
protected void
destroy()
{
try{
if ( adapter_key != null ){
Iterator it = getValues();
while( it.hasNext()){
it.next();
it.remove();
}
db.getAdapter().keyDeleted( adapter_key );
}
}catch( Throwable e ){
Debug.printStackTrace(e);
}
}
private void
informDeleted(
DHTDBValueImpl value )
{
boolean direct =
(!value.isLocal())&&
Arrays.equals( value.getOriginator().getID(), value.getSender().getID());
if ( direct ){
removeFromBloom( value );
}
try{
if ( adapter_key != null ){
db.getAdapter().valueDeleted( adapter_key, value );
diversification_state = adapter_key.getDiversificationType();
}
}catch( Throwable e ){
Debug.printStackTrace(e);
}
}
private void
informAdded(
DHTDBValueImpl value )
{
boolean direct =
(!value.isLocal()) &&
Arrays.equals( value.getOriginator().getID(), value.getSender().getID());
if ( direct ){
addToBloom( value );
}
try{
if ( adapter_key != null ){
db.getAdapter().valueAdded( adapter_key, value );
diversification_state = adapter_key.getDiversificationType();
}
}catch( Throwable e ){
Debug.printStackTrace(e);
}
}
private void
informUpdated(
DHTDBValueImpl old_value,
DHTDBValueImpl new_value)
{
boolean old_direct =
(!old_value.isLocal()) &&
Arrays.equals( old_value.getOriginator().getID(), old_value.getSender().getID());
boolean new_direct =
(!new_value.isLocal()) &&
Arrays.equals( new_value.getOriginator().getID(), new_value.getSender().getID());
if ( new_direct && !old_direct ){
addToBloom( new_value );
}
try{
if ( adapter_key != null ){
db.getAdapter().valueUpdated( adapter_key, old_value, new_value );
diversification_state = adapter_key.getDiversificationType();
}
}catch( Throwable e ){
Debug.printStackTrace(e);
}
}
private void
informRead(
DHTTransportContact contact ){
try{
if ( adapter_key != null && contact != null ){
db.getAdapter().keyRead( adapter_key, contact );
diversification_state = adapter_key.getDiversificationType();
}
}catch( Throwable e ){
Debug.printStackTrace(e);
}
}
protected void
addToBloom(
DHTDBValueImpl value )
{
// we don't check for flooding on indirect stores as this could be used to force a
// direct store to be bounced (flood a node with indirect stores before the direct
// store occurs)
DHTTransportContact originator = value.getOriginator();
int hit_count = ip_count_bloom_filter.add( originator.getAddress().getAddress().getAddress());
if ( DHTLog.LOCAL_BLOOM_TRACE ){
System.out.println( "direct local add from " + originator.getAddress() + ", hit count = " + hit_count );
}
// allow up to 10% bloom filter utilisation
if ( ip_count_bloom_filter.getSize() / ip_count_bloom_filter.getEntryCount() < 10 ){
rebuildIPBloomFilter( true );
}
if ( hit_count >= 15 ){
db.banContact( originator, "local flood on '" + DHTLog.getFullString( key.getBytes()) + "'" );
}
}
protected void
removeFromBloom(
DHTDBValueImpl value )
{
DHTTransportContact originator = value.getOriginator();
int hit_count = ip_count_bloom_filter.remove( originator.getAddress().getAddress().getAddress());
if ( DHTLog.LOCAL_BLOOM_TRACE ){
System.out.println( "direct local remove from " + originator.getAddress() + ", hit count = " + hit_count );
}
}
protected void
rebuildIPBloomFilter(
boolean increase_size )
{
BloomFilter new_filter;
if ( increase_size ){
new_filter = BloomFilterFactory.createAddRemove4Bit( ip_count_bloom_filter.getSize() + IP_COUNT_BLOOM_SIZE_INCREASE_CHUNK );
}else{
new_filter = BloomFilterFactory.createAddRemove4Bit( ip_count_bloom_filter.getSize());
}
try{
// only do flood prevention on direct stores as we can't trust the originator
// details for indirect and this can be used to DOS a direct store later
Iterator it = getDirectValues();
int max_hits = 0;
while( it.hasNext()){
DHTDBValueImpl val = (DHTDBValueImpl)it.next();
if ( !val.isLocal()){
// logger.log( " adding " + val.getOriginator().getAddress());
int hits = new_filter.add( val.getOriginator().getAddress().getAddress().getAddress());
if ( hits > max_hits ){
max_hits = hits;
}
}
}
if ( DHTLog.LOCAL_BLOOM_TRACE ){
db.log( "Rebuilt local IP bloom filter, size = " + new_filter.getSize() + ", entries =" + new_filter.getEntryCount()+", max hits = " + max_hits );
}
}finally{
ip_count_bloom_filter = new_filter;
}
}
protected class
valueIterator
implements Iterator
{
private List maps = new ArrayList(2);
private int map_index = 0;
private Map map;
private Iterator it;
private DHTDBValueImpl value;
protected
valueIterator(
boolean direct,
boolean indirect )
{
if ( direct ){
maps.add( direct_originator_map );
}
if ( indirect ){
maps.add( indirect_originator_value_map );
}
}
public boolean
hasNext()
{
if ( it != null && it.hasNext()){
return( true );
}
while( map_index < maps.size() ){
map = (Map)maps.get(map_index++);
it = map.values().iterator();
if ( it.hasNext()){
return( true );
}
}
return( false );
}
public Object
next()
{
if ( hasNext()){
value = (DHTDBValueImpl)it.next();
return( value );
}
throw( new NoSuchElementException());
}
public void
remove()
{
if ( it == null ){
throw( new IllegalStateException());
}
if ( value != null ){
if( value.isLocal()){
local_size -= value.getValue().length;
}
if ( map == indirect_originator_value_map ){
indirect_data_size -= value.getValue().length;
}else{
direct_data_size -= value.getValue().length;
}
// remove before informing
it.remove();
informDeleted( value );
}else{
it.remove();
}
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -