⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 kademlia.c

📁 这是一个P2P协议仿真软件
💻 C
📖 第 1 页 / 共 5 页
字号:
      SEND_RPC(front, fargs, fresult, hops[front.id], timeouts[last_returned_alpha]);      fresult->rpcs++;      asked.insert(front.id, true);      successors.erase(front);    }  }}// }}}// {{{ Kademlia::do_lookup//// only called for stabilization, so always learnvoidKademlia::do_lookup(lookup_args *largs, lookup_result *lresult){  //KDEBUG(1) << "Kademlia::do_lookup: node " << printID(largs->id) << " does lookup for " << printID(largs->key) << ", flyweight.size() = " << endl;  // assert(alive());  lresult->rid = _id;  // find successors of this key  closer::n = largs->key;  set<k_nodeinfo, closer> successors;  vector<k_nodeinfo*> tmp;  _root->find_node(largs->key, &tmp);  for(unsigned i=0; i<tmp.size(); i++) {    successors.insert(tmp[i]);    lresult->results.insert(tmp[i]);  }  // unsigned j = 0;  // NODES_ITER(&lresult->results) {  //   KDEBUG(0) << "start: lresults[" << j++ << "] = " << printID(i->id) << endl;  // }  // we can't do anything but return ourselves  if(!successors.size()) {    // KDEBUG(0) << "no successors, exiting" << endl;    lresult->results.insert(_me);    return;  }  // keep track of worst-of-best  NodeID worst = lresult->results.rbegin()->id;  // KDEBUG(0) << "worst = " << printID(worst) << endl;  // send out the first alpha RPCs  HashMap<unsigned, callinfo*> *outstanding_rpcs = New HashMap<unsigned, callinfo*>;  RPCSet *rpcset = New RPCSet;  {    unsigned a = 0;    for(set<k_nodeinfo, closer>::const_iterator i=successors.begin(); i != successors.end() && a < Kademlia::alpha; ++i, ++a) {      k_nodeinfo ki = *i;      SEND_RPC(ki, largs, lresult, 0, 0);      // KDEBUG(0) << "SEND_RPC (initial) to " << printID(ki.id) << endl;      successors.erase(ki);    }  }  // j = 0;  // NODES_ITER(&successors) {  //   KDEBUG(0) << "after initial SEND_RPC successors[" << j++ << "] = " << printID(i->id) << endl;  // }  // data structures for death notifications:  HashMap<NodeID, vector<IPAddress> * > *who_told_me =     New HashMap<NodeID, vector<IPAddress> * >;  HashMap<NodeID, bool> *is_dead = New HashMap<NodeID, bool>;  // send an RPC back for each incoming reply.  HashMap<NodeID, bool> replied;  while(true) {    bool ok;    unsigned donerpc = rcvRPC(rpcset, ok);    callinfo *ci = (*outstanding_rpcs)[donerpc];    outstanding_rpcs->remove(donerpc);    replied.insert(ci->ki.id, true);    if(!alive()) {      delete ci;      // KDEBUG(0) << "rcvRPC, not alive, bye" << endl;      CREATE_REAPER(largs->stattype); // returns    }    // node was dead    closer::n = largs->key;    if(!ok) {      // KDEBUG(0) << "!ok" << endl;      if(flyweight[ci->ki.id] && (!Kademlia::learn_stabilize_only ||                                   largs->stattype == STAT_STABILIZE ||                                   largs->stattype == STAT_LOOKUP))        erase(ci->ki.id);      if(Kademlia::death_notification) {	// inform whoever sent me this person that they are dead.	is_dead->insert( ci->ki.id, true );      }      delete ci;      // no more RPCs to send, but more RPCs to wait for      if(!successors.size() && outstanding_rpcs->size()) {        // KDEBUG(0) << "!ok, no more successors, but more outstanding rpcs" << endl;        continue;      }      // no more RPCs to send and no outstanding RPCs.      if(!successors.size() && !outstanding_rpcs->size()) {        // KDEBUG(0) << "!ok, no more successors and no outstanding_rpcs" << endl;        // truncate to right size        while(lresult->results.size() > Kademlia::k)          lresult->results.erase(*lresult->results.rbegin());        // unsigned j = 0;        // NODES_ITER(&lresult->results) {        //   KDEBUG(0) << "CREATE_REAPER lresults[" << j++ << "] = " << printID(i->id) << endl;        // }        CREATE_REAPER(largs->stattype); // returns      }      if(successors.size())        goto next_candidate;      // we've handled all the cases, right?      assert(false);    }    // node was ok    // KDEBUG(0) << "good reply" << endl;    record_stat(largs->stattype, ci->fr->results.size(), 0);    if(!Kademlia::learn_stabilize_only || largs->stattype == STAT_STABILIZE ||                                          largs->stattype == STAT_LOOKUP)      update_k_bucket(ci->ki.id, ci->ki.ip, now() - ci->before);    // j = 0;    // NODES_ITER(&lresult->results) {    //   KDEBUG(0) << "before merge lresults[" << j++ << "] = " << printID(i->id) << endl;    // }    // j = 0;    // NODES_ITER(&successors) {    //   KDEBUG(0) << "before merge successors[" << j++ << "] = " << printID(i->id) << endl;    // }    // put in successors list if:    //   - it's better than the worst-of-best we have so far AND    //   - it's not in our results set already    closer::n = largs->key;    for(unsigned i=0; i<ci->fr->results.size(); i++) {      k_nodeinfo ki = ci->fr->results[i];      if( Kademlia::death_notification ) {	if( (*who_told_me)[ki.id] == NULL ) {	  who_told_me->insert( ki.id, New vector<IPAddress> );	}	((*who_told_me)[ki.id])->push_back( ci->ki.ip );      }      if(distance(ki.id, largs->key) >= distance(worst, largs->key) ||         lresult->results.find(ki) != lresult->results.end())        continue;      successors.insert(ki);      lresult->results.insert(ki);    }    delete ci;    // j = 0;    // NODES_ITER(&lresult->results) {    //   KDEBUG(0) << "after merge lresults[" << j++ << "] = " << printID(i->id) << endl;    // }    // j = 0;    // NODES_ITER(&successors) {    //   KDEBUG(0) << "after merge successors[" << j++ << "] = " << printID(i->id) << endl;    // }    // we're done if we've had a reply from everyone in the results set.    if(!successors.size()) {      bool we_are_done = true;      NODES_ITER(&lresult->results) {        if(!replied[i->id]) {          we_are_done = false;          break;        }      }      if(we_are_done) {        // KDEBUG(0) << "returning size = " << lresult->results.size() << endl;        while(lresult->results.size() > Kademlia::k)          lresult->results.erase(*lresult->results.rbegin());        CREATE_REAPER(largs->stattype); // returns      }    }    // last alpha RPCs did not change our view of the world.  send parallel RPCs    // to all remaining ones.    if(successors.size() <= (Kademlia::k - Kademlia::alpha)) {      // KDEBUG(0) << "nothing changed. flushing" << endl;      NODES_ITER(&successors) {        k_nodeinfo ki = *i;        SEND_RPC(ki, largs, lresult, 0, 0);        successors.erase(ki);      }      continue;    }    // resize back to Kademlia::k    while(successors.size() > Kademlia::k)      successors.erase(*successors.rbegin());    while(lresult->results.size() > Kademlia::k)      lresult->results.erase(*lresult->results.rbegin());    // j = 0;    // NODES_ITER(&successors) {    //   KDEBUG(0) << "after truncate successors[" << j++ << "] = " << printID(i->id) << endl;    // }    // j = 0;    // NODES_ITER(&lresult->results) {    //   KDEBUG(0) << "after truncate lresults[" << j++ << "] = " << printID(i->id) << endl;    // }    // need to update our worst-of-best?    if(successors.size() &&      (Kademlia::distance(lresult->results.rbegin()->id, largs->key) <       Kademlia::distance(worst, largs->key)))    {      worst = lresult->results.rbegin()->id;    }next_candidate:    k_nodeinfo front = *successors.begin();    SEND_RPC(front, largs, lresult, 0, 0);    successors.erase(front);    // j = 0;    // NODES_ITER(&successors) {    //   KDEBUG(0) << "after next_candidate successors[" << j++ << "] = " << printID(i->id) << endl;    // }  }}// }}}// {{{ Kademlia::find_node// Kademlia's FIND_NODE.  Returns the best k from its own k-bucketsvoidKademlia::find_node(find_node_args *largs, find_node_result *lresult){  // assert(alive());  if(!Kademlia::learn_stabilize_only || largs->stattype == STAT_STABILIZE ||                                        largs->stattype == STAT_LOOKUP)    update_k_bucket(largs->id, largs->ip);  lresult->rid = _id;  // deal with the empty case  if(!flyweight.size()) {    // assert(p);    KDEBUG(2) << "find_node: key " << printID(largs->key) << " tree is empty. returning myself" << endl;    lresult->results.push_back(_me);    return;  }  vector<k_nodeinfo*> tmpset;  _root->find_node(largs->key, &tmpset);  //KDEBUG(2) << "find_node: key "<< printID(largs->key) << " from ip " << largs->ip << " returning: ip " <<tmpset[0]->ip << " id " << printID(tmpset[0]->id) << endl;  for(unsigned i=0; i<tmpset.size(); i++)    lresult->results.push_back(tmpset[i]);}// }}}// {{{ Kademlia::reschedule_stabilizervoidKademlia::reschedule_stabilizer(void *x){  // KDEBUG(1) << "Kademlia::reschedule_stabilizer" << endl;  if(!alive()) {    // KDEBUG(2) << "Kademlia::reschedule_stabilizer returning because I'm dead." << endl;    return;  }  stabilize();  if(stabilize_timer)    delaycb(stabilize_timer, &Kademlia::reschedule_stabilizer, (void *) 0);}// }}}// {{{ Kademlia::stabilizevoidKademlia::stabilize(){  // KDEBUG(0) << "Kademlia::stabilize" << endl;  // assert(alive());  if(Kademlia::docheckrep) {    k_check check;    _root->traverse(&check, this);  }  // stabilize  k_stabilizer stab;  _root->traverse(&stab, this);  // tell the observers.  maybe they're happy now.  notifyObservers();}// }}}// {{{ Kademlia::stabilizedboolKademlia::stabilized(vector<NodeID> *lid){  // remove ourselves from lid  vector<NodeID> copylid;  for(vector<NodeID>::const_iterator i = lid->begin(); i != lid->end(); ++i)    if(*i != id())      copylid.push_back(*i);  k_stabilized stab(&copylid);  _root->traverse(&stab, this);  return stab.stabilized();}// }}}// {{{ Kademlia::getbit//// Returns the i-th bit in n.  0 is the most significant bit.//inlineunsignedKademlia::getbit(NodeID n, unsigned i){  return (n & (((NodeID) 1)<<((sizeof(NodeID)*8)-i-1))) ? 1 : 0;}// }}}// {{{ Kademlia::touch// pre: id is in the flyweight and id is in the tree// post: lastts for id is updated, update propagated to k-bucket.voidKademlia::touch(NodeID id){  // KDEBUG(1) << "Kademlia::touch " << Kademlia::printID(id) << endl;  // assert(alive());  // assert(id);  // assert(flyweight.find(id, 0));  _root->insert(id, true);}// }}}// {{{ Kademlia::insert// pre: id and ip are valid, id is not yet in this flyweight// post: id->ip mapping in flyweight, and k-bucket//voidKademlia::insert(NodeID id, IPAddress ip, Time RTT, char timeouts, bool init_state){  // KDEBUG(1) << "Kademlia::insert " << Kademlia::printID(id) << ", ip = " << ip << endl;  static unsigned counter = 0;  // assert(alive());  // assert(id && ip);  // assert(!flyweight.find(id, 0));  k_nodeinfo *ni = Kademlia::pool->pop(id, ip, RTT, timeouts);  assert(ni);  if(init_state)    ni->lastts = counter++;  flyweight.insert(id, ni);  _root->insert(id, false, init_state);}// }}}// {{{ Kademlia::erasevoidKademlia::erase(NodeID id){  // KDEBUG(1) << "Kademlia::erase " << Kademlia::printID(id) << endl;  // assert(flyweight.find(id, 0));  // KDEBUG(2) << "Kademlia::erase deleting id = " << printID(id) << ", ip = " << flyweight[id]->ip << endl;  k_nodeinfo *ki = flyweight[id];  // 5, taken from Kademlia paper  if(++ki->timeouts >= Kademlia::erase_count) {    _root->erase(id);    flyweight.remove(id);    Kademlia::pool->push(ki);    return;  }}// }}}// {{{ Kademlia::erasevoidKademlia::do_erase(erase_args *a, erase_result *r){  for( unsigned i = 0; i < a->ids->size(); i++ ) {    NodeID n = (*(a->ids))[i];    if( flyweight.find(n, 0) ) {      erase(n);    }  }}// }}}// {{{ Kademlia::update_k_bucketinline voidKademlia::update_k_bucket(NodeID id, IPAddress ip, Time RTT){  // KDEBUG(1) << "Kademlia::update_k_bucket" << endl;  // update k-bucket  if(id == _id)    return;  if(!flyweight[id]) {    // KDEBUG(2) << "Kademlia::update_k_bucket says " << printID(id) << " doesn't exist yet" << endl;    insert(id, ip, RTT);  } else if(RTT) {    flyweight[id]->RTT = RTT;  }  touch(id);}// }}}// {{{ Kademlia::common_prefixunsigned

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -