📄 recroute.c
字号:
#include "recroute.h"#include "fingerroutepns.h"#include <coord.h>#include <location.h>#include <locationtable.h>#include <misc_utils.h>#include <modlogger.h>#define rwarning modlogger ("recroute", modlogger::WARNING)#define rinfo modlogger ("recroute", modlogger::INFO)#define rtrace modlogger ("recroute", modlogger::TRACE)template<class T>ref<vnode>recroute<T>::produce_vnode (ref<chord> _chordnode, ref<rpc_manager> _rpcm, ref<location> _l){ return New refcounted<recroute<T> > (_chordnode, _rpcm, _l);}template<class T>recroute<T>::recroute (ref<chord> _chord, ref<rpc_manager> _rpcm, ref<location> _l) : T (_chord, _rpcm, _l), sweep_cb (NULL){ addHandler (recroute_program_1, wrap (this, &recroute<T>::dispatch)); { int x = 1; assert (Configurator::only ().get_int ("chord.find_succlist_shaving", x)); assert (Configurator::only ().get_int ("chord.lookup_timeout", lto)); shave = (x == 1); } sweep_cb = delaycb (lto, 0, wrap (this, &recroute<T>::sweeper));}template<class T>recroute<T>::~recroute (){ if (sweep_cb) { timecb_remove (sweep_cb); sweep_cb = NULL; } route_recchord *r = routers.first (); route_recchord *rn = NULL; while (r != NULL) { rn = routers.next (r); routers.remove (r); delete r; r = rn; }}template<class T>voidrecroute<T>::sweeper (){ sweep_cb = NULL; u_int swept = 0; u_int started = 0; u_int total = 0; timespec now; clock_gettime (CLOCK_REALTIME, &now); timespec maxtime; maxtime.tv_sec = lto; maxtime.tv_nsec = 0; route_recchord *r = routers.first (); route_recchord *rn = NULL; while (r != NULL) { rn = routers.next (r); total++; if (r->started ()) { started++; timespec st = r->start_time (); if (now - st > maxtime) { swept++; routers.remove (r); r->handle_timeout (); } } r = rn; } rtrace << this->my_ID () << ": sweeper: swept " << swept << "/" << started << " started routers (" << total <<" total).\n"; sweep_cb = delaycb (maxtime.tv_sec, wrap (this, &recroute<T>::sweeper));}template<class T>voidrecroute<T>::dispatch (user_args *a){ if (a->prog->progno != recroute_program_1.progno) { T::dispatch (a); return; } switch (a->procno) { case RECROUTEPROC_NULL: a->reply (NULL); break; case RECROUTEPROC_ROUTE: { // go off and do recursive next hop and pass it on. recroute_route_arg *ra = a->template getarg<recroute_route_arg> (); dorecroute (a, ra); } break; case RECROUTEPROC_PENULTIMATE: { recroute_penult_arg *ra = a->template getarg<recroute_penult_arg> (); dopenult (a, ra); } break; case RECROUTEPROC_COMPLETE: { // Try to see if this is one of ours. // If so, we'll need to pass things back up to whoever called us. recroute_complete_arg *ca = a->template getarg<recroute_complete_arg> (); docomplete (a, ca); } break; default: a->reject (PROC_UNAVAIL); break; }}template<class T>voidrecroute<T>::dorecroute (user_args *sbp, recroute_route_arg *ra){ recroute_route_stat rstat (RECROUTE_ACCEPTED); chordID myID = this->my_ID (); strbuf header; header << myID << ": dorecroute (" << ra->routeid << ", " << ra->x << "): "; rtrace << header << "starting; desired = " << ra->succs_desired << "\n"; vec<ptr<location> > cs = this->succs (); u_long m = ra->succs_desired; vec<chordID> failed; ptr<location> p = this->closestpred (ra->x, failed); // the next best guess // Update best guess or complete, depending, if successor is in our // successor list. if (betweenrightincl (myID, cs.back ()->id (), ra->x)) { // Calculate the amount of overlap available in the successor list size_t overlap = 0; size_t succind = 0; for (size_t i = 0; i < cs.size (); i++) { if (betweenrightincl (myID, cs[i]->id (), ra->x)) { // The i+1st successor is the key's successor! overlap = cs.size () - i; succind = i; break; } } rtrace << header << "overlap = " << overlap << " / m = " << m << "\n"; // Try to decide who to talk to next. if (overlap >= m) { // Enough overlap to finish. cs.popn_front (succind); // leave succind+1st succ at front if (succind > 0) rtrace << myID << ": dorecroute (" << ra->routeid << ", " << ra->x << "): skipping " << succind << " nodes.\n"; dorecroute_sendcomplete (ra, cs); sbp->replyref (rstat); sbp = NULL; return; } else if (shave && ((int)m - (int)overlap < (int)cs.size ())) { // Override the absolute best we could've done, which probably // is the predecessor since our succlist spans the key, and // select someone nice and fast to get more successors from. float mindist = -1.0; size_t minind = 0; size_t start = m - overlap; strbuf distbuf; distbuf << "going to choose a distance from "; for (size_t i = start; i < cs.size (); i++) { float dist = Coord::distance_f (this->my_location ()->coords (), cs[i]->coords ()); distbuf << cs[i]->id () << "(" << (int)dist << ") "; if (mindist < 0 || dist < mindist) { mindist = dist; minind = i; } } distbuf << "; i chose " << cs[minind]->id () << "(" << (int)mindist << ")\n"; rtrace << header << distbuf; if (minind < succind) { // This is proximity route selection! p = cs[minind]; } else { ptr<location> nexthop = cs[minind]; rtrace << header << "going for penult from " << nexthop->id () << "\n"; cs.popn_front (succind); // just the overlap please dorecroute_sendpenult (ra, nexthop, p, cs); sbp->replyref (rstat); sbp = NULL; return; } } } dorecroute_sendroute (ra, p); sbp->replyref (rstat); sbp = NULL; return;}template<class T>voidrecroute<T>::dorecroute_sendpenult (recroute_route_arg *ra, ptr<location> nexthop, ptr<location> p, vec<ptr<location> > cs){ // Construct a new recroute_route_arg. chord_node_wire me; this->my_location ()->fill_node (me); ptr<recroute_penult_arg> nra = New refcounted<recroute_penult_arg> (); // XXX this is obnoxious nra->routeid = ra->routeid; nra->origin = ra->origin; nra->x = ra->x; nra->succs_desired = ra->succs_desired; nra->upcall_prog = ra->upcall_prog; nra->upcall_proc = ra->upcall_proc; nra->upcall_args = ra->upcall_args; nra->retries = ra->retries; nra->path.setsize (ra->path.size () + 1); for (size_t i = 0; i < ra->path.size (); i++) { nra->path[i] = ra->path[i]; } nra->path[ra->path.size ()] = me; nra->successors.setsize (cs.size ()); for (size_t i = 0; i < cs.size (); i++) { cs[i]->fill_node (nra->successors[i]); } rtrace << this->my_ID () << ": dorecroute (" << ra->routeid << ", " << ra->x << "): penultforwarding to " << nexthop->id () << "\n"; vec<chordID> failed; failed.push_back (nexthop->id ()); doRPC (nexthop, recroute_program_1, RECROUTEPROC_PENULTIMATE, nra, NULL, wrap (this, &recroute<T>::dorecroute_sendpenult_cb, nra, p, failed)); // XXX support notification?}template<class T>voidrecroute<T>::dorecroute_sendpenult_cb (ptr<recroute_penult_arg> nra, ptr<location> p, vec<chordID> failed, clnt_stat err){ if (err) { // Go back to wherever we were going to go in the first place. // ra->retries++; // dorecroute_sendroute (ra, p); rtrace << this->my_ID () << ": dorecroute (" << nra->routeid << ", " << nra->x << ": er. failure.....\n"; return; }}template<class T>voidrecroute<T>::dorecroute_sendcomplete (recroute_route_arg *ra, const vec<ptr<location> > cs){ chord_node_wire me; this->my_location ()->fill_node (me); // If complete (i.e. we have enough here to satisfy request), // send off a complete RPC. rtrace << this->my_ID () << ": dorecroute (" << ra->routeid << ", " << ra->x << "): complete.\n"; ptr<recroute_complete_arg> ca = New refcounted<recroute_complete_arg> (); ca->body.set_status (RECROUTE_ROUTE_OK); ca->routeid = ra->routeid; ca->path.setsize (ra->path.size () + 1); for (size_t i = 0; i < ra->path.size (); i++) { ca->path[i] = ra->path[i]; } ca->path[ra->path.size ()] = me;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -