📄 accordion.c
字号:
}else{ mee = n; } while (1) { if (newi >= newlist.size()) break; if (oldi >= oldlist.size()) { if (is_succ) { // i need to clean up some shit if ((oldi-1>=0) && (oldlist.size()>0)) sss = loctable->succ(oldlist[oldi-1].id); } while (newi < newlist.size()) { if (is_succ) { while (newlist[newi].ip!=sss.ip && ConsistentHash::between(mee.id,newlist[newi].id,sss.id)) { sss.timestamp = now(); loctable->del_node(sss); sss = loctable->succ(sss.id+1); } } loctable->add_node(newlist[newi],is_succ,false,0,0,true); newi++; oldi++; if (is_succ) { sss = loctable->succ(newlist[newi-1].id+1); } if (is_succ && oldi == _nsucc) break; } if (is_succ && oldi == _nsucc) loctable->last_succ(newlist[(newi-1)]); while (newi < newlist.size()) { loctable->add_node(newlist[newi],is_succ,false,0,0,true); newi++; } break; } if (oldlist[oldi].ip == newlist[newi].ip) { loctable->add_node(newlist[newi],is_succ,false,0,0,true); newi++; oldi++; }else if (ConsistentHash::between(_me.id, newlist[newi].id, oldlist[oldi].id)) { oldlist[oldi].timestamp = now(); loctable->del_node(oldlist[oldi]);//XXX the timestamp is not very correct oldi++; }else { loctable->add_node(newlist[newi],is_succ,false,0,0,true); newi++; } } if (is_succ) { vector<IDMap> updated = loctable->succs(_me.id+1,_nsucc); ADEBUG(4) << "consolidate fix_succ : old (" <<print_succs(oldlist) << ") new: (" << print_succs(newlist) << ") updated (" << print_succs(updated) << ")" << endl; }}/* ---------------------------- empty queue ------------------------------ */voidAccordion::empty_cb(void *x) //wrapper function{ Accordion *c = (Accordion *)x; return c->empty_queue(NULL);}voidAccordion::empty_queue(void *a) { IDMap succ = loctable->succ(_me.id+1); if (!succ.ip){ if (!_join_scheduled || (now()-_join_scheduled)>20000) { ADEBUG(4) << "empty_queue locsz " << loctable->size() << " reschedule join" << endl; delaycb(0,&Accordion::join,(Args *)0); } return; } if (now()>=_next_adjust) adjust_parallelism(); _empty_times++; IDMap askwhom,pred, next; askwhom.ip = _me.ip; double oldest = 0.0; uint op = 0; double tt = _tt; while ((pred.ip == _me.ip) || (!pred.ip)){ if (_fixed_stab_int) //oldest = loctable->pred_biggest_gap(pred, next, 20*_fixed_stab_int, _tt); op = loctable->sample_smallworld(_est_n, askwhom, pred, next, tt, _max_succ_gap); else //oldest = loctable->pred_biggest_gap(pred, next, 20*80*1000/_bw_overhead, _tt); op = loctable->sample_smallworld(_est_n, askwhom, pred,next, tt, _max_succ_gap); tt = 1-((1-tt)/2.0); if (tt > 0.9) break; } if (askwhom.ip == _me.ip || !askwhom.ip) { ADEBUG(4) << "nothing to learn oldest " << oldest << " locsz " << loctable->size() << endl; return; } if ((askwhom.ip != pred.ip) && (!ConsistentHash::between(askwhom.id,next.id,pred.id) || askwhom.ip == next.ip)) fprintf(stderr,"%llu %u %u %u %u %u fuck!\n", now(), _me.ip, askwhom.ip, pred.ip, next.ip, ((Accordion *)Network::Instance()->getnode(askwhom.ip))->budget()); learn_args *la = New learn_args; learn_ret *lr = New learn_ret; Time to = _stab_basic_timer; la->m = _learn_num; la->timeout = (Time) _tt; la->n = askwhom; la->src = _me; la->src.alivetime = now()-_last_joined_time; la->start = pred; la->end = next; ADEBUG(2) << "empty_queue quota " << _rate_queue->quota() << " succsz " << loctable->succ_size() << " locsz " << loctable->size() << " livesz " << loctable->live_size() << " locsz_used " << loctable->size(LOC_HEALTHY,_tt) << " livesz_used " << loctable->live_size(_tt) << " learn from " << la->n.ip << "," << printID(la->n.id) << " start " << la->start.ip << "," << printID(la->start.id) << " end " << la->end.ip << "," << printID(la->end.id) << " old " << (now()-la->n.timestamp) << " para " << _parallelism << " est_tt " << _tt << " op " << op << " statsz " << _stat.size() << " est_n " << _est_n << endl; _rate_queue->do_rpc(askwhom.ip, &Accordion::learn_handler, &Accordion::learn_cb, la, lr, 3, TYPE_FINGER_UP, PKT_SZ(0,1), PKT_SZ(2*la->m,0),TIMEOUT(_me.ip,pred.ip));}voidAccordion::learn_handler(learn_args *la, learn_ret *lr){ la->src.timestamp = now(); loctable->add_node(la->src); IDMap pred = loctable->pred(_me.id-1); lr->is_succ = false; if (la->n.ip == _me.ip) la->n.alivetime = now()-_last_joined_time; else abort(); if (_rate_queue->very_critical()) { IDMap succ = loctable->succ(_me.id+1); lr->v.clear(); if (succ.ip) lr->v.push_back(succ); lr->is_succ = true; }else { if (la->n.ip == _me.ip) { vector<IDMap> scs = loctable->succs(_me.id+1,_nsucc); if (scs.size() && ConsistentHash::between(_me.id,scs[scs.size()-1].id,la->end.id)) { lr->v = loctable->succs(_me.id+1,la->m); lr->is_succ = true; } } if (!lr->is_succ || la->n.ip!=_me.ip) { lr->v = loctable->get_closest_in_gap(la->m, la->start.id,la->end.id, la->src, 20*80*1000/_bw_overhead, la->timeout); if (lr->v.size() < la->m) { lr->v = loctable->succs(la->start.id+1,la->m); } } } ADEBUG(4) << "learn_handler from src " << la->src.ip << " is_succ " << lr->is_succ << " nodes " << print_succs(lr->v) << endl;}intAccordion::learn_cb(bool b, learn_args *la, learn_ret *lr){ uint ret_sz = 0; if (alive()) { Time delta = now()-la->n.timestamp; if (delta < la->n.alivetime && delta > 5000) add_stat((double)(la->n.alivetime-delta)/(double)(la->n.alivetime),b); la->n.timestamp = now(); if (b) { IDMap neighborsucc; if (lr->is_succ) { /* if (lr->v.size()>0) neighborsucc = lr->v[0]; else neighborsucc = la->end; */ loctable->update_ifexists(la->n,true); if (lr->v.size()>0 &&ConsistentHash::distance(la->n.id,lr->v[0].id) > _max_succ_gap) _max_succ_gap = ConsistentHash::distance(la->n.id,lr->v[0].id); //ConsistentHash::CHID gap = ConsistentHash::distance(la->n.id,neighborsucc.id); //_max_succ_gap = gap; if (lr->v.size() > 0) { vector<IDMap> oldlist = loctable->between(la->n.id+1,lr->v[lr->v.size()-1].id); consolidate_succ_list(la->n,oldlist,lr->v,false); if (p2psim_verbose) { vector<IDMap> newlist = loctable->between(la->n.id+1,lr->v[lr->v.size()-1].id); ADEBUG(4) << "learn_cb After consolidate " << la->n.ip << "," << la->n.alivetime << " : " << print_succs(newlist) << endl; } } } else { if (!lr->v.size()) { fprintf(stderr,"%llu me %u from %u,%qx,%llu %qx:%qx\n", now(),_me.ip,la->n.ip,la->n.id,la->n.alivetime,la->start.id,la->end.id); }else { neighborsucc = lr->v[0]; loctable->update_ifexists(la->n); for (uint i = 0; i < lr->v.size(); i++) { IDMap xx = loctable->succ(lr->v[i].id,LOC_HEALTHY); if (xx.ip == lr->v[i].ip && xx.timestamp < lr->v[i].timestamp) add_stat((double)xx.alivetime/(double)(lr->v[i].timestamp - xx.timestamp+xx.alivetime), true); loctable->add_node(lr->v[i]); } } } ADEBUG(4) << "learn_cb quota " << _rate_queue->quota() << " locsz " << loctable->size(LOC_HEALTHY) << " usedsz " << loctable->size(LOC_HEALTHY,la->timeout) << " to " << la->timeout << " livesz " << loctable->live_size(la->timeout) << " learn_cb " << la->m << " from " << la->n.ip << ":" << la->end.ip << " " << printID(la->n.id) << ":" << printID(la->end.id) << " maxgap " << printID(_max_succ_gap) << " is_succ " << (lr->is_succ?1:0) << " sz " << lr->v.size() << " est_n " << _est_n << " ex " << neighborsucc.alivetime << "," << now()-neighborsucc.timestamp << " : " << print_succs(lr->v) << endl; ret_sz = PKT_SZ(2*lr->v.size()+1,0); }else{ loctable->del_node(la->n); //XXX: should not delete a finger after one failure ADEBUG(4) << " learn_cb quota " << _rate_queue->quota() << " node " << la->n.ip << "," << printID(la->n.id)<< "," << (now()-la->n.timestamp) << "," << la->n.alivetime << "," << (double)(la->n.alivetime)/(double)(now()-la->n.timestamp+la->n.alivetime) << " dead " << (b?0:1) << " locsz " << loctable->size(LOC_HEALTHY,la->timeout) << " livesz " << loctable->live_size(la->timeout) << endl; } if (_rate_queue->empty() && !_fixed_stab_int) delaycb(0, &Accordion::empty_queue, (void *)0); } delete la; delete lr; return ret_sz;}stringAccordion::print_succs(vector<IDMap> v){ char buf[1024]; char *b = buf; b += sprintf(b," "); Time nn = now(); for (uint i = 0; i < v.size(); i++) { IDMap haha = v[i]; if (i == v.size()-1) { b += sprintf(b,"%u(%llu:%llu) ", v[i].ip,v[i].alivetime,now()-v[i].timestamp); }else{ b += sprintf(b,"%u(%llu:%llu) ", v[i].ip,v[i].alivetime,now()-v[i].timestamp); } } return string(buf);}stringAccordion::printID(ConsistentHash::CHID id){ char buf[128]; sprintf(buf,"%qx ",id); return string(buf);}bool Accordion::check_pred_correctness(ConsistentHash::CHID k, IDMap n){ IDMap tmp; tmp.id = k; uint pos = upper_bound(ids.begin(),ids.end(),tmp, IDMap::cmp) - ids.begin(); if (pos) pos--; else pos = ids.size()-1; if ((ids[pos].ip == n.ip) || (!Network::Instance()->alive(n.ip))) return true; else { ADEBUG(4) << "key " << printID(k) << "wrong " << n.ip << "," << printID(n.id) << "right " << ids[pos].ip << "," << printID(ids[pos].id) << endl; return false; }}voidAccordion::adjust_parallelism(){ if (!_fixed_stab_int) { uint old_p = _parallelism; if (_empty_times > _lookup_times) _parallelism++; else if (!_empty_times && _lookup_times) _parallelism = _parallelism/2; if (_parallelism < 1) _parallelism = 1; else if (_parallelism > _max_p) _parallelism = _max_p; unsigned long b = Node::collect_stat()?Node::get_out_bw_stat():0; ADEBUG(4) << "adjust_parallelism from " << old_p << " to " << _parallelism << " empty_times " << _empty_times << " lookup_times " << _lookup_times << " bytes " << (b-_last_bytes) << " time " << now()-_last_bytes_time << endl; _last_bytes = b; _last_bytes_time = now(); _empty_times = 0; _lookup_times = 0; while (_next_adjust < now()) _next_adjust += _adjust_interval; _tt = (_parallelism>1)?(est_timeout((exp(log(0.1)/(double)_parallelism)))):est_timeout(0.1); } if ((Node::collect_stat()) && (now()-_last_joined_time>600000)){ uint rsz = loctable->size(LOC_HEALTHY,_tt); rtable_sz.push_back(rsz); if (rtable_sz.size() > 10000) rtable_sz.erase(rtable_sz.begin()); }}voidAccordion::add_stat(double ti, bool live){ return; //XXX i do not think this is important if ((_fixed_stab_int) || (ti <= 0.0) || (ti >= 1.0)) return; Stat s; s.alive = live; s.ti = ti; _stat.push_back(s); if (_stat.size()> EST_TIMEOUT_SZ) adjust_timeout();}doubleAccordion::est_timeout(double prob){ return (1.0-prob); //return _calculated_prob[(uint)(10*prob)];}voidAccordion::adjust_timeout(){ if (_fixed_stab_int) return; uint ssz = _stat.size(); vector<double> new_prob; sort_live.clear(); sort_dead.clear(); for (uint i = 0; i < _stat.size(); i++) { if (_stat[i].alive) sort_live.push_back(_stat[i].ti); else sort_dead.push_back(_stat[i].ti); } sort(sort_live.begin(),sort_live.end()); sort(sort_dead.begin(),sort_dead.end()); _stat.clear(); if (sort_dead.size() == 0 || sort_live.size() ==0) return; new_prob.resize(10); for (uint i = 0; i < 10; i++) new_prob[i] = 0.0; uint lsz = sort_live.size(); uint dsz = sort_dead.size(); uint live = 0, dead = dsz; uint i = 0, j = 0; double tt; while (1) { if (i < lsz && j < dsz) { if (sort_live[i] < sort_dead[j]) { tt = sort_live[i]; live++; i++; }else{ tt = sort_dead[j]; dead--; j++; } }else if (i < lsz) { tt = sort_live[i]; live++; i++; }else if (j < dsz) { tt = sort_dead[j]; dead--; j++; }else break; uint p = (uint)(10.0*dead/(double)(dead+live))+ 1; if (p>=10) p = 9; if (new_prob[p] <= 0.0000001) new_prob[p] = tt; } _last_calculated = now(); ADEBUG(4) << "estimated timeout " << _calculated_prob[1] << endl; if (_calculated_prob[1] > 0.99) { printf("sort_live %u: ",lsz); for (uint i = 0; i < sort_live.size(); i++) printf("%.2f ",sort_live[i]); printf("\n"); printf("sort_dead %u: ",dsz); for (uint i = 0; i < sort_dead.size(); i++) printf("%.2f ",sort_dead[i]); printf("\n"); printf("calculated prob : "); for (uint i = 0; i < _calculated_prob.size();i++) printf("%.2f ",_calculated_prob[i]); printf("\n"); new_prob[1] = 0.95; } for (uint i = 0; i < 10; i++) _calculated_prob[i] = 0.9*_calculated_prob[i] + 0.1*new_prob[i];}#include "p2psim/bighashmap.cc"
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -