📄 onehop.c
字号:
IDMap succ_node = loctable->succ(me.id+1); DEBUG(1) << now() << ":" << me.ip << "," << printID(me.id) << " succ_ip " << succ_node.ip << " succ_id " << succ_node.id << " join_complete " << _join_complete<< endl; }}voidOneHop::publish(void *v) { if ((((now() - 1) % 100000) == 0) || (now() > 2000000 && now() < 2500000)) { Time interval = (now() - old_time)/1000; DEBUG(1) << "----------------------------------------------\n"; DEBUG(1) << "GRAND STATS\n"; DEBUG(1) << "Total number of lookups in the last cycle:" << lookups - old_lookups << endl; DEBUG(1) << "Total number of one hop failures in the last cycle:" << failed - old_failed<< endl; DEBUG(1) << "Fraction of lookup failures:" << (double)(failed - old_failed)/(double)(lookups - old_lookups) << endl; DEBUG(1) << "Fraction of two hop lookup failures:" << (double)(two_failed - old_two_failed)/(double)(lookups - old_lookups) << endl; DEBUG(1) << "Total number of same slice lookups:" << same_lookups << endl; DEBUG(1) << "Total number of same slice failures:" << same_failed << endl; DEBUG(1) << "Fraction of same slice failures:" << (double)same_failed/(double)same_lookups << endl; DEBUG(1) << "Number of intra-slice messages:" << tot_intraslice << endl; DEBUG(1) << "Number of intra-slice actually received:" << act_intraslice << endl; DEBUG(1) << "Exp number of received intra-slice mesgs:" << exp_intraslice << endl; DEBUG(1) << "Avg number of intra-slice recepients:" << (double)act_intraslice/(double)tot_intraslice << endl; DEBUG(1) << "Avg expected number of intra-slice recepients:" << (double)exp_intraslice/(double)tot_intraslice << endl; DEBUG(1) << "Avg number of empty units:" << (double)total_empty/(double)total_count << endl; DEBUG(1) << "Number of inter-slice messages:" << tot_interslice << endl; DEBUG(1) << "Number of inter-slice actually received:" << act_interslice << endl; DEBUG(1) << "Avg number of inter-slice recepients:" << (double)act_interslice/(double)tot_interslice << endl; DEBUG(1) << "Total number of alive and functioning nodes:" << num_nodes << endl; DEBUG(1) << "Total number of successful joins so far:" << joins << endl; DEBUG(1) << "Total number of crashes so far:" << crashes << endl; DEBUG(1) << "Total number of crashes with non-empty outers:" << nonempty_outers << endl; DEBUG(1) << "Total number of crashes with non-empty leaders:" << nonempty_leaders << endl; DEBUG(1) << "Average number of bytes used per second in the last cycle by normal nodes" << (double)(bandwidth - old_bandwidth)/(double)(num_nodes*interval) << endl; DEBUG(1) << "Average number of bytes used per second in the last cycle by slice leaders" << (double)(leader_bandwidth - old_leader_bandwidth)/(double)(_k*interval) << endl; DEBUG(1) << "Total maintenance traffic per second in the last cycle" << (double)(bandwidth + leader_bandwidth - old_bandwidth - old_leader_bandwidth)/(interval) << endl; DEBUG(1) << "Total lookup traffic per second in the last cycle" << (double)(lookup_bandwidth - old_lookup_bandwidth)/interval << endl; DEBUG(1) << "Average maintenance bandwidth so far " << (double) (bandwidth+leader_bandwidth)*1000/(double)now() << endl; DEBUG(1) << "Average lookup bandwidth so far " << (double) (lookup_bandwidth)*1000/(double)now() << endl; DEBUG(1) << "Average number of messages by normal nodes" << (double)messages*1000/((double)now()*num_nodes) << endl; DEBUG(1) << "Average number of messages by slice leaders" << (double)leader_messages*1000/((double)now()*_k) << endl; DEBUG(1) << "----------------------------------------------\n"; old_lookups = lookups; old_failed = failed; old_two_failed = two_failed; old_bandwidth = bandwidth; old_leader_bandwidth = leader_bandwidth; old_lookup_bandwidth = lookup_bandwidth; old_time = now(); } if (_publish_time>0) delaycb((Time)_publish_time, &OneHop::publish, (void *)0); }voidOneHop::crash(Args *args) { OneHopObserver::Instance(NULL)->delnode(me); DEBUG(1) << now() << ":" << me.ip << ","<< printID(me.id) << " crashed" << endl; if (is_slice_leader(me.id, me.id)) { DEBUG(1) << " -- Slice leader\n"; if (join_time > 0 && Node::collect_stat() && ((now()-join_time) > 180000)) { double avg = (double)1000.0*node_live_outbytes/(double)(now()-join_time); if (avg < 0.01) { printf("%llu: me %u what?! avg %.2f too small last join %llu total_bytes_sent %u\n", now(),ip(),avg, join_time, node_live_outbytes); }else{ OneHop::sliceleader_bw_avg.push_back(avg); } } } //else DEBUG(1) << "\n"; num_nodes--; crashes++; if (outer_log.size() > 0) nonempty_outers++; if (leader_log.size() > 0) nonempty_leaders++; /*jy: don't delete old loctable delete loctable; loctable = New OneHopLocTable(_k, _u); loctable->size(); loctable->set_timeout(0); //no timeouts on loctable entries */ loctable->del_all(); leader_log.clear(); outer_log.clear(); low_to_high.clear(); high_to_low.clear(); _join_complete = false; _wkn.ip = 0;}voidOneHop::join_leader(IDMap la, IDMap sender, Args *args) { IPAddress leader_ip = la.ip; DEBUG(1) << now() << ":" << ip() << "," << printID(id()) << " joining " << leader_ip << " in slice " << slice(id()) << endl; assert(leader_ip); join_leader_args ja; join_leader_ret* jr = New join_leader_ret (_k, _u); ja.key = id(); ja.ip = ip(); //send mesg to node, if it is slice leader will respond with //routing table, else will respond with correct slice leader record_stat( me.ip,leader_ip,ONEHOP_JOIN_LOOKUP,1); bool ok = doRPC (leader_ip, &OneHop::join_handler, &ja, jr, TIMEOUT(me.ip,leader_ip)); if (ok) record_stat(leader_ip,me.ip,ONEHOP_JOIN_LOOKUP,jr->table.size(),1); if (!alive()) { delete jr; return; } bool tmpok = false; if (ok && !jr->is_join_complete) { DEBUG(1) << now()<< ":" << ip() << "," << printID(id()) << " the leader " << leader_ip << " is still in the join process\n"; ok = false; tmpok = true; } if (ok) { //Anjali: Fix below -- will cause n pain later, check timestamp before ignoring //if (jr->exists) return; if (jr->is_slice_leader) { //found correct slice leader or there is no leader, should have initial routing table for (uint i=0; i < jr->table.size(); i++) { DEBUG_MSG(jr->table[i],"join",la); loctable->add_node(jr->table[i]); } _join_complete = true; num_nodes++; joins++; LogEntry *e = New LogEntry(me, ALIVE, now()); leader_log.push_back(*e); delete e; DEBUG(1) << now() << ":" << ip() <<"," << printID(id()) << ":Joined successfully" << endl; stabilize ((void *)0); } else { //should contain updated slice leader info IDMap new_leader = jr->leader; DEBUG(1) << now() << ":" << ip() << "," << printID(id()) << " join_leader new " << new_leader.ip << "," << printID(id()) << endl; join_leader(new_leader, la, args); } } else { //the process failed somewhere, try to contact well known node again //if it could successfully inform, good, else schedule after some time //bool dead_ok = false; //node is really dead, not just in the process of joining, //inform redirecting node if (!tmpok) { DEBUG(4) << now() << ":" << ip() << "," << printID(id()) << ": " << leader_ip << "failed repeating informing " << sender.ip << " that " << leader_ip << " has failed " << endl; //dead_ok = inform_dead (la, sender); test_inform_dead_args *aa = New test_inform_dead_args; aa->justdelete = false; aa->suspect = la; aa->informed = sender; delaycb(0,&OneHop::test_dead_inform,aa); } DEBUG(1) << now() << ":" << ip() << "," << printID(id()) << "rescheduling join" << endl; delaycb (_retry_timer, &OneHop::join, (Args *)0); } delete jr;}voidOneHop::join_handler(join_leader_args *args, join_leader_ret *ret) { CHID node = args->key; //contacted looks into loctable and checks if it is the correct leader //or if there is no correct leader DEBUG(3) << now() << ":" << ip() << "," << printID(id()) << ": Contacted by " << args->ip << " for join\n"; if (!_join_complete) { ret->is_join_complete = false; return; } ret->is_join_complete = true; if (is_slice_leader(node, id()) || loctable->is_empty(node)) { ret->is_slice_leader = true; ret->table = loctable->get_all(); IDMap newnode; newnode.id = node; newnode.ip = args->ip; DEBUG_MSG(newnode,"join_handler",newnode); loctable->add_node(newnode); LogEntry *e = New LogEntry(newnode, ALIVE, now()); leader_log.push_back(*e); delete e; DEBUG(3) << now() << ":" << ip() << "," << printID(id()) << ":accepting " << newnode.ip << " for join\n"; } else { ret->is_slice_leader = false; ret->leader = slice_leader(node); DEBUG(3) << now() << ":" << ip() << "," << printID(id()) << ":not slice leader, not empty, so redirecting "<< args->ip << " to " << (ret->leader).ip <<endl; }}OneHop::~OneHop() { if (alive() && _join_complete) { DEBUG(1) << now() << ":" << ip() << "," << printID(id()) <<":In slice " << slice(id()) << endl; if (is_slice_leader(me.id, me.id)) { DEBUG(1) << now() << ":" << ip() << "," << printID(id()) << ":Slice leader of slice " << slice(me.id) << endl; } } if (me.ip == 1) { if (OneHop::num_violations > 0) printf("Number of violations of sending too big a message %u\n", OneHop::num_violations); Node::print_stats(); printf("<-----STATS----->\n"); sort(OneHop::sliceleader_bw_avg.begin(),OneHop::sliceleader_bw_avg.end()); uint sz = OneHop::sliceleader_bw_avg.size(); if (sz > 0) printf("SLICELEADER_BW:: 50p:%.2f 90p:%.2f 95p:%.2f 99p:%.2f sz:%u\n", OneHop::sliceleader_bw_avg[(uint)(sz*0.5)], OneHop::sliceleader_bw_avg[(uint)(sz*0.9)], OneHop::sliceleader_bw_avg[(uint)(sz*0.95)], OneHop::sliceleader_bw_avg[(uint)(sz*0.99)],sz); printf("<-----ENDSTATS----->\n"); } delete loctable;} voidOneHop::stabilize(void* x){ if (!alive()) return; if (!_join_complete) return; DEBUG(1) << now() << ":" << ip() << "," << printID(id()) <<" stabilize last " << last_stabilize << " slice leader " << slice(id()) << " unit leader " << unit(id()) << endl; last_stabilize = now(); /* //if (now() < 2000000) // countertime = now(); if ((now() >= 2000000) && (now() < 2100000)) //if (((now() - countertime) % 5000) == 0) { // DEBUG(1) << ip() << "Stabilizing \n"; if (is_slice_leader(me.id, me.id)) DEBUG(1) << ip() << ":Slice leader of slice "<< slice(id()) << endl; //} */ if (is_slice_leader(me.id, me.id) || is_unit_leader(me.id, me.id)) leader_stabilize((void *)0); if (!alive()) return; notifyevent_args na, piggyback; na.sender = me; piggyback.sender = me; piggyback.log.clear(); bool ok = false; //I may have been slice leader some time, not any more. inform slice leader while ((!is_slice_leader(me.id, me.id)) && (leader_log.size () > 0) && (na.log.size() < MAX_IDS_MSG)) { //jy: limit size of one msg DEBUG_MSG(leader_log.front()._node,"stabilize non-leader leader->na log", me); na.log.push_back(leader_log.front()); leader_log.pop_front(); } while ((!is_slice_leader(me.id, me.id)) && (outer_log.size () > 0) && (na.log.size() < MAX_IDS_MSG)) { //jy: limit size of one msg DEBUG_MSG(outer_log.front()._node,"stabilize non-leader outer->na log", me); na.log.push_back(outer_log.front()); outer_log.pop_front(); } while ((low_to_high.size() > 0) && (piggyback.log.size()< MAX_IDS_MSG)){ //jy: limit size of one msg DEBUG_MSG(low_to_high.front()._node,"stabilize non-leader low_to_high->piggyback log", me); piggyback.log.push_back(low_to_high.front()); low_to_high.pop_front(); } //successor ping while (!ok) { IDMap succ = loctable->succ(me.id+1); //ping the successor to see if it is alive and piggyback a log general_ret gr; piggyback.up = 1; bw data = 4*piggyback.log.size(); ok = fd_xRPC(succ.ip,&OneHop::ping_handler,&piggyback,&gr, ONEHOP_PING, piggyback.log.size()); if (ok) record_stat(succ.ip,me.ip,ONEHOP_PING,0,1); DEBUG(4) << now() << ":" << ip() << "," << printID(me.id) << " succ " << succ.ip << "," << printID(succ.id) << " ok? " << (ok?1:0) << endl; if (!alive()) return; //very ugly hack to fix accouting -- clean up if (!((slice(succ.id) == slice(me.id)) && (unit(succ.id) == unit(me.id)))) { if (is_slice_leader(me.id, me.id)) { leader_bandwidth = leader_bandwidth - 20 - data; leader_messages--; } else { bandwidth = bandwidth - 20 - data; messages--; } } if (!ok) { if (me.id != succ.id) loctable->del_node(succ.id); DEBUG(5) << now() << ":" << ip() << "," << printID(id()) <<":PING! Informing " << slice_leader(me.id).ip <<" that successor "<< succ.ip << " is dead\n"; LogEntry *e = New LogEntry(succ, DEAD, now()); na.log.push_back(*e); piggyback.log.push_back(*e); delete e; } else if (!gr.has_joined) { if (me.id != succ.id) loctable->del_node(succ.id); } ok = ok && gr.has_joined; //if (ok && (!gr.correct)) // loctable->add_node(gr.act_neighbor); } //predecessor ping and piggyback piggyback.log.clear(); while ((high_to_low.size() > 0) && (piggyback.log.size() < MAX_IDS_MSG)) {//jy: limit max ids in one msg piggyback.log.push_back(high_to_low.front()); high_to_low.pop_front(); } ok = false; while (!ok) { IDMap pred = loctable->pred(me.id-1); DEBUG(4) << now() << ":" << ip() << "," << printID(id()) << " pred " << pred.ip << "," << printID(pred.id) << " ok? " << (ok?1:0) << endl; general_ret gr; piggyback.up = 0; bw data = 20+ 4*piggyback.log.size(); ok = fd_xRPC(pred.ip, &OneHop::ping_handler, &piggyback, &gr, ONEHOP_PING,piggyback.log.size()); record_stat(pred.ip,me.ip,ONEHOP_PING, 0,1); if (!alive()) return; //very ugly hack to fix accouting -- change after deadline if (!((slice(pred.id) == slice(me.id)) && (unit(pred.id) == unit(me.id)))) { if (is_slice_leader(me.id, me.id)) { leader_bandwidth = leader_bandwidth - data; leader_messages--; } else { bandwidth = bandwidth - data; messages--; } } if (!ok) { if (me.id != pred.id) loctable->del_node(pred.id); DEBUG(5) << now() << ":" << ip() << "," << printID(id()) <<":PING! Informing " << slice_leader(me.id).ip << " that predecessor "<< pred.ip << " is dead\n"; LogEntry *e = New LogEntry(pred, DEAD, now()); na.log.push_back(*e); piggyback.log.push_back(*e); delete e; } else if (!gr.has_joined) { if (me.id != pred.id) loctable->del_node(pred.id); } ok = ok && gr.has_joined; //if (ok && (!gr.correct)) // loctable->add_node(gr.act_neighbor); } //if either successor or predecessor has died, notify slice leader ok = false; if (na.log.size() > 0) { while (!ok) { IDMap sliceleader = loctable->slice_leader(me.id); general_ret gr; DEBUG(1) << now() << ":" << ip() << "," << printID(id()) << " stabilize "<< " notifyevent log sz " << na.log.size() << "to sliceleader "<<sliceleader.ip << endl; ok = fd_xRPC(sliceleader.ip, &OneHop::notifyevent_handler,&na,&gr, ONEHOP_NOTIFY,na.log.size()); if (ok) record_stat(sliceleader.ip,me.ip,ONEHOP_NOTIFY,0,1); if (!alive()) return; if (!ok) { LogEntry *e = New LogEntry(sliceleader, DEAD, now()); na.log.push_back(*e); if (me.id != sliceleader.id) loctable->del_node(sliceleader.id); DEBUG(5) << now() << ":" << ip() <<"," << printID(id()) <<":PING! Informing " << slice_leader(me.id).ip << " that old slice leader "<< sliceleader.ip << " is dead\n"; delete e; } else if (!gr.has_joined) { if (me.id != sliceleader.id) loctable->del_node(sliceleader.id); } ok = ok && gr.has_joined; if (ok) { if (gr.act_sliceleader.ip) { DEBUG_MSG(gr.act_sliceleader,"stabilize",sliceleader); loctable->add_node(gr.act_sliceleader); } } } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -