📄 server.c
字号:
memcpy (rpc_res->resok->results.base (), marshalled_res, res_len); xfree (marshalled_res); } assert (rpc_res->status == DORPC_OK); u_int64_t diff = getusec () - init_time; track_proctime (*prog, procno, diff); //reply sbp->reply (rpc_res); delete rpc_res; delete this;}voidvnode_impl::ping (ptr<location> x, cbping_t cb){ //talk directly to the RPC manger to get the dead behaviour rpcm->doRPC_dead (x, transport_program_1, TRANSPORTPROC_NULL, NULL, NULL, wrap (this, &vnode_impl::ping_cb, x, cb));}voidvnode_impl::ping_cb (ptr<location> x, cbping_t cb, clnt_stat status) { if (status) { x->set_alive (false); cb (CHORD_RPCFAILURE); } else { x->set_alive (true); cb (CHORD_OK); }}voidvnode_impl::check_dead_node_cb (ptr<location> l, time_t nbackoff, chordstat s){ unsigned int i=0; chordID id = l->id (); for (i=0; i<dead_nodes.size (); i++) if (dead_nodes[i]->id () == id) break; if (s != CHORD_OK) { if (i == dead_nodes.size ()) dead_nodes.push_back (l); delaycb (nbackoff, wrap (this, &vnode_impl::check_dead_node, l, nbackoff)); } else { /* Take it off the dead list */ if (i != dead_nodes.size ()) { dead_nodes[i] = dead_nodes[0]; dead_nodes.pop_front (); } // Insertion should ensure that node is set to alive and in table // (Liveness should also have been set by ping_cb) ptr<location> nl = locations->insert (l); if (nl != l) { warn << "duplicate location " << l << "\n"; } notify (my_succ (), myID); }}voidvnode_impl::check_dead_node (ptr<location> l, time_t backoff){ int cap; assert (Configurator::only ().get_int ("chord.checkdead_max", cap)); timespec ts; clock_gettime (CLOCK_REALTIME, &ts); if ((ts.tv_sec - l->dead_time ()) > 86400 * 30) { // Throw away nodes that are really really dead, // though they remain in the location table. warnx << "Node " << l << " dead for 30 days\n"; return; } backoff *= 2; if (backoff > cap) backoff = cap; ping (l, wrap (this, &vnode_impl::check_dead_node_cb, l, backoff)); warnx << gettime () << " pinging dead node " << l << "\n";}static inline const char *tracetime (){ static str buf (""); if (aclnttime) { timespec ts; clock_gettime (CLOCK_REALTIME, &ts); buf = strbuf (" %d.%06d", int (ts.tv_sec), int (ts.tv_nsec/1000)); } return buf;}// Stolen from aclnt::init_callstatic voidprintreply (aclnt_cb cb, str name, void *res, void (*print_res) (const void *, const strbuf *, int, const char *, const char *), clnt_stat err){ if (aclnttrace >= 3) { if (err) warn << "ACLNT_TRACE:" << tracetime () << " reply " << name << ": " << err << "\n"; else if (aclnttrace >= 4) { warn << "ACLNT_TRACE:" << tracetime () << " reply " << name << "\n"; if (aclnttrace >= 5 && print_res) print_res (res, NULL, aclnttrace - 4, "REPLY", ""); } } (*cb) (err);}voiderr_cb (aclnt_cb cb){ cb (RPC_CANTSEND);}longvnode_impl::doRPC (ref<location> l, const rpc_program &prog, int procno, ptr<void> in, void *out, aclnt_cb cb, cbtmo_t cb_tmo /* = NULL */, bool stream /* = false */){ //check to see if this is alive ptr<location> loc = locations->lookup (l->id ()); if (loc && !loc->alive()) { warn << "doRPC on dead node " << l->id () << "\n"; delaycb (0, wrap (&err_cb, cb)); return -1; } //form the transport RPC ptr<dorpc_arg> arg = New refcounted<dorpc_arg> (); //header l->fill_node (arg->dest); me_->fill_node (arg->src); arg->progno = prog.progno; arg->procno = procno; //marshall the args ourself xdrproc_t inproc = prog.tbl[procno].xdr_arg; xdrsuio x (XDR_ENCODE); if ((!inproc) || (!inproc (x.xdrp (), in))) { fatal << "failed to marshall args\n"; cb (RPC_CANTSEND); return 0; } else { int args_len = x.uio ()->resid (); arg->args.setsize (args_len); void *marshalled_args = suio_flatten (x.uio ()); memcpy (arg->args.base (), marshalled_args, args_len); xfree (marshalled_args); // This is the real call; cf transport tracking in stp_manager. track_call (prog, procno, args_len); ref<dorpc_res> res = New refcounted<dorpc_res> (DORPC_OK); xdrproc_t outproc = prog.tbl[procno].xdr_res; u_int32_t xid = random_getword (); aclnt_cb cbw = wrap (this, &vnode_impl::doRPC_cb, l, outproc, out, cb, res); // Stolen (mostly) from aclnt::init_call if (aclnttrace >= 2) { str name; const rpcgen_table *rtp; rtp = &prog.tbl[procno]; assert (rtp); name = strbuf ("%s:%s fake_xid=%x", prog.name, rtp->name, xid) << " on " << l->address () << ":" << l->vnode (); warn << "ACLNT_TRACE:" << tracetime () << " call " << name << "\n"; if (aclnttrace >= 5 && rtp->print_arg) rtp->print_arg (in, NULL, aclnttrace - 4, "ARGS", ""); if (aclnttrace >= 3 && cb != aclnt_cb_null) cbw = wrap (printreply, cbw, name, out, rtp->print_res); } if (!stream) return rpcm->doRPC (me_, l, transport_program_1, TRANSPORTPROC_DORPC, arg, res, cbw, wrap(this, &vnode_impl::tmo, cb_tmo, prog.progno, procno, args_len)); else return rpcm->doRPC_stream (me_, l, transport_program_1, TRANSPORTPROC_DORPC, arg, res, cbw); }}boolvnode_impl::tmo (cbtmo_t cb_tmo, int progno, int procno, int args_len, chord_node n, int r){ bool cancel = false; if (cb_tmo) cancel = cb_tmo (n, r); // Track the rexmit for the real program; // rpccb_chord::timeout_cb will track for transport_prot. if (!cancel) track_rexmit (progno, procno, args_len); return cancel;}voidvnode_impl::doRPC_cb (ptr<location> l, xdrproc_t proc, void *out, aclnt_cb cb, ref<dorpc_res> res, clnt_stat err) { if (err) { ptr<location> reall = locations->lookup (l->id ()); if (reall && reall->alive ()) { warn << "got error " << err << ", but " << l << " is still marked alive\n"; reall->set_alive (false); } else if (!reall) { locations->insert (l); l->set_alive (false); } if (!l->alive () && checkdead_int > 0) { // benjie: no longer alive, put it on the dead_nodes list so // we can try to contact it periodically unsigned i=0; for (i=0; i<dead_nodes.size (); i++) if (dead_nodes[i]->id () == l->id ()) break; if (i == dead_nodes.size ()) { dead_nodes.push_back (l); delaycb (checkdead_int, wrap (this, &vnode_impl::check_dead_node, l, checkdead_int)); } } cb (err); } else if (res->status != DORPC_OK) cb (RPC_CANTRECV); else { float distance = l->distance (); chord_node n = make_chord_node (res->resok->src); l->set_coords (n); Coord u_coords (n); update_coords (u_coords, distance); // This should reset the age of the node to zero because // remote side always provides an age of zero for self // and locationtable will pull in updates that are younger. if (me_->id () != n.x) locations->insert (n); //unmarshall the result and copy to out xdrmem x ((char *)res->resok->results.base (), res->resok->results.size (), XDR_DECODE); assert (proc); if (!proc (x.xdrp (), out)) { fatal << "failed to unmarshall result\n"; cb (RPC_CANTSEND); } else cb (err); }}voidvnode_impl::update_error (float actual, float expect, float rmt_err){ if (actual < 0) return; float rel_error = fabs (expect - actual)/actual; float pred_err = me_->coords ().err (); float npe = -1.0; if (pred_err < 0) npe = rel_error; else if (rmt_err < 0) npe = pred_err; else { // ce is our pred error, he is the remote pred error // squaring them punishes high error nodes relatively more. float ce = pred_err*pred_err; float he = rmt_err*rmt_err; //this is our new prediction error found by combining our prediction // error with the remote node's error float new_pred_err = rel_error*(ce/(he + ce)) + pred_err*(he/(he + ce)); //don't just take the new prediction, EWMA it with old predictions npe = (19*pred_err + new_pred_err)/20.0; //cap it at 1.0 if (npe > 1.0) npe = 1.0; } me_->set_coords_err (npe);}#define MAXDIM 10voidvnode_impl::update_coords (Coord uc, float ud){ // warn << myID << " --- starting update -----\n"; Coord coords = me_->coords (); Coord v = uc; float rmt_err = uc.err (); float actual = ud; float expect = coords.distance_f (uc); if (actual >= 0 && actual < 1000000) { //ignore timeouts //track our prediction error update_error (actual, expect, rmt_err); // force magnitude: > 0 --> stretched float grad = expect - actual; v.vector_sub (coords); float len = v.plane_norm (); while (len < 0.0001) { for (int i = 0; i < NCOORD; i++) v.coords[i] = (double)(random () % 400 - 200) / 1.0; //if (USING_HT) v.ht += fabs((double)(random () % 10 - 5) / 10.0); len = v.plane_norm (); } len = v.norm (); float unit = 1.0/sqrtf(len); // scalar_mult(v, unit) is unit force vector // times grad gives the scaled force vector v.scalar_mult (unit*grad); //timestep is the scaled ratio of our prediction error // and the remote node's prediction error float pred_err = me_->coords ().err (); float timestep; if (pred_err > 0 && rmt_err > 0) timestep = 0.1 * (pred_err)/(pred_err + rmt_err); else if (pred_err > 0) timestep = 0.0; else timestep = 1.0; v.scalar_mult(timestep); //flip sign on height v.ht = -v.ht; coords.vector_add (v);#ifdef VIVALDI_DEBUG char b[1024]; snprintf (b, 1024, "coord hop: %f - %f = %f ; len=%f ts=%f (%f %f)\n", expect, actual, grad, len, timestep, pred_err, rmt_err); warn << b; coords.print ("coords "); uc.print ("uc "); warn << "----------------\n";#endif if (coords.ht <= 100) coords.ht = 100; me_->set_coords (coords); } else { char b[32]; snprintf (b, 32, "%f", actual); trace << "COORD: ignored actual of " << b << "\n"; }}longvnode_impl::doRPC (const chord_node &n, const rpc_program &prog, int procno, ptr<void> in, void *out, aclnt_cb cb, cbtmo_t cb_tmo, bool stream){ ptr<location> l = locations->lookup_or_create (n); return doRPC (l, prog, procno, in, out, cb, cb_tmo, stream);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -