📄 recroute.c
字号:
u_long m = ra->succs_desired; u_long tofill = (cs.size () < m) ? cs.size () : m; ca->body.robody->successors.setsize (tofill); for (size_t i = 0; i < tofill; i++) cs[i]->fill_node (ca->body.robody->successors[i]); ca->retries = ra->retries; ptr<location> l = this->locations->lookup_or_create (make_chord_node (ra->origin)); doRPC (l, recroute_program_1, RECROUTEPROC_COMPLETE, ca, NULL, wrap (this, &recroute<T>::recroute_sent_complete_cb)); // We don't really care if this is lost beyond the RPC system's // retransmits.}template<class T>voidrecroute<T>::dorecroute_sendroute (recroute_route_arg *ra, ptr<location> p){ // Construct a new recroute_route_arg. chord_node_wire me; this->my_location ()->fill_node (me); ptr<recroute_route_arg> nra = New refcounted<recroute_route_arg> (); *nra = *ra; nra->path.setsize (ra->path.size () + 1); for (size_t i = 0; i < ra->path.size (); i++) { nra->path[i] = ra->path[i]; } nra->path[ra->path.size ()] = me; if (p->id () != this->my_ID ()) { recroute_route_stat *res = New recroute_route_stat (RECROUTE_ACCEPTED); rtrace << this->my_ID () << ": dorecroute (" << ra->routeid << ", " << ra->x << "): forwarding to " << p->id () << "\n"; vec<chordID> failed; doRPC (p, recroute_program_1, RECROUTEPROC_ROUTE, nra, res, wrap (this, &recroute<T>::recroute_hop_cb, nra, p, failed, res), wrap (this, &recroute<T>::recroute_hop_timeout_cb, nra, p, failed)); } else { //XXX we're dropping this (instead of sending a failure message) // since I'm sending a bunch of crazy parallel lookups and we're // guessing that maybe another one will succeed. // worst case: the lookup fails when the sweeper goes off. rtrace << this->my_ID () << " next hop is me. dropping\n"; }}template<class T>voidrecroute<T>::recroute_hop_cb (ptr<recroute_route_arg> nra, ptr<location> p, vec<chordID> failed, recroute_route_stat *res, clnt_stat status){ if (!status && *res == RECROUTE_ACCEPTED) { delete res; res = NULL; rtrace << this->my_ID () << ": dorecroute (" << nra->routeid << ", " << nra->x << "): message accepted\n"; return; } rtrace << this->my_ID () << ": dorecroute (" << nra->routeid << ", " << nra->x << ") forwarding to " << p->id () << " failed (" << status << "," << *res << ").\n"; delete res; if (failed.size () > 3) { // XXX hardcoded constant rtrace << this->my_ID () << ": dorecroute (" << nra->routeid << ", " << nra->x << ") failed too often. Discarding.\n"; ptr<recroute_complete_arg> ca = New refcounted<recroute_complete_arg> (); ca->body.set_status (RECROUTE_ROUTE_FAILED); p->fill_node (ca->body.rfbody->failed_hop); ca->body.rfbody->failed_stat = status; ca->routeid = nra->routeid; ca->retries = nra->retries; ca->path = nra->path; ptr<location> o = this->locations->lookup_or_create (make_chord_node (nra->origin)); doRPC (o, recroute_program_1, RECROUTEPROC_COMPLETE, ca, NULL, wrap (this, &recroute<T>::recroute_sent_complete_cb)); // We don't really care if this is lost beyond the RPC system's // retransmits. return; }}template<class T>boolrecroute<T>::recroute_hop_timeout_cb (ptr<recroute_route_arg> nra, ptr<location> p, vec<chordID> failed, chord_node n, int rexmit_number){ if (rexmit_number == 0 && failed.size () <= 3) { nra->retries++; failed.push_back (p->id ()); ptr<location> l = this->closestpred (nra->x, failed); if (l->id () != this->my_ID ()) { rtrace << this->my_ID () << ": dorecroute (" << nra->routeid << ", " << nra->x << "): TIMEOUT now forwarding to " << l->id () << "\n"; recroute_route_stat *nres = New recroute_route_stat (RECROUTE_ACCEPTED); doRPC (p, recroute_program_1, RECROUTEPROC_ROUTE, nra, nres, wrap (this, &recroute<T>::recroute_hop_cb, nra, l, failed, nres), wrap (this, &recroute<T>::recroute_hop_timeout_cb, nra, p, failed)); } else { return false; } } return true;}template<class T>voidrecroute<T>::recroute_sent_complete_cb (clnt_stat status){ if (status) rwarning << this->my_ID () << ": recroute_complete lost, status " << status << ".\n"; // We're not going to do anything more clever about this. It's dropped. Fini.}template<class T>voidrecroute<T>::dopenult (user_args *sbp, recroute_penult_arg *ra){ chord_node_wire me; this->my_location ()->fill_node (me); rtrace << this->my_ID () << ": dopenult (" << ra->routeid << ", " << ra->x << "): complete.\n"; ptr<recroute_complete_arg> ca = New refcounted<recroute_complete_arg> (); ca->routeid = ra->routeid; ca->path.setsize (ra->path.size () + 1); for (size_t i = 0; i < ra->path.size (); i++) { ca->path[i] = ra->path[i]; } ca->path[ra->path.size ()] = me; ca->retries = ra->retries; u_long m = ra->succs_desired; vec<ptr<location> > cs = this->succs (); if (ra->successors.size () + cs.size () < m) { warn << "misdirected PENULT: " << ra->successors.size () << " + " << cs.size () << " < " << m << "\n"; ca->body.set_status (RECROUTE_ROUTE_FAILED); ca->body.rfbody->failed_stat = RECROUTE_ROUTE_FAILED; this->my_location ()->fill_node(ca->body.rfbody->failed_hop); } else { ca->body.set_status (RECROUTE_ROUTE_OK); ca->body.robody->successors.setsize (m); size_t lastind = 0; for (size_t i = 0; i < ra->successors.size (); i++) { chord_node foo = make_chord_node (ra->successors[i]); // rtrace << my_ID () << ": dopenult (" << ra->routeid // << " filling " << i << "\n"; ca->body.robody->successors[i] = ra->successors[i]; if (foo.x == this->my_ID ()) { lastind = i; // rtrace << my_ID () << ": dopenult (" << ra->routeid << "): i'm at " // << i << "\n"; break; } } u_long tofill = m - (lastind + 1); if (tofill > cs.size ()) tofill = cs.size (); for (size_t i = 0; i < tofill; i++) { // rtrace << my_ID () << ": dopenult (" << ra->routeid // << ") filling2 " << i + lastind + 1<< "\n"; cs[i]->fill_node (ca->body.robody->successors[i + lastind + 1]); } ca->retries = ra->retries; } ptr<location> l = this->locations->lookup_or_create (make_chord_node (ra->origin)); doRPC (l, recroute_program_1, RECROUTEPROC_COMPLETE, ca, NULL, wrap (this, &recroute<T>::recroute_sent_complete_cb)); // We don't really care if this is lost beyond the RPC system's // retransmits. sbp->reply (NULL); sbp = NULL;}template<class T>voidrecroute<T>::docomplete (user_args *sbp, recroute_complete_arg *ca){ rtrace << this->my_ID () << ": docomplete: routeid " << ca->routeid << " has returned! retries: " << ca->retries << "\n"; route_recchord *router = routers[ca->routeid]; if (!router) { chord_node src; sbp->fill_from (&src); rtrace << this->my_ID () << ": docomplete: unknown routeid " << ca->routeid << " from host " << src << "\n"; sbp->reply (NULL); return; } routers.remove (router); router->handle_complete (sbp, ca);}template<class T>voidrecroute<T>::stats () const{ T::stats (); warnx << "Outstanding routing lookups:\n"; route_recchord *ri = routers.first (); while (ri != NULL) { timespec ts = ri->start_time (); warnx << " " << ri->routeid_ << " for " << ri->key () << " started " << ts.tv_sec << "." << ts.tv_nsec << "\n"; ri = routers.next (ri); }}template<class T>voidrecroute<T>::find_succlist (const chordID &x, u_long m, cbroute_t cb, ptr<chordID> guess){ route_recchord *ri = static_cast<route_recchord *> (produce_iterator_ptr (x)); if (shave) { rtrace << this->my_ID () << ": find_succlist (" << x << ", " << m << ") is shaving.\n"; ri->set_desired (m); } ri->first_hop (wrap (this, &recroute<T>::find_succlist_cb, cb, ri), guess);}template<class T>voidrecroute<T>::find_succlist_cb (cbroute_t cb, route_recchord *ri, bool done){ assert (done); // Expect that we are only called when totally done. vec<chord_node> cs = ri->successors (); cb (cs, ri->path (), ri->status ()); delete ri; return;}// override produce_iterator*template<class T>ptr<route_iterator>recroute<T>::produce_iterator (chordID xi) { ptr<route_recchord> ri = New refcounted<route_recchord> (mkref (this), xi); routers.insert (ri); return ri;}template<class T>ptr<route_iterator>recroute<T>::produce_iterator (chordID xi, const rpc_program &uc_prog, int uc_procno, ptr<void> uc_args) { ptr<route_recchord> ri = New refcounted<route_recchord> (mkref (this), xi, uc_prog, uc_procno, uc_args); routers.insert (ri); return ri;}template<class T>route_iterator *recroute<T>::produce_iterator_ptr (chordID xi) { route_recchord *ri = New route_recchord (mkref (this), xi); routers.insert (ri); return ri;}template<class T>route_iterator *recroute<T>::produce_iterator_ptr (chordID xi, const rpc_program &uc_prog, int uc_procno, ptr<void> uc_args) { route_recchord *ri = New route_recchord (mkref (this), xi, uc_prog, uc_procno, uc_args); routers.insert (ri); return ri;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -