⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 kademlia.c

📁 P2P模拟器
💻 C
📖 第 1 页 / 共 5 页
字号:
  update_k_bucket(lr.rid, wkn, now() - before);  // put all nodes that wkn told us in k-buckets  // NB: it's questionable whether we can do a touch here.  we haven't  // actually talked to the node.  NODES_ITER(&lr.results) {    if(!flyweight[i->id] && i->id != _id) {      insert(i->id, i->ip, 0, i->timeouts);      touch(i->id);    }  }  // get our ``successor'' and compute length of prefix we have in common  vector<k_nodeinfo*> successors;  _root->find_node(_id, &successors);  closer::n = _id;  sort(successors.begin(), successors.end(), closer());  NodeID succ_id = successors[0]->id;  //KDEBUG(2) << "join: succ ip is " << successors[0]->ip << " id is " << printID(succ_id) << endl;  unsigned cpl = common_prefix(_id, succ_id);  // all entries further away than him need to be refreshed.  this is similar to  // stabilization.  flip a bit and set the rest of the ID to something random.  for(unsigned i=idsize-cpl+1; i<idsize; i++) {    NodeID random_key = _id ^ (((Kademlia::NodeID) 1) << i);    for(int j=i-1; j>=0; j--)      random_key ^= (((NodeID) random() & 0x1) << j);    lookup_args la(_id, ip(), random_key);    la.stattype = Kademlia::STAT_JOIN;    lookup_result lr;    // if we now believe our successor died, then start again    if(!flyweight[succ_id]) {      clear();      goto join_restart;    }    // do a refresh on that bucket    do_lookup(&la, &lr);    if(!alive())      return;    // fill our finger table    NODES_ITER(&lr.results)      if(!flyweight[i->id] && i->id != _id)        insert(i->id, i->ip, 0, i->timeouts);        //  ... but not touch.  we didn't actually talk to the node.  }  _joined = true;  if(stabilize_timer)    delaycb(stabilize_timer, &Kademlia::reschedule_stabilizer, (void *) 0);}// }}}// {{{ Kademlia::crashvoidKademlia::crash(Args *args){  // destroy k-buckets  //KDEBUG(1) << "Kademlia::crash ip " << ip() << endl;  // assert(alive());  //_root->collapse();  //jy  delete _root;  _root = NULL;  for(HashMap<NodeID, k_nodeinfo*>::iterator i = flyweight.begin(); i; i++)    Kademlia::pool->push(i.value());  flyweight.clear();  _joined = false;}// }}}// {{{ Kademlia::clearvoidKademlia::clear(){  // destroy k-buckets  // KDEBUG(1) << "Kademlia::clear" << endl;  for(HashMap<NodeID, k_nodeinfo*>::iterator i = flyweight.begin(); i; i++)    Kademlia::pool->push(i.value());  flyweight.clear();  _root->collapse();}// }}}// {{{ Kademlia::lookup//// we assume lookups are only for node IDs, but we have to cheat to do this.voidKademlia::lookup(Args *args){  if(!_joined || !alive())    return;  IPAddress key_ip = args->nget<NodeID>("key");  if(!Network::Instance()->getnode(key_ip)->alive())    return;  // find node with this IP  Kademlia *k = (Kademlia*) Network::Instance()->getnode(key_ip);  NodeID key = k->id();  //KDEBUG(0) << "Kademlia::lookup: ip " << key_ip << " id " << printID(key) << endl;  lookup_wrapper_args *lwa = New lookup_wrapper_args();  lwa->ipkey = key_ip;  lwa->key = key;  lwa->starttime = now();  lwa->attempts = 0;  lookup_wrapper(lwa);}// }}}// {{{ Kademlia::lookup_wrappervoidKademlia::lookup_wrapper(lookup_wrapper_args *args){  static unsigned outcounter = 0;  if(!alive()) {    delete args;    return;  }  find_value_args fa(_id, ip(), args->key);  find_value_result fr;  find_value(&fa, &fr);  Time after = now();  args->attempts++;  // if we found the node, ping it.  if(args->key == fr.succ.id) {    ping_args pa(fr.succ.id, fr.succ.ip);    ping_result pr;    Time pingbegin = now();    // assert(_nodeid2kademlia[fr.succ.id]->ip() == fr.succ.ip);    if(!doRPC(fr.succ.ip, &Kademlia::do_ping, &pa, &pr, timeout(fr.succ.ip)) && alive()) {      if(collect_stat()) _lookup_dead_node++;      if(flyweight[fr.succ.id] && !Kademlia::learn_stabilize_only)        erase(fr.succ.id);      fr.timeouts++;    }    after = now();    record_lookup_stat(ip(), fr.succ.ip, after - args->starttime, true, true,		       fr.hops, fr.timeouts, 0);    if(outcounter++ >= 1000) {      KDEBUG(0) <<  pingbegin - args->starttime << "ms lookup (" << args->attempts << "a, " << fr.hops << "h, " << fr.rpcs << "r, " << fr.timeouts << "t), " << after - pingbegin << "ms ping, " << after - args->starttime << "ms total." << endl;      outcounter = 0;    }    // cout << pingbegin - args->starttime << endl;    if(collect_stat()) {      _good_lookups++;      _good_attempts += args->attempts;      _good_total_latency += (after - args->starttime);      _good_lookup_latency += (pingbegin - args->starttime);      _good_ping_latency += (after - pingbegin);      _good_hops += fr.hops;      _good_timeouts += fr.timeouts;      _good_rpcs += fr.rpcs;      _good_hop_latency += fr.latency;    }    delete args;    return;  }  //node crashed and some other nodes rejoined into its old master node,  //deleting the crashed node's entry, this cannot be relied upon to   //determine node liveliness  //bool alive_and_joined = (*_nodeid2kademlia)[args->key]->alive() &&  //                        (*_nodeid2kademlia)[args->key]->_joined;  bool alive_and_joined = Network::Instance()->alive(args->ipkey)&&                          ((Kademlia*)Network::Instance()->getnode(args->ipkey))->_joined;  IPAddress target_ip = args->ipkey;  // we're out of time.  if(now() - args->starttime > Kademlia::max_lookup_time) {    if(alive_and_joined) {      if(collect_stat()) _bad_failures++;      record_lookup_stat(ip(), target_ip, after - args->starttime, false, false, fr.hops, fr.timeouts, 0);    } else {      if(alive_and_joined) _ok_failures++;      record_lookup_stat(ip(), target_ip, after - args->starttime, true, false, fr.hops, fr.timeouts, 0);    }    if(collect_stat()) {      _bad_attempts += args->attempts;      _bad_lookup_latency += (after - args->starttime);      _bad_hops += fr.hops;      _bad_timeouts += fr.timeouts;      _bad_rpcs += fr.rpcs;      _bad_hop_latency += fr.latency;    }    delete args;    return;  }  // try again in a bit.  delaycb(100, &Kademlia::lookup_wrapper, args);}// }}}// {{{ Kademlia::do_pingvoidKademlia::do_ping(ping_args *args, ping_result *result){  // put the caller in the tree, but never ourselves  // KDEBUG(1) << "Kademlia::do_ping from " << printID(args->id) << endl;  if(!Kademlia::learn_stabilize_only)    update_k_bucket(args->id, args->ip);}// }}}// {{{ Kademlia::util for find_value and do_lookup#define SEND_RPC(x, ARGS, RESULT, HOPS, WHICH_ALPHA) {                                  \    find_node_args *fa = New find_node_args(_id, ip(), ARGS->key);                      \    fa->stattype = ARGS->stattype;                                                      \    find_node_result *fr = New find_node_result;                                        \    fr->hops = HOPS;                                                                    \    fr->which_alpha = WHICH_ALPHA;                                                      \    record_stat(ARGS->stattype, 1, 0);                                                  \    unsigned rpc = asyncRPC(x.ip, &Kademlia::find_node, fa, fr, timeout(x.ip)); \    callinfo *ci = New callinfo(x, fa, fr);                                             \    ci->before = now();                                                                 \    rpcset->insert(rpc);                                                                \    outstanding_rpcs->insert(rpc, ci);                                                  \}#define CREATE_REAPER(STAT) {                                                           \      if(!outstanding_rpcs->size() && !is_dead->size() ) {                              \        delete rpcset;                                                                  \        delete outstanding_rpcs;                                                        \        if(Kademlia::death_notification) {                                              \          for( HashMap<NodeID, vector<IPAddress> *>::iterator i=who_told_me->begin(); i != who_told_me->end(); ++i ) {                                                 \            vector<IPAddress> *v = i.value();                                           \            if( v != NULL ) delete v;                                                   \          }                                                                             \        }                                                                               \        delete who_told_me;                                                             \        delete is_dead;                                                                 \        return;                                                                         \      }                                                                                 \      reap_info *ri = New reap_info();                                                  \      ri->k = this;                                                                     \      ri->rpcset = rpcset;                                                              \      ri->outstanding_rpcs = outstanding_rpcs;                                          \      ri->who_told_me = who_told_me;                                                    \      ri->is_dead = is_dead;                                                            \      ri->stat = STAT;                                                                  \      ThreadManager::Instance()->create(Kademlia::reap, (void*) ri);                    \      return;                                                                           \}// }}}// {{{ Kademlia::find_valuevoidKademlia::find_value(find_value_args *fargs, find_value_result *fresult){  //KDEBUG(0) << "Kademlia::find_value: node ip " << fargs->ip << " id " << printID(fargs->id) << " does find_value for " << printID(fargs->key) << endl;    // assert(alive());  HashMap<NodeID, bool> asked;  HashMap<NodeID, unsigned> hops;  HashMap<unsigned, unsigned> timeouts;  update_k_bucket(fargs->id, fargs->ip);  fresult->rid = _id;  fresult->hops = 0;  // find alpha successors for this key  closer::n = fargs->key;  closerRTT::n = fargs->key;  set<k_nodeinfo, closerRTT> successors;  vector<k_nodeinfo*> tmp;  _root->find_node(fargs->key, &tmp);  for(unsigned i=0; i<tmp.size(); i++) {    successors.insert((*tmp[i]));    hops.insert(tmp[i]->id, 0);  }  // we can't do anything but return ourselves  if(!successors.size()) {    fresult->succ = _me;    return;  }  // return if we're done already  fresult->succ = *successors.begin();  if(fresult->succ.id == fargs->key)    return;  // send out the first alpha RPCs  HashMap<unsigned, callinfo*> *outstanding_rpcs = New HashMap<unsigned, callinfo*>;  RPCSet *rpcset = New RPCSet;  {    unsigned a = 0;    for(set<k_nodeinfo, closer>::const_iterator i=successors.begin(); i != successors.end() && a < Kademlia::alpha; ++i, ++a) {      k_nodeinfo ki = *i;      SEND_RPC(ki, fargs, fresult, hops[ki.id], a);      fresult->rpcs++;      asked.insert(ki.id, true);      successors.erase(ki);      timeouts.insert(a, 0);    }  }  // data structures for death notifications:  HashMap<NodeID, vector<IPAddress> * > *who_told_me =     New HashMap<NodeID, vector<IPAddress> * >;  HashMap<NodeID, bool> *is_dead = New HashMap<NodeID, bool>;  // now send out a new RPC for every single RPC that comes back  unsigned useless_replies = 0;  NodeID last_before_merge = ~fargs->key;  unsigned last_returned_alpha;  while(true) {    bool ok;    unsigned donerpc = rcvRPC(rpcset, ok);    callinfo *ci = (*outstanding_rpcs)[donerpc];    outstanding_rpcs->remove(donerpc);    bool improved = false;    last_returned_alpha = ci->fr->which_alpha;    k_nodeinfo ki;    if(!alive()) {      stat_type st = ci->fa->stattype;      delete ci;      CREATE_REAPER(st); // returns    }    // node was dead    closer::n = fargs->key;    closerRTT::n = fargs->key;    if(!ok) {      if(flyweight[ci->ki.id] && !Kademlia::learn_stabilize_only)        erase(ci->ki.id);      if(Kademlia::death_notification) {	// inform whoever sent me this person that they are dead.	is_dead->insert( ci->ki.id, true );      }      timeouts.insert(last_returned_alpha, timeouts[last_returned_alpha]+1);      delete ci;      if(!successors.size() && outstanding_rpcs->size()==0) {        CREATE_REAPER(ci->fa->stattype); // returns      }      goto next_candidate;    }    // node was ok    assert(ci->fr->results.size() <= Kademlia::k_tell);    record_stat(ci->fa->stattype, ci->fr->results.size(), 0);    if(!Kademlia::learn_stabilize_only)      update_k_bucket(ci->ki.id, ci->ki.ip, now() - ci->before);    // put all the ones better than what we knew so far in successors, provided    // we haven't asked them already.    if(successors.size())      last_before_merge = successors.rbegin()->id;    for(unsigned i=0; i<ci->fr->results.size(); i++) {      k_nodeinfo ki = ci->fr->results[i];      if( Kademlia::death_notification ) {	if( (*who_told_me)[ki.id] == NULL ) {	  who_told_me->insert( ki.id, New vector<IPAddress> );	}	((*who_told_me)[ki.id])->push_back( ci->ki.ip );      }      if(asked.find_pair(ki.id))        continue;      successors.insert(ki);      if(!hops.find_pair(ki.id))        hops.insert(ki.id, ci->fr->hops+1);    }    // cut out elements beyond index k    while(successors.size() > Kademlia::k)      successors.erase(*successors.rbegin());    // found the key    ki = *successors.begin();    if(ki.id == fargs->key) {      fresult->hops = hops[ki.id];      fresult->succ = ki;      fresult->timeouts = timeouts[last_returned_alpha];      stat_type st = ci->fa->stattype;      delete ci;      CREATE_REAPER(st); // returns    }    // if our standing hasn't improved,    improved = (distance(successors.begin()->id, fargs->key) < distance(fresult->succ.id, fargs->key));    useless_replies = improved ? 0 : useless_replies++;    // if the last alpha RPCs didn't yield anything better, give up.  I don't    // think this should ever happen.  It means we went down a dead end alpha    // times.    //    // XXX: Thomer completely pulled this out of his ass.    if(!successors.size() || useless_replies > Kademlia::alpha) {      stat_type st = ci->fa->stattype;      delete ci;      CREATE_REAPER(st); // returns    }    delete ci;next_candidate:    // assert((unsigned) outstanding_rpcs->size() == Kademlia::alpha - 1);    // XXX: wrong.  messes up last_alpha_rpc    while((outstanding_rpcs->size() < (int) Kademlia::alpha) && successors.size()) {      k_nodeinfo front = *successors.begin();      if(Kademlia::distance(front.id, fargs->key) < Kademlia::distance(fresult->succ.id, fargs->key))        fresult->succ = front;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -