📄 chord.c
字号:
fix_successor(); if (!alive()) return; succs = loctable->succs(me.id+1, _nsucc, LOC_HEALTHY); assert(_nsucc >= 3); if ((_nsucc > 1) && ((succs.size() < 0.5*_nsucc) || (now()-_last_succlist_stabilized > _stab_succlist_timer))) { _last_succlist_stabilized = now(); fix_successor_list(); } if (!alive()) return; fix_predecessor(); if (!alive()) return; IDMap pred = loctable->pred(me.id-1, LOC_ONCHECK); IDMap succ = loctable->succ(me.id+1,LOC_ONCHECK); CDEBUG(3) << "chord_stabilize done pred "<< pred.ip <<","<< printID(pred.id) << "succ " << succ.ip << "," << printID(succ.id) << endl;}boolChord::stabilized(vector<CHID> lid){ vector<CHID>::iterator iter; iter = find(lid.begin(), lid.end(), me.id); assert(iter != lid.end()); vector<IDMap> succs = loctable->succs(me.id+1, _nsucc, LOC_ONCHECK); if (succs.size() != _nsucc) return false; for (unsigned int i = 1; i <= _nsucc; i++) { iter++; if (iter == lid.end()) iter = lid.begin(); if (succs[i-1].id != *iter) { printf("%s not stabilized, %5d succ should be %16qx instead of (%u, %16qx)\n", ts(), i-1, *iter, succs[i-1].ip, succs[i-1].id); return false; } } return true;}voidChord::oracle_node_died(IDMap n){ //assume a dead node will affect routing tables of those that contain it IDMap tmp = loctable->succ(n.id, LOC_ONCHECK); if (tmp.ip != n.ip) return; vector<IDMap> ids = ChordObserver::Instance(NULL)->get_sorted_nodes(); uint sz = ids.size(); int my_pos = find(ids.begin(), ids.end(), me) - ids.begin(); //lost my predecessor IDMap pred = loctable->pred(me.id-1); if (tmp.ip == pred.ip) { loctable->del_node(n); loctable->add_node(ids[(my_pos-1)%sz]); CDEBUG(3) << "chord_oracle_node_died pred del " << n.ip << "," << printID(n.id) << " add " << ids[(my_pos-1)%sz].ip << "," << printID(ids[(my_pos-1)%sz].id) << endl; return; } //lost one of my successors vector<IDMap> succs = loctable->succs(me.id+1,_nsucc,LOC_ONCHECK); IDMap last = succs[succs.size()-1]; if (ConsistentHash::betweenrightincl(me.id, succs[succs.size()-1].id, n.id)) { loctable->del_node(n); loctable->add_node(ids[(my_pos+_nsucc)%sz],true); vector<IDMap> newsucc = loctable->succs(me.id+1,_nsucc,LOC_ONCHECK); CDEBUG(3) << "chord_oracle_node_died succ del " << n.ip << "," << printID(n.id) << "add " << ids[(my_pos+_nsucc)%sz].ip << "," << printID(ids[(my_pos+_nsucc)%sz].id) << endl; }}voidChord::oracle_node_joined(IDMap n){ IDMap tmp = loctable->succ(n.id, LOC_ONCHECK); if (tmp.ip == n.ip) return; //is the new node my predecessor? IDMap pred = loctable->pred(me.id-1); if (ConsistentHash::between(pred.id,me.id, n.id)) {// loctable->del_node(pred); loctable->add_node(n); CDEBUG(3) << "chord_oracle_node_joined pred del " << pred.ip << "," << printID(pred.id) << "add " << n.ip << "," << printID(n.id) << endl; return; } //is the new node one of my successor? vector<IDMap> succs = loctable->succs(me.id+1,_nsucc,LOC_ONCHECK); //uint ssz = succs.size(); if (succs.size() > 0 && ConsistentHash::between(me.id,succs[succs.size()-1].id,n.id)) { // loctable->del_node(succs[succs.size()-1]); loctable->add_node(n,true); // vector<IDMap> newsucc = loctable->succs(me.id+1,_nsucc,LOC_ONCHECK); // uint nssz = newsucc.size(); //assert(nssz == _nsucc); CDEBUG(3) << "chord_oracle_node_joined succ add " << n.ip << "," << printID(n.id) << endl; }}voidChord::initstate(){ vector<IDMap> ids = ChordObserver::Instance(NULL)->get_sorted_nodes(); uint sz = ids.size(); uint my_pos = find(ids.begin(), ids.end(), me) - ids.begin(); assert(ids[my_pos].id == me.id); //add successors and (each of the successor's predecessor) for (uint i = 1; i <= _nsucc; i++) { loctable->add_node(ids[(my_pos + i) % sz],true); } //add predecessor loctable->add_node(ids[(my_pos-1) % sz]); IDMap succ1 = loctable->succ(me.id+1); CDEBUG(3) << "chord_init_state sz " << loctable->size() << " succ " << succ1.ip << "," << printID(succ1.id) << endl; _inited = true;}//pings predecessor and fix my predecessor pointer if //old predecessor's successor pointer has changedvoidChord::fix_predecessor(){ //ping my predecessor IDMap pred = loctable->pred(me.id-1); if ((!pred.ip) || (pred.ip == me.ip)) return; bool ok; get_predsucc_args gpa; get_predsucc_ret gpr; gpa.pred = false; gpa.m = 1; ok = failure_detect(pred, &Chord::get_predsucc_handler, &gpa, &gpr, TYPE_FIXPRED_UP); if (ok) record_stat(pred.ip,me.ip,TYPE_FIXPRED_UP,gpr.v.size()); if (!alive()) return; if (ok) { loctable->update_ifexists(gpr.dst); if (gpr.v.size() > 0) { IDMap tmp = gpr.v[0]; loctable->add_node(gpr.v[0]); if (gpr.v[0].ip == me.ip) { _inited = true; } } } else loctable->del_node(pred); }//fix successor notify predecessor of its sucessor changevoidChord::fix_successor(void *x){ IDMap succ1, succ2,pred; get_predsucc_args gpa; get_predsucc_ret gpr; bool ok; gpa.m = 0; gpa.pred = true; alert_args aa; assert(alive()); //while (1) { aa.n.ip = 0; succ1 = loctable->succ(me.id+1,LOC_ONCHECK); if (succ1.ip == 0 || succ1.ip == me.ip) { //sth. wrong, i lost my succ, join again if (!_join_scheduled) { _join_scheduled++; CDEBUG(3) << "fix_successor re-join" << endl; delaycb(0, &Chord::join, (Args *)0); } return; } ok = failure_detect(succ1,&Chord::get_predsucc_handler, &gpa, &gpr, TYPE_FIXSUCC_UP,0); if (ok) record_stat(succ1.ip,me.ip,TYPE_FIXSUCC_UP,2); if (!alive()) return; if (!ok) { CDEBUG(3) << "fix_successor old succ " << succ1.ip << "," << printID(succ1.id) << "dead" << endl; loctable->del_node(succ1, true); //successor dead, force delete aa.n = succ1; } else { assert(gpr.dst.ip == succ1.ip); loctable->update_ifexists(gpr.dst); CDEBUG(3) << "fix_successor succ " << succ1.ip << "," << printID(succ1.id) << " his pred is " << gpr.n.ip << "," << printID(gpr.n.id) << endl; if (!gpr.n.ip) { //sigh, that person is clueless about his pred }else if (gpr.n.ip && gpr.n.ip == me.ip) { return; }else if ((gpr.n.ip && gpr.n.ip!= me.ip)) { if (ConsistentHash::between(me.id, succ1.id, gpr.n.id)) { loctable->add_node(gpr.n,true); //successor has changed, i should stabilize it immeidately } else { assert(gpr.n.ip == me.ip || ConsistentHash::between(gpr.n.id, succ1.id, me.id)); pred = loctable->pred(me.id-1, LOC_HEALTHY); if (ConsistentHash::between(pred.id,me.id,gpr.n.id)) loctable->add_node(gpr.n); //my successor's predecessor is behind me //notify my succ of his predecessor change notify_args na; notify_ret nr; na.me = me; ok = failure_detect(succ1, &Chord::notify_handler, &na, &nr, TYPE_FIXSUCC_UP, 2, 0); if (ok) record_stat(succ1.ip,me.ip,TYPE_FIXSUCC_UP,0); if(!alive()) return; if (!ok) { CDEBUG(3) << "fix_successor notify succ " << succ1.ip << "," << printID(succ1.id) << "dead" << endl; loctable->del_node(succ1,true); //successor dead, force delete aa.n = succ1; } } } } //notify my new successor of this node's death if (aa.n.ip) { succ2 = loctable->succ(me.id+1, LOC_ONCHECK); aa.n = succ1; if (succ2.ip) { ok = failure_detect(succ2, &Chord::alert_handler, &aa, (void*)NULL, TYPE_FIXSUCC_UP,1,0); if (ok) { //i should not immediately stabilize new successor, //i should wait for this new successor to discover the failure himself record_stat(succ2.ip,me.ip,TYPE_FIXSUCC_UP,0); } else { loctable->del_node(succ2,true); } return; } } //}}voidChord::fix_successor_list(){ IDMap succ = loctable->succ(me.id+1); if (!succ.ip) return; get_predsucc_args gpa; get_predsucc_ret gpr; bool ok; gpa.m = _nsucc; gpa.pred = false; gpr.v.clear(); ok = failure_detect(succ, &Chord::get_predsucc_handler, &gpa, &gpr,TYPE_FIXSUCCLIST_UP); if (ok) record_stat(succ.ip,me.ip,TYPE_FIXSUCCLIST_UP, gpr.v.size()); if (!alive()) return; if (!ok) { loctable->del_node(succ,true); }else{ loctable->update_ifexists(gpr.dst); //scs[0] might not be succ anymore vector<IDMap> scs = loctable->succs(me.id + 1, _nsucc, LOC_DEAD); assert(scs.size() <= _nsucc); uint scs_i, gpr_i; gpr_i = scs_i = 0; while (scs_i < scs.size()) { if (scs[scs_i].ip == succ.ip) break; scs_i++; } scs_i++; while (1) { if (gpr_i >= gpr.v.size()) { break; } if (scs_i >= scs.size()) { while (scs_i < _nsucc && gpr_i < gpr.v.size()) { loctable->add_node(gpr.v[gpr_i],true); gpr_i++; scs_i++; } break; } assert(scs_i < scs.size() && scs_i < _nsucc); if (scs[scs_i].ip == gpr.v[gpr_i].ip) { loctable->add_node(scs[scs_i],true); gpr_i++; scs_i++; } else if (ConsistentHash::between(me.id,gpr.v[gpr_i].id,scs[scs_i].id)) { //delete the successors that my successor failed to pass to me CDEBUG(3) << "fix_successor_list del " << scs_i << " " << gpr_i << " " << scs[scs_i].ip << "," << printID(scs[scs_i].id) << gpr.v[gpr_i].ip << printID(gpr.v[gpr_i].id) << endl; loctable->del_node(scs[scs_i],true); //force delete scs_i++; } else { loctable->add_node(gpr.v[gpr_i], true); gpr_i++; } } if (vis) { bool change = false; scs = loctable->succs(me.id + 1, _nsucc); for (uint i = 0; i < scs.size (); i++) { if ((i >= lastscs.size ()) || lastscs[i].id != scs[i].id) { change = true; } } if (change) { printf ( "vis %llu succlist %16qx", now (), me.id); for (uint i = 0; i < scs.size (); i++) { printf ( " %16qx", scs[i].id); } printf ( "\n"); } lastscs = scs; } }}voidChord::notify_handler(notify_args *args, notify_ret *ret){ assert(!static_sim); loctable->add_node(args->me);}voidChord::alert_handler(alert_args *args, void *ret){ assert(!static_sim); if (vis) printf ("vis %llu delete %16qx %16qx\n", now (), me.id, args->n.id); //due to non-transitivity problem and avoid long timeout during lookup //i check before i delete, //this will make find_successor() slower, but fortunately iterative lookup is async bool b = failure_detect(args->n, &Chord::null_handler,(void *)NULL, &(args->n), TYPE_MISC,0,0,TIMEOUT_RETRY-1); if (!alive()) return; if (!b) { loctable->del_node(args->n); CDEBUG(3) << "alert_handler del " << args->n.ip << "," << printID(args->n.id) << endl; }else{ record_stat(args->n.ip,me.ip,TYPE_MISC,0,0); loctable->update_ifexists(args->n); }}voidChord::get_predsucc_handler(get_predsucc_args *args, get_predsucc_ret *ret){ assert(!static_sim); ret->dst = me; if (args->pred) ret->n = loctable->pred(me.id-1); else ret->n.ip = 0; if (args->m > 0) ret->v = loctable->succs(me.id+1,args->m,LOC_HEALTHY);}voidChord::dump(){ _isstable = true; printf("myID is %16qx %5u\n", me.id, me.ip); printf("===== %16qx =====\n", me.id); printf ("ring size %d\n", loctable->size ()); vector<IDMap> v = loctable->succs(me.id+1, _nsucc); for (unsigned int i = 0; i < v.size(); i++) { printf("%16qx: succ %5d : %16qx\n", me.id, i, v[i].id); } IDMap p = loctable->pred(me.id-1); printf("pred is %5u,%16qx\n", p.ip, p.id);}voidChord::leave(Args *args){ assert(!static_sim); crash (args); loctable->del_all(); ChordObserver::Instance(NULL)->delnode(me);}voidChord::crash(Args *args){ ChordObserver::Instance(NULL)->delnode(me); if (vis) printf ("vis %llu crash %16qx\n", now (), me.id); if ((Node::collect_stat()) && (now()-_last_join_time>600000)){ uint rsz = loctable->size(LOC_HEALTHY,0.0); rtable_sz.push_back(rsz); } // XXX: Thomer says: not necessary //node()->crash (); _inited = false; loctable->del_all(); CDEBUG(1) << "crashed " << endl; notifyObservers((ObserverInfo *)"crash"); if (_learn) { learntable->del_all(); }}/*************** LocTable ***********************/#define MINTIMEOUT 30000LocTable::LocTable(){ _evict = false;} void LocTable::init(Chord::IDMap m){ ring.repok(); me = m; idmapwrap *elm = New idmapwrap(me); ring.insert(elm); full = me.id+1; lastfull = 0;}voidLocTable::del_all(){ assert (ring.repok ()); idmapwrap *next; idmapwrap *cur; for (cur = ring.first(); cur; cur = next) { next = ring.next(cur); ring.remove(cur->id); bzero(cur, sizeof(*cur)); delete cur; } assert (ring.repok ());}LocTable::~LocTable(){ del_all();}LocTable::idmapwrap *LocTable::get_naked_node(ConsistentHash::CHID id){ if (id == me.id) return NULL; else return ring.search(id);}//get the succ node including or after this id Chord::IDMap
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -