⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 test_merkle_syncer.c

📁 基于DHT的对等协议
💻 C
字号:
#include <chord.h>#include <modlogger.h>#include <id_utils.h>#include <db.h>#include "merkle.h"#include "merkle_tree_disk.h"#include "merkle_tree_bdb.h"#include <location.h>#include <transport_prot.h>#include <comm.h>// {{{ Globalsstatic struct {  ptr<merkle_tree> tree;  ptr<merkle_server> server;  ptr<asrv> srv;} SERVER;static struct {  ptr<merkle_tree> tree;  ptr<merkle_syncer> syncer;  ptr<aclnt> clnt;} SYNCER;u_int32_t nkeyspushed = 0;u_int32_t nkeyspulled = 0;vec<chordID> keys_for_server;vec<chordID> keys_for_syncer;// }}}// {{{ Merkle Tree setup/teardown harnessstruct harness_t {  harness_t () {}  virtual ~harness_t () {}};ptr<harness_t> harness (NULL);struct mem_harness_t : public harness_t {  mem_harness_t () {    SERVER.tree = New refcounted<merkle_tree_mem> ();    SYNCER.tree = New refcounted<merkle_tree_mem> ();  }  ~mem_harness_t () {  }};str server_index = "/tmp/server.index.mrk";str server_internal = "/tmp/server.internal.mrk";str server_leaf = "/tmp/server.leaf.mrk";str syncer_index = "/tmp/syncer.index.mrk";str syncer_internal = "/tmp/syncer.internal.mrk";str syncer_leaf = "/tmp/syncer.leaf.mrk";str server_index_ro = "/tmp/server.index.mrk.ro";str server_internal_ro = "/tmp/server.internal.mrk.ro";str server_leaf_ro = "/tmp/server.leaf.mrk.ro";str syncer_index_ro = "/tmp/syncer.index.mrk.ro";str syncer_internal_ro = "/tmp/syncer.internal.mrk.ro";str syncer_leaf_ro = "/tmp/syncer.leaf.mrk.ro";struct disk_harness_t : public harness_t {  disk_harness_t () {    SERVER.tree = New refcounted<merkle_tree_disk> (server_index,	server_internal, server_leaf, true);    SYNCER.tree = New refcounted<merkle_tree_disk> (syncer_index,	syncer_internal, syncer_leaf, true);  }  ~disk_harness_t () {    unlink (server_index);    unlink (server_internal);    unlink (server_leaf);    unlink (server_index_ro);    unlink (server_internal_ro);    unlink (server_leaf_ro);    unlink (syncer_index);    unlink (syncer_internal);    unlink (syncer_leaf);    unlink (syncer_index_ro);    unlink (syncer_internal_ro);    unlink (syncer_leaf_ro);  }};struct bdb_harness_t : public harness_t {  bdb_harness_t () {    SERVER.tree = New refcounted<merkle_tree_bdb> ("/tmp/server.bdb", false, false);    SYNCER.tree = New refcounted<merkle_tree_bdb> ("/tmp/syncer.bdb", false, false);  }  ~bdb_harness_t () {    system ("rm -rf /tmp/server.bdb");    system ("rm -rf /tmp/syncer.bdb");  }};str mode ("bdb");ptr<harness_t> allocate_harness (){  if (mode == "bdb")    return New refcounted<bdb_harness_t> ();  else if (mode == "disk")    return New refcounted<disk_harness_t> ();  else if (mode == "mem")    return New refcounted<mem_harness_t> ();  else    fatal << "Unknown mode " << mode << "\n";  return NULL;}// }}}// {{{ RPC Magicstatic voiddoRPCcb (xdrproc_t proc, dorpc_res *res, aclnt_cb cb, void *out, clnt_stat err){  xdrmem x ((char *)res->resok->results.base (), 	    res->resok->results.size (), XDR_DECODE);  if (err) {    warnx << "doRPC: err = " << err << "\n";    assert (!err);  } else if (!proc (x.xdrp (), out)) {    warnx << "failed to unmarshall result\n";    cb (RPC_CANTSEND);    return;  }  cb (err);  delete res;}// called by syncer to perform merkle RPC to serverstatic voiddoRPC (RPC_delay_args *a){  //form the transport RPC  ptr<dorpc_arg> arg = New refcounted<dorpc_arg> ();  // Other fields don't matter.  arg->progno = a->prog.progno;  arg->procno = a->procno;  xdrproc_t inproc = a->prog.tbl[a->procno].xdr_arg;  xdrproc_t outproc = a->prog.tbl[a->procno].xdr_res;  assert (outproc);  xdrsuio x (XDR_ENCODE);  if ((!inproc) || (!inproc (x.xdrp (), (void *)a->in))) {    fatal << "failed to marshall args\n";  }   int args_len = x.uio ()->resid ();  arg->args.setsize (args_len);  x.uio ()->copyout (arg->args.base ());  dorpc_res *res = New dorpc_res (DORPC_OK);  SYNCER.clnt->call (TRANSPORTPROC_DORPC, arg, res,                     wrap (&doRPCcb, outproc, res, a->cb, a->out));}vec<const rpc_program *> handledProgs;vec<cbdispatch_t> handlers;static void transport_dispatch (svccb *sbp){  if (!sbp) {    warnx << "transport server eof\n";    return;  }  dorpc_arg *arg = sbp->Xtmpl getarg<dorpc_arg> ();  chord_node_wire nw;  bzero (&nw, sizeof (chord_node_wire));  const rpc_program *prog (NULL);  unsigned int i (0);  for (unsigned int i = 0; i < handledProgs.size (); i++)    if (arg->progno == (int)handledProgs[i]->progno) {      prog = handledProgs[i];      break;    }  char *arg_base = (char *)(arg->args.base ());  int arg_len = arg->args.size ();    xdrmem x (arg_base, arg_len, XDR_DECODE);  xdrproc_t proc = prog->tbl[arg->procno].xdr_arg;  assert (proc);    void *unmarshalled_args = prog->tbl[arg->procno].alloc_arg ();  if (!proc (x.xdrp (), unmarshalled_args)) {    warn << "dispatch: error unmarshalling arguments: "	 << arg->progno << "." << arg->procno << "\n";    xdr_delete (prog->tbl[arg->procno].xdr_arg, unmarshalled_args);    sbp->replyref (rpcstat (DORPC_MARSHALLERR));    return;  }  user_args *ua = New user_args (sbp, unmarshalled_args,				 prog, arg->procno, 0);  ua->me_ = New refcounted<location> (make_chord_node (nw));  handlers[i] (ua);}// called by server to register handler fielding merkle RPCsstatic voidaddHandler (const rpc_program &prog, cbdispatch_t cb){  handledProgs.push_back (&prog);  handlers.push_back (cb);}// }}}// {{{ Testing framework - setup/finish/addrand/dump_statsstatic voidsendblock (bigint blockID, bool missingLocal){  if (missingLocal)    keys_for_syncer.push_back (blockID);  else    keys_for_server.push_back (blockID);}voiddump_stats (){  warn  << "+++++++++++++++++++++++++SERVER.tree++++++++++++++++++++++++++++++\n";  SERVER.tree->compute_stats ();  // SERVER.tree->dump ();  warn  << "+++++++++++++++++++++++++SYNCER.tree++++++++++++++++++++++++++++++\n";  SYNCER.tree->compute_stats ();  // SYNCER.tree->dump ();}voidsetup (){  warn << "===> setup() +++++++++++++++++++++++++++\n";  harness = allocate_harness ();  // these are closed by axprt_stream's dtor, right?   int fds[2];  assert (socketpair (AF_UNIX, SOCK_STREAM, 0, fds) == 0);  warn << "  sockets: " << fds[0] << ":" << fds[1] << "\n";  SERVER.srv = asrv::alloc (axprt_stream::alloc (fds[0]), transport_program_1);  SYNCER.clnt = aclnt::alloc (axprt_stream::alloc (fds[1]), transport_program_1);  assert (SERVER.srv && SYNCER.clnt);  SERVER.srv->setcb (wrap (&transport_dispatch));  SYNCER.syncer = New refcounted<merkle_syncer> (0, DHASH_CONTENTHASH,						 SYNCER.tree, 						 wrap (doRPC),						 wrap (sendblock));  SERVER.server = New refcounted<merkle_server> (SERVER.tree);  addHandler (merklesync_program_1,      wrap (SERVER.server, &merkle_server::dispatch));  err_flush ();}voidfinish (){  // Force all destructors to be called, hopefully.  handledProgs.clear ();  handlers.clear ();  SYNCER.syncer = NULL;  SERVER.server = NULL;  SYNCER.clnt = NULL;  SERVER.srv  = NULL;  SYNCER.tree = NULL;  SERVER.tree = NULL;  harness = NULL;}voidaddrand (ptr<merkle_tree> tr, int count){  for (int i = 0; i < count; i++) {    merkle_hash key;    key.randomize ();    tr->insert (key);    if ((i % 1000) == 0) {      warn << "inserted " << i << " blocks..of " << count << "\n";      err_flush ();    }  }}voidremovesome (ptr<merkle_tree> tr, int count, bool rand = false){  static bigint idmax = (bigint (1) << 160) - 1;  int maxtries = 2*count;  int nremoved = 0;  while (nremoved < count && maxtries > 0) {    maxtries--;    merkle_hash key (0);    if (rand)      key.randomize ();    vec<chordID> keys = tr->get_keyrange (static_cast<bigint> (key), idmax, count);    while (keys.size () && (nremoved < count)) {      nremoved++;      tr->remove (keys.back ());      keys.pop_back ();    }  }  warnx << "Removed " << nremoved << " keys\n";}voidcheck_invariants (){  warn << "Checking server invariants... ";  SERVER.tree->check_invariants ();  warn << "OK\n";  warn << "Checking syncer invariants... ";  SYNCER.tree->check_invariants ();  warn << "OK\n";}void check_equal_roots (){  warn << "Checking that roots are equal... ";  merkle_node *serv_root = SERVER.tree->get_root ();  merkle_node *sync_root = SYNCER.tree->get_root ();  if (serv_root->hash != sync_root->hash) {    warn << "SERVER.tree->root " << serv_root->hash << " cnt " << serv_root->count << "\n";    warn << "SYNCER.tree->root " << sync_root->hash << " cnt " << sync_root->count << "\n";    fatal << "NOT OK!\n";  }  SERVER.tree->lookup_release (serv_root);  SYNCER.tree->lookup_release (sync_root);  warn << "OK\n";}// }}} voidrunsync (chordID rngmin, chordID rngmax, bool perturb = false){  nkeyspushed = 0;  nkeyspulled = 0;  SYNCER.syncer->sync (rngmin, rngmax);  while (!SYNCER.syncer->done ()) {    if (perturb) {      long int bits = random ();      ptr<merkle_tree> t = ((bits>>1)&0x1) ? SYNCER.tree : SERVER.tree;      if (bits & 0x1)	addrand (t, 64);      else	removesome (t, 64);    }    acheck ();    while (keys_for_server.size ()) {      chordID k = keys_for_server.pop_front ();      merkle_hash key (k);      SERVER.tree->insert (key);      nkeyspushed++;    }    while (keys_for_syncer.size ()) {      chordID k = keys_for_syncer.pop_front ();      merkle_hash key (k);      SYNCER.tree->insert (key);      nkeyspulled++;    }  }}intmain (int argc, char *argv[]){  if (argc > 2)    modlogger::setmaxprio (modlogger::TRACE);  if (argc == 2)    mode = argv[1];  // Make sure no old state remains on disk.  finish ();  bigint idzero = 0;  bigint idmax  = (bigint (1) << 160)  - 1;  // Empty A, Empty B, Complete range  // ==> A should equal B.  // ==> Resync should move no keys.  setup ();  runsync (idzero, idmax);  check_invariants ();  assert (nkeyspushed == 0);  assert (nkeyspulled == 0);  check_equal_roots ();  runsync (idzero, idmax);  check_invariants ();  assert (nkeyspushed == 0);  assert (nkeyspulled == 0);  dump_stats ();  finish ();   // Empty A, Non-empty B, Complete range  // ==> A should equal B.  // ==> Resync should move no keys.  setup ();  addrand (SERVER.tree, 256);  runsync (idzero, idmax);  check_invariants ();  check_equal_roots ();  runsync (idzero, idmax);  check_invariants ();  assert (nkeyspushed == 0);  assert (nkeyspulled == 0);  dump_stats ();  finish ();  for (size_t c = 0; c < 10; c++) {    setup ();    addrand (SERVER.tree, 512);    vec<chordID> allkeys = SERVER.tree->get_keyrange (0, idmax, 512);    chordID a = make_randomID ();    chordID b = make_randomID ();    warnx << "sync " << a << " " << b << "\n";    bhash<chordID, hashID> filtered;    filtered.clear ();    for (size_t i = 0; i < allkeys.size (); i++) {      if (betweenbothincl (a, b, allkeys[i]))	filtered.insert (allkeys[i]);    }    runsync (a, b);    assert (nkeyspulled == filtered.size ());    assert (nkeyspushed == 0);    warnx << "Expecting " << filtered.size () << " keys\n";    vec<chordID> y = SYNCER.tree->get_keyrange (0, idmax, 512);    bool bad = false;    for (size_t i = 0; i < y.size (); i++) {      if (!filtered[y[i]]) {	warnx << "Unexpected key: " << y[i] << "\n";	bad = true;      }      filtered.remove (y[i]);    }    if (filtered.size ()) {      warnx << "Missing " << filtered.size () << " keys\n";      warnx << "(syncer pulled " << nkeyspulled << ")\n";      bad = true;    }    assert (!bad);    finish ();  }  for (size_t c = 0; c < 10; c++) {    setup ();    addrand (SERVER.tree, 512);    addrand (SYNCER.tree, 512);    chordID a = make_randomID ();    chordID b = make_randomID ();    warnx << "sync " << a << " " << b << "\n";    bhash<chordID, hashID> filtered;    filtered.clear ();    vec<chordID> allkeys = SERVER.tree->get_keyrange (0, idmax, 512);    for (size_t i = 0; i < allkeys.size (); i++) {      if (betweenbothincl (a, b, allkeys[i]))	filtered.insert (allkeys[i]);    }    unsigned int expected = filtered.size ();    allkeys = SYNCER.tree->get_keyrange (0, idmax, 512);    for (size_t i = 0; i < allkeys.size (); i++) {      if (betweenbothincl (a, b, allkeys[i]))	filtered.insert (allkeys[i]);    }    runsync (a, b);    warnx << "Expecting " << expected << " keys\n";    assert (nkeyspulled == expected);    vec<chordID> y = SYNCER.tree->get_keyrange (0, idmax, 1024);    bool bad = false;    for (size_t i = 0; i < y.size (); i++) {      if (!betweenbothincl (a, b, y[i]))	continue;      if (!filtered[y[i]]) {	warnx << "Unexpected key: " << y[i] << "\n";	bad = true;	continue;      }      filtered.remove (y[i]);    }    if (filtered.size ()) {      warnx << "Missing " << filtered.size () << " keys\n";      warnx << "(syncer pulled " << nkeyspulled << ")\n";      qhash_slot<chordID, void> *slot = filtered.first ();      while (slot) {	warnx << "  " << slot->key << "\n";	slot = filtered.next (slot);      }            bad = true;    }    assert (!bad);    finish ();  }  // Non-empty A, Empty B, Complete range  // ==> A should equal B.  // ==> Resync should move no keys.  setup ();  addrand (SYNCER.tree, 1024);  runsync (idzero, idmax);  check_invariants ();  check_equal_roots ();  runsync (idzero, idmax);  check_invariants ();  assert (nkeyspushed == 0);  assert (nkeyspulled == 0);  dump_stats ();  finish ();  // Non-empty A, Non-empty B, Complete range  // ==> A should equal B.  // ==> Resync should move no keys.  setup ();  addrand (SYNCER.tree, 4097);  addrand (SERVER.tree, 4097);  runsync (idzero, idmax);  check_invariants ();  check_equal_roots ();  runsync (idzero, idmax);  check_invariants ();  assert (nkeyspushed == 0);  assert (nkeyspulled == 0);  // Now test resynchronization after writes.  addrand (SERVER.tree, 257);  runsync (idzero, idmax);  check_equal_roots ();  assert (nkeyspulled == 257);  assert (nkeyspushed == 0);  addrand (SYNCER.tree, 257);  runsync (idzero, idmax);  check_equal_roots ();  assert (nkeyspulled == 0);  assert (nkeyspushed == 257);  addrand (SERVER.tree, 3);  addrand (SYNCER.tree, 5);  runsync (idzero, idmax);  check_equal_roots ();  assert (nkeyspulled == 3);  assert (nkeyspushed == 5);  dump_stats ();  finish ();  setup ();  addrand (SYNCER.tree, 4097);  addrand (SERVER.tree, 4097);  runsync (idzero, idmax, true);  runsync (idzero, idmax, false);  check_invariants ();  check_equal_roots ();  finish ();  // XXX Should we test various degrees of commonality in A/B?  //  // Same as above, but for a partial range.  // The results are that A will not equal B, but resync won't  // exchange any keys.}/* vim:set foldmethod=marker: */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -