⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dhblock_replicated_srv.c

📁 chord 源码 http://pdos.csail.mit.edu/chord/
💻 C
字号:
#include <chord.h>#include <dhash.h>#include <dhash_common.h>#include "dhashcli.h"#include <configurator.h>#include <location.h>#include "dhash_store.h"#include "dhblock_replicated.h"#include "dhblock_replicated_srv.h"#include <merkle_tree.h>#include <merkle_server.h>#include <merkle_misc.h>#include <modlogger.h>#define warning modlogger ("dhblock_replicated", modlogger::WARNING)#define info    modlogger ("dhblock_replicated", modlogger::INFO)#define trace   modlogger ("dhblock_replicated", modlogger::TRACE)#ifdef DMALLOC#include <dmalloc.h>#endif// key to store data under in the DB. Low 32-bits are zerochordIDdhblock_replicated_srv::id_to_dbkey (chordID key){  //get a key with the low bits cleard out  key = key >> 32;  key = key << 32;  assert (key > 0);  //assert that there were some high bits left  return key;}chordIDdhblock_replicated_srv::idaux_to_mkey (chordID key, u_int32_t hash){  chordID mkey = id_to_dbkey (key) | hash;  return mkey;}dhblock_replicated_srv::dhblock_replicated_srv (ptr<vnode> node,						ptr<dhashcli> cli,						str desc,						str dbname,						str dbext,						dhash_ctype ctype,						cbv donecb) :  dhblock_srv (node, cli, desc, dbname, dbext, true, donecb),  ctype (ctype),  mtree (NULL),  msrv (NULL){  mtree = New merkle_tree ();  mtree->set_rehash_on_modification (false);  db->getkeys (bigint(0), true, wrap (this, &dhblock_replicated_srv::populate_mtree));}voiddhblock_replicated_srv::populate_mtree (adb_status stat, vec<chordID> keys, vec<u_int32_t> aux){  if (stat != ADB_COMPLETE && stat != ADB_OK) {    warn << "dhblock_replicated_srv::populate_mtree: unexpected adb status "          << stat << "\n";    return;  }  // aux contains pre-computed hashes of the low-order  // bytes of the key.  for (unsigned int i = 0; i < keys.size (); i++) {    mtree->insert (keys[i], aux[i]);  }  if (stat != ADB_COMPLETE) { // more keys    db->getkeys (incID (keys.back ()), true,		 wrap (this, &dhblock_replicated_srv::populate_mtree));  } else {    mtree->hash_tree ();    mtree->set_rehash_on_modification (true);    msrv = New merkle_server (mtree);    (*donecb)();  }}voiddhblock_replicated_srv::fetch (chordID k, cb_fetch cb){  chordID db_key = id_to_dbkey (k);  return db->fetch (db_key, cb);}voiddhblock_replicated_srv::store (chordID key, str new_data, cb_dhstat cb){  chordID dbKey = id_to_dbkey (key);  // Serialize writers to this key in the order that they arrive here.  // Let subclasses deal with the actual logic of updating the databases.  if( _paused_stores[dbKey] != NULL ) {    vec<cbv> *waiters = *(_paused_stores[dbKey]);    waiters->push_back (wrap (db, &adb::fetch, dbKey,			      wrap (this, 				    &dhblock_replicated_srv::store_after_fetch_cb, 				    new_data, cb)));    return;  } else {    // We must clear this out before calling cb on all possible paths    vec<cbv> *waiters = New vec<cbv> ();    _paused_stores.insert (dbKey, waiters);    db->fetch (dbKey, 	       wrap (this, &dhblock_replicated_srv::store_after_fetch_cb, 		     new_data, cb));  }}voiddhblock_replicated_srv::store_after_fetch_cb (str new_data,    cb_dhstat cb, adb_status err, chordID dbkey, str old_data) {  if (err == ADB_ERR) {    finish_store (dbkey);    cb (DHASH_DBERR);    return;  }  if (err == ADB_NOTFOUND)     old_data = "";  // Hand off the real work to my subclass.  real_store (dbkey, old_data, new_data,      wrap (this, &dhblock_replicated_srv::store_after_rstore_cb, dbkey, cb));}voiddhblock_replicated_srv::store_after_rstore_cb (chordID dbkey,    cb_dhstat cb, dhash_stat stat){  finish_store (dbkey);  cb (stat);}voiddhblock_replicated_srv::finish_store (chordID key){  assert (_paused_stores[key] != NULL);  vec<cbv> *waiters = *(_paused_stores[key]);  if (waiters->size () > 0) {    cbv cb = waiters->pop_front ();    (*cb) ();  } else {    _paused_stores.remove (key);    delete waiters;  }}voiddhblock_replicated_srv::generate_repair_jobs (){  // iterate over all known blocks in bsm, if we aren't in the list,  // must fetch.  then, for the ones where we are in the list, compare  // with everyone else and update them.  db->getblockrange (node->my_pred ()->id (), node->my_location ()->id (),		     -1, REPAIR_QUEUE_MAX - repair_qlength (),		     wrap (this, &dhblock_replicated_srv::localqueue));}voiddhblock_replicated_srv::localqueue (clnt_stat err, adb_status stat, vec<block_info> blocks){  if (err) {    return;  } else if (stat == ADB_ERR) {    return;  }  trace << "rep-localqueue: repairing " << blocks.size() << " blocks\n";  vec<ptr<location> > succs = node->succs ();  ptr<location> me = node->my_location ();  qhash<chordID, u_int32_t, hashID> holders;  for (size_t i = 0; i < blocks.size (); i++) {    holders.clear ();    for (size_t j = 0; j < blocks[i].on.size (); j++) {      holders.insert (blocks[i].on[j].x, blocks[i].aux[j]);    }    ptr<repair_job> job = NULL;    blockID key (blocks[i].k, ctype);    u_int32_t *myaux = holders[me->id ()];    if (!myaux) {      real_repair (key, me, NULL, NULL, NULL);    } else {      // num_replica - 1 since me is handled separately      for (size_t j = 0; 	   j < dhblock_replicated::num_replica () - 1 && j < succs.size ();	   j++)       {	u_int32_t *theiraux = holders[succs[j]->id ()];	real_repair (key, me, myaux, succs[j], theiraux);      }    }  }  if (repair_qlength () < REPAIR_QUEUE_MAX &&      stat != ADB_COMPLETE)  {    // Expect blocks to be sorted (since DB_DUPSORT is set)    chordID nstart = incID( blocks.back ().k );    db->getblockrange (nstart, node->my_location ()->id (),	-1, REPAIR_QUEUE_MAX - repair_qlength (),	wrap (this, &dhblock_replicated_srv::localqueue));  }}//// Repair Logic Implementation////rjrep::rjrep (blockID key, ptr<location> w,     ptr<dhblock_replicated_srv> bsrv) :  repair_job (key, w),  bsrv (bsrv){}voidrjrep::execute (){  if (where == bsrv->node->my_location ()) {    bsrv->cli->retrieve (key,         wrap (mkref (this), &rjrep::repair_retrieve_cb));  } else {    bsrv->cli->sendblock (where, key, bsrv,        wrap (mkref (this), &rjrep::repair_send_cb));  }}voidrjrep::repair_send_cb (dhash_stat err, bool something){  if (err)     warn << "rjrep (" << key << "): error sending block " << err << "\n";}static voidstorecb (blockID key, dhash_stat err){  if (err) warn << "rjrep (" << key << ") err: " << err << "\n";}voidrjrep::repair_retrieve_cb (dhash_stat err, ptr<dhash_block> b, route r){  if (err) {    warn << "rjrep (" << key<< "): retrieve during repair failed: " << err << "\n";    return;  }    bsrv->store (key.ID, b->data, wrap (storecb, key));}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -