📄 kademlia.c
字号:
update_k_bucket(lr.rid, wkn, now() - before); // put all nodes that wkn told us in k-buckets // NB: it's questionable whether we can do a touch here. we haven't // actually talked to the node. NODES_ITER(&lr.results) { if(!flyweight[i->id] && i->id != _id) { insert(i->id, i->ip, 0, i->timeouts); touch(i->id); } } // get our ``successor'' and compute length of prefix we have in common vector<k_nodeinfo*> successors; _root->find_node(_id, &successors); closer::n = _id; sort(successors.begin(), successors.end(), closer()); NodeID succ_id = successors[0]->id; //KDEBUG(2) << "join: succ ip is " << successors[0]->ip << " id is " << printID(succ_id) << endl; unsigned cpl = common_prefix(_id, succ_id); // all entries further away than him need to be refreshed. this is similar to // stabilization. flip a bit and set the rest of the ID to something random. for(unsigned i=idsize-cpl+1; i<idsize; i++) { NodeID random_key = _id ^ (((Kademlia::NodeID) 1) << i); for(int j=i-1; j>=0; j--) random_key ^= (((NodeID) random() & 0x1) << j); lookup_args la(_id, ip(), random_key); la.stattype = Kademlia::STAT_JOIN; lookup_result lr; // if we now believe our successor died, then start again if(!flyweight[succ_id]) { clear(); goto join_restart; } // do a refresh on that bucket do_lookup(&la, &lr); if(!alive()) return; // fill our finger table NODES_ITER(&lr.results) if(!flyweight[i->id] && i->id != _id) insert(i->id, i->ip, 0, i->timeouts); // ... but not touch. we didn't actually talk to the node. } _joined = true; if(stabilize_timer) delaycb(stabilize_timer, &Kademlia::reschedule_stabilizer, (void *) 0);}// }}}// {{{ Kademlia::crashvoidKademlia::crash(Args *args){ // destroy k-buckets //KDEBUG(1) << "Kademlia::crash ip " << ip() << endl; // assert(alive()); //_root->collapse(); //jy delete _root; _root = NULL; for(HashMap<NodeID, k_nodeinfo*>::iterator i = flyweight.begin(); i; i++) Kademlia::pool->push(i.value()); flyweight.clear(); _joined = false;}// }}}// {{{ Kademlia::clearvoidKademlia::clear(){ // destroy k-buckets // KDEBUG(1) << "Kademlia::clear" << endl; for(HashMap<NodeID, k_nodeinfo*>::iterator i = flyweight.begin(); i; i++) Kademlia::pool->push(i.value()); flyweight.clear(); _root->collapse();}// }}}// {{{ Kademlia::lookup//// we assume lookups are only for node IDs, but we have to cheat to do this.voidKademlia::lookup(Args *args){ if(!_joined || !alive()) return; IPAddress key_ip = args->nget<NodeID>("key"); if(!Network::Instance()->getnode(key_ip)->alive()) return; // find node with this IP Kademlia *k = (Kademlia*) Network::Instance()->getnode(key_ip); NodeID key = k->id(); //KDEBUG(0) << "Kademlia::lookup: ip " << key_ip << " id " << printID(key) << endl; lookup_wrapper_args *lwa = New lookup_wrapper_args(); lwa->ipkey = key_ip; lwa->key = key; lwa->starttime = now(); lwa->attempts = 0; lookup_wrapper(lwa);}// }}}// {{{ Kademlia::lookup_wrappervoidKademlia::lookup_wrapper(lookup_wrapper_args *args){ static unsigned outcounter = 0; if(!alive()) { delete args; return; } find_value_args fa(_id, ip(), args->key); find_value_result fr; find_value(&fa, &fr); Time after = now(); args->attempts++; // if we found the node, ping it. if(args->key == fr.succ.id) { ping_args pa(fr.succ.id, fr.succ.ip); ping_result pr; Time pingbegin = now(); // assert(_nodeid2kademlia[fr.succ.id]->ip() == fr.succ.ip); if(!doRPC(fr.succ.ip, &Kademlia::do_ping, &pa, &pr, timeout(fr.succ.ip)) && alive()) { if(collect_stat()) _lookup_dead_node++; if(flyweight[fr.succ.id] && !Kademlia::learn_stabilize_only) erase(fr.succ.id); fr.timeouts++; } after = now(); record_lookup_stat(ip(), fr.succ.ip, after - args->starttime, true, true, fr.hops, fr.timeouts, 0); if(outcounter++ >= 1000) { KDEBUG(0) << pingbegin - args->starttime << "ms lookup (" << args->attempts << "a, " << fr.hops << "h, " << fr.rpcs << "r, " << fr.timeouts << "t), " << after - pingbegin << "ms ping, " << after - args->starttime << "ms total." << endl; outcounter = 0; } // cout << pingbegin - args->starttime << endl; if(collect_stat()) { _good_lookups++; _good_attempts += args->attempts; _good_total_latency += (after - args->starttime); _good_lookup_latency += (pingbegin - args->starttime); _good_ping_latency += (after - pingbegin); _good_hops += fr.hops; _good_timeouts += fr.timeouts; _good_rpcs += fr.rpcs; _good_hop_latency += fr.latency; } delete args; return; } //node crashed and some other nodes rejoined into its old master node, //deleting the crashed node's entry, this cannot be relied upon to //determine node liveliness //bool alive_and_joined = (*_nodeid2kademlia)[args->key]->alive() && // (*_nodeid2kademlia)[args->key]->_joined; bool alive_and_joined = Network::Instance()->alive(args->ipkey)&& ((Kademlia*)Network::Instance()->getnode(args->ipkey))->_joined; IPAddress target_ip = args->ipkey; // we're out of time. if(now() - args->starttime > Kademlia::max_lookup_time) { if(alive_and_joined) { if(collect_stat()) _bad_failures++; record_lookup_stat(ip(), target_ip, after - args->starttime, false, false, fr.hops, fr.timeouts, 0); } else { if(alive_and_joined) _ok_failures++; record_lookup_stat(ip(), target_ip, after - args->starttime, true, false, fr.hops, fr.timeouts, 0); } if(collect_stat()) { _bad_attempts += args->attempts; _bad_lookup_latency += (after - args->starttime); _bad_hops += fr.hops; _bad_timeouts += fr.timeouts; _bad_rpcs += fr.rpcs; _bad_hop_latency += fr.latency; } delete args; return; } // try again in a bit. delaycb(100, &Kademlia::lookup_wrapper, args);}// }}}// {{{ Kademlia::do_pingvoidKademlia::do_ping(ping_args *args, ping_result *result){ // put the caller in the tree, but never ourselves // KDEBUG(1) << "Kademlia::do_ping from " << printID(args->id) << endl; if(!Kademlia::learn_stabilize_only) update_k_bucket(args->id, args->ip);}// }}}// {{{ Kademlia::util for find_value and do_lookup#define SEND_RPC(x, ARGS, RESULT, HOPS, WHICH_ALPHA) { \ find_node_args *fa = New find_node_args(_id, ip(), ARGS->key); \ fa->stattype = ARGS->stattype; \ find_node_result *fr = New find_node_result; \ fr->hops = HOPS; \ fr->which_alpha = WHICH_ALPHA; \ record_stat(ARGS->stattype, 1, 0); \ unsigned rpc = asyncRPC(x.ip, &Kademlia::find_node, fa, fr, timeout(x.ip)); \ callinfo *ci = New callinfo(x, fa, fr); \ ci->before = now(); \ rpcset->insert(rpc); \ outstanding_rpcs->insert(rpc, ci); \}#define CREATE_REAPER(STAT) { \ if(!outstanding_rpcs->size() && !is_dead->size() ) { \ delete rpcset; \ delete outstanding_rpcs; \ if(Kademlia::death_notification) { \ for( HashMap<NodeID, vector<IPAddress> *>::iterator i=who_told_me->begin(); i != who_told_me->end(); ++i ) { \ vector<IPAddress> *v = i.value(); \ if( v != NULL ) delete v; \ } \ } \ delete who_told_me; \ delete is_dead; \ return; \ } \ reap_info *ri = New reap_info(); \ ri->k = this; \ ri->rpcset = rpcset; \ ri->outstanding_rpcs = outstanding_rpcs; \ ri->who_told_me = who_told_me; \ ri->is_dead = is_dead; \ ri->stat = STAT; \ ThreadManager::Instance()->create(Kademlia::reap, (void*) ri); \ return; \}// }}}// {{{ Kademlia::find_valuevoidKademlia::find_value(find_value_args *fargs, find_value_result *fresult){ //KDEBUG(0) << "Kademlia::find_value: node ip " << fargs->ip << " id " << printID(fargs->id) << " does find_value for " << printID(fargs->key) << endl; // assert(alive()); HashMap<NodeID, bool> asked; HashMap<NodeID, unsigned> hops; HashMap<unsigned, unsigned> timeouts; update_k_bucket(fargs->id, fargs->ip); fresult->rid = _id; fresult->hops = 0; // find alpha successors for this key closer::n = fargs->key; closerRTT::n = fargs->key; set<k_nodeinfo, closerRTT> successors; vector<k_nodeinfo*> tmp; _root->find_node(fargs->key, &tmp); for(unsigned i=0; i<tmp.size(); i++) { successors.insert((*tmp[i])); hops.insert(tmp[i]->id, 0); } // we can't do anything but return ourselves if(!successors.size()) { fresult->succ = _me; return; } // return if we're done already fresult->succ = *successors.begin(); if(fresult->succ.id == fargs->key) return; // send out the first alpha RPCs HashMap<unsigned, callinfo*> *outstanding_rpcs = New HashMap<unsigned, callinfo*>; RPCSet *rpcset = New RPCSet; { unsigned a = 0; for(set<k_nodeinfo, closer>::const_iterator i=successors.begin(); i != successors.end() && a < Kademlia::alpha; ++i, ++a) { k_nodeinfo ki = *i; SEND_RPC(ki, fargs, fresult, hops[ki.id], a); fresult->rpcs++; asked.insert(ki.id, true); successors.erase(ki); timeouts.insert(a, 0); } } // data structures for death notifications: HashMap<NodeID, vector<IPAddress> * > *who_told_me = New HashMap<NodeID, vector<IPAddress> * >; HashMap<NodeID, bool> *is_dead = New HashMap<NodeID, bool>; // now send out a new RPC for every single RPC that comes back unsigned useless_replies = 0; NodeID last_before_merge = ~fargs->key; unsigned last_returned_alpha; while(true) { bool ok; unsigned donerpc = rcvRPC(rpcset, ok); callinfo *ci = (*outstanding_rpcs)[donerpc]; outstanding_rpcs->remove(donerpc); bool improved = false; last_returned_alpha = ci->fr->which_alpha; k_nodeinfo ki; if(!alive()) { stat_type st = ci->fa->stattype; delete ci; CREATE_REAPER(st); // returns } // node was dead closer::n = fargs->key; closerRTT::n = fargs->key; if(!ok) { if(flyweight[ci->ki.id] && !Kademlia::learn_stabilize_only) erase(ci->ki.id); if(Kademlia::death_notification) { // inform whoever sent me this person that they are dead. is_dead->insert( ci->ki.id, true ); } timeouts.insert(last_returned_alpha, timeouts[last_returned_alpha]+1); delete ci; if(!successors.size() && outstanding_rpcs->size()==0) { CREATE_REAPER(ci->fa->stattype); // returns } goto next_candidate; } // node was ok assert(ci->fr->results.size() <= Kademlia::k_tell); record_stat(ci->fa->stattype, ci->fr->results.size(), 0); if(!Kademlia::learn_stabilize_only) update_k_bucket(ci->ki.id, ci->ki.ip, now() - ci->before); // put all the ones better than what we knew so far in successors, provided // we haven't asked them already. if(successors.size()) last_before_merge = successors.rbegin()->id; for(unsigned i=0; i<ci->fr->results.size(); i++) { k_nodeinfo ki = ci->fr->results[i]; if( Kademlia::death_notification ) { if( (*who_told_me)[ki.id] == NULL ) { who_told_me->insert( ki.id, New vector<IPAddress> ); } ((*who_told_me)[ki.id])->push_back( ci->ki.ip ); } if(asked.find_pair(ki.id)) continue; successors.insert(ki); if(!hops.find_pair(ki.id)) hops.insert(ki.id, ci->fr->hops+1); } // cut out elements beyond index k while(successors.size() > Kademlia::k) successors.erase(*successors.rbegin()); // found the key ki = *successors.begin(); if(ki.id == fargs->key) { fresult->hops = hops[ki.id]; fresult->succ = ki; fresult->timeouts = timeouts[last_returned_alpha]; stat_type st = ci->fa->stattype; delete ci; CREATE_REAPER(st); // returns } // if our standing hasn't improved, improved = (distance(successors.begin()->id, fargs->key) < distance(fresult->succ.id, fargs->key)); useless_replies = improved ? 0 : useless_replies++; // if the last alpha RPCs didn't yield anything better, give up. I don't // think this should ever happen. It means we went down a dead end alpha // times. // // XXX: Thomer completely pulled this out of his ass. if(!successors.size() || useless_replies > Kademlia::alpha) { stat_type st = ci->fa->stattype; delete ci; CREATE_REAPER(st); // returns } delete ci;next_candidate: // assert((unsigned) outstanding_rpcs->size() == Kademlia::alpha - 1); // XXX: wrong. messes up last_alpha_rpc while((outstanding_rpcs->size() < (int) Kademlia::alpha) && successors.size()) { k_nodeinfo front = *successors.begin(); if(Kademlia::distance(front.id, fargs->key) < Kademlia::distance(fresult->succ.id, fargs->key)) fresult->succ = front;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -