📄 accordion.c
字号:
strbuf tmpbuf; for (unsigned xx = 0; xx < fs.size (); xx++) tmpbuf << (fs[xx]->id ()>>144) << "," << fs[xx]->knownup () << "," << fs[xx]->age () << " "; acctrace << (myID>>144) << ": key " << (ra->x>>144) << " give prev hop " << (loc->id ()>>144) << " para " << (ra->succs_desired >> 24) << " info " << fs.size () << ": " << tmpbuf << "\n"; fill_nodelistres (&res, fs); } ra->succs_desired &= 0x00ffffff; sentinfo *s = sent[ra->routeid]; if (!s) { vec<ptr<location> > cs = succs (); if ((stopearly_ && betweenrightincl (myID, cs.back ()->id (), ra->x)) || (betweenrightincl(myID, cs.front ()->id (), ra->x))) { while (betweenrightincl (myID, ra->x, cs.front ()->id ())) cs.pop_front (); if ((cs.size () >= ra->succs_desired) || betweenrightincl(myID, cs.front ()->id (), ra->x)) { s = New sentinfo (ra->routeid, timenow, 1); sent.insert (s); lookups_++; doaccroute_sendcomplete (ra,cs); } } } if (s || ((ra->retries & 0x80000000) && (bavail< 0))) { //well it's a duplicate or redundant lookup acctrace << (myID>>144) << ": doaccroute key " << (ra->x>>144) << " seen " << (s?1:0) << " routeid " << ra->routeid << " redun " << ((ra->retries & 0x80000000)?1:0) << " bavail " << bavail << "\n"; } else { lookups_++; //calculate parallelism s = New sentinfo (ra->routeid, timenow, 1); sent.insert (s); unsigned pp = get_parallelism (); vec<ptr<location> > nextlocs = fingers_->nexthops (ra->x, pp, s->tried); s->tried.setsize (nextlocs.size ()); s->outstanding = nextlocs.size (); int j = 0; ptr<location> tmp; for (unsigned i = 0; i < nextlocs.size(); i++) { assert (nextlocs[i]->id () != myID); doaccroute_sendroute (ra, nextlocs[i], i, nextlocs.size ()); s->tried[i] = nextlocs[i]; j = i; while (j >0 && betweenrightincl (myID, s->tried[j-1]->id (), s->tried[j]->id ())) { tmp = s->tried[j-1]; s->tried[j-1] = s->tried[j]; s->tried[j] = tmp; j--; } } chordID xxx = nextlocs.size()?(nextlocs[0]->id ()):0; acctrace << (myID>>144) << ": parallel " << pp << " " << nextlocs.size () << " bavail " << bavail << " key " << (ra->x>>144) << " best " << (xxx>>144) << " knownup " << (nextlocs.size()?nextlocs[0]->knownup ():0) << " age " << (nextlocs.size()?nextlocs[0]->age ():0) << "\n"; } if (sbp) sbp->reply (&res); sbp = NULL; return;}voidaccordion::doaccroute_sendroute (recroute_route_arg *ra, ptr<location> p, bool redun, unsigned para){ chord_node_wire me; my_location ()->fill_node (me); ptr<recroute_route_arg> nra = New refcounted<recroute_route_arg> (); *nra = *ra; nra->succs_desired |= (para << 24); if (redun) nra->retries |= 0x80000000; nra->path.setsize (ra->path.size () + 1); for (unsigned i = 0; i < ra->path.size (); i++) nra->path[i] = ra->path[i]; nra->path[ra->path.size ()] = me; chord_nodelistres *res = New chord_nodelistres (CHORD_OK); long *seqnop = (long *)malloc (sizeof(long)); bytes_sent (BYTES_PER_LOOKUP + ra->path.size ()*BYTES_PER_ID); long seqno = doRPC (p, accordion_program_1, ACCORDIONPROC_LOOKUP, nra, res, wrap (this, &accordion::accroute_hop_cb, seqnop, nra, p, res), wrap (this, &accordion::accroute_hop_timeout_cb, seqnop, getusec ()/1000, nra, p)); *seqnop = seqno; acctrace << (myID>>144) << ": doaccroute_sendroute key " << (ra->x>>144) << " send to " << (p->id ()>>144) << " seqno " << seqno << "\n";}voidaccordion::accroute_hop_cb (long *seqp, ptr<recroute_route_arg> nra, ptr<location> p, chord_nodelistres *res, clnt_stat status){ sentinfo* ss = sent[nra->routeid]; if (!status && res->status == CHORD_OK) { bytes_sent (res->resok->nlist.size () * BYTES_PER_ID); if (ss) ss->outstanding = 0;//i.e. no more unncessary retransmissions vec<chord_node> nlist; for (unsigned i = 0; i < res->resok->nlist.size (); i++) nlist.push_back (make_chord_node (res->resok->nlist[i])); fingers_->fill_gap_cb (p, nlist, (chordstat)0); acctrace << (myID>>144) << ": accroute_hop_cb key " << (nra->x>>144) << " routeid " << nra->routeid << " done forwarded to " << (p->id ()>>144) << " knownup " << p->knownup () << " age " << p->age () <<" learnt " << nlist.size () << " seqno " << (*seqp) << "\n"; delete res; res = NULL; return; } delete res;}boolaccordion::accroute_hop_timeout_cb (long *seqp, u_int64_t t, ptr<recroute_route_arg> nra, ptr<location> p, chord_node n, int rexmit_number){ u_int64_t now = getusec ()/1000; sentinfo* ss = sent[nra->routeid]; acctrace << (myID>>144) << ": accroute_hop_TIMEOUT_cb key " << (nra->x>>144) << " next " << p->id () << " seqno " << (*seqp) << " rex " << rexmit_number << "\n"; if (rexmit_number == 0) { fingers_->del_node (p->id ()); if (!ss || ss->outstanding == 1 || (!(nra->retries & 0x80000000))) { nra->retries += (now - t); vec<ptr<location> > nexts = fingers_->nexthops (nra->x, 1, ss->tried); if (nexts.size () > 0 && nexts[0]->id () != myID) { chord_nodelistres *nres = New chord_nodelistres (CHORD_OK); bytes_sent (BYTES_PER_LOOKUP + nra->path.size ()*BYTES_PER_ID); long *nseq = (long *)malloc (sizeof(long)); *nseq = doRPC (nexts[0], accordion_program_1, ACCORDIONPROC_LOOKUP, nra, nres, wrap (this, &accordion::accroute_hop_cb, nseq , nra, nexts[0], nres), wrap (this, &accordion::accroute_hop_timeout_cb, nseq , now, nra, nexts[0])); acctrace << (myID>>144) << ": accroute_hop_timeout_cb key " << (nra->x>>144) << " failed " << (p->id ()>>144)<< " resend next " << (nexts[0]->id ()>>144) << " seq " << (*nseq) << " rex " << rexmit_number << " routeid " << nra->routeid << " retries " << (nra->retries& 0x7fffffff) << " really " << (now-t) << "\n"; return true; } vec<ptr<location> > cs = succs (); chordID succid = (cs.size ()>0?cs[0]->id ():0); if ((stopearly_ && betweenrightincl (myID, cs.back ()->id (), nra->x)) || (betweenrightincl(myID, cs.front ()->id (), nra->x))) { while (betweenrightincl (myID, nra->x, cs.front ()->id ())) cs.pop_front (); nra->succs_desired &= 0x00ffffff; if (cs.size () >= nra->succs_desired) { doaccroute_sendcomplete (nra,cs); //make sure nra is not garbage collected acctrace << (myID>>144) << ": accroute_hop_timeout_cb key " << (nra->x>>144) << " failed " << (p->id ()>>144)<< " completed rex " << rexmit_number << " routeid " << nra->routeid << "\n"; return true; } } acctrace << (myID>>144) << ": accroute_hop_timeuout_cb key " << (nra->x>>144) << " succ " << (succid>>144) << " SHOULD NOT HAVE HAPPEN\n"; } acctrace << (myID>>144) << ": accroute_hop_timeout_cb key " << (nra->x>>144) << " next " << (p->id ()>>144) << " rex " << rexmit_number << " out " << ss->outstanding << " redun? " << ((nra->retries&0x80000000)?1:0) << " no retry\n"; if (ss) ss->outstanding--; } else acctrace << (myID>>144) << ": accroute_hop_timeout_cb key " << (nra->x>>144)<< " next " << (p->id ()>>144) << " rex " << rexmit_number << " out " << ss->outstanding << " routeid " << nra->routeid << " failed dropped\n"; return true;}voidaccordion::docomplete (user_args *sbp, recroute_complete_arg *ca, ptr<location> src){ route_accordion *router = routers[ca->routeid]; sentinfo* ss = sent[ca->routeid]; if (ss) ss->outstanding = 0; if (!router) { acctrace << (myID>>144) << ": docomplete: unknown routeid " << ca->routeid << " from host " << (src->id ()>>144) << "\n"; sbp->reply (NULL); return; } strbuf s; s << (myID>>144) << ": docomplete: key " << (router->key () >>144) << " routeid " << ca->routeid << " num_succ " << ca->body.robody->successors.size () << " returned from " << (src->id ()>>144) << " retries: " << (ca->retries&0x7fffffff); chord_node n; for (size_t i = 0; i < ca->body.robody->successors.size (); i++) { n = make_chord_node (ca->body.robody->successors[i]); locations->insert (n); if (i == 0) s << " succ " << (n.x>>144); } acctrace << s << "\n"; routers.remove (router); router->handle_complete (sbp, ca);}voidaccordion::doaccroute_sendcomplete ( recroute_route_arg *ra, const vec<ptr<location> > cs){ //this has a lot of copy-n-paste from recroute.C chord_node_wire me; my_location ()->fill_node (me); ptr<recroute_complete_arg> ca = New refcounted<recroute_complete_arg> (); ca->body.set_status ((recroute_complete_stat)CHORD_OK); ca->routeid = ra->routeid; ca->path.setsize (ra->path.size () + 1); for (size_t i = 0; i < ra->path.size (); i++) { ca->path[i] = ra->path[i]; } ca->path[ra->path.size ()] = me; u_long m = ra->succs_desired; u_long tofill = (cs.size () < m) ? cs.size () : m; ca->body.robody->successors.setsize (tofill); for (size_t i = 0; i < tofill; i++) cs[i]->fill_node (ca->body.robody->successors[i]); ca->retries = ra->retries; ptr<location> l = locations->insert(make_chord_node (ra->origin)); chordID xxx = cs.size () > 0?cs[0]->id ():0; acctrace << (myID>>144) << ": doaccroute_sendcomplete key " << (ra->x>>144) << " routeid " << ra->routeid << " desired " << ra->succs_desired << " rt " << cs.size () << " " << (xxx>>144) << " ori " << (l->id ()>>144) <<"\n"; bytes_sent (BYTES_PER_LOOKUPCOMPLETE + (ra->path.size ()+ tofill)* BYTES_PER_ID); doRPC (l, accordion_program_1, ACCORDIONPROC_LOOKUP_COMPLETE, ca, NULL, wrap (this, &accordion::accroute_sent_complete_cb, l, ra->x), wrap (this, &accordion::accroute_sent_complete_timeout_cb, l, ra->x));}voidaccordion::accroute_sent_complete_cb(ptr<location> l, chordID x, clnt_stat status){ if (status) acctrace << (myID>>144) << ": accroute_complete lost key " << (x>>144) << " status " << status << ".\n";}boolaccordion::accroute_sent_complete_timeout_cb(ptr<location> l, chordID x, chord_node n, int rexmit_number){ acctrace << (myID>>144) << ": accroute_sent_complete_timeout_cb key " << (x>>144) << " rexmit " << rexmit_number << "\n"; return false;}voidaccordion::fill_nodelistres (chord_nodelistres *res, vec<ptr<location> > fs){ res->resok->nlist.setsize (fs.size ()); for (size_t i = 0; i < fs.size (); i++) fs[i]->fill_node (res->resok->nlist[i]);}voidaccordion::update_bavail (){ if (timenow > 0 && bavail_t > 0) bavail += (budget_ * (timenow-bavail_t)); bavail_t = timenow; if (bavail > burst_) bavail = burst_;}unsigned accordion::get_parallelism (){ update_bavail(); int p = bavail/(BYTES_PER_LOOKUP+BYTES_PER_ID); if (p <= 0) return 1; else return para_;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -