📄 accordion.c
字号:
for (HashMap<ConsistentHash::CHID, Time>::iterator i = _dead.begin(); i != _dead.end(); ++i) { list<IDMap>* s = (*i).value(); if (s) delete s; } */}/* ------------------------ lookup ----------------------------------*/voidAccordion::lookup(Args *args){ IDMap succ = loctable->succ(_me.id+1); if (!succ.ip) { if (!_join_scheduled || (now()-_join_scheduled) > 20000) delaycb(0,&Accordion::join,(Args *)0); ADEBUG(2) << "lookup key failed not yet joined" << endl; record_lookup_stat(_me.ip, _me.ip, 0, false, false, 0, 0, 0); return; } lookup_args la; lookup_ret lr; bzero(&la,sizeof(lookup_args)); la.key = args->nget<ConsistentHash::CHID>("key",0,16); la.no_drop = true; la.nexthop = _me; la.from = _me; la.from.alivetime = now()-_last_joined_time; la.src = _me; la.src.alivetime = now()-_last_joined_time; la.src.timestamp = now(); la.ori.ip = 0; la.m = 1; la.parallelism = _parallelism; la.type = TYPE_USER_LOOKUP; la.learnsz = _learn_num; la.deadnodes.clear(); lr.done = false; lr.v.clear(); lr.is_succ = false; _outstanding_lookups.insert(la.key, now()); ADEBUG(2) << "start lookup key " << printID(la.key) << endl; if (_recurs) next_recurs(&la,NULL); else next_iter(&la,&lr);}/* ------------------------ lookup (iterative) ------------------------*/voidAccordion::next_iter(lookup_args *la, lookup_ret *lr){ if (now() >= _next_adjust) adjust_parallelism(); IDMap succ = loctable->succ(_me.id+1); if (ConsistentHash::between(_me.id,succ.id,la->key)) { la->from = _me; la->nexthop = _me; lr->v.push_back(succ); lr->done = true; donelookup_handler(la,lr); return; }else if (lr->done) { _progress.remove(la->key); la->nexthop = _me; donelookup_handler(la,lr); return; } double ttt; int para; if (!_fixed_stab_int) { //calculate the parallelism needed para = (_rate_queue->quota()+_burst_sz)/(2*PKT_OVERHEAD+ 8*_learn_num); // para = para/est_hops; if (para > _parallelism) para = _parallelism; else if (para > la->parallelism) para = la->parallelism; else if (para <= 0) para = 1; ttt = para > 1? (1-(exp(log(0.1)/(double)para))):0.9; } else { para = _parallelism; ttt = _fixed_lookup_to; } vector<IDMap> nexthops = loctable->preds(la->key, para, LOC_HEALTHY, ttt); ConsistentHash::CHID mostprog = _progress.find(la->key); list<IDMap> *s = _sent.find(la->key); if (!s) { s = New list<IDMap>; s->clear(); _sent.insert(la->key,s); } list<IDMap> *d = _dead.find(la->key); if (!d) { d = New list<IDMap>; d->clear(); _dead.insert(la->key,d); } uint sentout = 0; uint i=0,j=0; IDMap nh; bool seen; while (sentout < para) { if (i < nexthops.size()) nh = nexthops[i++]; else if (lr && j < lr->v.size()) nh = lr->v[j++]; else break; if (!mostprog || ConsistentHash::between(mostprog,la->key,nh.id)) { seen = false; list<IDMap>::iterator li; for (li = s->begin(); li != s->end(); ++li) { if ((*li).ip == nh.ip) { seen = true; break; }else if (ConsistentHash::between((*li).id,la->key,nh.id)) { break; } } if (!seen) { s->insert(li,nh); lookup_args *lla = New lookup_args; lla->key = la->key; lla->no_drop = la->no_drop; lla->from = la->from; lla->src = la->src; lla->ori = la->ori; lla->m = la->m; lla->parallelism = la->parallelism; lla->type = la->type; lla->learnsz = la->learnsz; lla->to_num = la->to_num; lla->to_lat = la->to_lat; lla->timeout = (Time) ttt; lla->from.alivetime = now() - _last_joined_time; lla->prevhop = la->nexthop; lla->nexthop = nh; lla->hops = la->hops+1; lla->deadnodes.clear(); for (list<IDMap>::iterator di = d->begin(); di != d->end(); ++di) { if (ConsistentHash::between(nh.id,la->key,(*di).id)) lla->deadnodes.push_back((*di)); else break; } lookup_ret *llr = New lookup_ret; llr->is_succ = false; llr->done = false; llr->v.clear(); ADEBUG(4) << " key moha " << printID(la->key) << " to " << nh.ip << "," << printID(nh.id) << " dead " << (lla->deadnodes.size()?lla->deadnodes[0].ip:0) << endl; _rate_queue->do_rpc(nh.ip, &Accordion::next, &Accordion::next_iter_cb, lla, llr, 0, TYPE_USER_LOOKUP, PKT_SZ(1+2*lla->deadnodes.size(),0), PKT_SZ(0,0),TIMEOUT(_me.ip, nh.ip)); sentout++; } } } uint outstanding = _forwarded_nodrop.find(la->key); ADEBUG(4) << "next_iter key " << printID(la->key) << " mostprog " << printID(mostprog) << " from " << la->nexthop.ip << "," << printID(la->nexthop.id) << " quota " << _rate_queue->quota() << "," << para << "," << _parallelism << "," << sentout<< "," << outstanding << endl; if (!outstanding && !sentout) { assert(nexthops.size()); nh = nexthops[0]; lookup_args *lla = New lookup_args; lla->key = la->key; lla->no_drop = la->no_drop; lla->from = la->from; lla->src = la->src; lla->ori = la->ori; lla->m = la->m; lla->parallelism = la->parallelism; lla->type = la->type; lla->learnsz = la->learnsz; lla->to_num = la->to_num; lla->to_lat = la->to_lat; lla->timeout = (Time) ttt; lla->from.alivetime = now() - _last_joined_time; lla->prevhop = la->nexthop; lla->nexthop = nh; lla->hops = la->hops+1; lla->deadnodes.clear(); for (list<IDMap>::iterator di = d->begin(); di != d->end(); ++di) { if (ConsistentHash::between(nh.id,la->key,(*di).id)) lla->deadnodes.push_back((*di)); else break; } lookup_ret *llr = New lookup_ret; llr->is_succ = false; llr->done = false; llr->v.clear(); ADEBUG(4) << " key resend " << printID(la->key) << " to " << nh.ip << "," << printID(nh.id) << " dead " << (lla->deadnodes.size()?lla->deadnodes[0].ip:0) << endl; _rate_queue->do_rpc(nh.ip, &Accordion::next, &Accordion::next_iter_cb, lla, llr, 0, TYPE_USER_LOOKUP, PKT_SZ(1+2*lla->deadnodes.size(),0), PKT_SZ(0,0),TIMEOUT(_me.ip, nh.ip)); sentout++; } _forwarded_nodrop.insert(la->key,outstanding+sentout);}voidAccordion::next(lookup_args *la, lookup_ret *lr){ loctable->update_ifexists(la->from,0); la->nexthop.alivetime = now() - _last_joined_time; for (uint i = 0; i < la->deadnodes.size(); i++) loctable->del_node(la->deadnodes[i]); IDMap succ = loctable->succ(_me.id+1); if (ConsistentHash::between(_me.id,succ.id,la->key)) { lr->done = true; la->from = _me; lr->v = loctable->succs(_me.id+1,la->m); } else { assert(la->nexthop.ip == _me.ip); if (!_fixed_stab_int) //lr->v = loctable->next_close_hops(la->key, la->learnsz, la->from, la->timeout); lr->v = loctable->preds(la->key, la->learnsz, LOC_HEALTHY, la->timeout); else lr->v = loctable->preds(la->key, la->learnsz, LOC_HEALTHY, _fixed_stab_to); } ADEBUG(4) << " KEY " << printID(la->key) << " from " << la->from.ip << " dead " << (la->deadnodes.size()?la->deadnodes[0].ip:0) << " next " << (lr->v.size()>0?lr->v[0].ip:0) << "," << printID((lr->v.size()?lr->v[0].id:0)) << " succ " << succ.ip << "," << printID(succ.id) << " locsz " << loctable->live_size(la->timeout) << endl;}intAccordion::next_iter_cb(bool b, lookup_args *la, lookup_ret *lr){ int ret_sz = 0; uint outstanding = _forwarded_nodrop.find(la->key); outstanding--; if (alive()) { Time delta = now()-la->nexthop.timestamp; if (delta < la->nexthop.alivetime && delta > 5000) add_stat((double)(la->nexthop.alivetime-delta)/(double)(la->nexthop.alivetime), b); la->nexthop.timestamp = now(); if ((b) && (la->nexthop.ip!=_me.ip)) { loctable->update_ifexists(la->nexthop,0); for (uint i = 0; i < lr->v.size(); i++) loctable->add_node(lr->v[i]); ADEBUG(4) << "next_iter_cb key " << printID(la->key) << "src " << la->src.ip << " ori " << la->ori.ip << " from " << la->nexthop.ip << "," << printID(la->nexthop.id) << " learnsz " << la->learnsz << " learnt " << lr->v.size() << ": " << print_succs(lr->v) << " outstanding " << outstanding << " locsz " << loctable->size(LOC_HEALTHY,0.9) << " livesz " << loctable->live_size(0.9) << " done? " << (lr->done?1:0)<< endl; ret_sz = PKT_SZ(2*lr->v.size(),0); /* if ((la->prevhop.ip!=_me.ip) && lr->v.size() > 0) { alert_args *aa = New alert_args; aa->v.clear(); aa->dn.ip = 0; for (uint i = 0; i < lr->v.size(); i++) aa->v.push_back(lr->v[i]); _rate_queue->do_rpc(la->prevhop.ip, &Accordion::alert_nodes, &Accordion::alert_cb, aa, (lookup_ret *)NULL, 0, TYPE_USER_LOOKUP, PKT_SZ(2*lr->v.size(),0), PKT_SZ(0,0),TIMEOUT(_me.ip, la->prevhop.ip)); } */ } else { la->to_num++; la->to_lat += TIMEOUT(_me.ip, la->nexthop.ip); loctable->del_node(la->nexthop); ret_sz = 0; list<IDMap> *d = _dead.find(la->key); assert(d); if (d) { list<IDMap>::iterator di; for (di = d->begin(); di!=d->end();++di) { if (ConsistentHash::between((*di).id,la->key,la->nexthop.id)) { d->insert(di,la->nexthop); break; } } if (di == d->end()) d->push_back(la->nexthop); } ADEBUG(4) << "next_iter_cb key " << printID(la->key) << "src " << la->src.ip << " ori " << la->ori.ip << " from " << la->nexthop.ip << "," << printID(la->nexthop.id) << " DEAD " << (d?d->size():0) << endl;/* if (la->prevhop.ip!=_me.ip) { alert_args *aa = New alert_args; aa->v.clear(); aa->dn = la->nexthop; _rate_queue->do_rpc(la->prevhop.ip, &Accordion::alert_nodes, &Accordion::alert_cb, aa, (lookup_ret *)NULL, 0, TYPE_USER_LOOKUP, PKT_SZ(2,0), PKT_SZ(0,0),TIMEOUT(_me.ip, la->prevhop.ip)); } */ } Time t = _outstanding_lookups.find(la->key); if (outstanding) _forwarded_nodrop.insert(la->key,outstanding); else _forwarded_nodrop.remove(la->key); if (t) { ConsistentHash::CHID prog = _progress.find(la->key); if (b && (!prog || ConsistentHash::between(prog,la->key,la->nexthop.id))) { _progress.insert(la->key,la->nexthop.id); next_iter(la,lr); }else if (!outstanding && _outstanding_lookups.find(la->key)) next_iter(la,lr); } outstanding = _forwarded_nodrop.find(la->key); if (!outstanding) alert_lookup_nodes(la->key,la->timeout); } delete la; if (lr) delete lr; return ret_sz;}voidAccordion::alert_lookup_nodes(ConsistentHash::CHID key, Time to){ list<IDMap> *s = _sent.find(key); if (!s) return; list<IDMap> *dd = _dead.find(key); vector<IDMap> v; v.clear(); while (s->size()) { if (dd->size() && s->front().ip == dd->front().ip) { s->pop_front(); dd->pop_front(); }else if (!dd->size() || ConsistentHash::between(dd->front().id,key,s->front().id)) { v.push_back(s->front()); s->pop_front(); }else dd->pop_front(); } //if (_rate_queue->critical()) return; //vector<IDMap> v = loctable->preds(key, _learn_num, LOC_HEALTHY, to); for (list<IDMap>::iterator i = s->begin(); i != s->end(); ++i) { alert_args *la = New alert_args; la->v.clear(); for (uint j = 0; j < v.size(); j++) la->v.push_back(v[j]); la->d.clear(); la->k = key; la->src = _me; for (list<IDMap>::iterator jj = dd->begin(); jj!=dd->end();++jj) la->d.push_back(*jj); _rate_queue->do_rpc((*i).ip, &Accordion::alert_nodes, &Accordion::alert_cb, la, (lookup_ret *)NULL, 0, TYPE_USER_LOOKUP, PKT_SZ(2*(la->v.size()+la->d.size()),0), PKT_SZ(0,0), TIMEOUT(_me.ip, (*i).ip)); } if (s) { delete s; _sent.remove(key); } if (dd) { delete dd; _dead.remove(key); }}voidAccordion::alert_nodes(alert_args *la, lookup_ret *lr){ ADEBUG(4) << " alert_nodes key " << printID(la->k) << " " << print_succs(la->v) << " " << la->src.ip << " dead " << print_succs(la->d) << endl; for (uint i = 0; i < la->v.size(); i++) { if (la->v[i].ip == 0) ADEBUG(4) << " wierd " << endl; loctable->add_node(la->v[i]); } for (uint i = 0; i < la->d.size(); i++) loctable->del_node(la->d[i]);}intAccordion::alert_cb(bool b, alert_args *la, lookup_ret *lr){ if (la) delete la; return 0;}/* ------------------------ lookup (recursive) ----------------------- */voidAccordion::donelookup_handler(lookup_args *la, lookup_ret *lr){ if (la->nexthop.ip == _me.ip) { la->nexthop.alivetime = now() - _last_joined_time; }else abort(); if (la->from.ip!=_me.ip) { la->from.timestamp = now(); loctable->add_node(la->from); } if (la->ori.ip) { lookup_args *lla = New lookup_args; bcopy(la,lla,sizeof(lookup_args)); la->nexthop = la->ori; lookup_ret *llr = New lookup_ret; llr->v = lr->v; if (lr->v.size() > 0) { la->ori.timestamp = now(); loctable->add_node(la->ori); } _rate_queue->do_rpc(la->ori.ip, &Accordion::join_handler, &Accordion::null_cb, lla, llr, 0, TYPE_JOIN_LOOKUP, PKT_SZ(2*llr->v.size(),0), PKT_SZ(0,0),TIMEOUT(_me.ip, la->ori.ip)); return; }; assert(lr->done); Time t = _outstanding_lookups.find(la->key);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -