📄 kelips.c
字号:
// ip: someone random we know about.// ip1: random contact of ip in the target group.// ip2: who ip1 thinks has the key.boolKelips::lookup2(lookup_args *a) { vector<IPAddress> l = all(); if(l.size() < 1) return false; IPAddress ip = l[random() % l.size()]; IPAddress ip1 = 0; bool ok = xRPC(ip, 2, &Kelips::handle_lookup2, &(a->key), &ip1, STAT_LOOKUP, &(a->total_to),&(a->num_to)); a->history.push_back(ip); if(!ok || ip1 == 0 || (now()-a->start >= _max_lookup_time)) return false; IPAddress ip2 = 0; assert(id2group(a->key) == id2group(ip1)); //ok = xRPC(ip1, 2, &Kelips::handle_lookup1, &(a->key), &ip2, STAT_LOOKUP, &(a->total_to),&(a->num_to)); lookup1_args aaa; aaa.key = a->key; aaa.dst_ip = ip1; ok = xRPC(ip1, 2, &Kelips::handle_lookup1, &aaa, &ip2, STAT_LOOKUP, &(a->total_to),&(a->num_to)); a->history.push_back(ip1); if(!ok || ip2 == 0 || (now()-a->start>=_max_lookup_time)) return false; bool done = false; ok = xRPC(ip2, 2, &Kelips::handle_lookup_final, &(a->key), &done, STAT_LOOKUP, &(a->total_to),&(a->num_to)); a->history.push_back(ip2); return(ok && done &&(now()-a->start<_max_lookup_time));}// Someone in our group wants us to return them a// random contact in the key's group.voidKelips::handle_lookup2(ID *kp, IPAddress *res){ if((*res = find_by_id(*kp)) != 0) return; vector<IPAddress> cl = grouplist(id2group(*kp)); if(cl.size() > 0) *res = cl[random() % cl.size()]; else *res = 0;#if 0 printf("%qd %d handle_lookup2(%qd) %d\n", now(), ip(), *kp, *res);#endif}// Do we have the given node/key ID in our local state?IPAddressKelips::find_by_id(ID key){ vector<IPAddress> l = all(); for(u_int i = 0; i < l.size(); i++) if(ip2id(l[i]) == key) return l[i]; return 0;}// Someone outside the group is asking us which node is// responsible for the given key.void//Kelips::handle_lookup1(ID *kp, IPAddress *res)Kelips::handle_lookup1(lookup1_args *kp, IPAddress *res){ //ID key = *kp; ID key = kp->key; assert(Network::Instance()->alive(kp->dst_ip)); assert(ip() == kp->dst_ip); assert(id2group(key) == group()); if(id() == key){ *res = ip(); return; } *res = find_by_id(key);#if 0 if(*res == 0) printf("%qd %d handle_lookup1(%qd) failed\n", now(), ip(), key);#endif}voidKelips::handle_lookup_final(ID *kp, bool *done){ if(*kp == id()){ *done = true; } else { *done = false; }#if 0 printf("%qd %d handle_lookup_final(%qd) %s\n", now(), ip(), *kp, *done == true ? "ok" : "OOPS");#endif}voidKelips::insert(Args *a){}// A new node is asking us to tell it about some random// existing nodes. This is basically "pull" gossip.// Only the well-known-node should receive this RPC.// It remembers the caller to help seed its _info.voidKelips::handle_join(IPAddress *caller, vector<Info> *ret){ gotinfo(Info(*caller, now()), -1); // XXX caller should supply an Info // send a super-big ration on join, per Indranil's e-mail. *ret = gossip_msg(ip2group(*caller), 20, 20);}// This node has just learned about another node.// Remember the information, prepare to gossip it.// Enforce the invariant that we have at most 2 contacts// for each foreign group.// If rtt != -1, it's newly measured.voidKelips::gotinfo(Info i, int rtt){ if(i._ip == ip()) return; assert(i._ip); if(_info.find(i._ip) == _info.end()){ int g = ip2group(i._ip); bool add = false; if(g == group()){ add = true; } else { IPAddress x = victim(g); // pick the lamest contact to replace. assert(x == 0 || ip2group(x) == g); if(x == 0){ add = true; } else if(i.age() < 4 * _info[x]->age()){ Info *in = _info[x]; assert(in); _info.erase(x); delete in; add = true; } } if(add){ _info[i._ip] = New Info(i); _info[i._ip]->_rounds = _item_rounds; _info[i._ip]->_rtt = -1; } } else if (i._heartbeat > _info[i._ip]->_heartbeat){ _info[i._ip]->_heartbeat = i._heartbeat; if(_info[i._ip]->_rtt == 9999){ // we once got an RPC timeout, but node seems to have restarted. _info[i._ip]->_rtt = -1; } } if(rtt != -1 && _info.find(i._ip) != _info.end()) _info[i._ip]->_rtt = rtt;}// Return a list of all the IP addresses in _info.vector<IPAddress>Kelips::all(){ vector<IPAddress> l; for(map<IPAddress, Info *>::const_iterator ii = _info.begin(); ii != _info.end(); ++ii){ l.push_back(ii->first); } return l;}// Return the list of the IP addresses in _info in our group.vector<IPAddress>Kelips::grouplist(int g){ vector<IPAddress> l; for(map<IPAddress, Info *>::const_iterator ii = _info.begin(); ii != _info.end(); ++ii){ if(ip2group(ii->second->_ip) == g) l.push_back(ii->first); } return l;}// Return the list of the IP addresses *not* in our group.vector<IPAddress>Kelips::notgrouplist(int g){ vector<IPAddress> l; for(map<IPAddress, Info *>::const_iterator ii = _info.begin(); ii != _info.end(); ++ii){ if(ip2group(ii->second->_ip) != g) l.push_back(ii->first); } return l;}// Given a list of nodes, limit it to ones that are new (or old).// "new" means _rounds > 0.vector<IPAddress>Kelips::newold(vector<IPAddress> a, bool xnew){ vector<IPAddress> b; for(u_int i = 0; i < a.size(); i++) if((_info[a[i]]->_rounds > 0) == xnew) b.push_back(a[i]); return b;}// Randomize the order of a list of nodes.vector<IPAddress>Kelips::randomize(vector<IPAddress> a){ if(a.size() < 1) return a; for(u_int i = 0; i < a.size(); i++){ int j = random() % a.size(); IPAddress tmp = a[i]; a[i] = a[j]; a[j] = tmp; } return a;}// Given a list of nodes in l, add a ration of them to msg,// half new and the rest old.voidKelips::newold_msg(vector<Info> &msg, vector<IPAddress> l, u_int ration){ u_int n = 0; { // Half the ration for newly learned nodes. vector<IPAddress> nl = randomize(newold(l, true)); for(u_int i = 0; n <= ration / 2 && i < nl.size(); i++, n++){ Info *ip = _info[nl[i]]; assert(ip->_rounds > 0); ip->_rounds -= 1; msg.push_back(*ip); } } { // The remainder of the ration for existing nodes. vector<IPAddress> ol = randomize(newold(l, false)); for(u_int i = 0; n < ration && i < ol.size(); i++, n++){ Info *ip = _info[ol[i]]; assert(ip->_rounds == 0); msg.push_back(*ip); } }}// Create a gossip message.// Always include an entry for ourselves.// g allows targeted "pull" gossip during join, critical// to avoid disconnection!// XXX ought to send newer heartbeats preferentially????vector<Kelips::Info>Kelips::gossip_msg(int g, u_int gr, u_int cr){ vector<Info> msg; // Include this node w/ new heartbeat in every gossip. msg.push_back(Info(ip(), now())); // Add some nodes from our group. newold_msg(msg, grouplist(g), gr); // Add some contact nodes. newold_msg(msg, notgrouplist(g), cr); assert(msg.size() <= 1 + gr + cr); return msg;}voidKelips::handle_ping(void *xx, void *yy){}// One round of gossiping.// Pick a few items to send, and send them to a few other nodes.// XXX ought to send to nearby nodes preferentially (Section 2.1).voidKelips::gossip(void *junk){ if(_live){ vector<Info> msg = gossip_msg(group(), _group_ration, _contact_ration); { vector<IPAddress> gl = randomize(grouplist(group())); for(u_int i = 0; i < _group_targets && i < gl.size(); i++){ //since each gossip contains the heartbeat timer for a node, add extra 4 byte xRPC(gl[i], 2 * msg.size(), &Kelips::handle_gossip, &msg, (void *) 0); } } { vector<IPAddress> cl = randomize(notgrouplist(group())); for(u_int i = 0; i < _contact_targets && i < cl.size(); i++){ xRPC(cl[i], 2 * msg.size(), &Kelips::handle_gossip, &msg, (void *) 0); } } // ping one random node to find its RTT. { vector<IPAddress> l = all(); for(int iters = 0; l.size() > 0 && iters < 10; iters++){ IPAddress xip = l[random() % l.size()]; if(_info[xip]->_rtt == -1){ xRPC(xip, 2, &Kelips::handle_ping, (void*)0, (void*)0); break; } } } } delaycb(random() % (2 *_round_interval), &Kelips::gossip, (void *) 0);}voidKelips::handle_gossip(vector<Info> *msg, void *ret){ u_int i; for(i = 0; i < msg->size(); i++){ gotinfo((*msg)[i], -1); }}// Periodically get rid of _info entries that have// expired heartbeats.voidKelips::purge(void *junk){ vector<IPAddress> l = all(); for(u_int i = 0; i < l.size(); i++){ Info *in = _info[l[i]]; int to = (ip2group(in->_ip) == group() ? _timeout : 2*_timeout); if(in->_heartbeat + to < now()){#if 0 printf("%qd %d timed out %d %s\n", now(), ip(), in->_ip, node_key_alive(ip2id(in->_ip)) ? "oops" : "ok");#endif _info.erase(l[i]); delete in; } } delaycb(_purge_time, &Kelips::purge, (void *) 0);}// Called by KelipsObserver::init_state() with the complete list// of nodes to help us initialize our routing tables faster (i.e. cheat).// So in most nodes it's called before join().voidKelips::init_state(const set<Node*> *lid){ for(set<Node*>::const_iterator i = lid->begin(); i != lid->end(); ++i) { Kelips *k = dynamic_cast<Kelips*>(*i); assert(k); if(k->ip() == ip()) continue; Time rtt = 2 * Network::Instance()->gettopology()->latency(ip(), k->ip()); gotinfo(Info(k->ip(), now()), rtt); }}// KelipsObserver wants to know if we've stabilized.// lid is a list of all live nodes.boolKelips::stabilized(vector<ID> lid){ // Do we know about all nodes in our own group? for(u_int i = 0; i < lid.size(); i++) if(id2group(lid[i]) == group() && find_by_id(lid[i])) return false; // Do we know of two contacts from each other group? int *cc = (int *) malloc(_k * sizeof(int)); memset(cc, '\0', _k * sizeof(int)); for(u_int i = 0; i < lid.size(); i++) if(find_by_id(lid[i])) cc[id2group(lid[i])] += 1; for(int i = 0; i < _k; i++){ if(cc[i] < _n_contacts){ delete cc; return false; } } delete cc; return true;}// Kelips just did an RPC. The request contained nsent IDs,// the reply contained nrecv IDs. Update the RPC statistics.// Jinyang/Jeremy convention:// 20 bytes header, 4 bytes/ID, 1 byte/other// (Kelips paper says 40 bytes per gossip entry...)voidKelips::rpcstat(bool ok, IPAddress dst, int latency, int nitems, uint type){ if (Node::collect_stat()) { _rpc_bytes += 20 + nitems * 4; // paper says 40 bytes per node entry record_bw_stat( type, nitems, 0 ); if(ok) { _rpc_bytes += 20; record_bw_stat( type, 0, 0 ); } } if(ok) gotinfo(Info(dst, now()), latency); if(ok == false && _info.find(dst) != _info.end()){ _info[dst]->_rtt = 9999; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -