⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 syncer.c

📁 chord 源码 http://pdos.csail.mit.edu/chord/
💻 C
字号:
#include <arpc.h>#include <../devel/rpclib.h>#include <comm.h>#include <dhash_prot.h>#include <locationtable.h>#include <location.h>#include <merkle_tree.h>#include <merkle_syncer.h>#include <syncer.h>static int sync_trace (getenv ("SYNC_TRACE") ? atoi (getenv ("SYNC_TRACE")) : 0);syncer::syncer (ptr<locationtable> locations,		ptr<location> h,		str dbpath,		str dbname,		dhash_ctype ctype,		u_int dfrags, u_int efrags)  : locations (locations), ctype (ctype), dfrags (dfrags), efrags (efrags),    tmptree (NULL), host_loc (h),    db (New refcounted<adb> (dbpath, dbname)),    cur_succ (0),    replica_timer (300){     warn << "new syncer: \n"        << "   dbpath: " << dbpath << "\n"       << "    dbext: " << dbname << "\n"       << "    ctype: " << ctype << "\n"       << " d/efrags: " << dfrags << "/" << efrags << "\n";  locations->insert (h);  locations->pin (h->id ());    if (sync_trace >= 10)    replica_timer = sync_trace;    // Initially randomize a little.  int delay = random_getword () % replica_timer;  delaycb (delay, wrap(this, &syncer::sync_replicas)); }syncer::~syncer (){  delete tmptree;  tmptree = NULL;  replica_syncer = NULL;  db = NULL;}voidsyncer::doRPC (const rpc_program &prog,		int procno, const void *in, void *out, aclnt_cb cb){  chord_node dst;  host_loc->fill_node (dst);  ::doRPC (dst, prog, procno, in, out, cb);}voidsyncer::update_pred (cb_location cb){  ptr<chordID> id = New refcounted<chordID> (host_loc->id ());  chord_noderes *res = New chord_noderes ();  doRPC (chord_program_1, CHORDPROC_GETPREDECESSOR,	 id, res,	 wrap (this, &syncer::update_pred_cb, cb, res) );}voidsyncer::update_pred_cb (cb_location cb, chord_noderes *res, clnt_stat err){  if (err) {    warn << "my local node is down?\n";    (*cb) (NULL);  } else {    chord_node n = make_chord_node (*res->resok);    ptr<location> x = locations->lookup_or_create (n);    locations->insert (x);    cb (x);  }  delete res;}voidsyncer::get_succlist (cb_locationlist cb){  ptr<chordID> ga = New refcounted<chordID> (host_loc->id ());  chord_nodelistres *lst = New chord_nodelistres ();  doRPC (chord_program_1,	 CHORDPROC_GETSUCCLIST, 	 ga, lst, wrap (this, &syncer::get_succlist_cb, lst, cb));}voidsyncer::get_succlist_cb (chord_nodelistres *res,		 cb_locationlist cb,		 clnt_stat status){  vec<ptr<location> > ret;  if (!status) {    size_t sz = res->resok->nlist.size ();    for (size_t i = 0; i < sz; i++) {      chord_node n = make_chord_node (res->resok->nlist[i]);      ptr<location> s = locations->lookup_or_create (n);      locations->insert (s);      ret.push_back (s);    }  }  cb (ret);  delete res;}voidsyncer::sync_replicas (){  if (replica_syncer && !replica_syncer->done ()) {    // still working on the last sync    delaycb (replica_timer, wrap(this, &syncer::sync_replicas));   } else {    warn << "sync_replicas: starting (ctype = " << ctype << ")\n";    update_pred (wrap (this, &syncer::sync_replicas_predupdated));   } }voidsyncer::sync_replicas_predupdated (ptr<location> pred){  if (!pred) {    delaycb (replica_timer, wrap (this, &syncer::sync_replicas));     return;  }  warn << "sync_replicas: my pred is " << pred << "\n";  get_succlist (wrap (this, &syncer::sync_replicas_gotsucclist, pred));}voidsyncer::sync_replicas_gotsucclist (ptr<location> pred,			   vec<ptr<location> > succs) {  if (succs.size () < 2) {    delaycb (replica_timer, wrap (this, &syncer::sync_replicas));     return;  }      // succs[0] is the vnode we are working for  // pred = locations->closestpredloc (succs[0]);  assert (pred);  assert (succs[0]);  assert (host_loc);   cur_succ++; // start at 1 (0 is me)  if (efrags > 0 && cur_succ >= efrags) cur_succ = 1;  else if (cur_succ >= succs.size ()) cur_succ = 1;  assert(succs[cur_succ]);  //sync with the next node  if (tmptree) {    delete tmptree;  }  u_int64_t start = getusec ();  tmptree = New merkle_tree ();  tmptree->set_rehash_on_modification (false);  db->getkeyson (succs[cur_succ], pred->id (), succs[0]->id (),      wrap (this, &syncer::populate_tree, start, pred, succs));}voidsyncer::populate_tree (u_int64_t start,    ptr<location> pred, vec<ptr<location> > succs,    adb_status astat, vec<chordID> blocks, vec<u_int32_t> aux){  if (astat != ADB_OK && astat != ADB_COMPLETE) {    warn << "syncer adbd error: " << astat << "\n";    delete tmptree; tmptree = NULL;    delaycb (replica_timer, wrap (this, &syncer::sync_replicas));     return;  }  // XXX ugh  switch (ctype) {  case DHASH_CONTENTHASH:    for (size_t i = 0; i < blocks.size (); i++) {      tmptree->insert (blocks[i]);    }    break;  case DHASH_KEYHASH:  case DHASH_NOAUTH:    for (size_t i = 0; i < blocks.size (); i++) {      tmptree->insert (blocks[i], aux[i]);    }    break;  default:    fatal << "syncer::populate_tree: unexpected ctype " << ctype << "\n";    break;  }  if (astat != ADB_COMPLETE) {    // Get more, picking up from where we left off    const chordID last (blocks.back ());    db->getkeyson (succs[cur_succ], incID(last), succs[0]->id (),	wrap (this, &syncer::populate_tree, start, pred, succs));    return;  }  // move on to tree done  tmptree->hash_tree ();  tmptree->set_rehash_on_modification (true);  warn << host_loc->id () << " tree build: "        << getusec () - start << " usecs\n";  replica_syncer = New refcounted<merkle_syncer>     (ctype, tmptree,      wrap (this, &syncer::doRPC_unbundler, succs[cur_succ]),     wrap (this, &syncer::missing, succs[cur_succ]));    bigint rngmin = pred->id ();  bigint rngmax = succs[0]->id ();  warn << host_loc->id () << " syncing with " << succs[cur_succ]        << " (succ #" << cur_succ << ")"       << " for range [ " << rngmin << ", " << rngmax << " ]\n";    replica_syncer->sync (rngmin, rngmax);    delaycb (replica_timer, wrap (this, &syncer::sync_replicas)); }voidsyncer::doRPC_unbundler (ptr<location> dst, RPC_delay_args *args){  chord_node n;  dst->fill_node (n);  ::doRPC (n, args->prog, args->procno, args->in, args->out, args->cb);}voidsyncer::missing (ptr<location> from,		 bigint key, bool missing_local,		 bool round_over){  if (round_over) return;  // if he tells us that we're missing it, then he has it.  // otherwise, we found out he doesn't have it.  // XXX this switch business is kinda gross.  switch (ctype) {  case DHASH_CONTENTHASH:    db->update (key, from, missing_local, true);    break;  case DHASH_KEYHASH:  case DHASH_NOAUTH:    {      chordID aux = (key & 0xFFFFFFFF);      chordID dbkey = (key >> 32) << 32;      db->update (dbkey, from, aux.getui (), missing_local, true);    }    break;  default:    fatal << "syncer::missing: unexpected ctype " << ctype << "\n";    break;  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -