⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 test_merkle_syncer.c

📁 chord 源码 http://pdos.csail.mit.edu/chord/
💻 C
字号:
#include <chord.h>#include <id_utils.h>#include "merkle.h"#include <transport_prot.h>#include <comm.h>struct {  ptr<dbfe> db;  ptr<merkle_tree> tree;  ptr<merkle_server> server;  ptr<asrv> srv;} SERVER;struct {  ptr<dbfe> db;  ptr<merkle_tree> tree;  ptr<merkle_syncer> syncer;  ptr<aclnt> clnt;} SYNCER;#define SERVER_DBNAME "db1"#define SYNCER_DBNAME "db2"ptr<dbrec> FAKE_DATA = New refcounted<dbrec> ("FAKE", strlen ("FAKE"));// XXX: PUT THIS FUNCTION IN THE MERKLE DIRECTORYstatic ptr<dbfe>create_database (char *dbname){  ptr<dbfe> db = New refcounted<dbfe> ();  //set up the options we want  dbOptions opts;  opts.addOption("opt_async", 1);  opts.addOption("opt_cachesize", 1000);  opts.addOption("opt_nodesize", 4096);  // XXX ugh?  char cmd[80];  sprintf (cmd, "rm -r %s", dbname);  system (cmd);  if (int err = db->opendb(dbname, opts)) {    warn << "open returned: " << strerror(err) << err << "\n";    exit (-1);  }  return db;}vec<chordID> keys_for_server;vec<chordID> keys_for_syncer;static voidsendblock (bigint blockID, bool missingLocal){  if (missingLocal)    keys_for_syncer.push_back (blockID);  else    keys_for_server.push_back (blockID);}static voiddoRPCcb (xdrproc_t proc, dorpc_res *res, aclnt_cb cb, void *out, clnt_stat err){  xdrmem x ((char *)res->resok->results.base (), 	    res->resok->results.size (), XDR_DECODE);  if (err) {    warnx << "doRPC: err = " << err << "\n";  } else if (!proc (x.xdrp (), out)) {    warnx << "failed to unmarshall result\n";    cb (RPC_CANTSEND);    return;  }  cb (err);  delete res;}// called by syncer to perform merkle RPC to serverstatic voiddoRPC (RPC_delay_args *a){  //form the transport RPC  ptr<dorpc_arg> arg = New refcounted<dorpc_arg> ();  // Other fields don't matter.  arg->progno = a->prog.progno;  arg->procno = a->procno;  xdrproc_t inproc = a->prog.tbl[a->procno].xdr_arg;  xdrproc_t outproc = a->prog.tbl[a->procno].xdr_res;  assert (outproc);  xdrsuio x (XDR_ENCODE);  if ((!inproc) || (!inproc (x.xdrp (), (void *)a->in))) {    fatal << "failed to marshall args\n";  }   int args_len = x.uio ()->resid ();  arg->args.setsize (args_len);  void *marshalled_args = suio_flatten (x.uio ());  memcpy (arg->args.base (), marshalled_args, args_len);  free (marshalled_args);  dorpc_res *res = New dorpc_res (DORPC_OK);  SYNCER.clnt->call (TRANSPORTPROC_DORPC, arg, res,                     wrap (&doRPCcb, outproc, res, a->cb, a->out));}vec<const rpc_program *> handledProgs;vec<cbdispatch_t> handlers;static void transport_dispatch (svccb *sbp){  if (!sbp) {    warnx << "transport server eof\n";    return;  }  dorpc_arg *arg = sbp->template getarg<dorpc_arg> ();  chord_node_wire nw;  bzero (&nw, sizeof (chord_node_wire));  const rpc_program *prog (NULL);  unsigned int i (0);  for (unsigned int i = 0; i < handledProgs.size (); i++)    if (arg->progno == (int)handledProgs[i]->progno) {      prog = handledProgs[i];      break;    }  char *arg_base = (char *)(arg->args.base ());  int arg_len = arg->args.size ();    xdrmem x (arg_base, arg_len, XDR_DECODE);  xdrproc_t proc = prog->tbl[arg->procno].xdr_arg;  assert (proc);    void *unmarshalled_args = prog->tbl[arg->procno].alloc_arg ();  if (!proc (x.xdrp (), unmarshalled_args)) {    warn << "dispatch: error unmarshalling arguments: "	 << arg->progno << "." << arg->procno << "\n";    xdr_delete (prog->tbl[arg->procno].xdr_arg, unmarshalled_args);    sbp->replyref (rpcstat (DORPC_MARSHALLERR));    return;  }  user_args *ua = New user_args (sbp, unmarshalled_args,				 prog, arg->procno, 0);  ua->me_ = New refcounted<location> (make_chord_node (nw));  handlers[i] (ua);}// called by server to register handler fielding merkle RPCsstatic voidaddHandler (const rpc_program &prog, cbdispatch_t cb){  handledProgs.push_back (&prog);  handlers.push_back (cb);}voidfinish (){  // Force all destructors to be called, hopefully.  handledProgs.clear ();  handlers.clear ();  SYNCER.syncer = NULL;  SERVER.server = NULL;  SYNCER.clnt = NULL;  SERVER.srv  = NULL;  SYNCER.tree = NULL;  SERVER.tree = NULL;  SYNCER.db   = NULL;  SERVER.db   = NULL;}voidsetup (){  warn << " => setup() +++++++++++++++++++++++++++\n";  err_flush ();  SERVER.db = create_database (SERVER_DBNAME);  SYNCER.db = create_database (SYNCER_DBNAME);  SERVER.tree = New refcounted<merkle_tree> (SERVER.db);  SYNCER.tree = New refcounted<merkle_tree> (SYNCER.db);  // these are closed by axprt_stream's dtor, right?   int fds[2];  assert (socketpair (AF_UNIX, SOCK_STREAM, 0, fds) == 0);  warn << "sockets: " << fds[0] << ":" << fds[1] << "\n";  SERVER.srv = asrv::alloc (axprt_stream::alloc (fds[0]), transport_program_1);  SYNCER.clnt = aclnt::alloc (axprt_stream::alloc (fds[1]), transport_program_1);  assert (SERVER.srv && SYNCER.clnt);  SERVER.srv->setcb (wrap (&transport_dispatch));  SYNCER.syncer = New refcounted<merkle_syncer> (SYNCER.tree, 						 wrap (doRPC),						 wrap (sendblock));  SERVER.server = New refcounted<merkle_server> (SERVER.tree, 						 wrap (addHandler));  warn << " <= setup() DONE!\n";  err_flush ();}voidaddrand (ptr<merkle_tree> tr, int count){  for (int i = 0; i < count; i++) {    err_flush ();    if (i == 40000)      exit (0);    merkle_hash key;    key.randomize ();    block *b = New block (key, FAKE_DATA);    assert (!database_lookup (SERVER.db, b->key));    assert (!database_lookup (SYNCER.db, b->key));    tr->insert (b);    if ((i % 1000) == 0) {      warn << "a) inserted " << i << " blocks..of " << count << "\n";      err_flush ();    }  }}voidaddrand (ptr<merkle_tree> tr1, ptr<merkle_tree> tr2, int count){  for (int i = 0; i < count; i++) {    warn << "i=" << i << "\n";    err_flush ();    if (i == 40000)      exit (0);    merkle_hash key;    key.randomize();    block *b = New block (key, FAKE_DATA);    assert (!database_lookup (SERVER.db, key));    assert (!database_lookup (SYNCER.db, key));    tr1->insert (b);    tr2->insert (New block (key, FAKE_DATA));    if ((i % 1000) == 0) {      warn << "b) inserted " << i << " blocks..of " << count << "\n";      err_flush ();    }  }}voidaddinc (ptr<merkle_tree> tr, int count){  for (int i = 0; i < count; i++)     tr->insert (New block (merkle_hash(i), FAKE_DATA));}voidaddinc (ptr<merkle_tree> tr1, ptr<merkle_tree> tr2, int count){  for (int i = 0; i < count; i++) {    tr1->insert (New block (merkle_hash(i), FAKE_DATA));    tr2->insert (New block (merkle_hash(i), FAKE_DATA));  }}voiddump_stats (){  warn << "\n\n=======================================================================\n";  warn << "SERVER.tree->root " << SERVER.tree->root.hash << " cnt " << SERVER.tree->root.count << "\n";  warn << "SYNCER.tree->root " << SYNCER.tree->root.hash << " cnt " << SYNCER.tree->root.count << "\n";  warn  << "+++++++++++++++++++++++++SERVER.tree++++++++++++++++++++++++++++++\n";  SERVER.tree->compute_stats ();  warn  << "+++++++++++++++++++++++++SYNCER.tree++++++++++++++++++++++++++++++\n";  SYNCER.tree->compute_stats ();  //warn  << "++++++++++++++++++++++++++SERVER.db+++++++++++++++++++++++++++++++\n";  //SERVER.tree->db->dump_stats ();  //warn  << "++++++++++++++++++++++++++SYNCER.db+++++++++++++++++++++++++++++++\n";  //SYNCER.tree->db->dump_stats ();  //warn  << "++++++++++++++++++++++++client1+++++++++++++++++++++++++++++\n";  //host1->clnt->dump_stats ();  //warn  << "++++++++++++++++++++++++client2+++++++++++++++++++++++++++++\n";  //host2->clnt->dump_stats ();} voidtest (uint progress, uint data_points){  uint64 large_sz = (1 << 30) / (1 << 13);  // 1 GB  large_sz /= 100;  uint64 small_sz = (large_sz * progress) / (data_points - 1);  warn << "\n\n\n\n############################################################\n";  warn << "REPLICA TEST " << large_sz << "/" << small_sz << "\n";  setup ();  if (progress % 2) {    addrand (SERVER.tree, large_sz);    addrand (SYNCER.tree, small_sz);  } else {    addrand (SERVER.tree, small_sz);    addrand (SYNCER.tree, large_sz);  }  warn  << "+++++++++++++++++++++++++server tree++++++++++++++++++++++++++++++\n";  SERVER.tree->compute_stats ();  warn  << "+++++++++++++++++++++++++syncer tree++++++++++++++++++++++++++++++\n";  SYNCER.tree->compute_stats ();  err_flush();  warn << "\n\n ************************* RUNNING TEST ************************\n";  bigint rngmin  = 0;  bigint rngmax = (bigint (1) << 160)  - 1;  SYNCER.syncer->sync (rngmin, rngmax);  while (!SYNCER.syncer->done ()) {    acheck ();    while (keys_for_server.size ()) {      chordID k = keys_for_server.pop_front ();      merkle_hash key = to_merkle_hash(id2dbrec(k));      block b (key, FAKE_DATA);      SERVER.tree->insert (&b);    }    while (keys_for_syncer.size ()) {      chordID k = keys_for_syncer.pop_front ();      merkle_hash key = to_merkle_hash(id2dbrec(k));      block b (key, FAKE_DATA);      SYNCER.tree->insert (&b);    }  }  warn << "\n\n *********************** DONE *****************************\n";  dump_stats ();  finish ();}intmain (){  u_int start_point = 0;  u_int data_points = 100;  for (u_int i = start_point; i < data_points; i++)     test (i, data_points);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -