📄 dhtcontrolimpl.java
字号:
// contacts remaining to query
// closest at front
final Set contacts_to_query = getClosestContactsSet( lookup_id, false );
final AEMonitor contacts_to_query_mon = new AEMonitor( "DHTControl:ctq" );
final Map level_map = new HashMap();
Iterator it = contacts_to_query.iterator();
while( it.hasNext()){
DHTTransportContact contact = (DHTTransportContact)it.next();
result_handler.found( contact );
level_map.put( contact , new Integer(0));
}
// record the set of contacts we've queried to avoid re-queries
final Map contacts_queried = new HashMap();
// record the set of contacts that we've had a reply from
// furthest away at front
final Set ok_contacts = new sortedTransportContactSet( lookup_id, false ).getSet();
// this handles the search concurrency
final AESemaphore search_sem = new AESemaphore( "DHTControl:search", concurrency );
final int[] idle_searches = { 0 };
final int[] active_searches = { 0 };
final int[] values_found = { 0 };
final int[] value_replies = { 0 };
final Set values_found_set = new HashSet();
final boolean[] key_blocked = { false };
long start = SystemTime.getCurrentTime();
while( true ){
if ( timeout > 0 ){
long now = SystemTime.getCurrentTime();
// check for clock being set back
if ( now < start ){
start = now;
}
long remaining = timeout - ( now - start );
if ( remaining <= 0 ){
DHTLog.log( "lookup: terminates - timeout" );
timeout_occurred = true;
break;
}
// get permission to kick off another search
if ( !search_sem.reserve( remaining )){
DHTLog.log( "lookup: terminates - timeout" );
timeout_occurred = true;
break;
}
}else{
search_sem.reserve();
}
try{
contacts_to_query_mon.enter();
if ( values_found[0] >= max_values ||
value_replies[0]>= 2 ){ // all hits should have the same values anyway...
break;
}
// if we've received a key block then easiest way to terminate the query is to
// dump any outstanding targets
if ( key_blocked[0] ){
contacts_to_query.clear();
}
// if nothing pending then we need to wait for the results of a previous
// search to arrive. Of course, if there are no searches active then
// we've run out of things to do
if ( contacts_to_query.size() == 0 ){
if ( active_searches[0] == 0 ){
DHTLog.log( "lookup: terminates - no contacts left to query" );
break;
}
idle_searches[0]++;
continue;
}
// select the next contact to search
DHTTransportContact closest = (DHTTransportContact)contacts_to_query.iterator().next();
// if the next closest is further away than the furthest successful hit so
// far and we have K hits, we're done
if ( ok_contacts.size() == search_accuracy ){
DHTTransportContact furthest_ok = (DHTTransportContact)ok_contacts.iterator().next();
int distance = computeAndCompareDistances( furthest_ok.getID(), closest.getID(), lookup_id );
if ( distance <= 0 ){
DHTLog.log( "lookup: terminates - we've searched the closest " + search_accuracy + " contacts" );
break;
}
}
// we optimise the first few entries based on their Vivaldi distance. Only a few
// however as we don't want to start too far away from the target.
if ( contacts_queried.size() < concurrency ){
DHTNetworkPosition[] loc_nps = local_contact.getNetworkPositions();
DHTTransportContact vp_closest = null;
Iterator vp_it = contacts_to_query.iterator();
int vp_count_limit = (concurrency*2) - contacts_queried.size();
int vp_count = 0;
float best_dist = Float.MAX_VALUE;
while( vp_it.hasNext() && vp_count < vp_count_limit ){
vp_count++;
DHTTransportContact entry = (DHTTransportContact)vp_it.next();
DHTNetworkPosition[] rem_nps = entry.getNetworkPositions();
float dist = DHTNetworkPositionManager.estimateRTT( loc_nps, rem_nps );
if ( (!Float.isNaN(dist)) && dist < best_dist ){
best_dist = dist;
vp_closest = entry;
// System.out.println( start + ": lookup for " + DHTLog.getString2( lookup_id ) + ": vp override (dist = " + dist + ")");
}
if ( vp_closest != null ){
// override ID closest with VP closes
closest = vp_closest;
}
}
}
contacts_to_query.remove( closest );
contacts_queried.put( new HashWrapper( closest.getID()), closest );
// never search ourselves!
if ( router.isID( closest.getID())){
search_sem.release();
continue;
}
final int search_level = ((Integer)level_map.get(closest)).intValue();
active_searches[0]++;
result_handler.searching( closest, search_level, active_searches[0] );
DHTTransportReplyHandlerAdapter handler =
new DHTTransportReplyHandlerAdapter()
{
private boolean value_reply_received = false;
public void
findNodeReply(
DHTTransportContact target_contact,
DHTTransportContact[] reply_contacts )
{
try{
DHTLog.log( "findNodeReply: " + DHTLog.getString( reply_contacts ));
router.contactAlive( target_contact.getID(), new DHTControlContactImpl(target_contact));
for (int i=0;i<reply_contacts.length;i++){
DHTTransportContact contact = reply_contacts[i];
// ignore responses that are ourselves
if ( compareDistances( router.getID(), contact.getID()) == 0 ){
continue;
}
// dunno if its alive or not, however record its existance
router.contactKnown( contact.getID(), new DHTControlContactImpl(contact));
}
try{
contacts_to_query_mon.enter();
ok_contacts.add( target_contact );
if ( ok_contacts.size() > search_accuracy ){
// delete the furthest away
Iterator ok_it = ok_contacts.iterator();
ok_it.next();
ok_it.remove();
}
for (int i=0;i<reply_contacts.length;i++){
DHTTransportContact contact = reply_contacts[i];
// ignore responses that are ourselves
if ( compareDistances( router.getID(), contact.getID()) == 0 ){
continue;
}
if ( contacts_queried.get( new HashWrapper( contact.getID())) == null &&
(!contacts_to_query.contains( contact ))){
DHTLog.log( " new contact for query: " + DHTLog.getString( contact ));
contacts_to_query.add( contact );
result_handler.found( contact );
level_map.put( contact, new Integer( search_level+1));
if ( idle_searches[0] > 0 ){
idle_searches[0]--;
search_sem.release();
}
}else{
// DHTLog.log( " already queried: " + DHTLog.getString( contact ));
}
}
}finally{
contacts_to_query_mon.exit();
}
}finally{
try{
contacts_to_query_mon.enter();
active_searches[0]--;
}finally{
contacts_to_query_mon.exit();
}
search_sem.release();
}
}
public void
findValueReply(
DHTTransportContact contact,
DHTTransportValue[] values,
byte diversification_type,
boolean more_to_come )
{
DHTLog.log( "findValueReply: " + DHTLog.getString( values ) + ",mtc=" + more_to_come + ", dt=" + diversification_type );
try{
if ( !key_blocked[0] ){
if ( diversification_type != DHT.DT_NONE ){
// diversification instruction
result_handler.diversify( contact, diversification_type );
}
}
value_reply_received = true;
router.contactAlive( contact.getID(), new DHTControlContactImpl(contact));
int new_values = 0;
if ( !key_blocked[0] ){
for (int i=0;i<values.length;i++){
DHTTransportValue value = values[i];
DHTTransportContact originator = value.getOriginator();
// can't just use originator id as this value can be DOSed (see DB code)
byte[] originator_id = originator.getID();
byte[] value_bytes = value.getValue();
byte[] value_id = new byte[originator_id.length + value_bytes.length];
System.arraycopy( originator_id, 0, value_id, 0, originator_id.length );
System.arraycopy( value_bytes, 0, value_id, originator_id.length, value_bytes.length );
HashWrapper x = new HashWrapper( value_id );
if ( !values_found_set.contains( x )){
new_values++;
values_found_set.add( x );
result_handler.read( contact, values[i] );
}
}
}
try{
contacts_to_query_mon.enter();
if ( !more_to_come ){
value_replies[0]++;
}
values_found[0] += new_values;
}finally{
contacts_to_query_mon.exit();
}
}finally{
if ( !more_to_come ){
try{
contacts_to_query_mon.enter();
active_searches[0]--;
}finally{
contacts_to_query_mon.exit();
}
search_sem.release();
}
}
}
public void
findValueReply(
DHTTransportContact contact,
DHTTransportContact[] contacts )
{
findNodeReply( contact, contacts );
}
public void
failed(
DHTTransportContact target_contact,
Throwable error )
{
try{
// if at least one reply has been received then we
// don't treat subsequent failure as indication of
// a contact failure (just packet loss)
if ( !value_reply_received ){
DHTLog.log( "findNode/findValue " + DHTLog.getString( target_contact ) + " -> failed: " + error.getMessage());
router.contactDead( target_contact.getID(), false );
}
}finally{
try{
contacts_to_query_mon.enter();
active_searches[0]--;
}finally{
contacts_to_query_mon.exit();
}
search_sem.release();
}
}
public void
keyBlockRequest(
DHTTransportContact contact,
byte[] request,
byte[] key_signature )
{
// we don't want to kill the contact due to this so indicate that
// it is ok by setting the flag
if ( database.keyBlockRequest( null, request, key_signature ) != null ){
key_blocked[0] = true;
}
}
};
router.recordLookup( lookup_id );
if ( value_search ){
int rem = max_values - values_found[0];
if ( rem <= 0 ){
Debug.out( "eh?" );
rem = 1;
}
closest.sendFindValue( handler, lookup_id, rem, flags );
}else{
closest.sendFindNode( handler, lookup_id );
}
}finally{
contacts_to_query_mon.exit();
}
}
// maybe unterminated searches still going on so protect ourselves
// against concurrent modification of result set
List closest_res = null;
try{
contacts_to_query_mon.enter();
if ( DHTLog.isOn()){
DHTLog.log( "lookup complete for " + DHTLog.getString( lookup_id ));
DHTLog.log( " queried = " + DHTLog.getString( contacts_queried ));
DHTLog.log( " to query = " + DHTLog.getString( contacts_to_query ));
DHTLog.log( " ok = " + DHTLog.getString( ok_contacts ));
}
closest_res = new ArrayList( ok_contacts );
// we need to reverse the list as currently closest is at
// the end
Collections.reverse( closest_res );
if ( timeout <= 0 && !value_search ){
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -