📄 accordion.c
字号:
#include "accordion.h"#include <id_utils.h>#include <misc_utils.h>#include "accordion_table.h"#include "pred_list.h"#include "succ_list.h"#include <location.h>#include <locationtable.h>#include <modlogger.h>#include <configurator.h>/* jy: Accordion ugliness so far use recargs->retry's first bit to signify primary lookup path from redundant ones. use recargs->succs_desired's first byte to signify the parallelism used by previous hop */#define acctrace modlogger ("accordion")#define MAX_RETRIES 5static struct accordion_config_init { accordion_config_init ();}aci;accordion_config_init::accordion_config_init (){ bool ok = true;#define set_int Configurator::only ().set_int#define set_str Configurator::only ().set_str ok = ok && set_int ("accordion.budget", 10); ok = ok && set_int ("accordion.burst", 100); ok = ok && set_int ("accordion.stopearly", 0); assert (ok);#undef set_int#undef set_str}ref<vnode>accordion::produce_vnode (ref<chord> _chordnode, ref<rpc_manager> _rpcm, ref<location> _l){ return New refcounted<accordion> (_chordnode,_rpcm,_l);}// override produce_iterator*ptr<route_iterator>accordion::produce_iterator (chordID xi) { ptr<route_accordion> ri = New refcounted<route_accordion> (mkref (this), xi); routers.insert (ri); return ri;}ptr<route_iterator>accordion::produce_iterator (chordID xi, const rpc_program &uc_prog, int uc_procno, ptr<void> uc_args) { ptr<route_accordion> ri = New refcounted<route_accordion> (mkref (this), xi, uc_prog, uc_procno, uc_args); routers.insert (ri); return ri;}route_iterator *accordion::produce_iterator_ptr (chordID xi) { route_accordion *ri = New route_accordion (mkref (this), xi); routers.insert (ri); return ri;}route_iterator *accordion::produce_iterator_ptr (chordID xi, const rpc_program &uc_prog, int uc_procno, ptr<void> uc_args) { route_accordion *ri = New route_accordion (mkref (this), xi, uc_prog, uc_procno, uc_args); routers.insert (ri); return ri;}voidaccordion::init (){ bool ok = Configurator::only ().get_int ("accordion.budget", budget_); assert (ok); ok = Configurator::only ().get_int ("accordion.burst", burst_); assert (ok); ok = Configurator::only ().get_int ("accordion.stopearly", stopearly_); burst_ = burst_ * budget_; bavail_t = timenow; bavail = burst_; me_->set_budget (budget_); addHandler (accordion_program_1, wrap (this,&accordion::dispatch)); delaycb (60, wrap (this, &accordion::periodic)); lookups_ = explored_ = 0; para_ = 3;}voidaccordion::stabilize (){ fingers_->start (); vnode_impl::stabilize ();}accordion::accordion (ref<chord> _chord, ref<rpc_manager> _rpcm, ref<location> _l) : vnode_impl(_chord,_rpcm,_l){ fingers_ = New refcounted<accordion_table> (mkref(this), locations); init ();}voidaccordion::dispatch (user_args *a){ if (a->prog->progno != accordion_program_1.progno) { vnode_impl::dispatch (a); return; } //grab the src ptr<location> src; if (a->procno == ACCORDIONPROC_FILLGAP || a->procno == ACCORDIONPROC_LOOKUP || a->procno == ACCORDIONPROC_LOOKUP_COMPLETE) { chord_node srcn; a->fill_from (&srcn); src = locations->insert (srcn); } switch (a->procno) { case ACCORDIONPROC_NULL: a->reply (NULL); break; case ACCORDIONPROC_FILLGAP: { accordion_fillgap_arg *ra = a->Xtmpl getarg<accordion_fillgap_arg> (); dofillgap (a,ra, src); } break; case ACCORDIONPROC_LOOKUP: { recroute_route_arg *ra = a->Xtmpl getarg<recroute_route_arg> (); doaccroute (a,ra); } break; case ACCORDIONPROC_LOOKUP_COMPLETE: { recroute_complete_arg *ca = a->Xtmpl getarg<recroute_complete_arg> (); docomplete (a, ca, src); } break; case ACCORDIONPROC_GETFINGERS_EXT: { dogetfingers_ext (a); break; } default: { acctrace << (myID>>144) << ": dispatch rejected " << a->procno << "\n"; a->reject (PROC_UNAVAIL); break; } }}/* responding to others' FILLGAP message */voidaccordion::dofillgap(user_args *sbp, accordion_fillgap_arg *ra, ptr<location> src){ chord_nodelistres res (CHORD_OK); res.resok->nlist.setsize (1); my_location ()->fill_node (res.resok->nlist[0]); ptr<location> end = New refcounted<location> (make_chord_node (ra->end)); vec<ptr<location> > fs = fingers_->get_fingers (src, end->id (), ra->para); acctrace << (myID>>144) << ": dofillgap " << " src " << (src->id ()>>144) << " end " << (end->id ()>>144) << " para " << ra->para << " sz " << fs.size () << "\n"; fill_nodelistres (&res, fs); sbp->reply (&res);}/* called by accordion_table to initiate a FILLGAP message */voidaccordion::fill_gap (ptr<location> n, ptr<location> end, cbchordIDlist_t cb){ chord_nodelistres *res = New chord_nodelistres (CHORD_OK); ptr<accordion_fillgap_arg> fa = New refcounted<accordion_fillgap_arg> (); fa->para = para_; end->fill_node (fa->end); acctrace << (myID>>144) << ": fill_gap " << (my_location ()->id ()>>144) << " from " << n->id () << " end " << (end->id ()>>144) << "\n"; bytes_sent (BYTES_PER_ID); doRPC (n, accordion_program_1, ACCORDIONPROC_FILLGAP, fa, res, wrap (mkref (this), &accordion::fill_gap_cb, n, cb, res), wrap (mkref (this), &accordion::fill_gap_timeout_cb, n));}voidaccordion::fill_gap_cb (ptr<location> l, cbchordIDlist_t cb, chord_nodelistres *res, clnt_stat err){ vec<chord_node> nlist; chord_node n; if ((err) || (res->status)) { l->fill_node (n); nlist.push_back (n); } if (err) cb (nlist, CHORD_RPCFAILURE); else if (res->status) cb (nlist, res->status); else { bytes_sent (BYTES_PER_ID * res->resok->nlist.size ()); for (unsigned i = 0; i < res->resok->nlist.size (); i++) nlist.push_back (make_chord_node (res->resok->nlist[i])); cb (nlist, CHORD_OK); } delete res;}boolaccordion::fill_gap_timeout_cb (ptr<location> l, chord_node n, int rexmit_number){ if (rexmit_number == 0) { fingers_->del_node (l->id ()); } return true;}voidaccordion::dogetfingers_ext (user_args *sbp){ chord_nodelistextres res (CHORD_OK); fingers_->fill_nodelistresext (&res); sbp->reply (&res);}voidaccordion::periodic(){ //adjust the parallelism level#define MAX_PARALLELISM 6 int old_p = para_; if (lookups_ < explored_) { para_++; if (para_ > MAX_PARALLELISM) para_ = MAX_PARALLELISM; }else if (!explored_ && lookups_) { para_ = para_/2; if (!para_) para_ = 1; } lookups_ = explored_ = 0; acctrace << (myID>>144) << " re-adjust parallelism from " << old_p << " to " << para_ << "\n"; //traverse the hash table that remembers //all routeid forwarded, delete those that are old sentinfo *curr = sent.first (); sentinfo *next; while (curr) { next = sent.next (curr);#define ROUTEID_EXPIRE_TIME 100 if (curr->sent_t && (timenow-curr->sent_t > ROUTEID_EXPIRE_TIME)) sent.remove (curr); curr = next; } delaycb (ROUTEID_EXPIRE_TIME, wrap (this, &accordion::periodic));}voidaccordion::doaccroute (user_args *sbp, recroute_route_arg *ra){ chord_nodelistres res (CHORD_OK); ptr<location> loc; ptr<location> ori = New refcounted<location> (make_chord_node (ra->origin)); locations->insert (ori); if (ra->path.size () > 0) loc = New refcounted<location> (make_chord_node (ra->path[ra->path.size ()-1])); else loc = ori; if (loc->id () != myID) { vec<ptr<location> > fs = fingers_->get_fingers (loc, ra->x, (ra->succs_desired >> 24));
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -