maint_global.t

来自「基于DHT的对等协议」· T 代码 · 共 132 行

T
132
字号
// -*-c++-*-// vim: filetype=cpp  foldmethod=marker//// TODO:// Remove keys that are successfully repaired.// Remove keys that the remote side already has (would handle above case).// - We want to have a list of objects to save... those are the//   ones that are going to be repaired.  I want something like SQS.#include <arpc.h>#include <misc_utils.h>#include <id_utils.h>#include <rpclib.h>#include <chord_prot.h>#include <merkle.h>#include <maint_policy.h>voidmaint_global::handle_missing (ptr<locationcc> d,    chordID key, bool missing_local){  if (missing_local) {    // XXX optimize merkle implementation to not request locally    //     missing keys so that we don't have to waste this bandwidth    //     on sync?    return;  }  // Keys that are missing remotely need to be pushed to the remote node.  for (size_t i = 0; i < maintqueue.size (); i++) {    if (maintqueue[i] == key)      return;  }  warn << "global enqueue " << key << " from " << m->host << " to " << d->chordnode () << "\n";  maintqueue.push_back (key);  maintdest.push_back (d);  // The maintainer will check this queue when someone asks for repairs.  // dhblock_srv will check whether or not it is responsible for the  // key to decide if it is okay to delete at the end of a repair.}maint_global::maint_global (maintainer *m) :  m (m),  rngmin (incID (m->host.x)),  rngmax (incID (m->host.x)){}maint_global::~maint_global (){}TAMED voidmaint_global::next (cbv donecb){  VARS {    vec<chordID> keys;    ptr<locationcc> succ (NULL);    ptr<chord_findarg> arg (NULL);    ptr<chord_nodelistres> keysucc (NULL);    clnt_stat err;  };  // Look up first successor key of rngmin in m->db.  // NB: get_keyrange doesn't support wrapping around 0  if (rngmin < m->host.x) {    keys = m->localtree ()->get_keyrange (rngmin, m->host.x, 1);  } else {    keys = m->localtree ()->get_keyrange (rngmin, maxID, 1);    if (!keys.size ())      keys = m->localtree ()->get_keyrange (0, m->host.x, 1);  }  if (!keys.size ()) {    goto nextOUT;  }   // warn << m->host.x << " maint_global: searching for " << keys[0] << "\n";  // Find its successor.  arg = New refcounted<chord_findarg>;  arg->x = keys[0];  arg->return_succs = false;  keysucc = New refcounted<chord_nodelistres>;  BLOCK {    doRPC (m->host, chord_program_1, CHORDPROC_FINDROUTE,	arg, keysucc, @(err));  }  if (err || keysucc->status) {    // doRPC already printed out an error    goto nextOUT;  }  {    chord_node n = make_chord_node (keysucc->resok->nlist.back ());    succ = locationcc::alloc (n);  }  // Prep for next round  rngmax = succ->id ();  // ...and continue if this range is skippable.  if (succ->id () == m->host.x)  {    // warn << m->host.x << " maint_global: succ is self\n";    goto nextOUT;  } else {    // warn << m->host.x << " maint_global: succ is " << succ->id () << "\n";    // Anything in the range of our predecessor list should be handled by    // regular maintenance.    for (size_t i = 0; i < m->preds.size (); i++) {      if (succ->id () == m->preds[i]->id ())	goto nextOUT;    }  }  //   sync_with (successor) ... for its primary range  //     queue up a list of objects to send this host.  //     make this queue available for get_repairs.  BLOCK {    warn << m->host.x << " maint_global: syncing (" << rngmin << ", " << rngmax << ") with " << succ->id () << "\n";    m->sync->sync_with (succ, rngmin, rngmax,	m->localtree (),	wrap (this, &maint_global::handle_missing, succ),	@());  }nextOUT:  if (succ)    rngmin = incID (succ->id ());  (*donecb) ();}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?