⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 merkle_syncer.c

📁 基于DHT的对等协议
💻 C
字号:
#include <chord.h>#include "merkle_syncer.h"#include "qhash.h"#include "async.h"#include <id_utils.h>#include <comm.h>#include <modlogger.h>#define warning modlogger ("merkle", modlogger::WARNING)#define info  modlogger ("merkle", modlogger::INFO)#define trace modlogger ("merkle", modlogger::TRACE)// {{{ General utility functionsinline const strbuf &strbuf_cat (const strbuf &sb, merkle_stat status){  return rpc_print (sb, status, 0, NULL, NULL);}voidformat_rpcnode (merkle_tree *ltree, u_int depth, const merkle_hash &prefix,		merkle_node *node, merkle_rpc_node *rpcnode){  rpcnode->depth = depth;  rpcnode->prefix = prefix;  rpcnode->count = node->count;  rpcnode->hash = node->hash;  rpcnode->isleaf = node->isleaf ();  if (!node->isleaf ()) {    rpcnode->child_hash.setsize (64);    for (int i = 0; i < 64; i++)      rpcnode->child_hash[i] = node->child_hash (i);  } else {    vec<merkle_hash> keys = ltree->database_get_keys (depth, prefix);    if (keys.size () != rpcnode->count) {      // This means that the tree has changed since the lookup.      // (Perhaps a key was added or expired.)      // Don't panic; just fudge it.      warn << "format_rpcnode: mismatched count "	   << keys.size () << " != " << rpcnode->count	   << " at depth " << depth << " / " << prefix << "\n";      // Lose extra keys if too many.      while (keys.size () > 64)        keys.pop_back ();      rpcnode->count = keys.size ();      if (!keys.size ()) {	rpcnode->hash = 0;      } else {	sha1ctx sc;	for (u_int i = 0; i < keys.size (); i++)	  sc.update (keys[i].bytes, keys[i].size);	sc.final (rpcnode->hash.bytes);      }    }    rpcnode->child_hash.setsize (keys.size ());    for (u_int i = 0; i < keys.size (); i++) {      rpcnode->child_hash[i] = keys[i];    }  }}// Check whether [l1, r1] overlaps [l2, r2] on the circle.static booloverlap (const bigint &l1, const bigint &r1, const bigint &l2, const bigint &r2){  // There might be a more efficient way to do this..  return (betweenbothincl (l1, r1, l2) || betweenbothincl (l1, r1, r2)	  || betweenbothincl (l2, r2, l1) || betweenbothincl (l2, r2, r1));}static voidcompare_keylists (const vec<chordID> &lkeys,		  const vec<chordID> &vrkeys,		  chordID rngmin, chordID rngmax,		  missingfnc_t missingfnc){  // populate a hash table with the remote keys  qhash<chordID, int, hashID> rkeys;  for (u_int i = 0; i < vrkeys.size (); i++) {    if (betweenbothincl (rngmin, rngmax, vrkeys[i])) {      // trace << "remote key: " << vrkeys[i] << "\n";      rkeys.insert (vrkeys[i], 1);    }  }  // do I have something he doesn't have?  for (unsigned int i = 0; i < lkeys.size (); i++) {    if (!rkeys[lkeys[i]] &&	betweenbothincl (rngmin, rngmax, lkeys[i])) {      trace << "remote missing [" << rngmin << ", "	    << rngmax << "] key=" << lkeys[i] << "\n";      (*missingfnc) (lkeys[i], false);    } else {      if (rkeys[lkeys[i]]) trace << "remote has " << lkeys[i] << "\n";      else trace << "out of range: " << lkeys[i] << "\n";      rkeys.remove (lkeys[i]);    }  }  //anything left: he has and I don't  qhash_slot<chordID, int> *slot = rkeys.first ();  while (slot) {    trace << "local missing [" << rngmin << ", "	  << rngmax << "] key=" << slot->key << "\n";    (*missingfnc) (slot->key, true);    slot = rkeys.next (slot);  }}// }}}// {{{ merkle_getkeyrange// {{{ merkle_getkeyrange declarationsclass merkle_getkeyrange {private:  uint vnode;  dhash_ctype ctype;  bigint rngmin;  bigint rngmax;  bigint current;  missingfnc_t missing;  rpcfnc_t rpcfnc;  vec<chordID> lkeys;  cbv completecb;  void finish ();  void doRPC (int procno, ptr<void> in, void *out, aclnt_cb cb);  void go ();  void getkeys_cb (ref<getkeys_arg> arg, ref<getkeys_res> res, clnt_stat err);public:  ~merkle_getkeyrange () {}  merkle_getkeyrange (uint vnode, dhash_ctype ctype,		      bigint rngmin, bigint rngmax,		      const vec<chordID> &plkeys,		      missingfnc_t missing, rpcfnc_t rpcfnc,		      cbv completecb)    : vnode (vnode), ctype (ctype), rngmin (rngmin),      rngmax (rngmax), current (rngmin),      missing (missing), rpcfnc (rpcfnc), lkeys (plkeys),      completecb (completecb)    { go (); }};// }}}// {{{ merkle_getkeyrange utilityvoidmerkle_getkeyrange::finish (){  delaycb (0, completecb);  delete this;}voidmerkle_getkeyrange::doRPC (int procno, ptr<void> in, void *out, aclnt_cb cb){  // Must resort to bundling all values into one argument since  // async/callback.h is configured with too few parameters.  struct RPC_delay_args args (merklesync_program_1, procno, in, out, cb,			      NULL);  (*rpcfnc) (&args);}// }}}voidmerkle_getkeyrange::go (){  if (!betweenbothincl (rngmin, rngmax, current)) {    trace << "merkle_getkeyrange::go () ==> DONE\n";    finish ();    return;  }  ref<getkeys_arg> arg = New refcounted<getkeys_arg> ();  arg->ctype = ctype;  arg->vnode = vnode;  arg->rngmin = current;  arg->rngmax = rngmax;  ref<getkeys_res> res = New refcounted<getkeys_res> ();  doRPC (MERKLESYNC_GETKEYS, arg, res,	 wrap (this, &merkle_getkeyrange::getkeys_cb, arg, res));}voidmerkle_getkeyrange::getkeys_cb (ref<getkeys_arg> arg, ref<getkeys_res> res,				clnt_stat err){  if (err) {    warn << "GETKEYS: rpc error " << err << "\n";    finish ();    return;  } else if (res->status != MERKLE_OK) {    warn << "GETKEYS: protocol error " << res->status << "\n";    finish ();    return;  }//  warn << "getkeys_cb: [" << arg->rngmin << "," << arg->rngmax << "] "//       << res->resok->keys.size () << " keys, with "//       << (res->resok->morekeys ? "more" : "no") << " keys left.\n";  // Assuming keys are sent back in increasing clockwise order  vec<chordID> rkeys;  for (u_int i = 0; i < res->resok->keys.size (); i++)    rkeys.push_back (res->resok->keys[i]);  chordID sentmax = rngmax;  if (res->resok->morekeys && res->resok->keys.size () > 0)    sentmax = res->resok->keys.back ();  compare_keylists (lkeys, rkeys, current, sentmax, missing);  current = incID (sentmax);  if (!res->resok->morekeys) {    finish ();    return;  }  go ();}// }}}// {{{ merkle_syncer// {{{ merkle_syncer utilitymerkle_syncer::merkle_syncer (uint vnode, dhash_ctype ctype,			      ptr<merkle_tree> ltree,			      rpcfnc_t rpcfnc, missingfnc_t missingfnc)  : vnode (vnode), ctype (ctype), ltree (ltree), rpcfnc (rpcfnc),    missingfnc (missingfnc), completecb (cbi_null),    outstanding_sendnodes (0),    outstanding_keyranges (0){  deleted = New refcounted<bool> (false);  fatal_err = NULL;  sync_done = false;}merkle_syncer::~merkle_syncer (){  *deleted = true;}voidmerkle_syncer::dump (){  warn << "THIS: " << (u_int)this << "\n";  warn << "  st.size () " << st.size () << "\n";}voidmerkle_syncer::doRPC (int procno, ptr<void> in, void *out, aclnt_cb cb){  // Must resort to bundling all values into one argument since  // async/callback.h is configured with too few parameters.  struct RPC_delay_args args (merklesync_program_1, procno, in, out, cb,			      NULL);  (*rpcfnc) (&args);}voidmerkle_syncer::setdone (){  if (sync_done) {    warning << "duplicate setdone for " << (u_int) this << "\n";    return;  }  if (completecb != cbi_null) {    // Return 0 if everything was ok, 1 otherwise.    completecb (fatal_err.cstr () != NULL);    completecb = cbi_null;  }  sync_done = true;}voidmerkle_syncer::error (str err){  warn << (u_int)this << ": SYNCER ERROR: " << err << "\n";  fatal_err = err;  setdone ();}strmerkle_syncer::getsummary (){  assert (sync_done);  strbuf sb;  sb << "[" << local_rngmin << "," << local_rngmax << "] ";  if (fatal_err)    sb << fatal_err;  return sb;}// }}}voidmerkle_syncer::sync (bigint rngmin, bigint rngmax, cbi cb){  assert (outstanding_sendnodes == 0);  assert (outstanding_keyranges == 0);  assert (st.size () == 0);  sync_done = false;  local_rngmin = rngmin;  local_rngmax = rngmax;  completecb = cb;  // start at the root of the merkle tree  sendnode (0, 0);}voidmerkle_syncer::sendnode (u_int depth, const merkle_hash &prefix){  ref<sendnode_arg> arg = New refcounted<sendnode_arg> ();  ref<sendnode_res> res = New refcounted<sendnode_res> ();  u_int lnode_depth;  merkle_node *lnode = ltree->lookup (&lnode_depth, depth, prefix);  // OK to assert this: since depth-1 is an index node, we know that  //                    it created all of its depth children when  //                    it split. --FED  assert (lnode);  assert (lnode_depth == depth);  format_rpcnode (ltree, depth, prefix, lnode, &arg->node);  arg->vnode = vnode;  arg->ctype = ctype;  arg->rngmin = local_rngmin;  arg->rngmax = local_rngmax;  ltree->lookup_release (lnode);  outstanding_sendnodes++;  doRPC (MERKLESYNC_SENDNODE, arg, res,	 wrap (mkref (this), &merkle_syncer::sendnode_cb, deleted, arg, res));}voidmerkle_syncer::sendnode_cb (ptr<bool> deleted,			    ref<sendnode_arg> arg, ref<sendnode_res> res,			    clnt_stat err){  if (*deleted || sync_done)    return;  outstanding_sendnodes--;  if (err) {    error (strbuf () << "SENDNODE: rpc error " << err);    return;  } else if (res->status != MERKLE_OK) {    warn << "SENDNODE: protocol error " << res->status << "\n";  } else {    merkle_rpc_node *rnode = &res->resok->node;    merkle_node *lnode = ltree->lookup_exact (rnode->depth,	rnode->prefix);    if (lnode) {      compare_nodes (local_rngmin, local_rngmax, lnode, rnode);      ltree->lookup_release (lnode);    } else {      // If we no longer have a node at this address, it must mean      // we used to but deletions have shrank our tree.      // Let's just skip this subtree and get it the next time.      warn << "lookup failed: " << rnode->prefix 	   << " at " << rnode->depth << "\n";    }  }  next ();}voidmerkle_syncer::next (void){  trace << "local range [" <<  local_rngmin << "," << local_rngmax << "]\n";  assert (!sync_done);  assert (!fatal_err);  // st is queue of pending index nodes  while (st.size ()) {    pair<merkle_rpc_node, int> &p = st.back ();    merkle_rpc_node *rnode = &p.first;    assert (!rnode->isleaf);    merkle_node *lnode = ltree->lookup_exact (rnode->depth, rnode->prefix);    if (!lnode || lnode->isleaf ()) {      // Inconsistencies are to be expected; skip this subtree for now.      warnx << "lookup_exact didn't match for " << rnode->prefix << " at depth " << rnode->depth << "\n";      if (lnode)	ltree->lookup_release (lnode);      st.pop_back ();      continue;    }    trace << "starting from slot " << p.second << "\n";    while (p.second < 64) {      u_int i = p.second;      p.second += 1;      trace << "CHECKING: " << i << " of " << rnode->prefix << " at depth " << rnode->depth << "\n";      bigint remote = static_cast<bigint> (rnode->child_hash[i]);      bigint local = static_cast<bigint> (lnode->child_hash (i));      u_int depth = rnode->depth + 1;      //prefix is the high bits of the first key      // the node is responsible for.      merkle_hash prefix = rnode->prefix;      prefix.write_slot (rnode->depth, i);      bigint slot_rngmin = static_cast<bigint> (prefix);      bigint slot_width = bigint (1) << (160 - 6*depth);      bigint slot_rngmax = slot_rngmin + slot_width - 1;      bool overlaps = overlap (local_rngmin, local_rngmax, slot_rngmin, slot_rngmax);      strbuf tr;      if (remote != local) {	tr << " differ. local " << local << " != remote " << remote;	if (overlaps) {	  tr << " .. sending\n";	  sendnode (depth, prefix);	  trace << tr;	  ltree->lookup_release (lnode);	  return;	} else {	  tr << " .. not sending\n";	}      } else {	tr << " same. local " << local << " == remote " << remote << "\n";      }      trace << tr;    }    ltree->lookup_release (lnode);    assert (p.second == 64);    st.pop_back ();  }  trace << "DONE with internal nodes in NEXT\n";  if (!outstanding_keyranges && !outstanding_sendnodes) {    setdone ();    trace << "All OK!\n";  }}voidmerkle_syncer::compare_nodes (bigint rngmin, bigint rngmax,	       merkle_node *lnode, merkle_rpc_node *rnode){  trace << (lnode->isleaf ()  ? "L" : "I")       << " vs "       << (rnode->isleaf ? "L" : "I")       << " at " << rnode->prefix << " depth " << rnode->depth << "\n";  if (lnode->hash == rnode->hash) {    trace << "Hashes identical!\n";    return;  }  if (!lnode->isleaf () && !rnode->isleaf) {    st.push_back (pair<merkle_rpc_node, int> (*rnode, 0));    return;  }  vec<chordID> lkeys = ltree->database_get_IDs (rnode->depth,						rnode->prefix);  if (rnode->isleaf) {    vec<chordID> rkeys;    for (u_int i = 0; i < rnode->child_hash.size (); i++)	rkeys.push_back (static_cast<bigint> (rnode->child_hash[i]));    compare_keylists (lkeys, rkeys, rngmin, rngmax, missingfnc);  } else if (lnode->isleaf () && !rnode->isleaf) {    bigint tmpmin = static_cast<bigint> (rnode->prefix);    bigint node_width = bigint (1) << (160 - 6*rnode->depth);    bigint tmpmax = tmpmin + node_width - 1;    assert (tmpmin < tmpmax);    // further constrain to be within the host's range of interest    // This can be purely inside, at the start, at the end,    // or at the outsides.    if (rngmin < rngmax) {      // Completely contained, or only partial overlap      if (between (tmpmin, tmpmax, rngmin))	tmpmin = rngmin;      if (between (tmpmin, tmpmax, rngmax))	tmpmax = rngmax;      outstanding_keyranges++;      vNew merkle_getkeyrange (vnode, ctype, tmpmin, tmpmax, lkeys,	  missingfnc, rpcfnc,	  wrap (mkref (this), &merkle_syncer::collect_keyranges, deleted));    } else {      if (betweenbothincl (rngmin, maxID, tmpmin) &&	  betweenbothincl (0, rngmax, tmpmax))      {	// node is completely one of our ends.	outstanding_keyranges++;	vNew merkle_getkeyrange (vnode, ctype, tmpmin, tmpmax, lkeys,	    missingfnc, rpcfnc,	    wrap (mkref (this), &merkle_syncer::collect_keyranges, deleted));      } else {	// partial overlap; check right and left hand sides.	if (betweenbothincl (rngmin, maxID, tmpmax)) {	  outstanding_keyranges++;	  vNew merkle_getkeyrange (vnode, ctype, rngmin, tmpmax, lkeys,	      missingfnc, rpcfnc,	      wrap (mkref (this), &merkle_syncer::collect_keyranges, deleted));	}	if (betweenbothincl (0, rngmax, tmpmin)) {	  outstanding_keyranges++;	  vNew merkle_getkeyrange (vnode, ctype, tmpmin, rngmax, lkeys,	      missingfnc, rpcfnc,	      wrap (mkref (this), &merkle_syncer::collect_keyranges, deleted));	}      }    }  }}voidmerkle_syncer::collect_keyranges (ptr<bool> deleted){  if (*deleted || sync_done)    return;  assert (outstanding_keyranges > 0);  outstanding_keyranges--;  if (!outstanding_keyranges)    next ();}// }}}/* vim:set foldmethod=marker: */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -