⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 maintwalk.c

📁 基于DHT的对等协议
💻 C
字号:
/* * Walk around the current ring and update any local copies * of their synchronization data. * * Code adapted from tools/walk.C and maint/maint* */#include <db.h>#include <arpc.h>#include <aios.h>#include <comm.h>#include "rpclib.h"#include <merkle.h>#include <merkle_tree_bdb.h>#include <merkle_sync_prot.h>void usage (){  warnx << "Usage: " << progname <<     " [-d maintdir]"    " [-t maxtotaltime]"    " -j <host>:<port>\n";  exit (1);}// {{{ Globalsstatic chordID wellknown_ID = -1;// Sequential is what we believe is the correct sequencing of nodesstatic vec<chord_node> sequential;static const char *localdatapath;static u_int64_t totalbytes = 0;static u_int64_t starttime = 0;static chordID idmax = (chordID (1) << NBIT) - 1;static dhash_ctype ctypes[] = {  DHASH_CONTENTHASH,  DHASH_KEYHASH,  DHASH_NOAUTH};// }}}// {{{ Prototypesstruct sync_info {  chord_node n;  dhash_ctype ctype;  u_int32_t ndiffs;  ptr<merkle_tree> localtree;  ptr<merkle_syncer> msyncer;};void getsucc (const chord_node &n);void getsucc_cb1 (chord_node curr, chord_nodelistextres *res,    clnt_stat err);void getsucc_cb2 (chord_node curr);void sync_with (const chord_node &n, cbv cb);void sync_with_cb1 (u_int64_t start, cbv cb, chord_node n, int fd);void sync_with_cb2 (u_int64_t start, sync_info *si, cbv cb, int err);// }}}// {{{ Misc Utilitystatic str ctype2ext (dhash_ctype c) {  switch (c) {  case DHASH_CONTENTHASH:    return "c";    break;  case DHASH_KEYHASH:    return "k";    break;  case DHASH_NOAUTH:    return "n";    break;  default:    fatal << "bad ctype\n";  }  return "unknown";}voidfail (str msg){  aout << "Total time elapsed: " << (getusec () - starttime)/1000 << "ms\n";  aout << "Total bytes sent:   " << totalbytes << "\n";  if (msg)    fatal << msg;  exit (1);}static voiddoRPCer (ptr<aclnt> c, RPC_delay_args *args){  assert ((args->prog.progno == c->rp.progno) &&          (args->prog.versno == c->rp.versno));  c->call (args->procno, args->in, args->out, args->cb);}void handle_missing (sync_info *si, chordID key, bool missing_local){  si->ndiffs++;  if (missing_local) {    si->localtree->insert (key);  } else {    si->localtree->remove (key);  }}voidtrack_aclnt (aclnt_acct_t a){  totalbytes += a.len;}// }}}// {{{ Main logicvoidgetsucc (const chord_node &n){  chord_nodelistextres *res = New chord_nodelistextres ();  doRPC (n, chord_program_1, CHORDPROC_GETSUCC_EXT, &n.x, res,	 wrap (&getsucc_cb1, n, res));}voidgetsucc_cb1 (chord_node curr, chord_nodelistextres *res, clnt_stat err){  if (err != 0 || res->status != CHORD_OK) {    aout << "failed to get a reading from " << curr << "; skipping.\n";    sequential.pop_front ();    if (sequential.size () == 0) {      fatal << "too many consecutive failures.\n";    }    delete res;    getsucc (sequential[0]);    return;  }    size_t sz = res->resok->nlist.size ();  vec<chord_node> zs;  for (size_t i = 0; i < sz; i++) {    chord_node z = make_chord_node (res->resok->nlist[i].n);    zs.push_back (z);  }  delete res;  // XXX Steal verification code from walk.C?  curr = zs[0];  // ensure we talked to who we think we should be talking to.  assert (curr.x == sequential[0].x);  sequential = zs;  sequential.pop_front ();  sync_with (curr, wrap (&getsucc_cb2, curr));}voidgetsucc_cb2 (chord_node curr){  // wrapped around ring. done.  if (betweenrightincl (curr.x, sequential[0].x, wellknown_ID)) {    aout << "Total time elapsed: " << (getusec () - starttime)/1000 << "ms\n";    aout << "Total bytes sent:   " << totalbytes << "\n";    exit (0);  }    // Out of nodes, done.  if (!sequential.size ())    exit (0);  getsucc (sequential[0]);}voidsync_with (const chord_node &n, cbv cb){  tcpconnect (n.r.hostname, n.r.port-1,      wrap (&sync_with_cb1, getusec (), cb, n));}static int nout (0);voidsync_with_cb1 (u_int64_t start, cbv cb, chord_node n, int fd){  if (fd < 0) {    delaycb (0, cb);    return;  }  ptr<axprt_stream> x = axprt_stream::alloc (fd);  ptr<aclnt> client = aclnt::alloc (x, merklesync_program_1);  client->set_acct_hook (wrap (&track_aclnt));  for (size_t i = 0; i < sizeof (ctypes)/sizeof(ctypes[0]); i++) {    dhash_ctype ctype = ctypes[i];    strbuf succtreepath; succtreepath << localdatapath << "/"       << n.x << "." << ctype2ext (ctype);    sync_info *si = New sync_info ();    si->n = n;    si->ctype = ctype;    si->ndiffs = 0;    si->localtree = New refcounted<merkle_tree_bdb>       (str (succtreepath).cstr (), /* join = */ false, /* ro = */ false );    si->msyncer = New refcounted<merkle_syncer> (	n.vnode_num, ctype,	si->localtree,	wrap (&doRPCer, client),	wrap (&handle_missing, si));    si->msyncer->sync (0, idmax,	wrap (&sync_with_cb2, start, si, cb));    nout++;  }}voidsync_with_cb2 (u_int64_t start, sync_info *si, cbv cb, int err){  nout--;  aout << si->n << " " << ctype2ext (si->ctype)       << " with "       << si->ndiffs << " updates.\n";  if (!nout) {    aout << si->n << " complete in "          << (getusec () - start)/1000 << "ms\n";    delaycb (0, cb);  }  delete si;}// }}}intmain (int argc, char *argv[]){  setprogname (argv[0]);  unsigned int maxtime (0);  str host = "not set";  unsigned short port (0);  localdatapath = "./maintdata/";  int ch;  while ((ch = getopt (argc, argv, "d:j:t:")) != -1) {    switch (ch) {    case 'd':      localdatapath = optarg;      break;    case 'j':       {	char *bs_port = strchr(optarg, ':');	if (!bs_port) usage ();	*bs_port = 0;	bs_port++;	if (inet_addr (optarg) == INADDR_NONE) {	  //yep, this blocks	  struct hostent *h = gethostbyname (optarg);	  if (!h) {	    warn << "Invalid address or hostname: " << optarg << "\n";	    usage ();	  }	  struct in_addr *ptr = (struct in_addr *)h->h_addr;	  host = inet_ntoa (*ptr);	} else	  host = optarg;	port = atoi (bs_port);	break;      }    case 't':      maxtime = atoi (optarg);      break;    }  }  if (host == "not set")    usage ();  {    struct stat sb;    if (stat (localdatapath, &sb) < 0) {      if (errno != ENOENT ||	  (mkdir (localdatapath, 0755) < 0 && errno != EEXIST))	fatal ("%s: %m\n", localdatapath);      if (stat (localdatapath, &sb) < 0)	fatal ("stat (%s): %m\n", localdatapath);      warn << "Created " << localdatapath << " for maintenance state.\n";    }    if (!S_ISDIR (sb.st_mode))      fatal ("%s: not a directory\n", localdatapath);  }  wellknown_ID = make_chordID (host, port, 0);  chord_node wellknown_node;  wellknown_node.x = wellknown_ID;  wellknown_node.r.hostname = host;  wellknown_node.r.port = port;  wellknown_node.vnode_num = 0;  sequential.push_back (wellknown_node);  getsucc (wellknown_node);  starttime = getusec ();  if (maxtime > 0)    delaycb (maxtime, wrap (&fail,	  strbuf ("timed out after %d seconds\n", maxtime)));  sigcb (SIGINT, wrap (&fail, "Received SIGINT\n"));  sigcb (SIGHUP, wrap (&fail, "Received SIGHUP\n"));  sigcb (SIGTERM, wrap (&fail, "Received SIGTERM\n"));  amain ();}// -*-c++-*-// vim: filetype=cpp  foldmethod=marker

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -