📄 dhtdbimpl.java
字号:
}
((List)data[1]).add( key );
}
}
it = contact_map.values().iterator();
while( it.hasNext()){
final Object[] data = (Object[])it.next();
final DHTTransportContact contact = (DHTTransportContact)data[0];
// move to anti-spoof on cache forwards - gotta do a find-node first
// to get the random id
final AESemaphore sem = new AESemaphore( "DHTDB:cacheForward" );
contact.sendFindNode(
new DHTTransportReplyHandlerAdapter()
{
public void
findNodeReply(
DHTTransportContact _contact,
DHTTransportContact[] _contacts )
{
anti_spoof_done.add( _contact );
try{
// System.out.println( "cacheForward: pre-store findNode OK" );
List keys = (List)data[1];
byte[][] store_keys = new byte[keys.size()][];
DHTTransportValue[][] store_values = new DHTTransportValue[store_keys.length][];
keys_published[0] += store_keys.length;
for (int i=0;i<store_keys.length;i++){
HashWrapper wrapper = (HashWrapper)keys.get(i);
store_keys[i] = wrapper.getHash();
List values = (List)republish.get( wrapper );
store_values[i] = new DHTTransportValue[values.size()];
values_published[0] += store_values[i].length;
for (int j=0;j<values.size();j++){
DHTDBValueImpl value = (DHTDBValueImpl)values.get(j);
// we reduce the cache distance by 1 here as it is incremented by the
// recipients
store_values[i][j] = value.getValueForRelay(local_contact);
}
}
List contacts = new ArrayList();
contacts.add( contact );
republish_ops[0]++;
control.putDirectEncodedKeys(
store_keys,
"Republish cache",
store_values,
contacts );
}finally{
sem.release();
}
}
public void
failed(
DHTTransportContact _contact,
Throwable _error )
{
try{
// System.out.println( "cacheForward: pre-store findNode Failed" );
DHTLog.log( "cacheForward: pre-store findNode failed " + DHTLog.getString( _contact ) + " -> failed: " + _error.getMessage());
router.contactDead( _contact.getID(), false);
}finally{
sem.release();
}
}
},
contact.getProtocolVersion() >= DHTTransportUDP.PROTOCOL_VERSION_ANTI_SPOOF2?new byte[0]:new byte[20] );
sem.reserve();
}
try{
this_mon.enter();
for (int i=0;i<stop_caching.size();i++){
DHTDBMapping mapping = (DHTDBMapping)stored_values.remove( stop_caching.get(i));
if ( mapping != null ){
mapping.destroy();
}
}
}finally{
this_mon.exit();
}
}
DHTStorageBlock[] direct_key_blocks = getDirectKeyBlocks();
if ( direct_key_blocks.length > 0 ){
for (int i=0;i<direct_key_blocks.length;i++){
final DHTStorageBlock key_block = direct_key_blocks[i];
List contacts = control.getClosestKContactsList( key_block.getKey(), false );
boolean forward_it = false;
// ensure that the key is close enough to us
for (int j=0;j<contacts.size();j++){
final DHTTransportContact contact = (DHTTransportContact)contacts.get(j);
if ( router.isID( contact.getID())){
forward_it = true;
break;
}
}
for (int j=0; forward_it && j<contacts.size();j++){
final DHTTransportContact contact = (DHTTransportContact)contacts.get(j);
if ( key_block.hasBeenSentTo( contact )){
continue;
}
if ( contact.getProtocolVersion() >= DHTTransportUDP.PROTOCOL_VERSION_BLOCK_KEYS ){
final Runnable task =
new Runnable()
{
public void
run()
{
contact.sendKeyBlock(
new DHTTransportReplyHandlerAdapter()
{
public void
keyBlockReply(
DHTTransportContact _contact )
{
DHTLog.log( "key block forward ok " + DHTLog.getString( _contact ));
key_block.sentTo( _contact );
}
public void
failed(
DHTTransportContact _contact,
Throwable _error )
{
DHTLog.log( "key block forward failed " + DHTLog.getString( _contact ) + " -> failed: " + _error.getMessage());
}
},
key_block.getRequest(),
key_block.getCertificate());
}
};
if ( anti_spoof_done.contains( contact )){
task.run();
}else{
contact.sendFindNode(
new DHTTransportReplyHandlerAdapter()
{
public void
findNodeReply(
DHTTransportContact contact,
DHTTransportContact[] contacts )
{
task.run();
}
public void
failed(
DHTTransportContact _contact,
Throwable _error )
{
// System.out.println( "nodeAdded: pre-store findNode Failed" );
DHTLog.log( "pre-kb findNode failed " + DHTLog.getString( _contact ) + " -> failed: " + _error.getMessage());
router.contactDead( _contact.getID(), false);
}
},
contact.getProtocolVersion() >= DHTTransportUDP.PROTOCOL_VERSION_ANTI_SPOOF2?new byte[0]:new byte[20] );
}
}
}
}
}
return( new int[]{ values_published[0], keys_published[0], republish_ops[0] });
}
protected void
checkCacheExpiration(
boolean force )
{
long now = SystemTime.getCurrentTime();
if ( !force ){
long elapsed = now - last_cache_expiry_check;
if ( elapsed > 0 && elapsed < MIN_CACHE_EXPIRY_CHECK_INTERVAL ){
return;
}
}
try{
this_mon.enter();
last_cache_expiry_check = now;
Iterator it = stored_values.values().iterator();
while( it.hasNext()){
DHTDBMapping mapping = (DHTDBMapping)it.next();
if ( mapping.getValueCount() == 0 ){
mapping.destroy();
it.remove();
}else{
Iterator it2 = mapping.getValues();
while( it2.hasNext()){
DHTDBValueImpl value = (DHTDBValueImpl)it2.next();
if ( !value.isLocal()){
// distance 1 = initial store location. We use the initial creation date
// when deciding whether or not to remove this, plus a bit, as the
// original publisher is supposed to republish these
if ( now - value.getCreationTime() > original_republish_interval + ORIGINAL_REPUBLISH_INTERVAL_GRACE ){
DHTLog.log( "removing cache entry (" + value.getString() + ")" );
it2.remove();
}
}
}
}
}
}finally{
this_mon.exit();
}
}
protected DHTTransportContact
getLocalContact()
{
return( local_contact );
}
protected DHTStorageAdapter
getAdapter()
{
return( adapter );
}
protected void
log(
String str )
{
logger.log( str );
}
public DHTDBStats
getStats()
{
return( this );
}
public void
print()
{
Map count = new TreeMap();
try{
this_mon.enter();
logger.log( "Stored keys = " + stored_values.size() + ", values = " + getValueDetails()[DHTDBStats.VD_VALUE_COUNT]);
Iterator it = stored_values.entrySet().iterator();
while( it.hasNext()){
Map.Entry entry = (Map.Entry)it.next();
HashWrapper value_key = (HashWrapper)entry.getKey();
DHTDBMapping mapping = (DHTDBMapping)entry.getValue();
DHTDBValue[] values = mapping.get(null,0,(byte)0);
for (int i=0;i<values.length;i++){
DHTDBValue value = values[i];
Integer key = new Integer( value.isLocal()?0:1);
Object[] data = (Object[])count.get( key );
if ( data == null ){
data = new Object[2];
data[0] = new Integer(1);
data[1] = "";
count.put( key, data );
}else{
data[0] = new Integer(((Integer)data[0]).intValue() + 1 );
}
String s = (String)data[1];
s += (s.length()==0?"":", ") + "key=" + DHTLog.getString2(value_key.getHash()) + ",val=" + value.getString();
data[1] = s;
}
}
it = count.keySet().iterator();
while( it.hasNext()){
Integer k = (Integer)it.next();
Object[] data = (Object[])count.get(k);
logger.log( " " + k + " -> " + data[0] + " entries" ); // ": " + data[1]);
}
it = stored_values.entrySet().iterator();
String str = " ";
int str_entries = 0;
while( it.hasNext()){
Map.Entry entry = (Map.Entry)it.next();
HashWrapper value_key = (HashWrapper)entry.getKey();
DHTDBMapping mapping = (DHTDBMapping)entry.getValue();
if ( str_entries == 16 ){
logger.log( str );
str = " ";
str_entries = 0;
}
str_entries++;
str += (str_entries==1?"":", ") + DHTLog.getString2(value_key.getHash()) + " -> " + mapping.getValueCount() + "/" + mapping.getHits()+"["+mapping.getLocalSize()+","+mapping.getDirectSize()+","+mapping.getIndirectSize() + "]";
}
if ( str_entries > 0 ){
logger.log( str );
}
}finally{
this_mon.exit();
}
}
protected void
banContact(
final DHTTransportContact contact,
final String reason )
{
new AEThread( "DHTDBImpl:delayed flood delete", true )
{
public void
runSupport()
{
// delete their data on a separate thread so as not to
// interfere with the current action
try{
this_mon.enter();
Iterator it = stored_values.values().iterator();
while( it.hasNext()){
DHTDBMapping mapping = (DHTDBMapping)it.next();
Iterator it2 = mapping.getDirectValues();
while( it2.hasNext()){
DHTDBValueImpl val = (DHTDBValueImpl)it2.next();
if ( !val.isLocal()){
if ( Arrays.equals( val.getOriginator().getID(), contact.getID())){
it.remove();
}
}
}
}
}finally{
this_mon.exit();
}
}
}.start();
logger.log( "Banning " + contact.getString() + " due to store flooding (" + reason + ")" );
ip_filter.ban(
contact.getAddress().getAddress().getHostAddress(),
"DHT: Sender stored excessive entries at this node (" + reason + ")" );
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -