📄 onehop.c
字号:
DEBUG_MSG(args->sender, "notify_rec_handler directly add sender", args->sender); loctable->add_node(args->sender); if (args->log.size() < 1) DEBUG(5) << now() << ":" << ip() << "," << printID(id()) <<":PANIC! Got empty log\n"; for (uint i=0; i < args->log.size(); i++) { if (!alive()) return; if (args->sender.id >= me.id) { high_to_low.push_back(args->log[i]); sent_low = false; } else { low_to_high.push_back(args->log[i]); sent_high = false; } if (args->log[i]._state == DEAD) { if (args->log[i]._node.ip == ip()) { DEBUG(5) << now() << ":" << ip() << "," << printID(id()) << ":Panic! People think I am dead, but I'm not\n"; //exit(-1); LogEntry *e = New LogEntry(me, ALIVE, now()); leader_log.push_back(*e); delete e; if (args->sender.id >= me.id) high_to_low.pop_back(); else low_to_high.pop_back(); } else { loctable->del_node(args->log[i]._node.id); } } else { DEBUG_MSG(args->log[i]._node,"notify_rec_handler",args->sender); loctable->add_node(args->log[i]._node); } }}voidOneHop::notify_other_leaders(notifyevent_args *args, general_ret *ret) { if (_join_complete) ret->has_joined = true; else ret->has_joined = false; if (!alive()) return; IDMap succ_node = loctable->succ(args->sender.id); if (succ_node.id != args->sender.id) { DEBUG(5) << now() << ":" << ip() << "," << printID(id()) << ":Found new node " << args->sender.ip << " via slice leader ping\n"; DEBUG_MSG(args->sender,"notify_other_leaders directly add sender", args->sender); loctable->add_node(args->sender); LogEntry *e = New LogEntry(args->sender, ALIVE, now()); outer_log.push_back(*e); delete e; } if (args->log.size() < 1) DEBUG(5) << now() << ":" << ip() << "," << printID(id()) <<":PANIC! Got empty log\n"; for (uint i=0; i < args->log.size(); i++) { outer_log.push_back(args->log[i]); } notifyevent_args send_in; send_in.sender = me; leader_stabilize((void *)0);}voidOneHop::notify_unit_leaders(notifyevent_args *args, general_ret *ret) { if (_join_complete) ret->has_joined = true; else ret->has_joined = false; if (!alive()) return; if (args->log.size() < 1) DEBUG(5) << now() << ":" << ip() << "," << printID(id()) <<":PANIC! Got empty log\n"; IDMap succ_node = loctable->succ(args->sender.id); if (succ_node.id != args->sender.id) { DEBUG(5) << now() << ":" << ip() << "," << printID(id()) << ":Found new node " << args->sender.ip << " via same slice leader/unit leader ping\n"; DEBUG_MSG(args->sender,"notify_unit_leader",args->sender); loctable->add_node(args->sender); LogEntry *e = New LogEntry(args->sender, ALIVE, now()); leader_log.push_back(*e); args->log.push_back(*e); delete e; } if (!_join_complete) DEBUG(5) << now() << ":" << ip() << "," << printID(id()) << "Wow! Join not complete, still got this message\n"; for (uint i=0; i < args->log.size(); i++) { if (args->log[i]._state == DEAD) { if (!alive()) return; if (args->log[i]._node.ip == ip()) { DEBUG(5) << now() << ":" << ip() << "," << printID(id()) << ":Panic! People think I am dead, but I'm not\n"; LogEntry *e = New LogEntry(me, ALIVE, now()); leader_log.push_back(*e); inner_log.push_back(*e); delete e; } else { inner_log.push_back(args->log[i]); loctable->del_node(args->log[i]._node.id); } } else { inner_log.push_back(args->log[i]); DEBUG_MSG(args->log[i]._node,"notify_unit_leaders",args->sender); loctable->add_node(args->log[i]._node); } } leader_stabilize((void *)0);}voidOneHop::test_dead_inform(test_inform_dead_args *a){ if (a->justdelete) { DEBUG(4) << now() << ":" << ip() << "," << printID(id()) << " test_dead_inform from " << a->informed.ip << " about " << a->suspect.ip << endl; loctable->del_node(a->suspect.id); }else{ bool ok = fd_xRPC (a->suspect.ip, &OneHop::test_dead_handler, (void *)NULL, (void *)NULL, ONEHOP_INFORMDEAD,0); if (ok) { record_stat(a->suspect.ip,me.ip,ONEHOP_INFORMDEAD,0); loctable->add_node(a->suspect); delete a; return; } else loctable->del_node(a->suspect.id); inform_dead_args ia; ia.ip = a->suspect.ip; ia.key = a->suspect.id; ok = fd_xRPC(a->informed.ip,&OneHop::inform_dead_handler, &ia, (void *)NULL, ONEHOP_INFORMDEAD,1); if ((ok)&&(a->informed.ip!=me.ip)) record_stat(a->informed.ip,me.ip,ONEHOP_INFORMDEAD); } delete a;}voidOneHop::test_dead_handler(void *x, void *y){ //do nothing}boolOneHop::inform_dead (IDMap dead, IDMap recv) { IPAddress recv_ip = recv.ip; inform_dead_args ia; ia.ip = dead.ip; ia.key = dead.id; bool ok = fd_xRPC (recv_ip, &OneHop::inform_dead_handler, &ia, (void *)NULL, ONEHOP_INFORMDEAD, 1); if (ok) record_stat(recv_ip,me.ip,ONEHOP_INFORMDEAD); return ok;}void OneHop::inform_dead_handler (inform_dead_args *ia, void *ir) { IDMap dead; dead.id = ia->key; dead.ip = ia->ip; LogEntry *e = New LogEntry(dead, DEAD, now()); leader_log.push_back(*e); delete e; if (me.id != dead.id) loctable->del_node(dead.id);}OneHop::IDMapOneHop::unit_leader(CHID node) { return loctable->unit_leader(node);}OneHop::IDMapOneHop::slice_leader(CHID node) { return loctable->slice_leader(node);}bool OneHop::is_slice_leader(CHID node, CHID explead) { return loctable->is_slice_leader(node, explead);}boolOneHop::is_unit_leader(CHID node, CHID explead) { return loctable->is_unit_leader(node, explead);}ConsistentHash::CHID OneHop::slice (CHID node) { return loctable->slice(node);}ConsistentHash::CHID OneHop::unit (CHID node) { return loctable->unit(node);}OneHop::IDMapOneHopLocTable::succ(ConsistentHash::CHID id) { assert (ring.repok ()); ohidmapwrap *ptr = ring.closestsucc(id); assert(ptr); IDMap n; n.ip = ptr->ip; n.id = ptr->id; return n;}OneHop::IDMapOneHopLocTable::pred(ConsistentHash::CHID id) { assert (ring.repok ()); ohidmapwrap *ptr = ring.closestpred(id); assert(ptr); OneHop::IDMap n; n.ip = ptr->ip; n.id = ptr->id; return n;}boolOneHopLocTable::is_slice_leader (CHID node, CHID explead) { return (explead == slice_leader(node).id);}boolOneHopLocTable::is_unit_leader (CHID node, CHID explead) { return (explead == unit_leader(node).id);}/*bool OneHopLocTable::is_unit_leader(CHID node, CHID explead) { vector<IDMap> uls = unit_leaders(node); //DEBUG(2) << "Number of unit leaders found = " << uls.size() << endl; for (uint i=0; i < uls.size(); i++) { if (explead == uls[i].id) return true; } return false;}*/ OneHop::IDMap OneHopLocTable::slice_leader(CHID node) { IDMap pot_succ; pot_succ.id = 0; pot_succ.ip = 0; if (is_empty(node)) return pot_succ; pot_succ = succ(slice(node)*slice_size + slice_size/2); if ((pot_succ.ip == 0) || (slice(node) != slice(pot_succ.id))){ pot_succ = pred(slice(node)*slice_size + slice_size/2); } return pot_succ;}OneHop::IDMap OneHopLocTable::unit_leader(CHID node) { IDMap pot_succ; pot_succ.id = 0; pot_succ.ip = 0; if (is_empty_unit(node)) return pot_succ; pot_succ = succ(slice(node)*slice_size + unit(node)*unit_size + unit_size/2); if ((pot_succ.ip == 0) || (slice(node) != slice(pot_succ.id)) || unit(node) != unit(pot_succ.id)){ pot_succ = pred(slice(node)*slice_size + unit(node)*unit_size + unit_size/2); } return pot_succ;}vector<OneHop::IDMap>OneHopLocTable::unit_leaders(CHID node) { vector<IDMap> ret_vec; IDMap pot_succ; pot_succ.id = 0; pot_succ.ip = 0; CHID beg = slice(node)*slice_size; CHID def; for (int i=0; i < _u; i++) { CHID n = beg + i*unit_size; if (!is_empty_unit (node)) { def = n + unit_size/2; pot_succ = succ(def); if ((pot_succ.ip == 0) || (unit(def) != unit(pot_succ.id)) || (slice(def) != slice(pot_succ.id))){ pot_succ = pred(def); } if(! ((pot_succ.ip == 0) || (unit(def) != unit(pot_succ.id)) || (slice(def) != slice(pot_succ.id)))) ret_vec.push_back(pot_succ); } else DEBUG(2) << "Empty unit\n"; } return ret_vec;}boolOneHopLocTable::is_empty(CHID node) { //find beginning of slice CHID beg = slice(node)*slice_size; CHID end = beg+slice_size; IDMap pot_succ = succ(beg); if ((pot_succ.ip == 0) || !ConsistentHash::betweenleftincl(beg,end,pot_succ.id)) { return true; } else return false;}boolOneHopLocTable::is_empty_unit(CHID node) { //find beginning of unit CHID beg = slice(node)*slice_size + unit(node)*unit_size; CHID end = beg+unit_size; IDMap pot_succ = succ(beg); if ((pot_succ.ip == 0) || !ConsistentHash::betweenleftincl(beg,end,pot_succ.id)) { return true; } else return false;}voidOneHopLocTable::print() { ohidmapwrap *i = ring.first(); while (i) { DEBUG(3) << i->ip << ":" << i->id << endl; i = ring.next(i); }}voidOneHopLocTable::del_all() { ohidmapwrap *next; ohidmapwrap *cur; for (cur = ring.first(); cur; cur = next) { next = ring.next(cur); ring.remove(cur->id); bzero(cur, sizeof(*cur)); delete cur; } assert (ring.repok ());}voidOneHopLocTable::add_node(IDMap n){ ohidmapwrap *elm = ring.search(n.id); if (!elm) { elm = New ohidmapwrap(n.ip,n.id); ring.insert(elm); }}voidOneHopLocTable::del_node(CHID id){ ohidmapwrap *elm = ring.remove(id); if (elm) delete elm;}vector<OneHop::IDMap>OneHopLocTable::get_all(){ vector<OneHop::IDMap> v; v.clear(); ohidmapwrap *currp; currp = ring.first(); OneHop::IDMap n; while (currp) { n.ip = currp->ip; n.id = currp->id; v.push_back(n); currp = ring.next(currp); } return v;}voidOneHop::initstate(){ vector<IDMap> ids = OneHopObserver::Instance(NULL)->get_sorted_nodes(); for (uint i = 0; i < ids.size(); i++) { loctable->add_node(ids[i]); }}stringOneHop::printID(CHID id){ char buf[128]; sprintf(buf,"%qx ",id); return string(buf);}/*voidOneHop::reschedule_stabilizer(void *x){ if (!node()->alive()) { _stab_running = false; return; } _stab_running = true; if (_stab_outstanding > 0) { }else{ _stab_outstanding++; stabilize(); _stab_outstanding--; assert(_stab_outstanding == 0); } delaycb(_stabtimer, &OneHop::reschedule_stabilizer, (void *)0);}*/
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -