⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 server.c

📁 chord 源码 http://pdos.csail.mit.edu/chord/
💻 C
📖 第 1 页 / 共 2 页
字号:
    memcpy (rpc_res->resok->results.base (), marshalled_res, res_len);    xfree (marshalled_res);  }  assert (rpc_res->status == DORPC_OK);  u_int64_t diff = getusec () - init_time;  track_proctime (*prog, procno, diff);   //reply  sbp->reply (rpc_res);  delete rpc_res;  delete this;}voidvnode_impl::ping (ptr<location> x, cbping_t cb){  //talk directly to the RPC manger to get the dead behaviour  rpcm->doRPC_dead (x, transport_program_1, TRANSPORTPROC_NULL, 		    NULL, NULL, 		    wrap (this, &vnode_impl::ping_cb, x, cb));}voidvnode_impl::ping_cb (ptr<location> x, cbping_t cb, clnt_stat status) {  if (status) {    x->set_alive (false);    cb (CHORD_RPCFAILURE);  } else {    x->set_alive (true);    cb (CHORD_OK);  }}voidvnode_impl::check_dead_node_cb (ptr<location> l, time_t nbackoff, chordstat s){  unsigned int i=0;  chordID id = l->id ();  for (i=0; i<dead_nodes.size (); i++)    if (dead_nodes[i]->id () == id)      break;  if (s != CHORD_OK) {    if (i == dead_nodes.size ())      dead_nodes.push_back (l);    delaycb (nbackoff, wrap (this, &vnode_impl::check_dead_node, l, nbackoff));  }  else {    /* Take it off the dead list */    if (i != dead_nodes.size ()) {      dead_nodes[i] = dead_nodes[0];      dead_nodes.pop_front ();    }    // Insertion should ensure that node is set to alive and in table    // (Liveness should also have been set by ping_cb)    ptr<location> nl = locations->insert (l);    if (nl != l) {      warn << "duplicate location  " << l << "\n";    }    notify (my_succ (), myID);  }}voidvnode_impl::check_dead_node (ptr<location> l, time_t backoff){  int cap;  assert (Configurator::only ().get_int ("chord.checkdead_max", cap));  timespec ts;  clock_gettime (CLOCK_REALTIME, &ts);  if ((ts.tv_sec - l->dead_time ()) > 86400 * 30) {    // Throw away nodes that are really really dead,    // though they remain in the location table.    warnx << "Node " << l << " dead for 30 days\n";    return;  }  backoff *= 2;  if (backoff > cap)    backoff = cap;  ping (l, wrap (this, &vnode_impl::check_dead_node_cb, l, backoff));  warnx << gettime () << " pinging dead node " << l << "\n";}static inline const char *tracetime (){  static str buf ("");  if (aclnttime) {    timespec ts;    clock_gettime (CLOCK_REALTIME, &ts);    buf = strbuf (" %d.%06d", int (ts.tv_sec), int (ts.tv_nsec/1000));  }  return buf;}// Stolen from aclnt::init_callstatic voidprintreply (aclnt_cb cb, str name, void *res,	    void (*print_res) (const void *, const strbuf *, int,			       const char *, const char *),	    clnt_stat err){  if (aclnttrace >= 3) {    if (err)      warn << "ACLNT_TRACE:" << tracetime () 	   << " reply " << name << ": " << err << "\n";    else if (aclnttrace >= 4) {      warn << "ACLNT_TRACE:" << tracetime ()	   << " reply " << name << "\n";      if (aclnttrace >= 5 && print_res)	print_res (res, NULL, aclnttrace - 4, "REPLY", "");    }  }  (*cb) (err);}voiderr_cb (aclnt_cb cb){  cb (RPC_CANTSEND);}longvnode_impl::doRPC (ref<location> l, const rpc_program &prog, int procno, 		   ptr<void> in, void *out, aclnt_cb cb,		   cbtmo_t cb_tmo /* = NULL */, bool stream /* = false */){  //check to see if this is alive  ptr<location> loc = locations->lookup (l->id ());  if (loc && !loc->alive()) {    warn << "doRPC on dead node " << l->id () << "\n";    delaycb (0, wrap (&err_cb, cb));    return -1;  }  //form the transport RPC  ptr<dorpc_arg> arg = New refcounted<dorpc_arg> ();  //header  l->fill_node (arg->dest);  me_->fill_node (arg->src);    arg->progno = prog.progno;  arg->procno = procno;  //marshall the args ourself  xdrproc_t inproc = prog.tbl[procno].xdr_arg;  xdrsuio x (XDR_ENCODE);  if ((!inproc) || (!inproc (x.xdrp (), in))) {    fatal << "failed to marshall args\n";    cb (RPC_CANTSEND);    return 0;  } else {    int args_len = x.uio ()->resid ();    arg->args.setsize (args_len);    void *marshalled_args = suio_flatten (x.uio ());    memcpy (arg->args.base (), marshalled_args, args_len);    xfree (marshalled_args);    // This is the real call; cf transport tracking in stp_manager.    track_call (prog, procno, args_len);    ref<dorpc_res> res = New refcounted<dorpc_res> (DORPC_OK);    xdrproc_t outproc = prog.tbl[procno].xdr_res;    u_int32_t xid = random_getword ();    aclnt_cb cbw = wrap (this, &vnode_impl::doRPC_cb, 			l, outproc, out, cb, res);    // Stolen (mostly) from aclnt::init_call    if (aclnttrace >= 2) {      str name;      const rpcgen_table *rtp;      rtp = &prog.tbl[procno];      assert (rtp);      name = strbuf ("%s:%s fake_xid=%x", prog.name, rtp->name, xid)	 	 << " on " << l->address () << ":" << l->vnode ();            warn << "ACLNT_TRACE:" << tracetime () << " call " << name << "\n";      if (aclnttrace >= 5 && rtp->print_arg)	rtp->print_arg (in, NULL, aclnttrace - 4, "ARGS", "");      if (aclnttrace >= 3 && cb != aclnt_cb_null)	cbw = wrap (printreply, cbw, name, out, rtp->print_res);    }      if (!stream)      return rpcm->doRPC (me_, l, transport_program_1, TRANSPORTPROC_DORPC, 			  arg, res, cbw, 			  wrap(this, &vnode_impl::tmo, cb_tmo, 			       prog.progno, procno, args_len));    else      return rpcm->doRPC_stream (me_, l, 				 transport_program_1, TRANSPORTPROC_DORPC, 				 arg, res, cbw);  }}boolvnode_impl::tmo (cbtmo_t cb_tmo, int progno, 		 int procno, int args_len, chord_node n, int r){  bool cancel = false;  if (cb_tmo) cancel = cb_tmo (n, r);  // Track the rexmit for the real program;  // rpccb_chord::timeout_cb will track for transport_prot.  if (!cancel)    track_rexmit (progno, procno, args_len);  return cancel;}voidvnode_impl::doRPC_cb (ptr<location> l, xdrproc_t proc, 		      void *out, aclnt_cb cb, 		      ref<dorpc_res> res, clnt_stat err) {  if (err) {    ptr<location> reall = locations->lookup (l->id ());    if (reall && reall->alive ()) {      warn << "got error " << err << ", but " << l	   << " is still marked alive\n";      reall->set_alive (false);    } else if (!reall) {      locations->insert (l);      l->set_alive (false);    }    if (!l->alive () && checkdead_int > 0) {      // benjie: no longer alive, put it on the dead_nodes list so      // we can try to contact it periodically      unsigned i=0;      for (i=0; i<dead_nodes.size (); i++)	if (dead_nodes[i]->id () == l->id ())          break;      if (i == dead_nodes.size ()) {        dead_nodes.push_back (l);	delaycb (checkdead_int, wrap (this, &vnode_impl::check_dead_node, l, checkdead_int));      }    }    cb (err);  }  else if (res->status != DORPC_OK)    cb (RPC_CANTRECV);  else {    float distance = l->distance ();    chord_node n = make_chord_node (res->resok->src);    l->set_coords (n);    Coord u_coords (n);    update_coords (u_coords,		   distance);    // This should reset the age of the node to zero because    // remote side always provides an age of zero for self    // and locationtable will pull in updates that are younger.    if (me_->id () != n.x)       locations->insert (n);     //unmarshall the result and copy to out    xdrmem x ((char *)res->resok->results.base (), 	      res->resok->results.size (), XDR_DECODE);    assert (proc);    if (!proc (x.xdrp (), out)) {      fatal << "failed to unmarshall result\n";      cb (RPC_CANTSEND);    } else       cb (err);  }}voidvnode_impl::update_error (float actual, float expect, float rmt_err){  if (actual < 0) return;  float rel_error = fabs (expect - actual)/actual;    float pred_err = me_->coords ().err ();  float npe = -1.0;  if (pred_err < 0)    npe = rel_error;  else if (rmt_err < 0)    npe = pred_err;  else {    // ce is our pred error, he is the remote pred error    // squaring them punishes high error nodes relatively more.    float ce = pred_err*pred_err;    float he = rmt_err*rmt_err;    //this is our new prediction error found by combining our prediction    // error with the remote node's error    float new_pred_err = rel_error*(ce/(he + ce)) + pred_err*(he/(he + ce));    //don't just take the new prediction, EWMA it with old predictions    npe = (19*pred_err + new_pred_err)/20.0;    //cap it at 1.0    if (npe > 1.0) npe = 1.0;  }  me_->set_coords_err (npe);}#define MAXDIM 10voidvnode_impl::update_coords (Coord uc, float ud){  //  warn << myID << " --- starting update -----\n";  Coord coords = me_->coords ();  Coord v = uc;  float rmt_err = uc.err ();    float actual = ud;  float expect = coords.distance_f (uc);  if (actual >= 0 && actual < 1000000) { //ignore timeouts    //track our prediction error    update_error (actual, expect, rmt_err);    // force magnitude: > 0 --> stretched    float grad = expect - actual;    v.vector_sub (coords);        float len = v.plane_norm ();    while (len < 0.0001) {      for (int i = 0; i < NCOORD; i++)	v.coords[i] = (double)(random () % 400 - 200) / 1.0;      //if (USING_HT) v.ht += fabs((double)(random () % 10 - 5) / 10.0);      len = v.plane_norm ();    }    len = v.norm ();    float unit = 1.0/sqrtf(len);    // scalar_mult(v, unit) is unit force vector    // times grad gives the scaled force vector    v.scalar_mult (unit*grad);    //timestep is the scaled ratio of our prediction error    // and the remote node's prediction error    float pred_err = me_->coords ().err ();    float timestep;    if (pred_err > 0 && rmt_err > 0)      timestep = 0.1 * (pred_err)/(pred_err + rmt_err);    else if (pred_err > 0)      timestep = 0.0;    else      timestep = 1.0;    v.scalar_mult(timestep);    //flip sign on height    v.ht = -v.ht;    coords.vector_add (v);#ifdef VIVALDI_DEBUG    char b[1024];			           snprintf (b, 1024, "coord hop: %f - %f = %f ; len=%f ts=%f (%f %f)\n", 	      expect, actual, grad, len, timestep, pred_err, rmt_err);    warn << b;    coords.print ("coords ");    uc.print ("uc ");    warn << "----------------\n";#endif         if (coords.ht <= 100) coords.ht = 100;    me_->set_coords (coords);  } else {    char b[32];    snprintf (b, 32, "%f", actual);    trace << "COORD: ignored actual of " << b << "\n";  }}longvnode_impl::doRPC (const chord_node &n, const rpc_program &prog, int procno, 		   ptr<void> in, void *out, aclnt_cb cb, 		    		   cbtmo_t cb_tmo, bool stream){  ptr<location> l = locations->lookup_or_create (n);  return doRPC (l, prog, procno, in, out, cb, cb_tmo, stream);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -