📄 dhtcontrolimpl.java
字号:
diversified[j] = true;
byte[][] diversified_keys =
adapter.diversify( _contact, true, false, encoded_keys[j], _diversifications[j], false );
for (int k=0;k<diversified_keys.length;k++){
put( thread_pool,
diversified_keys[k],
"Diversification of [" + description + "]",
value_sets[j],
timeout,
false,
keys_written,
listener );
}
}
}
}
}finally{
listener.complete( false );
}
}
public void
failed(
DHTTransportContact _contact,
Throwable _error )
{
try{
DHTLog.log( "Store failed " + DHTLog.getString( _contact ) + " -> failed: " + _error.getMessage());
router.contactDead( _contact.getID(), false );
}finally{
listener.complete( true );
}
}
public void
keyBlockRequest(
DHTTransportContact contact,
byte[] request,
byte[] key_signature )
{
DHTStorageBlock key_block = database.keyBlockRequest( null, request, key_signature );
if ( key_block != null ){
// remove this key for any subsequent publishes. Quickest hack
// is to change it into a random key value - this will be rejected
// by the recipient as not being close enough anyway
for (int i=0;i<encoded_keys.length;i++){
if ( Arrays.equals( encoded_keys[i], key_block.getKey())){
byte[] dummy = new byte[encoded_keys[i].length];
new Random().nextBytes( dummy );
encoded_keys[i] = dummy;
}
}
}
}
},
encoded_keys,
value_sets );
}catch( Throwable e ){
Debug.printStackTrace(e);
}
}
}
}
public DHTTransportValue
getLocalValue(
byte[] unencoded_key )
{
final byte[] encoded_key = encodeKey( unencoded_key );
DHTLog.log( "getLocalValue for " + DHTLog.getString( encoded_key ));
DHTDBValue res = database.get( new HashWrapper( encoded_key ));
if ( res == null ){
return( null );
}
return( res );
}
public void
get(
byte[] unencoded_key,
String description,
byte flags,
int max_values,
long timeout,
boolean exhaustive,
boolean high_priority,
final DHTOperationListener get_listener )
{
final byte[] encoded_key = encodeKey( unencoded_key );
DHTLog.log( "get for " + DHTLog.getString( encoded_key ));
getSupport( encoded_key, description, flags, max_values, timeout, exhaustive, high_priority, new DHTOperationListenerDemuxer( get_listener ));
}
public boolean
isDiversified(
byte[] unencoded_key )
{
final byte[] encoded_key = encodeKey( unencoded_key );
return( adapter.isDiversified( encoded_key ));
}
public boolean
lookup(
byte[] unencoded_key,
long timeout,
final DHTOperationListener lookup_listener )
{
final byte[] encoded_key = encodeKey( unencoded_key );
DHTLog.log( "lookup for " + DHTLog.getString( encoded_key ));
final AESemaphore sem = new AESemaphore( "DHTControl:lookup" );
final boolean[] diversified = { false };
DHTOperationListener delegate =
new DHTOperationListener()
{
public void
searching(
DHTTransportContact contact,
int level,
int active_searches )
{
lookup_listener.searching( contact, level, active_searches );
}
public void
found(
DHTTransportContact contact )
{
}
public void
diversified()
{
lookup_listener.diversified();
}
public void
read(
DHTTransportContact contact,
DHTTransportValue value )
{
}
public void
wrote(
DHTTransportContact contact,
DHTTransportValue value )
{
}
public void
complete(
boolean timeout )
{
lookup_listener.complete( timeout );
sem.release();
}
};
lookup( external_lookup_pool, false,
encoded_key,
"lookup",
(byte)0,
false,
timeout,
search_concurrency,
1,
router.getK(),
new lookupResultHandler( delegate )
{
public void
diversify(
DHTTransportContact cause,
byte diversification_type )
{
diversified();
diversified[0] = true;
}
public void
closest(
List closest )
{
for (int i=0;i<closest.size();i++){
lookup_listener.found((DHTTransportContact)closest.get(i));
}
}
});
sem.reserve();
return( diversified[0] );
}
protected void
getSupport(
final byte[] initial_encoded_key,
final String description,
final byte flags,
final int max_values,
final long timeout,
final boolean exhaustive,
final boolean high_priority,
final DHTOperationListenerDemuxer get_listener )
{
// get the initial starting point for the get - may have previously been diversified
byte[][] encoded_keys = adapter.diversify( null, false, true, initial_encoded_key, DHT.DT_NONE, exhaustive );
for (int i=0;i<encoded_keys.length;i++){
final boolean[] diversified = { false };
final byte[] encoded_key = encoded_keys[i];
boolean div = !Arrays.equals( encoded_key, initial_encoded_key );
if ( div ){
get_listener.diversified();
}
final String this_description =
div?("Diversification of [" + description + "]" ):description;
lookup( external_lookup_pool,
high_priority,
encoded_key,
this_description,
flags,
true,
timeout,
search_concurrency * 2, // double conc for priority gets
max_values,
router.getK(),
new lookupResultHandler( get_listener )
{
private List found_values = new ArrayList();
public void
diversify(
DHTTransportContact cause,
byte diversification_type )
{
diversified();
// we only want to follow one diversification
if ( !diversified[0]){
diversified[0] = true;
int rem = max_values==0?0:( max_values - found_values.size());
if ( max_values == 0 || rem > 0 ){
byte[][] diversified_keys = adapter.diversify( cause, false, false, encoded_key, diversification_type, exhaustive );
// should return a max of 1 (0 if diversification refused)
// however, could change one day to search > 1
for (int j=0;j<diversified_keys.length;j++){
getSupport( diversified_keys[j], "Diversification of [" + this_description + "]", flags, rem, timeout, exhaustive, high_priority, get_listener );
}
}
}
}
public void
read(
DHTTransportContact contact,
DHTTransportValue value )
{
found_values.add( value );
super.read( contact, value );
}
public void
closest(
List closest )
{
/* we don't use teh cache-at-closest kad feature
if ( found_values.size() > 0 ){
DHTTransportValue[] values = new DHTTransportValue[found_values.size()];
found_values.toArray( values );
// cache the values at the 'n' closest seen locations
for (int k=0;k<Math.min(cache_at_closest_n,closest.size());k++){
DHTTransportContact contact = (DHTTransportContact)(DHTTransportContact)closest.get(k);
for (int j=0;j<values.length;j++){
wrote( contact, values[j] );
}
contact.sendStore(
new DHTTransportReplyHandlerAdapter()
{
public void
storeReply(
DHTTransportContact _contact,
byte[] _diversifications )
{
// don't consider diversification for cache stores as we're not that
// bothered
DHTLog.log( "Cache store OK " + DHTLog.getString( _contact ));
router.contactAlive( _contact.getID(), new DHTControlContactImpl(_contact));
}
public void
failed(
DHTTransportContact _contact,
Throwable _error )
{
DHTLog.log( "Cache store failed " + DHTLog.getString( _contact ) + " -> failed: " + _error.getMessage());
router.contactDead( _contact.getID(), false );
}
},
new byte[][]{ encoded_key },
new DHTTransportValue[][]{ values });
}
}
*/
}
});
}
}
public byte[]
remove(
byte[] unencoded_key,
String description,
DHTOperationListener listener )
{
final byte[] encoded_key = encodeKey( unencoded_key );
DHTLog.log( "remove for " + DHTLog.getString( encoded_key ));
DHTDBValue res = database.remove( local_contact, new HashWrapper( encoded_key ));
if ( res == null ){
// not found locally, nothing to do
return( null );
}else{
// we remove a key by pushing it back out again with zero length value
put( external_put_pool,
encoded_key,
description,
res,
0,
true,
new HashSet(),
new DHTOperationListenerDemuxer( listener ));
return( res.getValue());
}
}
/**
* The lookup method returns up to K closest nodes to the target
* @param lookup_id
* @return
*/
protected void
lookup(
ThreadPool thread_pool,
boolean high_priority,
final byte[] lookup_id,
final String description,
final byte flags,
final boolean value_search,
final long timeout,
final int concurrency,
final int max_values,
final int search_accuracy,
final lookupResultHandler handler )
{
thread_pool.run(
new task(thread_pool)
{
public void
runSupport()
{
try{
lookupSupportSync( lookup_id, flags, value_search, timeout, concurrency, max_values, search_accuracy, handler );
}catch( Throwable e ){
Debug.printStackTrace(e);
}
}
public byte[]
getTarget()
{
return( lookup_id );
}
public String
getDescription()
{
return( description );
}
}, high_priority );
}
protected void
lookupSupportSync(
final byte[] lookup_id,
byte flags,
boolean value_search,
long timeout,
int concurrency,
int max_values,
final int search_accuracy,
final lookupResultHandler result_handler )
{
boolean timeout_occurred = false;
last_lookup = SystemTime.getCurrentTime();
result_handler.incrementCompletes();
try{
DHTLog.log( "lookup for " + DHTLog.getString( lookup_id ));
if ( value_search ){
if ( database.isKeyBlocked( lookup_id )){
DHTLog.log( "lookup: terminates - key blocked" );
// bail out and pretend everything worked with zero results
return;
}
}
// keep querying successively closer nodes until we have got responses from the K
// closest nodes that we've seen. We might get a bunch of closer nodes that then
// fail to respond, which means we have reconsider further away nodes
// we keep a list of nodes that we have queried to avoid re-querying them
// we keep a list of nodes discovered that we have yet to query
// we have a parallel search limit of A. For each A we effectively loop grabbing
// the currently closest unqueried node, querying it and adding the results to the
// yet-to-query-set (unless already queried)
// we terminate when we have received responses from the K closest nodes we know
// about (excluding failed ones)
// Note that we never widen the root of our search beyond the initial K closest
// that we know about - this could be relaxed
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -