📄 kademlia.c
字号:
SEND_RPC(front, fargs, fresult, hops[front.id], timeouts[last_returned_alpha]); fresult->rpcs++; asked.insert(front.id, true); successors.erase(front); } }}// }}}// {{{ Kademlia::do_lookup//// only called for stabilization, so always learnvoidKademlia::do_lookup(lookup_args *largs, lookup_result *lresult){ //KDEBUG(1) << "Kademlia::do_lookup: node " << printID(largs->id) << " does lookup for " << printID(largs->key) << ", flyweight.size() = " << endl; // assert(alive()); lresult->rid = _id; // find successors of this key closer::n = largs->key; set<k_nodeinfo, closer> successors; vector<k_nodeinfo*> tmp; _root->find_node(largs->key, &tmp); for(unsigned i=0; i<tmp.size(); i++) { successors.insert(tmp[i]); lresult->results.insert(tmp[i]); } // unsigned j = 0; // NODES_ITER(&lresult->results) { // KDEBUG(0) << "start: lresults[" << j++ << "] = " << printID(i->id) << endl; // } // we can't do anything but return ourselves if(!successors.size()) { // KDEBUG(0) << "no successors, exiting" << endl; lresult->results.insert(_me); return; } // keep track of worst-of-best NodeID worst = lresult->results.rbegin()->id; // KDEBUG(0) << "worst = " << printID(worst) << endl; // send out the first alpha RPCs HashMap<unsigned, callinfo*> *outstanding_rpcs = New HashMap<unsigned, callinfo*>; RPCSet *rpcset = New RPCSet; { unsigned a = 0; for(set<k_nodeinfo, closer>::const_iterator i=successors.begin(); i != successors.end() && a < Kademlia::alpha; ++i, ++a) { k_nodeinfo ki = *i; SEND_RPC(ki, largs, lresult, 0, 0); // KDEBUG(0) << "SEND_RPC (initial) to " << printID(ki.id) << endl; successors.erase(ki); } } // j = 0; // NODES_ITER(&successors) { // KDEBUG(0) << "after initial SEND_RPC successors[" << j++ << "] = " << printID(i->id) << endl; // } // data structures for death notifications: HashMap<NodeID, vector<IPAddress> * > *who_told_me = New HashMap<NodeID, vector<IPAddress> * >; HashMap<NodeID, bool> *is_dead = New HashMap<NodeID, bool>; // send an RPC back for each incoming reply. HashMap<NodeID, bool> replied; while(true) { bool ok; unsigned donerpc = rcvRPC(rpcset, ok); callinfo *ci = (*outstanding_rpcs)[donerpc]; outstanding_rpcs->remove(donerpc); replied.insert(ci->ki.id, true); if(!alive()) { delete ci; // KDEBUG(0) << "rcvRPC, not alive, bye" << endl; CREATE_REAPER(largs->stattype); // returns } // node was dead closer::n = largs->key; if(!ok) { // KDEBUG(0) << "!ok" << endl; if(flyweight[ci->ki.id] && (!Kademlia::learn_stabilize_only || largs->stattype == STAT_STABILIZE || largs->stattype == STAT_LOOKUP)) erase(ci->ki.id); if(Kademlia::death_notification) { // inform whoever sent me this person that they are dead. is_dead->insert( ci->ki.id, true ); } delete ci; // no more RPCs to send, but more RPCs to wait for if(!successors.size() && outstanding_rpcs->size()) { // KDEBUG(0) << "!ok, no more successors, but more outstanding rpcs" << endl; continue; } // no more RPCs to send and no outstanding RPCs. if(!successors.size() && !outstanding_rpcs->size()) { // KDEBUG(0) << "!ok, no more successors and no outstanding_rpcs" << endl; // truncate to right size while(lresult->results.size() > Kademlia::k) lresult->results.erase(*lresult->results.rbegin()); // unsigned j = 0; // NODES_ITER(&lresult->results) { // KDEBUG(0) << "CREATE_REAPER lresults[" << j++ << "] = " << printID(i->id) << endl; // } CREATE_REAPER(largs->stattype); // returns } if(successors.size()) goto next_candidate; // we've handled all the cases, right? assert(false); } // node was ok // KDEBUG(0) << "good reply" << endl; record_stat(largs->stattype, ci->fr->results.size(), 0); if(!Kademlia::learn_stabilize_only || largs->stattype == STAT_STABILIZE || largs->stattype == STAT_LOOKUP) update_k_bucket(ci->ki.id, ci->ki.ip, now() - ci->before); // j = 0; // NODES_ITER(&lresult->results) { // KDEBUG(0) << "before merge lresults[" << j++ << "] = " << printID(i->id) << endl; // } // j = 0; // NODES_ITER(&successors) { // KDEBUG(0) << "before merge successors[" << j++ << "] = " << printID(i->id) << endl; // } // put in successors list if: // - it's better than the worst-of-best we have so far AND // - it's not in our results set already closer::n = largs->key; for(unsigned i=0; i<ci->fr->results.size(); i++) { k_nodeinfo ki = ci->fr->results[i]; if( Kademlia::death_notification ) { if( (*who_told_me)[ki.id] == NULL ) { who_told_me->insert( ki.id, New vector<IPAddress> ); } ((*who_told_me)[ki.id])->push_back( ci->ki.ip ); } if(distance(ki.id, largs->key) >= distance(worst, largs->key) || lresult->results.find(ki) != lresult->results.end()) continue; successors.insert(ki); lresult->results.insert(ki); } delete ci; // j = 0; // NODES_ITER(&lresult->results) { // KDEBUG(0) << "after merge lresults[" << j++ << "] = " << printID(i->id) << endl; // } // j = 0; // NODES_ITER(&successors) { // KDEBUG(0) << "after merge successors[" << j++ << "] = " << printID(i->id) << endl; // } // we're done if we've had a reply from everyone in the results set. if(!successors.size()) { bool we_are_done = true; NODES_ITER(&lresult->results) { if(!replied[i->id]) { we_are_done = false; break; } } if(we_are_done) { // KDEBUG(0) << "returning size = " << lresult->results.size() << endl; while(lresult->results.size() > Kademlia::k) lresult->results.erase(*lresult->results.rbegin()); CREATE_REAPER(largs->stattype); // returns } } // last alpha RPCs did not change our view of the world. send parallel RPCs // to all remaining ones. if(successors.size() <= (Kademlia::k - Kademlia::alpha)) { // KDEBUG(0) << "nothing changed. flushing" << endl; NODES_ITER(&successors) { k_nodeinfo ki = *i; SEND_RPC(ki, largs, lresult, 0, 0); successors.erase(ki); } continue; } // resize back to Kademlia::k while(successors.size() > Kademlia::k) successors.erase(*successors.rbegin()); while(lresult->results.size() > Kademlia::k) lresult->results.erase(*lresult->results.rbegin()); // j = 0; // NODES_ITER(&successors) { // KDEBUG(0) << "after truncate successors[" << j++ << "] = " << printID(i->id) << endl; // } // j = 0; // NODES_ITER(&lresult->results) { // KDEBUG(0) << "after truncate lresults[" << j++ << "] = " << printID(i->id) << endl; // } // need to update our worst-of-best? if(successors.size() && (Kademlia::distance(lresult->results.rbegin()->id, largs->key) < Kademlia::distance(worst, largs->key))) { worst = lresult->results.rbegin()->id; }next_candidate: k_nodeinfo front = *successors.begin(); SEND_RPC(front, largs, lresult, 0, 0); successors.erase(front); // j = 0; // NODES_ITER(&successors) { // KDEBUG(0) << "after next_candidate successors[" << j++ << "] = " << printID(i->id) << endl; // } }}// }}}// {{{ Kademlia::find_node// Kademlia's FIND_NODE. Returns the best k from its own k-bucketsvoidKademlia::find_node(find_node_args *largs, find_node_result *lresult){ // assert(alive()); if(!Kademlia::learn_stabilize_only || largs->stattype == STAT_STABILIZE || largs->stattype == STAT_LOOKUP) update_k_bucket(largs->id, largs->ip); lresult->rid = _id; // deal with the empty case if(!flyweight.size()) { // assert(p); KDEBUG(2) << "find_node: key " << printID(largs->key) << " tree is empty. returning myself" << endl; lresult->results.push_back(_me); return; } vector<k_nodeinfo*> tmpset; _root->find_node(largs->key, &tmpset); //KDEBUG(2) << "find_node: key "<< printID(largs->key) << " from ip " << largs->ip << " returning: ip " <<tmpset[0]->ip << " id " << printID(tmpset[0]->id) << endl; for(unsigned i=0; i<tmpset.size(); i++) lresult->results.push_back(tmpset[i]);}// }}}// {{{ Kademlia::reschedule_stabilizervoidKademlia::reschedule_stabilizer(void *x){ // KDEBUG(1) << "Kademlia::reschedule_stabilizer" << endl; if(!alive()) { // KDEBUG(2) << "Kademlia::reschedule_stabilizer returning because I'm dead." << endl; return; } stabilize(); if(stabilize_timer) delaycb(stabilize_timer, &Kademlia::reschedule_stabilizer, (void *) 0);}// }}}// {{{ Kademlia::stabilizevoidKademlia::stabilize(){ // KDEBUG(0) << "Kademlia::stabilize" << endl; // assert(alive()); if(Kademlia::docheckrep) { k_check check; _root->traverse(&check, this); } // stabilize k_stabilizer stab; _root->traverse(&stab, this); // tell the observers. maybe they're happy now. notifyObservers();}// }}}// {{{ Kademlia::stabilizedboolKademlia::stabilized(vector<NodeID> *lid){ // remove ourselves from lid vector<NodeID> copylid; for(vector<NodeID>::const_iterator i = lid->begin(); i != lid->end(); ++i) if(*i != id()) copylid.push_back(*i); k_stabilized stab(©lid); _root->traverse(&stab, this); return stab.stabilized();}// }}}// {{{ Kademlia::getbit//// Returns the i-th bit in n. 0 is the most significant bit.//inlineunsignedKademlia::getbit(NodeID n, unsigned i){ return (n & (((NodeID) 1)<<((sizeof(NodeID)*8)-i-1))) ? 1 : 0;}// }}}// {{{ Kademlia::touch// pre: id is in the flyweight and id is in the tree// post: lastts for id is updated, update propagated to k-bucket.voidKademlia::touch(NodeID id){ // KDEBUG(1) << "Kademlia::touch " << Kademlia::printID(id) << endl; // assert(alive()); // assert(id); // assert(flyweight.find(id, 0)); _root->insert(id, true);}// }}}// {{{ Kademlia::insert// pre: id and ip are valid, id is not yet in this flyweight// post: id->ip mapping in flyweight, and k-bucket//voidKademlia::insert(NodeID id, IPAddress ip, Time RTT, char timeouts, bool init_state){ // KDEBUG(1) << "Kademlia::insert " << Kademlia::printID(id) << ", ip = " << ip << endl; static unsigned counter = 0; // assert(alive()); // assert(id && ip); // assert(!flyweight.find(id, 0)); k_nodeinfo *ni = Kademlia::pool->pop(id, ip, RTT, timeouts); assert(ni); if(init_state) ni->lastts = counter++; flyweight.insert(id, ni); _root->insert(id, false, init_state);}// }}}// {{{ Kademlia::erasevoidKademlia::erase(NodeID id){ // KDEBUG(1) << "Kademlia::erase " << Kademlia::printID(id) << endl; // assert(flyweight.find(id, 0)); // KDEBUG(2) << "Kademlia::erase deleting id = " << printID(id) << ", ip = " << flyweight[id]->ip << endl; k_nodeinfo *ki = flyweight[id]; // 5, taken from Kademlia paper if(++ki->timeouts >= Kademlia::erase_count) { _root->erase(id); flyweight.remove(id); Kademlia::pool->push(ki); return; }}// }}}// {{{ Kademlia::erasevoidKademlia::do_erase(erase_args *a, erase_result *r){ for( unsigned i = 0; i < a->ids->size(); i++ ) { NodeID n = (*(a->ids))[i]; if( flyweight.find(n, 0) ) { erase(n); } }}// }}}// {{{ Kademlia::update_k_bucketinline voidKademlia::update_k_bucket(NodeID id, IPAddress ip, Time RTT){ // KDEBUG(1) << "Kademlia::update_k_bucket" << endl; // update k-bucket if(id == _id) return; if(!flyweight[id]) { // KDEBUG(2) << "Kademlia::update_k_bucket says " << printID(id) << " doesn't exist yet" << endl; insert(id, ip, RTT); } else if(RTT) { flyweight[id]->RTT = RTT; } touch(id);}// }}}// {{{ Kademlia::common_prefixunsigned
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -