📄 onehop.c
字号:
delaycb(_stab_timer, &OneHop::stabilize, (void *)0);}voidOneHop::leader_stabilize(void *x) { if (_leaderstab_running) return; _leaderstab_running = true; if (!alive()) return; if (!_join_complete) return; /* if (is_slice_leader(me.id, me.id)) { DEBUG(1) << ip() << ":Slice size = " << slice_size << endl; DEBUG(1) << ip() << ":Unit size = " << loctable->unit_size << endl; DEBUG(1) << ip() << ":My Id = " << id() << endl; DEBUG(1) << ip() << "I am succ of (slice mid) " << slice(me.id)*slice_size + slice_size/2 << endl; DEBUG(1) << ip() << "I should also be succ of (unit mid) " << slice(me.id)*slice_size + unit(me.id)*(loctable->unit_size) + (loctable->unit_size)/2 << endl; DEBUG(1) << ip() << "Code claims unit succ is " << unit_leader(me.id).ip << " with id " << unit_leader(me.id).id << endl; DEBUG(1) << ip() << "My unit is " << unit(me.id) << " and claimed succ's unit is " << unit(unit_leader(me.id).id) << endl; assert(is_unit_leader(me.id, me.id)); } */ notifyevent_args send_unit; send_unit.sender = me; while ((inner_log.size() > 0) && (send_unit.log.size() < MAX_IDS_MSG)) { send_unit.log.push_back(inner_log.front()); inner_log.pop_front(); } bool ok = false; if (send_unit.log.size() > 0) { while (!ok) { IDMap succ = loctable->succ(me.id+1); //ping the successor to see if it is alive and piggyback a log general_ret gr; bw data = 20 + 4*send_unit.log.size(); ok = fd_xRPC(succ.ip, &OneHop::notify_rec_handler, &send_unit, &gr, ONEHOP_LEADER_STAB, send_unit.log.size()); if (ok) record_stat(succ.ip,me.ip,ONEHOP_LEADER_STAB,0,1); if (!alive()) return; //very ugly hack to fix accouting -- change after deadline if (!((slice(succ.id) == slice(me.id)) && (unit(succ.id) == unit(me.id)))) { if (is_slice_leader(me.id, me.id)) { leader_bandwidth = leader_bandwidth - data; leader_messages--; } else { bandwidth = bandwidth - data; messages--; } } if (!ok) { LogEntry *e = New LogEntry(succ, DEAD, now()); leader_log.push_back(*e); send_unit.log.push_back(*e); delete e; if (me.id != succ.id) loctable->del_node(succ.id); } if (ok && !gr.has_joined) { DEBUG(5) << now() << ":" << ip() << "," << printID(id()) << "Sending to an incompletely joined node\n"; if (me.id != succ.id) loctable->del_node(succ.id); } } ok = false; while (!ok) { IDMap pred = loctable->pred(me.id-1); //ping the pred to see if it is alive and piggyback a log general_ret gr; bw data = 20+ 4*send_unit.log.size(); ok = fd_xRPC(pred.ip, &OneHop::notify_rec_handler, &send_unit, &gr, ONEHOP_LEADER_STAB, send_unit.log.size()); if (ok) record_stat(pred.ip,me.ip,ONEHOP_LEADER_STAB,0,1); if (!alive()) return; //very ugly hack to fix accouting -- change after deadline if (!((slice(pred.id) == slice(me.id)) && (unit(pred.id) == unit(me.id)))) { if (is_slice_leader(me.id, me.id)) { leader_bandwidth = leader_bandwidth - data; leader_messages--; } else { bandwidth = bandwidth - data; messages--; } } if (!ok) { LogEntry *e = New LogEntry(pred, DEAD, now()); leader_log.push_back(*e); send_unit.log.push_back(*e); delete e; if (me.id != pred.id) loctable->del_node(pred.id); } if (ok && !gr.has_joined) { DEBUG(5) << now() << ":" << ip() << "," << printID(id()) << "Sending to an incompletely joined node\n"; if (me.id != pred.id) loctable->del_node(pred.id); } } /* vector<IDMap> all_nodes = loctable->get_all(); for (uint i=0; i < all_nodes.size(); i++) { if (!alive()) { DEBUG(1) << ip() << ":Unit leader: received events but died before sending them! Yikes!\n"; return; } ok = false; if ((slice(all_nodes[i].id) == slice(me.id)) && (unit(all_nodes[i].id) == unit(me.id))) { general_ret gr; ok = xRPC(all_nodes[i].ip, &OneHop::notify_rec_handler, &send_unit, &gr); if (!ok) { LogEntry *e = new LogEntry(all_nodes[i], DEAD, now()); leader_log.push_back(*e); send_unit.log.push_back(*e); loctable->del_node(all_nodes[i],true); } if (ok && !gr.has_joined) { DEBUG(1) << ip() << "Sending to an incompletely joined node\n"; loctable->del_node(all_nodes[i],true); } } }*/ } if (is_slice_leader(me.id, me.id) && (leader_log.size() + outer_log.size() >= 5)) { notifyevent_args send_in, send_out; send_in.sender = me; send_out.sender = me; while ((leader_log.size() > 0) && (send_in.log.size() < MAX_IDS_MSG)) { send_in.log.push_back(leader_log.front()); send_out.log.push_back(leader_log.front()); leader_log.pop_front(); } while ((outer_log.size() > 0) && (send_in.log.size() < MAX_IDS_MSG)) { send_in.log.push_back(outer_log.front()); outer_log.pop_front(); } if (send_out.log.size() > 0) tot_interslice++; if (send_in.log.size() > 0) { tot_intraslice++; exp_intraslice += _u ; } vector<IDMap> all_nodes = loctable->get_all(); if (send_in.log.size() > 0) { for (uint i=0; i < all_nodes.size(); i++) { if (!alive()) { DEBUG(3) << now() << ":" << ip() << "," << printID(id()) << "Received events but died before sending them! Yikes!\n"; return; } //all logs are already incorporated, do not send to myself //send mesg to all unit leaders if ((slice(me.id) == slice(all_nodes[i].id)) && is_unit_leader(all_nodes[i].id, all_nodes[i].id)) { general_ret gr; ok = fd_xRPC(all_nodes[i].ip, &OneHop::notify_unit_leaders, &send_in, &gr, ONEHOP_LEADER_STAB, send_in.log.size()); if (ok) record_stat(all_nodes[i].ip,me.ip,ONEHOP_LEADER_STAB,0,1); if (!alive()) return; if (ok) { if (gr.has_joined) { act_intraslice++; //DEBUG(2) << ip() << ":Sent intra-slice to " << all_nodes[i].id << " in unit " << unit(all_nodes[i].id) << endl; } if ((!gr.has_joined) && (me.id != all_nodes[i].id)) { loctable->del_node(all_nodes[i].id); DEBUG(5) << now() << ":" << ip() << "," << printID(id()) << "Sending to an incompletely joined node -- failed\n"; } } else { LogEntry *e = New LogEntry(all_nodes[i], DEAD, now()); leader_log.push_back(*e); send_in.log.push_back(*e); send_out.log.push_back(*e); delete e; if (me.id != all_nodes[i].id) loctable->del_node(all_nodes[i].id); } } if (me.id != all_nodes[i].id) { if (send_out.log.size() > 0) { if ((is_slice_leader(slice(all_nodes[i].id)*slice_size, all_nodes[i].id))) { general_ret gr; ok = fd_xRPC(all_nodes[i].ip, &OneHop::notify_other_leaders, &send_out, &gr, ONEHOP_LEADER_STAB, send_out.log.size()); if (ok) record_stat(all_nodes[i].ip,me.ip,ONEHOP_LEADER_STAB,0,1); if (!alive()) return; if (ok) { act_interslice++; if (!gr.has_joined) { if (me.id != all_nodes[i].id) loctable->del_node(all_nodes[i].id); DEBUG(5) << now() << ":" << ip() << "," << printID(id()) << "Sending to an incompletely joined node\n"; } } if (!ok) { LogEntry *e = New LogEntry(all_nodes[i], DEAD, now()); outer_log.push_back(*e); send_in.log.push_back(*e); delete e; if (me.id != all_nodes[i].id) loctable->del_node(all_nodes[i].id); } } } } } for (int i=0; i < _u; i++) { CHID n = slice(me.id)*slice_size + i*loctable->unit_size; if (loctable->is_empty_unit (n)) total_empty++; } total_count++; } } _leaderstab_running = false;}voidOneHop::ping_handler(notifyevent_args *args, general_ret *ret) { ret->correct = true; if (_join_complete) ret->has_joined = true; else ret->has_joined = false; IDMap succ_node = loctable->succ(args->sender.id); if (succ_node.id != args->sender.id) { if (!alive()) return; DEBUG(5) << now() << ":" << ip() << "," << printID(id()) << ":Found new node " << args->sender.ip << " via ping\n"; DEBUG_MSG(args->sender,"ping_handler directly add sender",args->sender); loctable->add_node(args->sender); LogEntry *e = New LogEntry(args->sender, ALIVE, now()); leader_log.push_back(*e); delete e; } if ((slice(args->sender.id) == slice(me.id)) && (unit(args->sender.id) == unit(me.id))) { for (uint i=0; i < args->log.size(); i++) { if (args->sender.id >= me.id) { high_to_low.push_back(args->log[i]); sent_low = false; } else { low_to_high.push_back(args->log[i]); sent_high = false; } if (args->log[i]._state == DEAD) { if (args->log[i]._node.ip == ip()) { DEBUG(3) << now() << ":" << ip() << "," << printID(id()) << ":Panic! People think I am dead, but I'm not\n"; //exit(-1); LogEntry *e = New LogEntry(me, ALIVE, now()); leader_log.push_back(*e); delete e; if (args->sender.id >= me.id) high_to_low.pop_back(); else low_to_high.pop_back(); } else { loctable->del_node(args->log[i]._node.id); } } else { DEBUG_MSG(args->log[i]._node, "ping_handler", args->sender); loctable->add_node(args->log[i]._node); } } } ret->correct = true; //now that all logs have been absorbed, and I know latest info //check if I am really the right neighbor /*if (args->up == 1) { //I am supposed to be the successor //who succ IDMap succ_node = loctable->succ(args->sender.id + 1); if (succ_node.id != me.id) {//I am not the successor assert(ConsistentHash::betweenleftincl(args->sender.id, me.id, succ_node.id)); ret->act_neighbor = succ_node; ret->correct = false; } } else { IDMap pred_node = loctable->pred(args->sender.id - 1); if (pred_node.id != me.id) { assert(ConsistentHash::betweenrightincl(me.id, args->sender.id, pred_node.id)); ret->act_neighbor = pred_node; ret->correct = false; } } */ } voidOneHop::notifyevent_handler(notifyevent_args *args, general_ret *ret){ if (_join_complete) ret->has_joined = true; else ret->has_joined = false; bool ok = false; if (!alive()) { DEBUG(3) << now() << ":" << ip() << "," << printID(id()) << "Received events but died before sending them! Yikes!\n"; return; } bool me_leader = is_slice_leader(me.id, me.id); general_ret gr; ret->act_sliceleader.id = 0; ret->act_sliceleader.ip = 0; if (!me_leader) { //not slice leader, but still got message, forward to correct slice leader //must forward message to the correct slice leader DEBUG(1) << now() << ":" << ip() << "," << printID(id()) << ":I am not slice leader of "<< slice(id()) << ", still I got this message!\n"; while (!ok) { IDMap sliceleader = loctable->slice_leader(me.id); DEBUG(1) << now() << ":" << ip() << "," << printID(id()) << ":Forwarding to " << sliceleader.ip << endl ; ok = fd_xRPC(sliceleader.ip, &OneHop::notifyevent_handler, args, &gr, ONEHOP_NOTIFY, args->log.size()); if (ok) record_stat(sliceleader.ip,me.ip,ONEHOP_NOTIFY, 0,1); if (!alive()) return; if (!ok) { LogEntry *e = New LogEntry(sliceleader, DEAD, now()); leader_log.push_back(*e); args->log.push_back(*e); if (me.id != sliceleader.id) loctable->del_node(sliceleader.id); DEBUG(5) << now() << ":" << ip() << "," << printID(id()) <<":PING! Informing " << slice_leader(me.id).ip << " that old slice leader "<< sliceleader.ip << " is dead\n"; delete e; } else if (!gr.has_joined) { if (me.id != sliceleader.id) loctable->del_node(sliceleader.id); } ok = ok && gr.has_joined; if (ok) { ret->act_sliceleader = sliceleader; LogEntry *e = New LogEntry(sliceleader, ALIVE, now()); leader_log.push_back(*e); delete e; } } } for (uint i=0; i < args->log.size(); i++) { if (args->log[i]._state == DEAD) { if (!alive()) return; if (args->log[i]._node.ip == ip()) { DEBUG(5) << now() << ":" << ip() << "," << printID(id()) << ":Panic! People think I am dead, but I'm not\n"; LogEntry *e = New LogEntry(me, ALIVE, now()); leader_log.push_back(*e); delete e; } else { if (!alive()) return; loctable->del_node(args->log[i]._node.id); if (me_leader || (is_slice_leader(me.id, me.id))) { LogEntry *e = New LogEntry(args->log[i]._node, DEAD, now()); leader_log.push_back(*e); delete e; } } } else { if (!alive()) return; DEBUG_MSG(args->log[i]._node,"notifyevent_handler",args->sender); loctable->add_node(args->log[i]._node); if ((me_leader) || is_slice_leader(me.id, me.id)) { LogEntry *e = New LogEntry(args->log[i]._node, ALIVE, now()); leader_log.push_back(*e); delete e; } } } IDMap succ_node = loctable->succ(args->sender.id); if (succ_node.id != args->sender.id) { if (!alive()) return; DEBUG(5) << now() << ":" << ip() << "," << printID(id()) << ":Found New node " << args->sender.ip << " via notify event\n"; DEBUG_MSG(args->sender,"notifyevent_handler directly add sender", args->sender); loctable->add_node(args->sender); LogEntry *e = New LogEntry(args->sender, ALIVE, now()); leader_log.push_back(*e); delete e; }} void OneHop::notify_rec_handler(notifyevent_args *args, general_ret *ret){ if (_join_complete) ret->has_joined = true; else ret->has_joined = false; if (!alive()) return;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -