📄 dhtcontrolimpl.java
字号:
DHTLog.log( "get for " + DHTLog.getString( encoded_key ));
getSupport( encoded_key, description, flags, max_values, timeout, exhaustive, new DHTOperationListenerDemuxer( get_listener ));
}
public void
getSupport(
final byte[] initial_encoded_key,
final String description,
final byte flags,
final int max_values,
final long timeout,
final boolean exhaustive,
final DHTOperationListenerDemuxer get_listener )
{
// get the initial starting point for the get - may have previously been diversified
byte[][] encoded_keys = adapter.diversify( null, false, true, initial_encoded_key, DHT.DT_NONE, exhaustive );
for (int i=0;i<encoded_keys.length;i++){
final boolean[] diversified = { false };
final byte[] encoded_key = encoded_keys[i];
final String this_description =
Arrays.equals( encoded_key, initial_encoded_key )?
description:
("Diversification of [" + description + "]" );
lookup( external_lookup_pool,
encoded_key,
this_description,
flags,
true,
timeout,
search_concurrency,
max_values,
router.getK(),
new lookupResultHandler( get_listener )
{
private List found_values = new ArrayList();
public void
diversify(
DHTTransportContact cause,
byte diversification_type )
{
// we only want to follow one diversification
if ( !diversified[0]){
diversified[0] = true;
int rem = max_values==0?0:( max_values - found_values.size());
if ( max_values == 0 || rem > 0 ){
byte[][] diversified_keys = adapter.diversify( cause, false, false, encoded_key, diversification_type, exhaustive );
// should return a max of 1 (0 if diversification refused)
// however, could change one day to search > 1
for (int j=0;j<diversified_keys.length;j++){
getSupport( diversified_keys[j], "Diversification of [" + this_description + "]", flags, rem, timeout, exhaustive, get_listener );
}
}
}
}
public void
read(
DHTTransportContact contact,
DHTTransportValue value )
{
found_values.add( value );
super.read( contact, value );
}
public void
closest(
List closest )
{
/* we don't use teh cache-at-closest kad feature
if ( found_values.size() > 0 ){
DHTTransportValue[] values = new DHTTransportValue[found_values.size()];
found_values.toArray( values );
// cache the values at the 'n' closest seen locations
for (int k=0;k<Math.min(cache_at_closest_n,closest.size());k++){
DHTTransportContact contact = (DHTTransportContact)(DHTTransportContact)closest.get(k);
for (int j=0;j<values.length;j++){
wrote( contact, values[j] );
}
contact.sendStore(
new DHTTransportReplyHandlerAdapter()
{
public void
storeReply(
DHTTransportContact _contact,
byte[] _diversifications )
{
// don't consider diversification for cache stores as we're not that
// bothered
DHTLog.log( "Cache store OK " + DHTLog.getString( _contact ));
router.contactAlive( _contact.getID(), new DHTControlContactImpl(_contact));
}
public void
failed(
DHTTransportContact _contact,
Throwable _error )
{
DHTLog.log( "Cache store failed " + DHTLog.getString( _contact ) + " -> failed: " + _error.getMessage());
router.contactDead( _contact.getID(), false );
}
},
new byte[][]{ encoded_key },
new DHTTransportValue[][]{ values });
}
}
*/
}
});
}
}
public byte[]
remove(
byte[] unencoded_key,
String description,
DHTOperationListener listener )
{
final byte[] encoded_key = encodeKey( unencoded_key );
DHTLog.log( "remove for " + DHTLog.getString( encoded_key ));
DHTDBValue res = database.remove( local_contact, new HashWrapper( encoded_key ));
if ( res == null ){
// not found locally, nothing to do
return( null );
}else{
// we remove a key by pushing it back out again with zero length value
put( external_put_pool,
encoded_key,
description,
res,
0,
true,
new HashSet(),
new DHTOperationListenerDemuxer( listener ));
return( res.getValue());
}
}
/**
* The lookup method returns up to K closest nodes to the target
* @param lookup_id
* @return
*/
protected void
lookup(
ThreadPool thread_pool,
final byte[] lookup_id,
final String description,
final byte flags,
final boolean value_search,
final long timeout,
final int concurrency,
final int max_values,
final int search_accuracy,
final lookupResultHandler handler )
{
thread_pool.run(
new task(thread_pool)
{
public void
runSupport()
{
try{
lookupSupportSync( lookup_id, flags, value_search, timeout, concurrency, max_values, search_accuracy, handler );
}catch( Throwable e ){
Debug.printStackTrace(e);
}
}
public byte[]
getTarget()
{
return( lookup_id );
}
public String
getDescription()
{
return( description );
}
});
}
protected void
lookupSupportSync(
final byte[] lookup_id,
byte flags,
boolean value_search,
long timeout,
int concurrency,
int max_values,
final int search_accuracy,
final lookupResultHandler result_handler )
{
boolean timeout_occurred = false;
last_lookup = SystemTime.getCurrentTime();
result_handler.incrementCompletes();
try{
DHTLog.log( "lookup for " + DHTLog.getString( lookup_id ));
// keep querying successively closer nodes until we have got responses from the K
// closest nodes that we've seen. We might get a bunch of closer nodes that then
// fail to respond, which means we have reconsider further away nodes
// we keep a list of nodes that we have queried to avoid re-querying them
// we keep a list of nodes discovered that we have yet to query
// we have a parallel search limit of A. For each A we effectively loop grabbing
// the currently closest unqueried node, querying it and adding the results to the
// yet-to-query-set (unless already queried)
// we terminate when we have received responses from the K closest nodes we know
// about (excluding failed ones)
// Note that we never widen the root of our search beyond the initial K closest
// that we know about - this could be relaxed
// contacts remaining to query
// closest at front
final Set contacts_to_query = getClosestContactsSet( lookup_id, false );
final AEMonitor contacts_to_query_mon = new AEMonitor( "DHTControl:ctq" );
final Map level_map = new HashMap();
Iterator it = contacts_to_query.iterator();
while( it.hasNext()){
DHTTransportContact contact = (DHTTransportContact)it.next();
result_handler.found( contact );
level_map.put( contact , new Integer(0));
}
// record the set of contacts we've queried to avoid re-queries
final Map contacts_queried = new HashMap();
// record the set of contacts that we've had a reply from
// furthest away at front
final Set ok_contacts = new sortedTransportContactSet( lookup_id, false ).getSet();
// this handles the search concurrency
final AESemaphore search_sem = new AESemaphore( "DHTControl:search", concurrency );
final int[] idle_searches = { 0 };
final int[] active_searches = { 0 };
final int[] values_found = { 0 };
final int[] value_replies = { 0 };
final Set values_found_set = new HashSet();
long start = SystemTime.getCurrentTime();
while( true ){
if ( timeout > 0 ){
long now = SystemTime.getCurrentTime();
// check for clock being set back
if ( now < start ){
start = now;
}
long remaining = timeout - ( now - start );
if ( remaining <= 0 ){
DHTLog.log( "lookup: terminates - timeout" );
timeout_occurred = true;
break;
}
// get permission to kick off another search
if ( !search_sem.reserve( remaining )){
DHTLog.log( "lookup: terminates - timeout" );
timeout_occurred = true;
break;
}
}else{
search_sem.reserve();
}
try{
contacts_to_query_mon.enter();
if ( values_found[0] >= max_values ||
value_replies[0]>= 2 ){ // all hits should have the same values anyway...
break;
}
// if nothing pending then we need to wait for the results of a previous
// search to arrive. Of course, if there are no searches active then
// we've run out of things to do
if ( contacts_to_query.size() == 0 ){
if ( active_searches[0] == 0 ){
DHTLog.log( "lookup: terminates - no contacts left to query" );
break;
}
idle_searches[0]++;
continue;
}
// select the next contact to search
DHTTransportContact closest = (DHTTransportContact)contacts_to_query.iterator().next();
// if the next closest is further away than the furthest successful hit so
// far and we have K hits, we're done
if ( ok_contacts.size() == search_accuracy ){
DHTTransportContact furthest_ok = (DHTTransportContact)ok_contacts.iterator().next();
int distance = computeAndCompareDistances( furthest_ok.getID(), closest.getID(), lookup_id );
if ( distance <= 0 ){
DHTLog.log( "lookup: terminates - we've searched the closest " + search_accuracy + " contacts" );
break;
}
}
// we optimise the first few entries based on their Vivaldi distance. Only a few
// however as we don't want to start too far away from the target.
if ( contacts_queried.size() < concurrency ){
VivaldiPosition loc_vp = local_contact.getVivaldiPosition();
if ( !loc_vp.getCoordinates().atOrigin()){
DHTTransportContact vp_closest = null;
Iterator vp_it = contacts_to_query.iterator();
int vp_count_limit = (concurrency*2) - contacts_queried.size();
int vp_count = 0;
float best_dist = Float.MAX_VALUE;
while( vp_it.hasNext() && vp_count < vp_count_limit ){
vp_count++;
DHTTransportContact entry = (DHTTransportContact)vp_it.next();
VivaldiPosition vp = entry.getVivaldiPosition();
Coordinates coords = vp.getCoordinates();
if ( !coords.atOrigin()){
float dist = loc_vp.estimateRTT( coords );
if ( dist < best_dist ){
best_dist = dist;
vp_closest = entry;
// System.out.println( start + ": lookup for " + DHTLog.getString2( lookup_id ) + ": vp override (dist = " + dist + ")");
}
}
}
if ( vp_closest != null ){
// override ID closest with VP closes
closest = vp_closest;
}
}
}
contacts_to_query.remove( closest );
contacts_queried.put( new HashWrapper( closest.getID()), closest );
// never search ourselves!
if ( router.isID( closest.getID())){
search_sem.release();
continue;
}
final int search_level = ((Integer)level_map.get(closest)).intValue();
active_searches[0]++;
result_handler.searching( closest, search_level, active_searches[0] );
DHTTransportReplyHandlerAdapter handler =
new DHTTransportReplyHandlerAdapter()
{
private boolean value_reply_received = false;
public void
findNodeReply(
DHTTransportContact target_contact,
DHTTransportContact[] reply_contacts )
{
try{
DHTLog.log( "findNodeReply: " + DHTLog.getString( reply_contacts ));
router.contactAlive( target_contact.getID(), new DHTControlContactImpl(target_contact));
for (int i=0;i<reply_contacts.length;i++){
DHTTransportContact contact = reply_contacts[i];
// ignore responses that are ourselves
if ( compareDistances( router.getID(), contact.getID()) == 0 ){
continue;
}
// dunno if its alive or not, however record its existance
router.contactKnown( contact.getID(), new DHTControlContactImpl(contact));
}
try{
contacts_to_query_mon.enter();
ok_contacts.add( target_contact );
if ( ok_contacts.size() > search_accuracy ){
// delete the furthest away
Iterator ok_it = ok_contacts.iterator();
ok_it.next();
ok_it.remove();
}
for (int i=0;i<reply_contacts.length;i++){
DHTTransportContact contact = reply_contacts[i];
// ignore responses that are ourselves
if ( compareDistances( router.getID(), contact.getID()) == 0 ){
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -