maint_policy.t

来自「基于DHT的对等协议」· T 代码 · 共 824 行 · 第 1/2 页

T
824
字号
// -*-c++-*-// vim: filetype=cpp  foldmethod=marker#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <db.h>#include <arpc.h>#include <chord_prot.h>#include <comm.h>#include <merkle.h>#include <merkle_tree_bdb.h>#include <id_utils.h>#include <location.h>#include <libadb.h>#include <rpclib.h>#include <maint_prot.h>#include "maint_policy.h"// {{{ Bandwidth tracker// See comm.[Ch]static inline rpcstats *getstats (int progno, int procno){  str key = strbuf ("%d:%d", progno, procno);  rpcstats *stats = rpc_stats_tab[key];  if (!stats) {    stats = New rpcstats (key);    rpc_stats_tab.insert (stats);  }  return stats;}static voidtrack_aclnt (aclnt_acct_t a){  rpcstats *stats = getstats (a.progno, a.procno);  switch (a.dir) {    case ACCT_SEND:      stats->ncall++;      stats->call_bytes += a.len;      break;    case ACCT_RECV:      stats->nreply++;      stats->reply_bytes += a.len;      break;    default:      fatal ("Unknown RPC direction: %d", a.dir);      break;  }}// }}}// {{{ locationcc - TCP connection managementvec<locationcc *> locationcc::allocated;locationcc::locationcc (const chord_node &n) :    n (n),    tcpc (NULL),    x (NULL),    reapcaller (NULL){  allocated.push_back (this);}ptr<locationcc>locationcc::alloc (const chord_node &n){  for (size_t i = 0; i < allocated.size (); i++) {    if (allocated[i]->n.x == n.x)      return mkref (allocated[i]);  }  return New refcounted<locationcc> (n);}voidlocationcc::aclntmaker (const rpc_program *p, locationcc::aclntcb cb){  ptr<aclnt> c (NULL);  if (x && !x->ateof ()) {    c = aclnt::alloc (x, *p);    c->set_acct_hook (wrap (&track_aclnt));  }  cb (c);}TAMED voidlocationcc::get_stream_aclnt (const rpc_program &p,    callback<void, ptr<aclnt> >::ref cb){  VARS {    int fd (-1);  }  if (x && x->ateof ()) {    if (reapcaller)      timecb_remove (reapcaller);    reapcaller = NULL;    x = NULL;  }  if (x) {    assert (!x->ateof ());    assert (reapcaller);    timecb_remove (reapcaller);    reapcaller = NULL;    ptr<aclnt> c = aclnt::alloc (x, p);    if (c)      c->set_acct_hook (wrap (&track_aclnt));    cb (c);  } else {    assert (!reapcaller);    aclntcbs.push_back (wrap 	(this, &locationcc::aclntmaker, &p, cb));    if (tcpc) return; // Our cb will be called when tcpc finishes.    BLOCK {      tcpc = tcpconnect (n.r.hostname,		  n.r.port-1, // LAME CONVENTION		  @(fd));    }    tcpc = NULL;    if (fd < 0) {      warn << "locationcc: connect to " << n << " failed: " <<	strerror (errno) << "\n";      assert (x == NULL);    } else {      x = axprt_stream::alloc (fd);    }    while (aclntcbs.size ()) {      cbv cb = aclntcbs.pop_back ();      cb ();    }  }  if (x) {    // XXX Should dynamically calculate delay to be just    // longer than the time between the start of two rounds.    reapcaller = delaycb (1200, wrap (this, &locationcc::reaper));  }}voidlocationcc::fill_ipportvn (u_int32_t &a, u_int32_t &b){  sockaddr_in saddr;  bzero (&saddr, sizeof(sockaddr_in));  // saddr.sin_family = AF_INET;  inet_aton (n.r.hostname.cstr (), &saddr.sin_addr);  saddr.sin_port = htons (n.r.port);  /* saddr fields are in network byte order */  a = ntohl (saddr.sin_addr.s_addr);  b = (ntohs (saddr.sin_port) << 16) | n.vnode_num;}voidlocationcc::reaper () {  reapcaller = NULL;  // Forget about the axprt.  This will cause the connection  // to close as soon as any outstanding aclnt's lose their last ref.  x = NULL;}locationcc::~locationcc () {  if (reapcaller) {    timecb_remove (reapcaller);    reapcaller = NULL;  }  for (size_t i = 0; i < allocated.size (); i++) {    if (allocated[i] == this) {      allocated[i] = allocated.back ();      allocated.pop_back ();      break;    }  }}// }}}// {{{ maintainer - base maintainer classstatic str ctype2ext (dhash_ctype c) {  switch (c) {  case DHASH_CONTENTHASH:    return "c";    break;  case DHASH_KEYHASH:    return "k";    break;  case DHASH_NOAUTH:    return "n";    break;  default:    fatal << "bad ctype\n";  }}const u_int32_t maintainer::default_delay = 300;maintainer::maintainer (str path, maint_dhashinfo_t *hostinfo, ptr<syncer> s) :  host (hostinfo->host),  ctype (hostinfo->ctype),  sync (s),  efrags (hostinfo->efrags),  dfrags (hostinfo->dfrags),  db (New refcounted<adb> (hostinfo->dbsock, hostinfo->dbname, hostinfo->hasaux)),  private_path (path),  gm (New refcounted<maint_global> (this)),  running (false),  in_progress (false),  delay (default_delay),  mainttimer (NULL),  ltree (NULL){}maintainer::~maintainer () {  stop ();}voidmaintainer::start (u_int32_t d, bool randomize){  int jitter = 0;  delay = d;  if (mainttimer || in_progress)    return;  running = true;  if (randomize)    jitter = random_getword () % delay;  mainttimer = delaycb (delay + jitter,      wrap (this, &maintainer::start_helper));}void maintainer::start_helper (){  mainttimer = NULL;  // Helper is needed for delaycb since run_cycle is TAMED.  run_cycle (wrap (this, &maintainer::restart, delay));}voidmaintainer::restart (u_int32_t d){  if (running)    start (d);}voidmaintainer::stop (){  if (mainttimer) {    timecb_remove (mainttimer);    mainttimer = NULL;  }  // If we are in-progress, make sure we don't restart later.  // We do not have control to stop an in-process sync.  running = false;}TAMED voidmaintainer::run_cycle (cbv cb){  in_progress = true;  // Run local and global cycle in parallel  BLOCK {    local_maint_cycle (@());    gm->next (@());  }  in_progress = false;  delaycb (0, cb);}TAMED voidmaintainer::local_maint_cycle (cbv cb){  BLOCK {    update_neighbors (@());  }  if (preds.size () > 0) {    BLOCK {       process_neighbors (preds, succs, @());    }  }  (cb) ();}TAMED voidmaintainer::update_neighbors (cbv cb){  VARS {    ptr<chordID> id;    chord_nodelistres *slist;    chord_nodelistres *plist;    clnt_stat e1, e2;    bool changed (false);  }  id = New refcounted<chordID> (host.x);  slist = New chord_nodelistres ();  plist = New chord_nodelistres ();  BLOCK {    doRPC (host, chord_program_1, CHORDPROC_GETPREDLIST, 	   id, plist, @(e1));    doRPC (host, chord_program_1, CHORDPROC_GETSUCCLIST,	   id, slist, @(e2));  }  if (e1 || e2) {    warn << "my local node is down.\n";    changed = true;    goto updateOUT;  }  // Predecessor and predecessor list (for range).  {    size_t sz = plist->resok->nlist.size ();    vec<ptr<locationcc> > npreds;    for (size_t i = 1; i < sz; i++) {      chord_node n = make_chord_node (plist->resok->nlist[i]);      ptr<locationcc> s = locationcc::alloc (n);      npreds.push_back (s);    }    if (npreds.size () != preds.size ())      changed = true;    else {      for (size_t i = 0; i < preds.size (); i++) {	if (npreds[i]->id () != preds[i]->id ()) {	  changed = true;	  break;	}      }    }    preds = npreds;  }  // Successor  {    size_t sz = slist->resok->nlist.size ();    vec<ptr<locationcc> > nsuccs;    for (size_t i = 1; i < sz; i++) {      chord_node n = make_chord_node (slist->resok->nlist[i]);      ptr<locationcc> s = locationcc::alloc (n);      nsuccs.push_back (s);    }    if (nsuccs.size () != succs.size ())      changed = true;    else {      for (size_t i = 0; i < succs.size (); i++) {	if (nsuccs[i]->id () != succs[i]->id ()) {	  changed = true;	  break;	}      }    }    succs = nsuccs;  }updateOUT:  stable = !changed;  if (slist)    delete slist;  if (plist)    delete plist;  cb ();}TAMED voidmaintainer::process_neighbors (    const vec<ptr<locationcc> > &preds,    const vec<ptr<locationcc> > &succs, cbv cb){  // See Carbonite, Passing Tone or other implementations.  cb ();}size_tmaintainer::get_global_repairs (size_t max,      rpc_vec<maint_repair_t, RPC_INFINITY> &repairs){  size_t added = 0;  ptr<locationcc> hostcc = locationcc::alloc (host);  while (gm->maintqueue.size () && repairs.size () < max)  {    maint_repair_t repair;    repair.responsible = false;    repair.id = gm->maintqueue.pop_front ();    hostcc->fill_ipportvn (repair.src_ipv4_addr,	repair.src_port_vnnum);    ptr<locationcc> dst = gm->maintdest.pop_front ();    dst->fill_ipportvn (repair.dst_ipv4_addr,	repair.dst_port_vnnum);    repairs.push_back (repair);    added++;  }  if (added || gm->maintqueue.size ())    warn << host << " added " << added << " gm repairs; " << gm->maintqueue.size () << " remaining.\n";  return added;}// }}}// {{{ Carboniteref<maintainer> carbonite::produce_maintainer (str path, maint_dhashinfo_t *hostinfo, ptr<syncer> s, cbv cb){  return New refcounted<carbonite> (path, hostinfo, s, cb);}carbonite::carbonite (str path, maint_dhashinfo_t *hostinfo, ptr<syncer> s, cbv cb) :    maintainer (path, hostinfo, s){  db->getspaceinfo (wrap (this, &carbonite::init_ltree, cb));}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?