📄 chord.c
字号:
//find a free rpc slot uint ii = 0; for (ii = 0; ii < parallel; ii++) { if (rpcslots[(rpc_i+ii)%parallel]->free) { rpc_i = (rpc_i+ii)%parallel; p = rpcslots[rpc_i]; p->free = false; break; } } assert(ii<parallel); p->link = h; totalrpc++; assert(h.to.ip > 0 && h.to.ip < 3000); //recorded.push_back(h); CDEBUG(2) << "key " << printID(key) << "sending to next hop node "<< h.to.ip << "," << printID(h.to.id) << endl; record_stat(me.ip,h.to.ip,type,1+na.deadnodes.size(),0); assert((h.to.ip == me.ip) || (h.to.ip!=h.from.ip)); rpc = asyncRPC(h.to.ip, &Chord::next_handler, &na, &(p->ret), TIMEOUT(me.ip,h.to.ip)); rpcset.insert(rpc); resultmap[rpc] = rpc_i; outstanding++; } assert(outstanding > 0); //fill out all my allowed parallel connections, wait for response before = now(); donerpc = rcvRPC(&rpcset, ok); rpc_i = resultmap[donerpc]; assert(rpc_i>=0 && rpc_i<parallel); if (a) a->latency += (now()-before); outstanding--; reuse = rpcslots[rpc_i]; assert(!reuse->free); reuse->free = true; //count statistics about hop count and timeouts num_hops[rpc_i]++; if (!ok) { num_timeouts[rpc_i]++; time_timeouts[rpc_i]+=TIMEOUT(me.ip,reuse->link.to.ip); } if (!alive()) goto DONE; if (ok) { record_stat(reuse->link.to.ip,me.ip,type,reuse->ret.done?reuse->ret.next.size():reuse->ret.v.size()); if (reuse->link.from.ip == me.ip) { assert(reuse->ret.dst.ip == reuse->link.to.ip); loctable->update_ifexists(reuse->ret.dst); } CDEBUG(2) << "key " << printID(key) << "outstanding " << outstanding << " deadsz " << na.deadnodes.size() << " from " << reuse->link.to.ip << "," << printID(reuse->link.to.id) << "done? " << (reuse->ret.done?1:0) << " nextsz " << reuse->ret.next.size() << " savefinishedsz " << savefinished.size() << endl; if ((reuse->link.from.ip == me.ip) && (!static_sim)) loctable->update_ifexists(reuse->link.to); if (reuse->ret.done) { lastfinished = reuse->link; goto DONE;//success } if (a && a->ipkey && reuse->link.to.ip == a->ipkey) { lastfinished = reuse->link; goto DONE; } //XXX: to be fixed, does not look like it will work for parallel lookup if ((reuse->ret.next.size() == 0 ) && (outstanding == 0)) {// && reuse->link.to.ip == lastfinished.to.ip) //this will fail coz i cannot mark this node as dead lastfinished = reuse->link; goto DONE;//failed } list<hop_info>::iterator iter = tasks.begin(); IDMap tmptmp; for (uint i = 0; i < reuse->ret.next.size(); i++) { tmptmp = reuse->ret.next[i]; if (asked.find(reuse->ret.next[i].ip) != asked.end()) continue; if (_learn) learn_info(reuse->ret.next[i]); if (ConsistentHash::betweenrightincl(reuse->ret.next[i].id, key, lastfinished.to.id)) continue; h.from = reuse->link.to; h.to = reuse->ret.next[i]; h.hop = reuse->link.hop + 1; while (iter != tasks.end()) { if (ConsistentHash::between(iter->to.id, key, reuse->ret.next[i].id)) break; iter++; } assert(h.to.ip != me.ip); tasks.insert(iter, h); asked[h.to.ip] = true; } // any improvement? if (na.retry && lastfinished.to.ip == reuse->link.to.ip) if ((tasks.size() == 0) || ConsistentHash::betweenrightincl(tasks.front().to.id, key, lastfinished.to.id)) goto DONE; //insert into the history of finished rpcs iter = savefinished.begin(); while (iter != savefinished.end()) { if (ConsistentHash::betweenrightincl(me.id, iter->to.id, reuse->link.to.id)) break; iter++; } if (iter == savefinished.end() || reuse->link.to.ip != iter->to.ip) savefinished.insert(iter, reuse->link); lastfinished = savefinished.back(); CDEBUG(2) << "key " << printID(key) << "from " << reuse->link.to.ip << "," << printID(reuse->link.to.id) << "next? " << (reuse->ret.next.size()>0?reuse->ret.next[0].ip:0) << "," << (reuse->ret.next.size()>0?printID(reuse->ret.next[0].id):0) << " task top " << (tasks.size()>0?tasks.front().to.ip:0) << "," << (tasks.size()>0?printID(tasks.front().to.id):0) << " tasksz " << tasks.size() << " lastfinished " << lastfinished.to.ip << "," << printID(lastfinished.to.id) << endl; } else { if (reuse->link.to.ip == me.ip) { //a very special wierd case, the node has gone down and up during the RPC goto DONE; } CDEBUG(2) << "key " << printID(key) << "outstanding " << outstanding << " deadsz " << na.deadnodes.size() << " from " << reuse->link.to.ip << "," << printID(reuse->link.to.id) << "DEAD savefinishedsz " << savefinished.size() << " lastfinished " << lastfinished.to.ip << endl; if (reuse->link.from.ip == me.ip) { if (_learn) to_be_replaced.push_back(reuse->link.to); } //notify alert_args *aa = New alert_args; aa->n = reuse->link.to; record_stat(me.ip,reuse->link.from.ip,type,1); assert(reuse->link.from.ip > 0 && reuse->link.from.ip < 3000); rpc = asyncRPC(reuse->link.from.ip, &Chord::alert_handler, aa,(void *)NULL); alertset.insert(rpc); alertmap[rpc] = aa; alertoutstanding++; if (ConsistentHash::betweenleftincl(lastfinished.to.id, key, reuse->link.to.id)) { na.deadnodes.push_back(reuse->link.to); } if (lastfinished.to.ip == reuse->link.to.ip) { assert(savefinished.size()>0); savefinished.pop_back(); CDEBUG(2) << "key " << printID(key) << "lastfinished " << lastfinished.to.ip << "," << printID(lastfinished.to.id) << "dead new last finished " << savefinished.back().to.ip << endl; lastfinished = savefinished.back(); } } if (a && a->latency >= _max_lookup_time) { goto DONE; } }DONE: //get rid the crap nodes that i've learned if (learntable) { for (uint i = 0; i < na.deadnodes.size(); i++) learntable->del_node(na.deadnodes[i],true); IDMap replacement; for (uint i = 0; i < to_be_replaced.size();i++) replace_node(to_be_replaced[i],replacement); } //jesus christ i'm done, however, i need to clean up my shit assert(reuse); assert(rpc_i >=0 && rpc_i<parallel); if (a) { a->num_to += num_timeouts[rpc_i]; a->total_to += time_timeouts[rpc_i]; a->hops += num_hops[rpc_i]; } if ((type == TYPE_USER_LOOKUP) && (alive())) { /* Topology *t = Network::Instance()->gettopology(); printf("%s lookup key %qx, hops %d totalrpc %d (lookup info hop %u, num_to %u, retry %u)\n", ts(), key, lastfinished.hop, totalrpc,a->hops,a->num_to,a->retrytimes); printf("%s key %qx route: ", ts(), key); IDMap last; last = lastfinished.to; for (uint i = recorded.size()-1; i >= 0; i--) { if (last.ip == me.ip) break; if (recorded[i].to.ip == last.ip) { printf("(%u,%qx %llu) ", recorded[i].to.ip, recorded[i].to.id, (uint)2*t->latency(me.ip, last.ip)); last = recorded[i].from; } } printf("\n"); */ if ((reuse->ret.done) && (reuse->ret.correct)) results = reuse->ret.v; }else { if (reuse->ret.done) results = reuse->ret.v; } if (lasthop) { *lasthop = reuse->link.to; } for (uint i = 0; i < outstanding; i++) { donerpc = rcvRPC(&rpcset, ok); rpc_i = resultmap[donerpc]; assert(rpc_i>=0 && rpc_i < parallel); reuse = rpcslots[rpc_i]; if (ok) record_stat(reuse->link.to.ip,me.ip,type,reuse->ret.done?reuse->ret.next.size():reuse->ret.v.size()); } for (uint i = 0; i < parallel; i++) { delete rpcslots[i]; } for (uint i = 0;i < alertoutstanding; i++) { donerpc = rcvRPC(&alertset, ok); alert_args *aa = alertmap[donerpc]; if (ok) record_stat(aa->n.ip,me.ip,type,0); delete alertmap[donerpc]; } return results;}/* the recursive query goes directly back to the sender. */voidChord::final_recurs_hop(next_recurs_args *args, next_recurs_ret *ret){ lookup_path tmp; tmp.n = me; tmp.tout = 0; ret->path.push_back(tmp); ret->finish_time = now();}vector<Chord::IDMap>Chord::find_successors_recurs(CHID key, uint m, uint type, IDMap *lasthop, lookup_args *a) { next_recurs_args fa; fa.key = key; fa.type = type; fa.m = m; fa.src = me; CDEBUG(3) << "find_successors_recurs start key " << printID(key) << endl; //do the parallel recursive lookup thing //doRPC(me.ip, &Chord::next_recurs_handler, args, ret); hash_map<unsigned, next_recurs_ret*> resultmap; next_recurs_ret *reuse = NULL; next_recurs_ret *p = NULL; uint outstanding, parallel; bool ok; Time before = now(); vector<IDMap> results; results.clear(); lookup_path tmp; unsigned rpc, donerpc; RPCSet rpcset; if (a) { fa.ipkey = a->ipkey; parallel = _parallel; } else { fa.ipkey = 0; parallel = 1; } fa.src = me; IDMap nexthop; nexthop.id = key; nexthop.ip = 0; outstanding = 0; assert(!_stopearly_overshoot || _parallel == 1); //i do not know how to do parallel lookup with stopearly and overshoot while (1) { if (!alive()) return results; IDMap succ = loctable->succ(me.id+1,LOC_HEALTHY); if (succ.ip == 0) { if (!_join_scheduled) { _join_scheduled++; delaycb(0, &Chord::join, (Args *)0); CDEBUG(2) << " find_successors_recurs key " << printID(key) << "no succ " << endl; } if (lasthop) *lasthop = me; return results; } while (outstanding < parallel) { if (_stopearly_overshoot) nexthop = me; else { nexthop = loctable->next_hop(nexthop.id-1); if(nexthop.ip == 0) { if (reuse) { delete reuse; reuse = NULL; } break; } } if (reuse) { p = reuse; reuse = NULL; }else { p = New next_recurs_ret; p->path.clear(); } p->correct = false; p->lasthop = me; p->v.clear(); p->finish_time = 0; if ((a && a->ipkey && me.ip == a->ipkey) || (ConsistentHash::between(me.id,succ.id,key))) { if (fa.m == 1) { p->v.push_back(succ); }else{ p->v = loctable->succs(key,fa.m,LOC_HEALTHY); } p->correct = check_correctness(key,p->v); p->finish_time = now(); reuse = p; goto RECURS_DONE; } if (nexthop.ip!=me.ip) { tmp.n = nexthop; tmp.tout = 0; p->path.push_back(tmp); } CDEBUG(2) << " key " << (a?a->ipkey:0) << "," << printID(key) << "via nexthop " << nexthop.ip << "," << printID(nexthop.id) << "outstanding " << outstanding << " parallel " << parallel << endl; record_stat(me.ip,nexthop.ip,type,1); p->nexthop = nexthop; p->prevhop = me; assert(!reuse); rpc = asyncRPC(nexthop.ip, &Chord::next_recurs_handler, &fa, p, TIMEOUT(me.ip,nexthop.ip)); rpcset.insert(rpc); resultmap[rpc] = p; outstanding++; } assert(outstanding>0); donerpc = rcvRPC(&rpcset, ok); outstanding--; reuse = resultmap[donerpc]; if (ok) { loctable->update_ifexists(reuse->nexthop); record_stat(reuse->nexthop.ip,me.ip,type,resultmap[donerpc]->v.size()); goto RECURS_DONE; }else{ //do a long check to see if next hop is really dead assert(reuse->path.size()>0); IDMap n = reuse->path[reuse->path.size()-1].n; CDEBUG(2) << " key " << (a?a->ipkey:0) << "," << printID(key) << "nexthop " << n.ip << "," << printID(n.id) << "failed outstanding " << outstanding <<endl; reuse->path[reuse->path.size()-1].tout=1; IDMap replacement; //if ((!_learn) || (!replace_node(n,replacement))) { int check = loctable->add_check(n); if (check == LOC_ONCHECK) { alert_args *tmp = New alert_args; tmp->n = n; tmp->dst = me.ip; delaycb(1, &Chord::alert_delete, tmp); } replace_node(n,replacement); //} } }RECURS_DONE: assert(reuse); if (lasthop) { *lasthop = (reuse->lasthop); } CDEBUG(1)<<" key " << printID(key) <<"finished lasthop "<< reuse->lasthop.ip <<","<< printID(reuse->lasthop.id) << "vsize " << reuse->v.size() <<" correct? "<< (reuse->correct?1:0) <<" hops "<<reuse->path.size()<<endl; if (a) { IDMap prev = me; if (_recurs_direct) { assert(reuse->finish_time > 0); a->latency += (reuse->finish_time-before); }else{ a->latency += (now()-before); } a->hops += reuse->path.size(); for (uint j= 0; j< reuse->path.size(); j++) { if (reuse->path[j].tout>0) { a->num_to++; a->total_to += TIMEOUT(prev.ip,reuse->path[j].n.ip); }else{ prev = reuse->path[j].n; } } } if (!a || reuse->correct) { results = reuse->v; }else{ results.clear(); } if (reuse->lasthop.ip && _learn) learn_info(reuse->lasthop); delete reuse; //garbage collection for (uint i = 0; i < outstanding; i++) { donerpc = rcvRPC(&rpcset, ok); if (ok) { loctable->update_ifexists(resultmap[donerpc]->nexthop); record_stat(resultmap[donerpc]->nexthop.ip,me.ip,type,resultmap[donerpc]->v.size()); } if ((_learn) && (resultmap[donerpc]->lasthop.ip)) learn_info(resultmap[donerpc]->lasthop); delete resultmap[donerpc]; } return results;}char *Chord::print_path(vector<lookup_path> &p, char *tmp){ char *begin = tmp; tmp += sprintf(tmp,"<"); for (uint i = 0; i < p.size(); i++) { tmp += sprintf(tmp, " %u",p[i].n.ip); } tmp += sprintf(tmp," >"); return begin;}//XXX: in a dynamic environment, the current implementation has a //wierd mechanism for implementing timeout.//it sends an empty reply to the sender to indicate that //some node currently holding the packet has died//in a real implemenation, the sender should implement some kind of //timeout mechanism.voidChord::next_recurs_handler(next_recurs_args *args, next_recurs_ret *ret){ vector<IDMap> succs; IDMap succ; bool r; lookup_path tmp; IDMap next; uint sz, i; check_static_init(); assert(alive()); if (_learn) { assert(args->src.ip>0 && args->src.ip < 100000); learn_info(args->src); if (ret->prevhop.ip!=args->src.ip) learn_info(ret->prevhop);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -