⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cd.c

📁 chord 源码 http://pdos.csail.mit.edu/chord/
💻 C
字号:
#include <misc_utils.h>#include <configurator.h>#include <qhash.h>#include "chord.h"#include <sys/types.h>#include "cd_prot.h"#include <location.h>#include <locationtable.h>#include <debruijn.h>#include <fingerroute.h>#include <fingerroutepns.h>#include <recroute.h>#include <modlogger.h>#define info modlogger ("cd")static char *logfname;ptr<chord> chordnode;static str ctlsocket;static qhash<chordID, ref<vnode>, hashID> vnodes;struct routing_mode_desc {  cd_routing_mode m;  vnode_producer_t producer;};					       /* List of routing modes. */routing_mode_desc modes[] = {  { MODE_SUCC, wrap (vnode::produce_vnode) },  { MODE_CHORD, wrap (fingerroute::produce_vnode) },  { MODE_DEBRUIJN, wrap (debruijn::produce_vnode) },  { MODE_PNS, wrap (fingerroutepns::produce_vnode) },  { MODE_PNSREC, wrap (recroute<fingerroutepns>::produce_vnode) },  { MODE_CHORDREC, wrap (recroute<fingerroute>::produce_vnode) },  { MODE_TCPPNSREC, wrap (recroute<fingerroutepns>::produce_vnode) },};int nmodes = sizeof (modes)/sizeof(modes[0]);void stats ();void stop ();void halt ();// =====================================static voidlookup_cb (svccb *sbp, vec<chord_node> nodes, route r, chordstat stat){  cd_lookup_res *res = New cd_lookup_res ();  res->set_stat(stat);  if (stat == CHORD_OK) {    chord_node_wire_plus_id cnw;    while (!r.empty()) {      ptr<location> l = r.pop_front();      l->fill_node(cnw.wire);      cnw.id = l->id();      res->resok->route.push_back(cnw);    }        for (unsigned int i = 0; i < nodes.size(); i++) {      location l = location(nodes[i]);      l.fill_node(cnw.wire);      cnw.id = l.id();      res->resok->successors.push_back(cnw);    }  }  sbp->reply(res);  }static voidgetsucclist_cb (svccb *sbp, vec<chord_node> nodes, chordstat stat){  cd_getsucclist_res *res = New cd_getsucclist_res ();  res->set_stat(stat);  if (stat == CHORD_OK) {    chord_node_wire_plus_id cnw;    for (unsigned int i = 0; i < nodes.size(); i++) {      location l = location(nodes[i]);      l.fill_node(cnw.wire);      cnw.id = l.id();      res->resok->nodes.push_back(cnw);    }  }  sbp->reply(res);}voidcd_dispatch (ptr<asrv> s, svccb *sbp){  if (!sbp) {    // Close the server    s->setcb (NULL);	// Since cd is meant for one-time use, now that my one connection	// has shutdown, shutdown the server too	halt();    return;  }  info << "received cd " << sbp->proc () << "\n";  switch (sbp->proc ()) {  case CD_NULL:    sbp->reply (NULL);    break;  case CD_EXIT:    sbp->reply (NULL);    halt ();    break;  case CD_NEWCHORD:    {      cd_newchord_arg *a = sbp->Xtmpl getarg<cd_newchord_arg> ();      chord_hostname myname = a->myname;	  chord_hostname wellknownhost = a->wellknownhost;      if (myname.len() == 0) {        myname = my_addr();      }      if (wellknownhost.len() == 0) {        wellknownhost = my_addr();      }      cd_newchord_res *res = New cd_newchord_res ();      if (chordnode) {        res->set_stat(CHORD_NOTINRANGE);      } else {        // Find routing mode        int i;        bool success = false;        for (i = 0; i < nmodes; i++) {          if (a->routing_mode == modes[i].m) {            success = true;            break;          }        }                if (!success) {          res->set_stat(CHORD_NOTINRANGE);        } else {          chordnode = New refcounted<chord> (myname,                                             a->myport,                                             modes[i].producer,                                             a->nvnodes,                                             a->maxcache);          chordnode->startchord();          chordnode->join(wellknownhost, a->wellknownport, false);          res->set_stat(CHORD_OK);          res->resok->nvnodes = chordnode->num_vnodes();          for (int j = 0; j < res->resok->nvnodes; j++) {            ptr<vnode> vn = chordnode->get_vnode(j);            chordID id = vn->my_ID();            vnodes.insert(id, vn);            res->resok->vnodes.push_back(id);          }        }      }      sbp->reply(res);    }    break;  case CD_UNNEWCHORD:    {      cd_newchord_res *res = New cd_newchord_res ();      if (chordnode) {        chordnode = NULL;        res->set_stat(CHORD_OK);      } else {        res->set_stat(CHORD_NOTINRANGE);      }      sbp->reply(res);    }    break;      case CD_LOOKUP:    {      cd_lookup_arg *a = sbp->Xtmpl getarg<cd_lookup_arg> ();      ptr<vnode> vn = vnodes[a->vnode];      if (vn == NULL) {        cd_lookup_res *res = New cd_lookup_res ();        res->set_stat(CHORD_NOTINRANGE);        sbp->reply(res);      } else {        vn->find_successor(a->key, wrap(lookup_cb, sbp));      }    }    break;  case CD_GETSUCCLIST:    {      cd_getsucclist_arg *a = sbp->Xtmpl getarg<cd_getsucclist_arg> ();      ptr<vnode> vn = vnodes[a->vnode];      if (vn == NULL) {        cd_lookup_res *res = New cd_lookup_res ();        res->set_stat(CHORD_NOTINRANGE);        sbp->reply(res);      } else {        vn->get_succlist(vn->my_location(), wrap(getsucclist_cb, sbp));      }    }    break;  case CD_GETPREDLIST:    {      cd_getsucclist_arg *a = sbp->Xtmpl getarg<cd_getsucclist_arg> ();      ptr<vnode> vn = vnodes[a->vnode];      if (vn == NULL) {        cd_lookup_res *res = New cd_lookup_res ();        res->set_stat(CHORD_NOTINRANGE);        sbp->reply(res);      } else {        vn->get_predlist(vn->my_location(), wrap(getsucclist_cb, sbp));      }    }    break;  default:    sbp->reject (PROC_UNAVAIL);    break;  }}// =====================================voidcontrol_accept (ref<axprt_stream> x){  ptr<asrv> srv;  srv = asrv::alloc (x, cd_program_1, NULL);  srv->setcb (wrap (&cd_dispatch, srv));}static voidstartcontroller (){  // XXX Are there any security issues here?  int fd = unixsocket_connect(ctlsocket);  if (fd < 0)    fatal << "Error opening control socket: " << strerror(errno) << "\n";  //unlink(ctlsocket);  ref<axprt_stream> x = axprt_stream::alloc (fd, 1024*1025);  control_accept(x);}void clear_stats (const rpc_program &prog){#ifdef RPC_PROGRAM_STATS  bzero (prog.outcall_num, sizeof (prog.outcall_num));  bzero (prog.outcall_bytes, sizeof (prog.outcall_bytes));  bzero (prog.outcall_numrex, sizeof (prog.outcall_numrex));  bzero (prog.outcall_bytesrex, sizeof (prog.outcall_bytesrex));  bzero (prog.outreply_num, sizeof (prog.outreply_num));  bzero (prog.outreply_bytes, sizeof (prog.outreply_bytes));#endif}voiddump_rpcstats (const rpc_program &prog, bool first, bool last){  warn << "dump_rpcstats: " << (u_int)&prog << "\n";  // In arpc/rpctypes.h -- if defined#ifdef RPC_PROGRAM_STATS  static rpc_program total;  str fmt1 ("%-40s %15s %15s %15s %15s %15s %15s\n");  str fmt2 ("%-40s %15d %15d %15d %15d %15d %15d\n");  if (first) {    bzero (&total, sizeof (total));    warn.fmt (fmt1,	      "",	      "outcall_num","outcall_bytes",	      "outcall_numrex","outcall_bytesrex",	      "outreply_num","outreply_bytes");  }  rpc_program subtotal;  bzero (&subtotal, sizeof (subtotal));  for (size_t procno = 0; procno < prog.nproc; procno++) {    if (strlen (prog.tbl[procno].name) == 1)      continue;    warn.fmt (fmt2,	      prog.tbl[procno].name,	      prog.outcall_num[procno],	      prog.outcall_bytes[procno],	      prog.outcall_numrex[procno],	      prog.outcall_bytesrex[procno],	      prog.outreply_num[procno],	      prog.outreply_bytes[procno]);        subtotal.outcall_num[0] += prog.outcall_num[procno];    subtotal.outcall_bytes[0] += prog.outcall_bytes[procno];    subtotal.outcall_numrex[0] += prog.outcall_numrex[procno];    subtotal.outcall_bytesrex[0] += prog.outcall_bytesrex[procno];    subtotal.outreply_num[0] += prog.outreply_num[procno];    subtotal.outreply_bytes[0] += prog.outreply_bytes[procno];  }    str tmp = strbuf () << "SUMMARY " << prog.name;  warn.fmt (fmt2,	    tmp.cstr (),	    subtotal.outcall_num[0],	    subtotal.outcall_bytes[0],	    subtotal.outcall_numrex[0],	    subtotal.outcall_bytesrex[0],	    subtotal.outreply_num[0],	    subtotal.outreply_bytes[0]);  warn << "TOTAL " << prog.name << "  out*_num "       << subtotal.outcall_num[0]          + subtotal.outcall_numrex[0] + subtotal.outreply_num[0]       << " out*_bytes "        << subtotal.outcall_bytes[0]          + subtotal.outcall_bytesrex[0] + subtotal.outreply_bytes[0]       << "\n";  warn << "\n";  total.outcall_num[0] += subtotal.outcall_num[0];  total.outcall_bytes[0] += subtotal.outcall_bytes[0];  total.outcall_numrex[0] += subtotal.outcall_numrex[0];  total.outcall_bytesrex[0] += subtotal.outcall_bytesrex[0];  total.outreply_num[0] += subtotal.outreply_num[0];  total.outreply_bytes[0] += subtotal.outreply_bytes[0];    if (last) {    warn.fmt (fmt2,	      "SUMMARY all protocols",	      total.outcall_num[0],	      total.outcall_bytes[0],	      total.outcall_numrex[0],	      total.outcall_bytesrex[0],	      total.outreply_num[0],	      total.outreply_bytes[0]);    warn << "TOTAL all protocols      out*_num " 	 << total.outcall_num[0]	    + total.outcall_numrex[0] + total.outreply_num[0]	 << " out*_bytes " 	 << total.outcall_bytes[0]	    + total.outcall_bytesrex[0] + total.outreply_bytes[0]	 << "\n";  }#endif}voidbandwidth (){  warn << gettime () << " bandwidth\n";  static bool first_call = true;  extern const rpc_program chord_program_1;  extern const rpc_program cd_program_1;  if (!first_call) {    // don't dump on the first call, because stats    // have not been cleared yet.    dump_rpcstats (chord_program_1, true, false);    dump_rpcstats (cd_program_1, false, true);  }  clear_stats (chord_program_1);  clear_stats (cd_program_1);  warn << gettime () << " bandwidth delaycb\n";  delaycb (1, 0, wrap (bandwidth));  first_call = false;}voidstats () {  warn << "STATS:\n";  //      bandwidth ();    chordnode->stats ();  strbuf x;  chordnode->print (x);  warnx << x;}voidstop (){  chordnode->stop ();}voidhalt (){  warnx << "Exiting on command.\n";  info << "stopping.\n";  chordnode = NULL;  exit (0);}static voidusage (){  warnx << "Usage: " << progname 	<< " -C <control socket> "    "[-O <conf file>] "    "[-b logbase] "    "[-s <server select mode>] "    "[-L <warn/fatal/panic output file name>] "    "[-T <trace file name (aka new log)>] "    "[-t]"    "\n";  exit (1);}intmain (int argc, char **argv){  setprogname (argv[0]);  mp_clearscrub ();  // sfsconst_init ();  random_init ();  sigcb(SIGUSR1, wrap (&stats));  sigcb(SIGUSR2, wrap (&stop));  sigcb(SIGHUP, wrap (&halt));  sigcb(SIGINT, wrap (&halt));  int ch;  int ss_mode = -1;  int lbase = 1;  ctlsocket = "";  logfname = "cd-trace.log";  char *cffile = NULL;  while ((ch = getopt (argc, argv, "b:C:L:O:p:s:T:t"))!=-1)    switch (ch) {    case 'b':      lbase = atoi (optarg);      break;    case 'C':      ctlsocket = optarg;      break;    case 'L':      {	int logfd = open (optarg, O_RDWR | O_CREAT, 0666);	if (logfd <= 0)	  fatal << "Could not open logfile " << optarg << " for appending\n";	lseek (logfd, 0, SEEK_END);	errfd = logfd;	break;      }    case 'O':      cffile = optarg;      break;    case 's':      ss_mode = atoi(optarg);      break;    case 't':      modlogger::setmaxprio (modlogger::TRACE);      break;    case 'T':      logfname = optarg;      break;    default:      usage ();      break;    }  if (ctlsocket.len() == 0)  {    fatal << "Must specify -C\n";  }    {    int logfd = open (logfname, O_WRONLY|O_APPEND|O_CREAT, 0666);    if (logfd < 0)      fatal << "Couldn't open " << optarg << " for append.\n";    modlogger::setlogfd (logfd);  }  if (cffile) {    bool ok = Configurator::only ().parse (cffile);    assert (ok);  }    if (ss_mode >= 0) {    if (ss_mode & 1) {      fatal << "DHash ordered successors not supported by cd.\n";    }    Configurator::only ().set_int ("chord.greedy_lookup",				   ((ss_mode & 2) ? 1 : 0));    Configurator::only ().set_int ("chord.find_succlist_shaving",				   ((ss_mode & 4) ? 1 : 0));  }  // XXX The following should be made an option to NEWVNODE  if (lbase != 1) {    Configurator::only ().set_int ("debruijn.logbase", lbase);  }  Configurator::only ().dump ();    {    strbuf x = strbuf ("starting: ");    for (int i = 0; i < argc; i++) { x << argv[i] << " "; }    x << "\n";    info << x;  }  time_t now = time (NULL);  warn << "cd starting up at " << ctime ((const time_t *)&now);  warn << " running with options: \n";  warn << "  control socket: " << ctlsocket << "\n";  warn << "  ss mode: " << ss_mode << "\n";  startcontroller ();    info << "starting amain.\n";  amain ();}// This is needed to instantiate recursive routing classes.#include <recroute.C>template class recroute<fingerroutepns>;template class recroute<fingerroute>;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -