⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 chord.c

📁 chord 源码 http://pdos.csail.mit.edu/chord/
💻 C
字号:
/* * * Copyright (C) 2000 Frans Kaashoek (kaashoek@lcs.mit.edu) * Copyright (C) 2001 Frans Kaashoek (kaashoek@lcs.mit.edu) and  *                    Frank Dabek (fdabek@lcs.mit.edu). * *  Permission is hereby granted, free of charge, to any person obtaining *  a copy of this software and associated documentation files (the *  "Software"), to deal in the Software without restriction, including *  without limitation the rights to use, copy, modify, merge, publish, *  distribute, sublicense, and/or sell copies of the Software, and to *  permit persons to whom the Software is furnished to do so, subject to *  the following conditions: * *  The above copyright notice and this permission notice shall be *  included in all copies or substantial portions of the Software. * *  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, *  EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF *  MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND *  NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE *  LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION *  OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION *  WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * */#include <assert.h>#include <qhash.h>#include "chord_impl.h"#include <location.h>#include "stabilize.h"#include "succ_list.h"#include "pred_list.h"#include "comm.h"#include <coord.h>#include <modlogger.h>#include <misc_utils.h>#include <id_utils.h>#include <locationtable.h>#include <configurator.h>ref<vnode>vnode::produce_vnode (ref<chord> _chordnode,		      ref<rpc_manager> _rpcm,		      ref<location> _l){  return New refcounted<vnode_impl> (_chordnode, _rpcm, _l);}// Pure virtual destructors still need definitionsvnode::~vnode () {}chordIDvnode_impl::my_ID () const{  return myID;}ref<location>vnode_impl::my_location (){  return me_;}vnode_impl::vnode_impl (ref<chord> _chordnode,			ref<rpc_manager> _rpcm,			ref<location> _l) :  me_ (_l),  rpcm (_rpcm),  myindex (_l->vnode ()),  myID (_l->id ()),   chordnode (_chordnode){  locations = _chordnode->locations;  warnx << gettime () << " myID is " << myID << "\n";  successors = New refcounted<succ_list> (mkref(this), locations);  predecessors = New refcounted<pred_list> (mkref(this), locations);  stabilizer = New refcounted<stabilize_manager> (myID);  stabilizer->register_client (successors);  stabilizer->register_client (predecessors);  locations->incvnodes ();  addHandler (chord_program_1, wrap(this, &vnode_impl::dispatch));  ngetsuccessor = 0;  ngetpredecessor = 0;  ngetsucclist = 0;  nfindsuccessor = 0;  nhops = 0;  nmaxhops = 0;  nfindpredecessor = 0;  nnotify = 0;  nalert = 0;    ndogetsuccessor = 0;  ndogetpredecessor = 0;  ndonotify = 0;  ndoalert = 0;  ndogetsucclist = 0;  ndogetsucc_ext = 0;  ndogetpred_ext = 0;  assert (Configurator::only ().get_int ("chord.checkdead_interval", checkdead_int));}vnode_impl::~vnode_impl (){  warnx << myID << ": vnode_impl: destroyed\n";  exit (0);}voidvnode_impl::dispatch (user_args *a){  switch (a->procno) {  case CHORDPROC_NULL:     {      assert (0);    }    break;  case CHORDPROC_GETSUCCESSOR:    {      doget_successor (a);    }    break;  case CHORDPROC_GETPREDECESSOR:    {      doget_predecessor (a);    }    break;  case CHORDPROC_NOTIFY:    {      chord_nodearg *na = a->Xtmpl getarg<chord_nodearg> ();      donotify (a, na);    }    break;  case CHORDPROC_ALERT:    {      chord_nodearg *na = a->Xtmpl getarg<chord_nodearg> ();      doalert (a, na);    }    break;  case CHORDPROC_GETSUCCLIST:    {      dogetsucclist (a);    }    break;  case CHORDPROC_GETPREDLIST:    {      dogetpredlist (a);    }    break;  case CHORDPROC_TESTRANGE_FINDCLOSESTPRED:    {      chord_testandfindarg *fa = a->Xtmpl getarg<chord_testandfindarg> ();      doroute (a, fa);    }    break;  case CHORDPROC_GETPRED_EXT:    {      dogetpred_ext (a);    }    break;  case CHORDPROC_GETSUCC_EXT:    {      dogetsucc_ext (a);    }    break;  case CHORDPROC_FINDROUTE:    {      chord_findarg *fa = a->Xtmpl getarg<chord_findarg> ();      dofindroute (a, fa);    }    break;  default:    a->reject (PROC_UNAVAIL);    break;  }}ptr<location>vnode_impl::my_pred() const{  return predecessors->pred ();}ptr<location>vnode_impl::my_succ () const{  return successors->succ ();}voidvnode_impl::stats () const{  warnx << "VIRTUAL NODE STATS " << myID	<< " stable? " << stabilizer->isstable () << "\n";  warnx << "# estimated node in ring "	<< locations->estimate_nodes () << "\n";  warnx << "continuous_timer " << stabilizer->cts_timer ()	<< " backoff " 	<< stabilizer->bo_timer () << "\n";    warnx << "# getsuccesor requests " << ndogetsuccessor << "\n";  warnx << "# getpredecessor requests " << ndogetpredecessor << "\n";  warnx << "# getsucclist requests " << ndogetsucclist << "\n";  warnx << "# notify requests " << ndonotify << "\n";    warnx << "# alert requests " << ndoalert << "\n";    warnx << "# getsuccesor calls " << ngetsuccessor << "\n";  warnx << "# getpredecessor calls " << ngetpredecessor << "\n";  warnx << "# getsucclist calls " << ngetsucclist << "\n";  warnx << "# findsuccessor calls " << nfindsuccessor << "\n";  warnx << "# hops for findsuccessor " << nhops << "\n";  {    char buf[100];    if (nfindsuccessor) {      sprintf (buf, "   Average # hops: %f\n", ((float) nhops)/nfindsuccessor);      warnx << buf;    }  }  warnx << "   # max hops for findsuccessor " << nmaxhops << "\n";  warnx << "# findpredecessor calls " << nfindpredecessor << "\n";  warnx << "# notify calls " << nnotify << "\n";    warnx << "# alert calls " << nalert << "\n";  }voidvnode_impl::print (strbuf &outbuf) const{  outbuf << "======== " << myID << " ====\n";  successors->print (outbuf);  outbuf << "pred : " << my_pred ()->id () << "\n";  outbuf << "=====================================================\n";}voidvnode_impl::stabilize (void){  stabilizer->start ();}voidvnode_impl::join (ptr<location> n, cbjoin_t cb){  ptr<chord_findarg> fa = New refcounted<chord_findarg> ();  fa->x = decID (myID);  fa->return_succs = true;  chord_nodelistres *route = New chord_nodelistres ();  doRPC (n, chord_program_1, CHORDPROC_FINDROUTE, fa, route,	 wrap (this, &vnode_impl::join_getsucc_cb, n, cb, route));}void vnode_impl::join_getsucc_cb (ptr<location> n,			     cbjoin_t cb, chord_nodelistres *route,			     clnt_stat err){  ptr<vnode> v = NULL;  chordstat status = CHORD_OK;    if (err) {    warnx << myID << ": join RPC failed: " << err << "\n";    if (err == RPC_TIMEDOUT) {      // try again. XXX limit the number of retries??      join (n, cb);      delete route;      return;    }    status = CHORD_RPCFAILURE;  } else if (route->status != CHORD_OK) {    status = route->status;  } else if (route->resok->nlist.size () < 1) {    status = CHORD_ERRNOENT;  } else {    // Just insert a possible predecessor and successor.    size_t i = route->resok->nlist.size ();    for (size_t j = 0; j < i; j++) {      warn << my_ID () << " adding " << make_chordID(route->resok->nlist[j]) 	<< "," << route->resok->nlist[j].knownup << "," << route->resok->nlist[j].age 	<< " as an initial node\n";      if (make_chordID (route->resok->nlist[j]) != my_ID ()) 	  locations->insert (make_chord_node (route->resok->nlist[j]));    }    stabilize ();    notify (my_succ (), myID);    v = mkref (this);    status = CHORD_OK;  }  if (status != CHORD_OK) {    warnx << myID << ": join failed, remove from vnodes?\n";    // continue to run, even if join has failed... maybe    // stabilization (with other vnodes) will fix us over time.    stabilize ();    v = mkref (this);  }  cb (v, status);  delete route;}voidvnode_impl::doget_successor (user_args *sbp){  ndogetsuccessor++;    ptr<location> s = my_succ ();  chord_noderes res(CHORD_OK);  s->fill_node (*res.resok);  sbp->reply (&res);}voidvnode_impl::doget_predecessor (user_args *sbp){  ndogetpredecessor++;  ptr<location> p = my_pred ();  chord_noderes res(CHORD_OK);  p->fill_node (*res.resok);  sbp->reply (&res);}voidvnode_impl::do_upcall_cb (char *a, int upcall_prog, int upcall_proc,			  cbupcalldone_t done_cb, bool v){  const rpc_program *prog = chordnode->get_program (upcall_prog);  assert (prog);  xdr_delete (prog->tbl[upcall_proc].xdr_arg, a);  done_cb (v);}voidvnode_impl::do_upcall (int upcall_prog, int upcall_proc,		  void *uc_args, int uc_args_len,		  cbupcalldone_t done_cb){  upcall_record *uc = upcall_table[upcall_prog];  if (!uc) {     warn << "upcall not registered\n";    done_cb (false);    return;  }  const rpc_program *prog = chordnode->get_program (upcall_prog);  if (!prog)     fatal << "bad prog: " << upcall_prog << "\n";    xdrmem x ((char *)uc_args, uc_args_len, XDR_DECODE);  xdrproc_t proc = prog->tbl[upcall_proc].xdr_arg;  assert (proc);    char *unmarshalled_args = (char *)prog->tbl[upcall_proc].alloc_arg ();  if (!proc (x.xdrp (), unmarshalled_args))    fatal << "upcall: error unmarshalling arguments\n";    //run the upcall. It returns a pointer to its result and a length in the cb  cbupcall_t cb = uc->cb;  (*cb)(upcall_proc, (void *)unmarshalled_args,	wrap (this, &vnode_impl::do_upcall_cb, 	      unmarshalled_args, upcall_prog, upcall_proc,	      done_cb));}voidvnode_impl::doroute (user_args *sbp, chord_testandfindarg *fa){  chordID x = fa->x;  chordID succ = my_succ ()->id ();  chord_testandfindres *res = New chord_testandfindres ();    if (betweenrightincl(myID, succ, x) ) {    res->set_status (CHORD_INRANGE);    vec<ptr<location> > succs = successors->succs ();    res->inrange->n.setsize (succs.size ());    for (size_t i = 0; i < succs.size (); i++)      succs[i]->fill_node (res->inrange->n[i]);  } else {    res->set_status (CHORD_NOTINRANGE);    vec<chordID> f;    for (unsigned int i=0; i < fa->failed_nodes.size (); i++)      f.push_back (fa->failed_nodes[i]);    ptr<location> p = closestpred (fa->x, f);    p->fill_node (res->notinrange->n);        vec<ptr<location> > succs = successors->succs ();    res->notinrange->succs.setsize (succs.size ());    for (size_t i = 0; i < succs.size (); i++)      succs[i]->fill_node (res->notinrange->succs[i]);  }  if (fa->upcall_prog)  {    do_upcall (fa->upcall_prog, fa->upcall_proc,	       fa->upcall_args.base (), fa->upcall_args.size (),	       wrap (this, &vnode_impl::upcall_done, fa, res, sbp));  } else {    sbp->reply (res);    delete res;  }}voidvnode_impl::upcall_done (chord_testandfindarg *fa,			 chord_testandfindres *res,			 user_args *sbp,			 bool stop){  if (stop) res->set_status (CHORD_STOP);  sbp->reply (res);  delete res;}voidvnode_impl::donotify (user_args *sbp, chord_nodearg *na){  ndonotify++;  predecessors->update_pred (make_chord_node (na->n));  chordstat res = CHORD_OK;  sbp->reply (&res);}voidvnode_impl::doalert (user_args *sbp, chord_nodearg *na){  ndoalert++;  chord_node n = make_chord_node (na->n);  ptr<location> l = locations->lookup (n.x);  if (l) {    // check whether we cannot reach x either    chord_noderes *res = New chord_noderes (CHORD_OK);    ptr<chordID> v = New refcounted<chordID> (n.x);    doRPC (l, chord_program_1, CHORDPROC_GETSUCCESSOR, v, res,	   wrap (mkref (this), &vnode_impl::doalert_cb, res, n.x));  }  chordstat res = CHORD_OK;  sbp->reply (&res);}voidvnode_impl::doalert_cb (chord_noderes *res, chordID x, clnt_stat err){  if (!err) {    warnx << "doalert_cb: " << x << " is alive\n";  } else {    warnx << "doalert_cb: " << x << " is indeed not alive\n";    // doRPCcb has already killed this node for us.  }  delete res;}voidvnode_impl::dogetsucc_ext (user_args *sbp){  chord_nodelistextres res(CHORD_OK);  ndogetsucc_ext++;  successors->fill_nodelistresext (&res);  sbp->reply (&res);}voidvnode_impl::dogetpred_ext (user_args *sbp){  ndogetpred_ext++;  chord_nodelistextres res(CHORD_OK);  predecessors->fill_nodelistresext (&res);  sbp->reply (&res);}voidvnode_impl::dogetsucclist (user_args *sbp){  ndogetsucclist++;  chord_nodelistres res (CHORD_OK);  successors->fill_nodelistres (&res);  sbp->reply (&res);}voidvnode_impl::dogetpredlist (user_args *sbp){  ndogetpredlist++;  chord_nodelistres res (CHORD_OK);  predecessors->fill_nodelistres (&res);  sbp->reply (&res);}voidvnode_impl::dofindroute (user_args *sbp, chord_findarg *fa){  find_route (fa->x, wrap (this, &vnode_impl::dofindroute_cb, sbp, fa));}voidvnode_impl::dofindroute_cb (user_args *sbp, chord_findarg *fa, 			    vec<chord_node> s, route r, chordstat err){  if (err) {    chord_nodelistres res (CHORD_RPCFAILURE);    sbp->reply (&res);  } else {    chord_nodelistres res (CHORD_OK);    int nnodes_returned = r.size ();    if (fa->return_succs) nnodes_returned += s.size ();    res.resok->nlist.setsize (nnodes_returned);    unsigned int n = 0;    if (fa->return_succs)       for (unsigned int i = 0; i < s.size (); i++, n++) {    	ptr<location> l = New refcounted<location> (s[i]);    	l->fill_node (res.resok->nlist[n]);      }        for (unsigned int i = 0; i < r.size (); i++, n++) {      r[i]->fill_node (res.resok->nlist[n]);    }        sbp->reply (&res);  }}voidvnode_impl::stop (void){  stabilizer->stop ();}vec<ptr<location> > vnode_impl::succs (){  return successors->succs ();}vec<ptr<location> >vnode_impl::preds (){   return predecessors->preds ();}ptr<location>vnode_impl::closestpred (const chordID &x, const vec<chordID> &f){  return successors->closestpred (x, f);}ptr<route_iterator>vnode_impl::produce_iterator (chordID xi){  return New refcounted<route_chord> (mkref (this), xi);}ptr<route_iterator>vnode_impl::produce_iterator (chordID xi, const rpc_program &uc_prog,			      int uc_procno, ptr<void> uc_args){  return New refcounted<route_chord> (mkref (this),				      xi, uc_prog, uc_procno, uc_args);}route_iterator *vnode_impl::produce_iterator_ptr (chordID xi){  return New route_chord (mkref (this), xi);}route_iterator *vnode_impl::produce_iterator_ptr (chordID xi, const rpc_program &uc_prog,				  int uc_procno, ptr<void> uc_args){  return New route_chord (mkref (this), xi, uc_prog, uc_procno, uc_args);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -