📄 accordion.c
字号:
if (t) { if (lr->v.size() == 0) { ADEBUG(2) << "done lookup key " << printID(la->key) << "from " << la->from.ip << "," << printID(la->from.id) << "failed " << endl; record_lookup_stat(_me.ip, la->from.ip, now()-t, false,false, la->hops, la->to_num, la->to_lat); }else{ bool b = check_pred_correctness(la->key, la->from); ADEBUG(2) << "done lookup key " << printID(la->key) << "from " << la->from.ip << "," << printID(la->from.id) << "succ " << lr->v.size() << " " << (lr->v.size()>0?lr->v[0].ip:0) << "," << printID(lr->v.size()>0?lr->v[0].id:0) << "best " << 2*Network::Instance()->gettopology()->latency(_me.ip, la->from.ip) << " lat " << now()-t << " hops " << la->hops << " timeouts " << la->to_num << " correct? " << (b?1:0) << endl; if (now()-t > 4000) { record_lookup_stat(_me.ip, la->from.ip, now()-t, false, false, la->hops, la->to_num, la->to_lat); }else{ record_lookup_stat(_me.ip, la->from.ip, now()-t, true, b, la->hops, la->to_num, la->to_lat); } for (uint i = 0; i < lr->v.size();i++) loctable->add_node(lr->v[i]); } _outstanding_lookups.remove(la->key); }}voidAccordion::next_recurs(lookup_args *la, lookup_ret *lr){ IDMap succ = loctable->succ(_me.id+1); IDMap pred = loctable->pred(_me.id-1); //update my alivetime //XXX bug?? la->nexthop.alivetime = now() - _last_joined_time; //learn the src node if this query la->src.timestamp = now(); loctable->add_node(la->src); la->from.timestamp = now(); loctable->add_node(la->from); if (lr && la->overshoot) { if (_rate_queue->very_critical() || _rate_queue->critical()) la->learnsz = 1; if (!_fixed_stab_int) lr->v = loctable->get_closest_in_gap(la->learnsz, _me.id, la->overshoot, la->from, 20*80*1000/_bw_overhead,la->timeout?la->timeout:est_timeout(0.1)); else lr->v = loctable->get_closest_in_gap(la->learnsz, _me.id,la->overshoot, la->from, 20*_fixed_stab_int,_fixed_stab_to); if (!lr->v.size()) { if (!_rate_queue->critical()) lr->v = loctable->succs(_me.id+1,la->learnsz); else lr->v.clear(); lr->is_succ = true; }else{ lr->is_succ = false; } } //if i have forwarded pkts for this key //and the packet is droppable if (_forwarded.find(la->src.id | la->key)) { ADEBUG(3) << "next_recurs key " << printID(la->key) << "src " << la->src.ip << " ori " << la->ori.ip << " from " << la->from.ip << " forwarded before no_drop? " << (la->no_drop?1:0) << endl; if (!la->no_drop) return; }else _forwarded.insert(la->src.id | la->key, now()); //for adjusting parallelisms in the next discrete interval if (now() >= _next_adjust) adjust_parallelism(); _lookup_times++; if (!succ.ip || succ.ip == _me.ip) { ADEBUG(4) << "next_recurs not joined key " << printID(la->key) << "failed" << endl; if ((!_join_scheduled) || (now()-_join_scheduled) > 20000) delaycb(0,&Accordion::join, (Args *)0); //join again if (lr) { lr->v.clear(); lr->done = false; } return; } if (succ.ip && ConsistentHash::between(_me.id,succ.id,la->key)) { lookup_args *lla = New lookup_args; bcopy(la,lla,sizeof(lookup_args)); lla->from = _me; lla->from.alivetime = now()-_last_joined_time; lla->nexthop = lla->src; lookup_ret *llr = New lookup_ret; llr->done = true; llr->v = loctable->succs(_me.id+1,la->m); ADEBUG(3) << "next_recurs key " << printID(la->key) << "src " << la->src.ip << " ori " << la->ori.ip << " from " << la->from.ip << " no_drop? " << (la->no_drop?1:0) << " done succ " << succ.ip << "," << printID(succ.id) << " quota " << _rate_queue->quota() << " qsz " << _rate_queue->size() << " hops " << lla->hops << endl; _rate_queue->do_rpc(lla->src.ip, &Accordion::donelookup_handler, &Accordion::null_cb, lla, llr, lla->ori.ip?0:1, lla->type, PKT_SZ(2*llr->v.size(),0), PKT_SZ(0,0),TIMEOUT(_me.ip,lla->src.ip)); return; } double ttt; int para; if (!_fixed_stab_int) { para = (_rate_queue->quota()+_burst_sz)/(2*PKT_OVERHEAD+ 8*_learn_num); if (!la->no_drop && _rate_queue->critical()) para = 1; if (para > _parallelism) para = _parallelism; else if (para > la->parallelism) para = la->parallelism; else if (para <= 0) para = 1; ttt = para > 1? est_timeout((exp(log(0.1)/(double)para))):est_timeout(0.1); }else{ para = _parallelism; ttt = _fixed_lookup_to; } //vector<IDMap> nexthops = loctable->preds(la->key, para, LOC_HEALTHY, ttt); vector<IDMap> nexthops = loctable->next_close_hops(la->key, para, _me,ttt); if (_fixed_stab_int) ttt = _fixed_stab_to; /* else if (ttt > 0.5) ttt = est_timeout(0.5); */ uint nsz = nexthops.size(); if (((nsz == 0) || (nexthops[0].ip == _me.ip)) && succ.ip) { fprintf(stderr,"%llu para %u _para %u shit! %u,%qx key %qx succ %qx\n",now(), para, _parallelism, _me.ip,_me.id, la->key, succ.id); abort(); } ADEBUG(3) << "next_recurs " << printID(la->key) << " para " << para << " nsz " << nsz << ": " << print_succs(nexthops) << endl; IDMap overshoot = loctable->succ(la->key); bool sent_success; uint i; for (i = 0; i < nexthops.size(); i++) { if (!ConsistentHash::between(_me.id,la->key,nexthops[i].id)) break; lookup_args *lla = New lookup_args; bcopy(la,lla,sizeof(lookup_args)); if ((la->no_drop) && i == 0) { lla->no_drop = true; } else { lla->no_drop = false; } if (_rate_queue->critical()) lla->learnsz = 1; lla->timeout = (Time) ttt; lla->hops = la->hops+1; lla->nexthop = nexthops[i]; lla->from = _me; //lla->overshoot = overshoot.id; lla->overshoot = (i>=1)?nexthops[i-1].id:la->key; lookup_ret *llr = New lookup_ret; llr->v.clear(); ADEBUG(3) << "next_recurs key " << printID(la->key) << " quota " << _rate_queue->quota() << " qsz " << _rate_queue->size() << " locsz " << loctable->size(LOC_HEALTHY, ttt) << " livesz " << loctable->live_size(ttt) << " src " << la->src.ip << " ori " << la->ori.ip << " from " << la->from.ip << " forward to next " << nexthops[i].ip << "," << printID(nexthops[i].id) << "," << now()-nexthops[i].timestamp << "," << nexthops[i].alivetime << " to " << (double)nexthops[i].alivetime/(double)(nexthops[i].alivetime+ now()-nexthops[i].timestamp) << " hops " << lla->hops << " est_to " << ttt << " est_sz " << _stat.size() << " nsz " << nsz << " i " << i << " nodrop " << lla->no_drop << endl; if (nexthops[i].ip == _me.ip) { fprintf(stderr,"what?! loop?!!!\n"); exit(3); } sent_success = _rate_queue->do_rpc(nexthops[i].ip, &Accordion::next_recurs, &Accordion::next_recurs_cb, lla,llr, lla->ori.ip?0:(lla->no_drop?1:3), lla->type, PKT_SZ(1,0), PKT_SZ(2*lla->learnsz,0),TIMEOUT(_me.ip,nexthops[i].ip)); if (!sent_success) break; } if ((la->no_drop) && (i>0)) _forwarded_nodrop.insert(la->src.id | la->key, (uint)i);}intAccordion::next_recurs_cb(bool b, lookup_args *la, lookup_ret *lr){ int ret_sz = 0; if (alive()) { IDMap succ = loctable->succ(_me.id+1); Time delta = now()-la->nexthop.timestamp; if (delta < la->nexthop.alivetime && delta > 5000) add_stat((double)(la->nexthop.alivetime-delta)/(double)(la->nexthop.alivetime), b); la->nexthop.timestamp = now(); if (b && (la->nexthop.ip!=_me.ip)){ if ((lr->is_succ) && (lr->v.size() > 0)){ loctable->update_ifexists(la->nexthop,true); vector<IDMap> oldlist = loctable->between(la->nexthop.id+1,lr->v[lr->v.size()-1].id); consolidate_succ_list(la->nexthop,oldlist,lr->v,false); vector<IDMap> newlist = loctable->between(la->nexthop.id+1,lr->v[lr->v.size()-1].id); ADEBUG(4) << "next_recurs_cb After consolidate: " << print_succs(newlist) << endl; } else { loctable->update_ifexists(la->nexthop); for (uint i = 0; i < lr->v.size(); i++) { IDMap xx = loctable->succ(lr->v[i].id,LOC_HEALTHY); if (xx.ip == lr->v[i].ip && xx.timestamp < lr->v[i].timestamp && lr->v[i].timestamp - xx.timestamp > 5000) add_stat((double)xx.alivetime/(double)(lr->v[i].timestamp - xx.timestamp+xx.alivetime), true); loctable->add_node(lr->v[i]); } } ADEBUG(4) << "next_recurs_cb key " << printID(la->key) << "src " << la->src.ip << " ori " << la->ori.ip << " from " << la->nexthop.ip << "," << printID(la->nexthop.id) << " learnt " << lr->v.size() << ": " << print_succs(lr->v) << " nodes is_succ " << (lr->is_succ?1:0) << " overshoot " << printID(la->overshoot) << " locsz " << loctable->size(LOC_HEALTHY,0.9) << " livesz " << loctable->live_size(0.9) << endl; ret_sz = PKT_SZ(2*lr->v.size(),0); } else { ADEBUG(4) << "next_recurs_cb key " << printID(la->key) << "src " << la->src.ip << " nexthop " << la->nexthop.ip << " dead " << (b?0:1) << endl; if (!b) loctable->del_node(la->nexthop); ret_sz = 0; } uint outstanding = _forwarded_nodrop.find(la->src.id | la->key); if (outstanding) { if (b && la->no_drop) { _forwarded_nodrop.remove(la->key|la->src.id); ADEBUG(3) << "next_recurs_cb key " << printID(la->key) << "src " << la->src.ip << " ori " << la->ori.ip << " from " << la->nexthop.ip << "," << printID(la->nexthop.id) << "ori " << la->ori.ip << " successfully forwarded nodrop" << endl; }else if (outstanding == 1) { //send again, to myself la->to_lat += TIMEOUT(_me.ip, la->nexthop.ip); la->to_num++; la->no_drop = true; _forwarded_nodrop.remove(la->key|la->src.id); la->parallelism = 1; ADEBUG(3) << "next_recurs_cb key " << printID(la->key) << "src " << la->src.ip << " ori " << la->ori.ip << " from " << la->nexthop.ip << "," << printID(la->nexthop.id) << "ori " << la->ori.ip << (b?" live":" dead") << " restransmit" << endl; next_recurs(la,lr); } else { ADEBUG(3) << "next_recurs_cb key " << printID(la->key) << "src " << la->src.ip << " ori " << la->ori.ip << " from " << la->nexthop.ip << "," << printID(la->nexthop.id) << "ori " << la->ori.ip << (b?" live":" dead") << " outstanding " << (outstanding-1) << endl; _forwarded_nodrop.insert(la->src.id|la->key,(outstanding-1)); } }else { if (succ.ip == la->nexthop.ip && (!b)) { ADEBUG(3) << "next_recurs_cb key " << printID(la->key) << "src " << la->src.ip << " ori " << la->ori.ip << " new succ emerged " << endl; la->no_drop = true; next_recurs(la,lr); } else ADEBUG(3) << "next_recurs_cb key " << printID(la->key) << "src " << la->src.ip << " ori " << la->ori.ip << " from " << la->nexthop.ip << "," << printID(la->nexthop.id) << "ori " << la->ori.ip << (b?" live":" dead") << " dont care" << (outstanding-1) << endl; } } delete la; if (lr) delete lr; return ret_sz;}/* ------------- fix successor routines ----------------- */voidAccordion::stab_succ(void *x){ if (!alive()) { _stab_basic_running = false; return; } fix_pred(NULL); _last_stab = now(); delaycb(_stab_basic_timer, &Accordion::stab_succ, (void *)0);}voidAccordion::fix_pred(void *a){ if (!alive()) return; IDMap pred = loctable->pred(_me.id-1); get_predsucc_args *gpa = New get_predsucc_args; get_predsucc_ret *gpr = New get_predsucc_ret; gpa->m = 1; gpa->n = pred; gpa->src = _me; gpa->src.alivetime = now()-_last_joined_time; gpr->v.clear(); ADEBUG(3) << " fix_pred " << pred.ip << "," << printID(pred.id) << " quota " << _rate_queue->quota() << endl; _rate_queue->do_rpc(pred.ip, &Accordion::get_predsucc_handler, &Accordion::fix_pred_cb, gpa, gpr, 0, TYPE_FIXPRED_UP, PKT_SZ(0,1), PKT_SZ(2,1), TIMEOUT(_me.ip,pred.ip));}intAccordion::fix_pred_cb(bool b, get_predsucc_args *gpa, get_predsucc_ret *gpr){ int ret_sz = 0; if (alive()) { gpa->n.timestamp = now(); ADEBUG(4) << "fix_pred_cb pred " << gpa->n.ip << (b?" alive":" dead") << endl; if (b) { ret_sz = PKT_SZ(2,0); loctable->update_ifexists(gpa->n); if (gpr->v.size()>0) loctable->add_node(gpr->v[0]); } else { loctable->del_node(gpa->n,true); } delaycb(10000,&Accordion::fix_succ,(void*)0); } delete gpa; delete gpr; return ret_sz;}void Accordion::fix_succ(void *a){ if (!alive()) return; IDMap succ = loctable->succ(_me.id+1, LOC_DEAD-1); if (succ.ip == 0) { ADEBUG(1) << "fix_succ locsz " << loctable->size() << " reschedule join" << endl; if ((!_join_scheduled) || (now()-_join_scheduled) > 20000) delaycb(200, &Accordion::join, (Args *)0); return; } get_predsucc_args *gpa = New get_predsucc_args; get_predsucc_ret *gpr = New get_predsucc_ret; gpr->v.clear(); gpa->n = succ; gpa->src = _me; gpa->src.alivetime = now()-_last_joined_time; vector<IDMap> scs = loctable->succs(_me.id+1,_nsucc); if (scs.size() > _nsucc/2) gpa->m = 1; else gpa->m= _nsucc; ADEBUG(2) << "fix_succ succ " << succ.ip << "," << printID(succ.id) << endl; _rate_queue->do_rpc(succ.ip, &Accordion::get_predsucc_handler, &Accordion::fix_succ_cb, gpa,gpr, 0, TYPE_FIXSUCC_UP, PKT_SZ(0,1), PKT_SZ(2*gpa->m,0), TIMEOUT(_me.ip,succ.ip));}voidAccordion::get_predsucc_handler(get_predsucc_args *gpa, get_predsucc_ret *gpr){ if (gpa->n.ip == _me.ip) gpa->n.alivetime = now()-_last_joined_time; else abort(); gpr->pred = loctable->pred(_me.id-1); if (gpa->m) gpr->v = loctable->succs(_me.id+1,gpa->m); gpa->src.timestamp = now(); loctable->add_node(gpa->src);}intAccordion::fix_succ_cb(bool b, get_predsucc_args *gpa, get_predsucc_ret *gpr){ int ret_sz = 0; if (alive()) { vector<IDMap> scs = loctable->succs(_me.id + 1, _nsucc); ADEBUG(3) << "fix_succ_cb get " << gpr->v.size() << " succs, old succ " << gpa->n.ip << "," << printID(gpa->n.id) << (b?" alive":" dead") << " succsz " << scs.size() << "(" << print_succs(scs) << ")" << endl; gpa->n.timestamp = now(); if (b) { ret_sz = PKT_SZ(1+2*gpr->v.size(),0); IDMap succ = loctable->succ(_me.id+1); loctable->update_ifexists(gpa->n); if (gpr->pred.ip == _me.ip) { }else if (gpr->pred.ip != _me.ip) { loctable->add_node(gpr->pred); IDMap newsucc = loctable->succ(_me.id+1); } consolidate_succ_list(gpa->n,scs,gpr->v); vector<IDMap> newscs = loctable->succs(_me.id+1,100); if (newscs.size() > 0) { _est_n = ((ConsistentHash::CHID)-1)/ConsistentHash::distance(_me.id,newscs[newscs.size()-1].id); _est_n = newscs.size()*_est_n; } ADEBUG(3) << "fix_succ_cb pred " << gpr->pred.ip << " new succ " << (newscs.size()>0?newscs[0].ip:0) << "," << (newscs.size()>0?printID(newscs[0].id):"??") << " succsz " << newscs.size() << "(" << print_succs(newscs) << ")" << " retsz " << ret_sz << " newsz " << gpr->v.size() << " est_n " << _est_n << endl; //delaycb(200,&Accordion::fix_pred,(void *)0); } else { loctable->del_node(gpa->n); //XXX: don't delete after one try? delaycb(200,&Accordion::fix_succ, (void *)0); } }else{ _stab_basic_running = false; } delete gpa; delete gpr; return ret_sz;}voidAccordion::consolidate_succ_list(IDMap n, vector<IDMap> oldlist, vector<IDMap> newlist, bool is_succ){ IDMap sss; IDMap mee; uint oldi = 0, newi = 0; if (is_succ) { while (oldi < oldlist.size()) { if (oldlist[oldi].ip == n.ip) break; oldi++; } oldi++; mee = _me;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -