⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 kelips.c

📁 P2P模拟器
💻 C
📖 第 1 页 / 共 2 页
字号:
// ip: someone random we know about.// ip1: random contact of ip in the target group.// ip2: who ip1 thinks has the key.boolKelips::lookup2(lookup_args *a) {  vector<IPAddress> l = all();  if(l.size() < 1)    return false;  IPAddress ip = l[random() % l.size()];  IPAddress ip1 = 0;  bool ok = xRPC(ip, 2, &Kelips::handle_lookup2, &(a->key), &ip1, STAT_LOOKUP, &(a->total_to),&(a->num_to));  a->history.push_back(ip);  if(!ok || ip1 == 0 || (now()-a->start >= _max_lookup_time))    return false;  IPAddress ip2 = 0;  assert(id2group(a->key) == id2group(ip1));  //ok = xRPC(ip1, 2, &Kelips::handle_lookup1, &(a->key), &ip2, STAT_LOOKUP, &(a->total_to),&(a->num_to));  lookup1_args aaa;  aaa.key = a->key;  aaa.dst_ip = ip1;  ok = xRPC(ip1, 2, &Kelips::handle_lookup1, &aaa, &ip2, STAT_LOOKUP, &(a->total_to),&(a->num_to));  a->history.push_back(ip1);  if(!ok || ip2 == 0 || (now()-a->start>=_max_lookup_time))    return false;  bool done = false;  ok = xRPC(ip2, 2, &Kelips::handle_lookup_final, &(a->key), &done, STAT_LOOKUP, &(a->total_to),&(a->num_to));  a->history.push_back(ip2);  return(ok && done &&(now()-a->start<_max_lookup_time));}// Someone in our group wants us to return them a// random contact in the key's group.voidKelips::handle_lookup2(ID *kp, IPAddress *res){  if((*res = find_by_id(*kp)) != 0)    return;  vector<IPAddress> cl = grouplist(id2group(*kp));  if(cl.size() > 0)    *res = cl[random() % cl.size()];  else    *res = 0;#if 0  printf("%qd %d handle_lookup2(%qd) %d\n",         now(), ip(), *kp, *res);#endif}// Do we have the given node/key ID in our local state?IPAddressKelips::find_by_id(ID key){  vector<IPAddress> l = all();  for(u_int i = 0; i < l.size(); i++)    if(ip2id(l[i]) == key)      return l[i];  return 0;}// Someone outside the group is asking us which node is// responsible for the given key.void//Kelips::handle_lookup1(ID *kp, IPAddress *res)Kelips::handle_lookup1(lookup1_args *kp, IPAddress *res){  //ID key = *kp;  ID key = kp->key;  assert(Network::Instance()->alive(kp->dst_ip));  assert(ip() == kp->dst_ip);  assert(id2group(key) == group());  if(id() == key){    *res = ip();    return;  }  *res = find_by_id(key);#if 0  if(*res == 0)    printf("%qd %d handle_lookup1(%qd) failed\n", now(), ip(), key);#endif}voidKelips::handle_lookup_final(ID *kp, bool *done){  if(*kp == id()){    *done = true;  } else {    *done = false;  }#if 0  printf("%qd %d handle_lookup_final(%qd) %s\n",         now(), ip(), *kp, *done == true ? "ok" : "OOPS");#endif}voidKelips::insert(Args *a){}// A new node is asking us to tell it about some random// existing nodes. This is basically "pull" gossip.// Only the well-known-node should receive this RPC.// It remembers the caller to help seed its _info.voidKelips::handle_join(IPAddress *caller, vector<Info> *ret){  gotinfo(Info(*caller, now()), -1); // XXX caller should supply an Info  // send a super-big ration on join, per Indranil's e-mail.  *ret = gossip_msg(ip2group(*caller), 20, 20);}// This node has just learned about another node.// Remember the information, prepare to gossip it.// Enforce the invariant that we have at most 2 contacts// for each foreign group.// If rtt != -1, it's newly measured.voidKelips::gotinfo(Info i, int rtt){  if(i._ip == ip())    return;  assert(i._ip);  if(_info.find(i._ip) == _info.end()){    int g = ip2group(i._ip);    bool add = false;    if(g == group()){      add = true;    } else {      IPAddress x = victim(g); // pick the lamest contact to replace.      assert(x == 0 || ip2group(x) == g);      if(x == 0){        add = true;      } else if(i.age() < 4 * _info[x]->age()){        Info *in = _info[x];        assert(in);        _info.erase(x);        delete in;        add = true;      }    }    if(add){      _info[i._ip] = New Info(i);      _info[i._ip]->_rounds = _item_rounds;      _info[i._ip]->_rtt = -1;    }  } else if (i._heartbeat > _info[i._ip]->_heartbeat){    _info[i._ip]->_heartbeat = i._heartbeat;    if(_info[i._ip]->_rtt == 9999){      // we once got an RPC timeout, but node seems to have restarted.      _info[i._ip]->_rtt = -1;    }  }  if(rtt != -1 && _info.find(i._ip) != _info.end())    _info[i._ip]->_rtt = rtt;}// Return a list of all the IP addresses in _info.vector<IPAddress>Kelips::all(){  vector<IPAddress> l;  for(map<IPAddress, Info *>::const_iterator ii = _info.begin();      ii != _info.end();      ++ii){    l.push_back(ii->first);  }  return l;}// Return the list of the IP addresses in _info in our group.vector<IPAddress>Kelips::grouplist(int g){  vector<IPAddress> l;  for(map<IPAddress, Info *>::const_iterator ii = _info.begin();      ii != _info.end();      ++ii){    if(ip2group(ii->second->_ip) == g)      l.push_back(ii->first);  }  return l;}// Return the list of the IP addresses *not* in our group.vector<IPAddress>Kelips::notgrouplist(int g){  vector<IPAddress> l;  for(map<IPAddress, Info *>::const_iterator ii = _info.begin();      ii != _info.end();      ++ii){    if(ip2group(ii->second->_ip) != g)      l.push_back(ii->first);  }  return l;}// Given a list of nodes, limit it to ones that are new (or old).// "new" means _rounds > 0.vector<IPAddress>Kelips::newold(vector<IPAddress> a, bool xnew){  vector<IPAddress> b;  for(u_int i = 0; i < a.size(); i++)    if((_info[a[i]]->_rounds > 0) == xnew)      b.push_back(a[i]);  return b;}// Randomize the order of a list of nodes.vector<IPAddress>Kelips::randomize(vector<IPAddress> a){  if(a.size() < 1)    return a;  for(u_int i = 0; i < a.size(); i++){    int j = random() % a.size();    IPAddress tmp = a[i];    a[i] = a[j];    a[j] = tmp;  }  return a;}// Given a list of nodes in l, add a ration of them to msg,// half new and the rest old.voidKelips::newold_msg(vector<Info> &msg, vector<IPAddress> l, u_int ration){  u_int n = 0;  {    // Half the ration for newly learned nodes.    vector<IPAddress> nl = randomize(newold(l, true));    for(u_int i = 0; n <= ration / 2 && i < nl.size(); i++, n++){      Info *ip = _info[nl[i]];      assert(ip->_rounds > 0);      ip->_rounds -= 1;      msg.push_back(*ip);    }  }  {    // The remainder of the ration for existing nodes.    vector<IPAddress> ol = randomize(newold(l, false));    for(u_int i = 0; n < ration && i < ol.size(); i++, n++){      Info *ip = _info[ol[i]];      assert(ip->_rounds == 0);      msg.push_back(*ip);    }  }}// Create a gossip message.// Always include an entry for ourselves.// g allows targeted "pull" gossip during join, critical// to avoid disconnection!// XXX ought to send newer heartbeats preferentially????vector<Kelips::Info>Kelips::gossip_msg(int g, u_int gr, u_int cr){  vector<Info> msg;  // Include this node w/ new heartbeat in every gossip.  msg.push_back(Info(ip(), now()));  // Add some nodes from our group.  newold_msg(msg, grouplist(g), gr);  // Add some contact nodes.  newold_msg(msg, notgrouplist(g), cr);  assert(msg.size() <= 1 + gr + cr);  return msg;}voidKelips::handle_ping(void *xx, void *yy){}// One round of gossiping.// Pick a few items to send, and send them to a few other nodes.// XXX ought to send to nearby nodes preferentially (Section 2.1).voidKelips::gossip(void *junk){  if(_live){    vector<Info> msg = gossip_msg(group(), _group_ration, _contact_ration);    {      vector<IPAddress> gl = randomize(grouplist(group()));      for(u_int i = 0; i < _group_targets && i < gl.size(); i++){	//since each gossip contains the heartbeat timer for a node, add extra 4 byte        xRPC(gl[i], 2 * msg.size(), &Kelips::handle_gossip, &msg, (void *) 0);      }    }    {      vector<IPAddress> cl = randomize(notgrouplist(group()));      for(u_int i = 0; i < _contact_targets && i < cl.size(); i++){        xRPC(cl[i], 2 * msg.size(), &Kelips::handle_gossip, &msg, (void *) 0);      }    }    // ping one random node to find its RTT.    {      vector<IPAddress> l = all();      for(int iters = 0; l.size() > 0 && iters < 10; iters++){        IPAddress xip = l[random() % l.size()];        if(_info[xip]->_rtt == -1){          xRPC(xip, 2, &Kelips::handle_ping, (void*)0, (void*)0);          break;        }      }    }  }  delaycb(random() % (2 *_round_interval), &Kelips::gossip, (void *) 0);}voidKelips::handle_gossip(vector<Info> *msg, void *ret){  u_int i;  for(i = 0; i < msg->size(); i++){    gotinfo((*msg)[i], -1);  }}// Periodically get rid of _info entries that have// expired heartbeats.voidKelips::purge(void *junk){  vector<IPAddress> l = all();  for(u_int i = 0; i < l.size(); i++){    Info *in = _info[l[i]];    int to =      (ip2group(in->_ip) == group() ? _timeout : 2*_timeout);    if(in->_heartbeat + to < now()){#if 0      printf("%qd %d timed out %d %s\n",             now(), ip(), in->_ip,             node_key_alive(ip2id(in->_ip)) ? "oops" : "ok");#endif      _info.erase(l[i]);      delete in;    }  }  delaycb(_purge_time, &Kelips::purge, (void *) 0);}// Called by KelipsObserver::init_state() with the complete list// of nodes to help us initialize our routing tables faster (i.e. cheat).// So in most nodes it's called before join().voidKelips::init_state(const set<Node*> *lid){  for(set<Node*>::const_iterator i = lid->begin(); i != lid->end(); ++i) {    Kelips *k = dynamic_cast<Kelips*>(*i);    assert(k);    if(k->ip() == ip())      continue;    Time rtt = 2 * Network::Instance()->gettopology()->latency(ip(), k->ip());    gotinfo(Info(k->ip(), now()), rtt);  }}// KelipsObserver wants to know if we've stabilized.// lid is a list of all live nodes.boolKelips::stabilized(vector<ID> lid){  // Do we know about all nodes in our own group?  for(u_int i = 0; i < lid.size(); i++)    if(id2group(lid[i]) == group() && find_by_id(lid[i]))      return false;  // Do we know of two contacts from each other group?  int *cc = (int *) malloc(_k * sizeof(int));  memset(cc, '\0', _k * sizeof(int));  for(u_int i = 0; i < lid.size(); i++)    if(find_by_id(lid[i]))      cc[id2group(lid[i])] += 1;  for(int i = 0; i < _k; i++){    if(cc[i] < _n_contacts){      delete cc;      return false;    }  }  delete cc;  return true;}// Kelips just did an RPC. The request contained nsent IDs,// the reply contained nrecv IDs. Update the RPC statistics.// Jinyang/Jeremy convention://   20 bytes header, 4 bytes/ID, 1 byte/other// (Kelips paper says 40 bytes per gossip entry...)voidKelips::rpcstat(bool ok, IPAddress dst, int latency, int nitems, uint type){  if (Node::collect_stat()) {    _rpc_bytes += 20 + nitems * 4; // paper says 40 bytes per node entry    record_bw_stat( type, nitems, 0 );    if(ok) {      _rpc_bytes += 20;      record_bw_stat( type, 0, 0 );    }  }  if(ok)    gotinfo(Info(dst, now()), latency);  if(ok == false && _info.find(dst) != _info.end()){    _info[dst]->_rtt = 9999;  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -