📄 chord.c
字号:
} Topology *t = Network::Instance()->gettopology(); CDEBUG(2) << " next_recurs key " << args->ipkey << "," << printID(args->key) << "arrived pathsz " << ret->path.size() <<" src "<< args->src.ip << endl; while (1) { if (!alive()) { ret->v.clear(); ret->correct = false; ret->finish_time = now(); ret->lasthop.ip = 0; ret->nexthop = me; return; } succs = loctable->succs(me.id+1,_nsucc,LOC_HEALTHY); if (succs.size() == 0) { //lookup failed //XXX do i need to backtrack? ret->v.clear(); vector<IDMap> tmp = loctable->succs(me.id+1,_nsucc,LOC_ONCHECK); //rejoin baby if ((!_join_scheduled) && (tmp.size()==0)) { _join_scheduled++; delaycb(0, &Chord::join, (Args *)0); CDEBUG(2) <<" next_recurs key "<< printID(args->key) << "no succ rejoin "<< endl; } ret->correct = false; ret->lasthop = me; if (args->type==TYPE_USER_LOOKUP && _recurs_direct) { record_stat(me.ip,args->src.ip,args->type,ret->v.size(),0); r = doRPC(args->src.ip, &Chord::final_recurs_hop, args, ret); if (r) record_stat(args->src.ip,me.ip,args->type,0); else ret->finish_time=now(); //who cares about finishtime now } ret->nexthop = me; return; } succ = succs[0]; next = loctable->next_hop(args->key); if ((_ipkey && me.ip == args->ipkey) || (ConsistentHash::betweenrightincl(me.id, succs[0].id, args->key))) { ret->v.clear(); sz = args->m < succs.size()? args->m : succs.size(); for (i = 0; i < sz; i++) { ret->v.push_back(succs[i]); if (ret->v.size() >= args->m) break; } if (args->type == TYPE_USER_LOOKUP) { ret->correct = check_correctness(args->key,ret->v); if (!ret->correct) CDEBUG(3) << "next_recurs_handler key " << printID(args->key) << " incorrect succ " << succs[0].ip << "," << printID(succs[0].id) << endl; } else ret->correct = true; ret->lasthop = me; if (_recurs_direct) { record_stat(me.ip,args->src.ip,args->type,ret->v.size(),0); r = doRPC(args->src.ip, &Chord::final_recurs_hop, args, ret); if (r) record_stat(args->src.ip,me.ip,args->type,0); else ret->finish_time=now(); //who cares about finishtime now } ret->nexthop = me; //the return path return; }else if (_stopearly_overshoot && ret->v.size()==0 && ConsistentHash::betweenrightincl(me.id, succs[succs.size()-1].id, args->key)) { assert(static_sim); uint start = 0; for (i = 0; i < succs.size(); i++) { if ( ConsistentHash::betweenrightincl(me.id, succs[i].id, args->key)) { ret->v.push_back(succs[i]); if (ret->v.size()>1) assert(ConsistentHash::between(args->key,ret->v[ret->v.size()-1].id,ret->v[0].id)); if (!start) start = i; } } if (ret->v.size() >= args->m) { assert(ret->v.size() <= _nsucc); ret->correct = check_correctness(args->key,ret->v); assert(ret->correct); if (_recurs_direct) { record_stat(me.ip,args->src.ip,args->type,1,0); assert(ret->v.size() >= args->m && args->m >= 7); r = doRPC(args->src.ip, &Chord::final_recurs_hop, args, ret); if (r) record_stat(args->src.ip,me.ip,args->type,0); } ret->nexthop = me; return; } int currsz = ret->v.size(); Time min_lat = 1000000; Time lat; for (i = 0; i < succs.size(); i++) { if ((currsz + i + 1)>= args->m) { lat = t->latency(me.ip, succs[i].ip); if (min_lat > lat) { min_lat = lat; next = succs[i]; } } } assert(min_lat < 1000000); }else if (_stopearly_overshoot && ret->v.size() > 0) { assert(_nsucc == succs.size()); assert(static_sim); //handling overshoot, just append my succ list uint start = 0; if (me.ip != ret->v[ret->v.size()-1].ip) { for (start = 0; start < succs.size(); start++) { if (succs[start].ip == ret->v[ret->v.size()-1].ip) { break; } } start++; } if (start > succs.size()) { start = 0; while (start < succs.size()) { if (ConsistentHash::between(me.id,succs[start].id,args->key) && ConsistentHash::between(args->key,succs[start].id, ret->v[ret->v.size()-1].id)) break; start++; } } IDMap tmp,tmp2,tmp3,tmp4; for (i = start; i < succs.size(); i++) { tmp = succs[i]; if (i>1) { tmp2 = succs[i-1]; } tmp3 = ret->v[ret->v.size()-1]; tmp4 = ret->v[0]; ret->v.push_back(succs[i]); if (ret->v.size()>1) assert(ConsistentHash::between(args->key,ret->v[ret->v.size()-1].id,ret->v[0].id)); if (ret->v.size() >= args->m) { assert(ret->v.size() <= _nsucc); ret->correct = check_correctness(args->key,ret->v); assert(ret->correct); if (_recurs_direct) { record_stat(me.ip,args->src.ip,args->type,1,0); assert(ret->v.size() >= args->m); r = doRPC(args->src.ip, &Chord::final_recurs_hop, args, ret); if (r) record_stat(args->src.ip,me.ip,args->type,0); } ret->nexthop = me; return; } } assert(0); } else { if (ret->path.size() >= 30) { printf("WARNING: path too long for key %qx m %d %s: ", args->key, args->m, ts()); for (uint i = 0; i < ret->path.size(); i++) { printf("(%u,%qx,%u) ", ret->path[i].n.ip, ret->path[i].n.id, ret->path[i].tout); } printf("\n"); if (_recurs_direct) { ret->finish_time = now(); //not correct, but what the heck } ret->correct = false; ret->v.clear(); ret->nexthop = me; return; } } assert(_stopearly_overshoot|| (next.ip != me.ip && ConsistentHash::between(me.id, args->key, next.id))); tmp.n = next; tmp.tout = 0; ret->path.push_back(tmp); record_stat(me.ip,next.ip,args->type,1); ret->nexthop = next; ret->prevhop = me; r = doRPC(next.ip, &Chord::next_recurs_handler, args, ret, TIMEOUT(me.ip,next.ip)); if (_learn && !_recurs_direct) { if (ret->lasthop.ip) learn_info(ret->lasthop); for (uint x = 0; x < ret->v.size(); x++) learn_info(ret->v[x]); } if (!_recurs_direct && (!alive())) { CDEBUG(3) << " next_recurs_handler lost key " << printID(args->key)<<endl; ret->lasthop.ip = 0; ret->v.clear(); ret->nexthop = me; ret->correct = false; if (!ret->finish_time) ret->finish_time = now(); return; } if (r) { if (!_recurs_direct) { record_stat(next.ip,me.ip,args->type,ret->v.size()); }else{ record_stat(next.ip,me.ip,args->type,0); } if (!static_sim) loctable->update_ifexists(ret->nexthop); ret->nexthop = me; return; }else{ CDEBUG(3) << "next_recurs_handler key " << printID(args->key) << " nexthop " << next.ip << "," << printID(next.id) << endl; if (vis) printf ("vis %llu delete %16qx %16qx succ %u,%qx\n", now (), me.id, next.id, succ.ip,succ.id); ret->path[ret->path.size()-1].tout = 1; //do a long check to see if next hop is really dead IDMap replacement; if ((!_learn) || (!replace_node(next,replacement))) { int check = loctable->add_check(next); if (check == LOC_ONCHECK) { alert_args *tmp = New alert_args; tmp->n = next; tmp->dst = me.ip; delaycb(1, &Chord::alert_delete, tmp); } } } }}voidChord::null_handler (void *args, IDMap *ret) { return;}voidChord::alert_delete(alert_args *aa){ if (aa->dst != me.ip) record_stat(me.ip,aa->dst,TYPE_MISC,1); doRPC(aa->dst, &Chord::alert_handler, aa, (void *)NULL, TIMEOUT(me.ip,aa->dst)); delete aa;}// Always attempt to go to the predecessor for the key regardless of mvoidChord::next_handler(next_args *args, next_ret *ret){ if (_learn) learn_info(args->src); ret->dst = me; check_static_init(); /* don't trust other people's information about my dead neighbors coz of non-transitivity however, in case this is a retried query, take it more seriously*/ /* if ((args->retry) && (s >= 0)) { for (s = 0; s < (int)succs.size(); s++) { for (j = 0; j < (int)args->deadnodes.size(); j++) { if (succs[s].ip == args->deadnodes[j].ip) { alert_args *tmp = New alert_args; tmp->n = succs[s]; tmp->dst = me.ip; delaycb(1, &Chord::alert_delete, tmp); break; } } if (j >= (int) args->deadnodes.size()) { break; } } } */ if (args->deadnodes.size() > 0) { for (uint i = 0; i < args->deadnodes.size(); i++) { int check = loctable->add_check(args->deadnodes[i]); if (check == -1 || check == LOC_DEAD) { } else { alert_args *tmp = New alert_args; tmp->n = args->deadnodes[i]; tmp->dst = me.ip; delaycb(1, &Chord::alert_delete, tmp); } } } int s; vector<IDMap> succs = loctable->succs(me.id+1, _nsucc, LOC_HEALTHY); if (succs.size() > 0) s = 0; else s = -1; assert((!static_sim) || succs.size() >= _allfrag); ret->v.clear(); ret->next.clear(); if (s < 0) { ret->lastnode = me; ret->done = true; vector<IDMap> tmp = loctable->succs(me.id+1,_nsucc,LOC_ONCHECK); CDEBUG(3) << "next_handler key " << printID(args->key) << " failed due to newjoin deadsz " << args->deadnodes.size() << endl; ret->correct = false; //rejoin baby if ((!_join_scheduled) && (tmp.size()==0)){ _join_scheduled++; delaycb(0, &Chord::join, (Args *)0); } } else if (ConsistentHash::betweenrightincl(me.id, succs[s].id, args->key)) { //XXX: need to take care of < m nodes situation in future for (int i = s; i < (int)succs.size(); i++) { ret->v.push_back(succs[i]); if (ret->v.size() >= args->m) break; } ret->lastnode = me; ret->done = true; if (args->type == TYPE_USER_LOOKUP) { ret->correct = check_correctness(args->key,ret->v); if (!ret->correct) CDEBUG(3) << "key " << printID(args->key) << "incorrect succ " << succs[0].ip << "," << printID(succs[0].id) << endl; }else ret->correct = true; //i don't check correctness for non-user lookups } else { ret->done = false; ret->next.clear(); ret->next = loctable->next_hops(args->key,args->alpha); }}// External event that tells a node to contact the well-known node// and try to join.voidChord::join(Args *args){ if (static_sim) { if ((args) && (!_inited)) notifyObservers((ObserverInfo *)"join"); _inited = true; return; } if (args) { me.ip = ip(); if (_random_id) me.id = ConsistentHash::getRandID(); else me.id = ConsistentHash::ip2chid(me.ip); me.timestamp = 0; loctable->init(me); if (_learn) learntable->init(me); _last_join_time = now(); ChordObserver::Instance(NULL)->addnode(me); notifyObservers((ObserverInfo *)"join"); _join_scheduled++; // XXX: Thomer says: not necessary // node()->set_alive(); //if args is NULL, it's an internal join CDEBUG(1) << "start to join " << endl; }else{ if (!alive()) { _join_scheduled--; return; } CDEBUG(2) << "rescheduled join " << endl; } if (vis) { printf("vis %llu join %16qx\n", now (), me.id); } if (!_wkn.ip) { assert(args); _wkn.ip = args->nget<IPAddress>("wellknown"); assert (_wkn.ip); _wkn.id = dynamic_cast<Chord *>(Network::Instance()->getnode(_wkn.ip))->id(); } find_successors_args fa; find_successors_ret fr; //fa.key = me.id + 1; fa.key = me.id - 1; //try to get multiple successors in case some failed assert(_nsucc > 3); fa.m = _nsucc; bool ok = failure_detect(_wkn, &Chord::find_successors_handler, &fa, &fr, TYPE_JOIN_LOOKUP,1,0); assert(_wkn.ip == fr.dst.ip); _wkn = fr.dst; joins++; if (ok) record_stat(_wkn.ip,me.ip,TYPE_JOIN_LOOKUP, fr.v.size()); if (!alive()) { _join_scheduled--; return; } if (!ok || fr.v.size() < 1) { CDEBUG(2) << "join failed retry later" << endl; _join_scheduled--; if (!_join_scheduled) { delaycb(200, &Chord::join, (Args *)0); _join_scheduled++; } return; } assert (ok); for (uint i = 0; i < fr.v.size(); i++) { if (fr.v[i].ip != me.ip) loctable->add_node(fr.v[i],true); _inited = true; } if (fr.v.size()>1) _last_succlist_stabilized = now(); CDEBUG(1) << "joined succ " << fr.v[0].ip << "," << printID(fr.v[0].id) << endl; if (!_stab_basic_running) { _stab_basic_running = true; delaycb(0, &Chord::reschedule_basic_stabilizer, (void *) 0); }else{ Chord::stabilize(); } if ((loctable->size() < 2) && (alive())) { CDEBUG(2) << "after stabilize join failed" << endl; _join_scheduled--; if (!_join_scheduled) { delaycb(200, &Chord::join, (Args *)0); _join_scheduled++; return; } } _join_scheduled--;}voidChord::reschedule_basic_stabilizer(void *x){ assert(!static_sim); if (!alive()) { _stab_basic_running = false; CDEBUG(3) << "node dead cancel stabilizing " << endl; return; } _stab_basic_running = true; if (_stab_basic_outstanding > 0) { assert(0); }else{ _stab_basic_outstanding++; stabilize(); _stab_basic_outstanding--; assert(_stab_basic_outstanding == 0); } delaycb(_stab_basic_timer, &Chord::reschedule_basic_stabilizer, (void *) 0);}// Which paper is this code from? -- PODC // stabilization is not run in one non-interruptable piece// after each possible yield point, check if the node is dead.voidChord::stabilize(){ assert(!static_sim); if (!_inited) return; IDMap pred1 = loctable->pred(me.id-1, LOC_ONCHECK); IDMap succ1 = loctable->succ(me.id+1, LOC_ONCHECK); vector<IDMap> succs = loctable->succs(me.id+1, _nsucc, LOC_ONCHECK); if (!succ1.ip) return; CDEBUG(3) << "chord_stabilize start pred " << pred1.ip << "," << printID(pred1.id) << "succ " << succ1.ip << "," << printID(succ1.id) << endl;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -