⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dhblock_chash_srv.c

📁 chord 源码 http://pdos.csail.mit.edu/chord/
💻 C
字号:
#include <chord.h>#include <dhash.h>#include <dhash_common.h>#include <dhashcli.h>#include <libadb.h>#include <dhblock_chash.h>#include <dhblock_chash_srv.h>#include "pmaint.h"#include <merkle.h>#include <merkle_server.h>#include <configurator.h>#include <locationtable.h>#include <ida.h>#include <modlogger.h>#define warning modlogger ("dhblock_chash", modlogger::WARNING)#define info  modlogger ("dhblock_chash", modlogger::INFO)#define trace modlogger ("dhblock_chash", modlogger::TRACE)#ifdef DMALLOC#include <dmalloc.h>#endifstruct rjchash : public repair_job {  rjchash (blockID key, ptr<location> w, ptr<dhblock_chash_srv> bsrv);  const ptr<dhblock_chash_srv> bsrv;  void execute ();  // 1. Check cache db and send frag if present.  // 2. Else retrieve the block  // 3. Cache it and send frag.  // Async callbacks  void cache_check_cb (adb_status stat, chordID key, str d);  void retrieve_cb (dhash_stat err, ptr<dhash_block> b, route r);  void send_frag (str block);  void local_store_cb (dhash_stat stat);  void send_frag_cb (dhash_stat err, bool present);};dhblock_chash_srv::dhblock_chash_srv (ptr<vnode> node,				      ptr<dhashcli> cli,				      str desc,				      str dbname,				      str dbext,				      cbv donecb) :  dhblock_srv (node, cli, desc, dbname, dbext, false, donecb),  cache_db (NULL),  msrv (NULL),  mtree (NULL),  pmaint_obj (NULL){  pmaint_obj = New pmaint (cli, node, mkref (this));   cache_db = New refcounted<adb> (dbname, "ccache");  mtree = New merkle_tree ();  mtree->set_rehash_on_modification (false);  db->getkeys (0, false, wrap (this, &dhblock_chash_srv::populate_mtree));  // Don't create msrv until populate_mtree is done.  // Any merkle RPCs will get a MERKLE_ERR from dhash_impl::merkle_dispatch}voiddhblock_chash_srv::populate_mtree (adb_status stat, vec<chordID> keys, vec<u_int32_t> aux){  // aux can be ignored; not needed for chash  if (stat != ADB_COMPLETE && stat != ADB_OK) {    warn << "dhblock_chash_srv::populate_mtree: unexpected adb status "          << stat << "\n";    return;  }  for (size_t i = 0; i < keys.size (); i++) {    mtree->insert (keys[i]);  }  if (stat != ADB_COMPLETE) {    db->getkeys (incID (keys.back ()), false,		 wrap (this, &dhblock_chash_srv::populate_mtree));  } else {    mtree->hash_tree ();    mtree->set_rehash_on_modification (true);    // Ready to start synchronizing!    msrv = New merkle_server (mtree);    (*donecb)();  }}dhblock_chash_srv::~dhblock_chash_srv (){  stop ();  if (msrv) {    delete msrv;    msrv = NULL;  }  if (mtree) {    delete mtree;    mtree = NULL;  }  if (pmaint_obj) {    delete pmaint_obj;    pmaint_obj = NULL;  }}voiddhblock_chash_srv::start (bool randomize){  dhblock_srv::start (randomize);  // XXX disable pmaint until DHASHPROC_OFFER is fixed.  if (0 && pmaint_obj)    pmaint_obj->start ();}voiddhblock_chash_srv::stop (){  dhblock_srv::stop ();  if (pmaint_obj)    pmaint_obj->stop ();}voiddhblock_chash_srv::store (chordID key, str d, cb_dhstat cb){  char *action;  if (!mtree->key_exists (key)) {    action = "N"; // New    db_store (key, d, cb);    mtree->insert (key);  } else {    action = "R";    cb (DHASH_OK);  }  // Force a BSM update just in case it was confused.  db->update (key, node->my_location (), true);    bigint h = compute_hash (d.cstr (), d.len ());  info << "db write: " << node->my_ID () << " " << action       << " " << key << " " << d.len ()        << " " << h       << "\n";}voiddhblock_chash_srv::generate_repair_jobs (){  u_int32_t frags = dhblock_chash::num_dfrags ();  db->getblockrange (node->my_pred ()->id (), node->my_location ()->id (),		     frags, REPAIR_QUEUE_MAX - repair_qlength (),		     wrap (this, &dhblock_chash_srv::localqueue, frags));  return;}voiddhblock_chash_srv::localqueue (u_int32_t frags,    clnt_stat err, adb_status stat, vec<block_info> blocks){  if (err) {    return;  } else if (stat == ADB_ERR) {    warning << "dhblock_chash_srv::localqueue: adb error, failing.\n";    return;  }  trace << "chash-localqueue (" << node->my_ID() << "): repairing " 	<< blocks.size() << " blocks with " << frags 	<< " frags\n";  if( blocks.size() > 0 ) {    trace << "first block=" << blocks[0].k << "\n";  }  //don't assume we are holding the block  // i.e. -> put ourselves on this of nodes to check for the block  vec<ptr<location> > nmsuccs = node->succs ();  vec<ptr<location> > succs;  succs.push_back (node->my_location ());  for (size_t j = 0; j < nmsuccs.size (); j++)    succs.push_back (nmsuccs[j]);  bhash<chordID, hashID> holders;  for (size_t i = 0; i < blocks.size (); i++) {        // Should always be true, but fails occasionally for me. maybe the db    // was non transactionally correcting? -- strib, 2/8/06    //assert (blocks[i].on.size () == frags);    holders.clear ();    for (size_t j = 0; j < blocks[i].on.size (); j++)      holders.insert (blocks[i].on[j].x);    blockID key (blocks[i].k, DHASH_CONTENTHASH);    ptr<location> w = NULL;    u_int32_t reps = 0;    for (size_t j = 0; j < succs.size (); j++) {      w = succs[j];      if (!holders[w->id ()] && reps < dhblock_chash::num_efrags () - frags) {	ptr<repair_job> job = New refcounted<rjchash> (key, w, mkref (this));	repair_add (job);	reps++;	break;      }    }  }  if (repair_qlength () < REPAIR_QUEUE_MAX) {    // Expect blocks to be sorted (since DB_DUPSORT is set)    chordID nstart;    if (stat == ADB_COMPLETE) {      frags++;      nstart = node->my_pred ()->id ();    } else {      nstart = incID( blocks.back ().k );    }    if (frags < dhblock_chash::num_efrags ())      db->getblockrange (nstart, node->my_location ()->id (),	  frags, REPAIR_QUEUE_MAX - repair_qlength (),	  wrap (this, &dhblock_chash_srv::localqueue, frags));  }}// XXX Ugh// This code is called in response to an RPC.  Unfortunately, now// we need to go out to disk to figure out the best way to respond.voiddhblock_chash_srv::offer (user_args *sbp, dhash_offer_arg *arg){  dhash_offer_res res (DHASH_OK);  res.resok->accepted.setsize (arg->keys.size ());  res.resok->dest.setsize (arg->keys.size ());  //XXX copied code  vec<ptr<location> > nmsuccs = node->succs ();  //don't assume we are holding the block  // i.e. -> put ourselves on this of nodes to check for the block  vec<ptr<location> > succs;  succs.push_back (node->my_location ());  for (unsigned int j = 0; j < nmsuccs.size (); j++)    succs.push_back(nmsuccs[j]);  //XXX end copied#if 0  for (u_int i = 0; i < arg->keys.size (); i++) {    chordID key = arg->keys[i];    u_int count = bsm->pcount (key, succs);    chordID pred = node->my_pred ()->id ();    bool mine = between (pred, node->my_ID (), key);    // belongs to me and isn't replicated well    if (mine && count < dhblock_chash::num_efrags ()) {      res.resok->accepted[i] = DHASH_SENDTO;      ptr<location> l = bsm->best_missing (key, node->succs ());      trace << "server: sending " << key << ": count=" << count 	    << " to=" << l->id () << "\n";      l->fill_node (res.resok->dest[i]);      bsm->unmissing (l, key);    } else {      trace << "server: holding " << key << ": count=" << count << "\n";      res.resok->accepted[i] = DHASH_HOLD;    }   }#endif /* 0 */  sbp->reply (&res);}voiddhblock_chash_srv::stats (vec<dstat> &s){  s.push_back (dstat ("frags on disk", mtree->root.count));  return;}//// Repair Logic Implementation//rjchash::rjchash (blockID key, ptr<location> w,		  ptr<dhblock_chash_srv> bsrv) :    repair_job (key, w),    bsrv (bsrv){}voidrjchash::execute (){  bsrv->cache_db->fetch (key.ID,      wrap (mkref (this), &rjchash::cache_check_cb));}voidrjchash::cache_check_cb (adb_status stat, chordID k, str d){  if (stat == ADB_OK) {    assert (key.ID == k);    send_frag (d);  } else {    bsrv->cli->retrieve (key,	wrap (mkref (this), &rjchash::retrieve_cb));  }}static void cache_store_cb (adb_status stat);voidrjchash::retrieve_cb (dhash_stat err, ptr<dhash_block> b, route r){  if (err) {    trace << "retrieve missing block " << key << " failed: " << err << "\n";    // We'll probably try again later.    // XXX maybe make sure we don't try too "often"?  } else {    assert (b);    bsrv->cache_db->store (key.ID, b->data, wrap (cache_store_cb));    send_frag (b->data);  }}static voidcache_store_cb (adb_status stat){  if (stat != ADB_OK)     warn << "store error caching block: " << stat << "\n";}voidrjchash::send_frag (str block){  u_long m = Ida::optimal_dfrag (block.len (), dhblock::dhash_mtu ());  if (m > dhblock_chash::num_dfrags ())    m = dhblock_chash::num_dfrags ();  str frag = Ida::gen_frag (m, block);  //if the block will be sent to us, just store it instead of sending  // an RPC to ourselves. This will happen a lot, so the optmization  // is probably worth it.  if (where == bsrv->node->my_location ()) {    bsrv->store (key.ID, frag,	wrap (mkref (this), &rjchash::local_store_cb));  } else {    bsrv->cli->sendblock (where, key, frag, 	wrap (mkref (this), &rjchash::send_frag_cb));  }}voidrjchash::local_store_cb (dhash_stat stat){  if (stat != DHASH_OK) {    warning << "dhblock_chash_srv database store error: "	    << stat << "\n";  } else {    info << "repair: " << bsrv->node->my_ID ()	 << " sent " << key << " to " << where->id () <<  " (me).\n";  }}voidrjchash::send_frag_cb (dhash_stat err, bool present){  strbuf x;  x << "repair: " << bsrv->node->my_ID ();  if (!err) {    bsrv->db->update (key.ID, where, true);    x << " sent ";  } else {    x << " error sending ";  }  x << key << " to " << where->id ();  if (err)    x << " (" << err << ")";  info << x << ".\n";}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -