⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 adbd.c

📁 chord 源码 http://pdos.csail.mit.edu/chord/
💻 C
📖 第 1 页 / 共 2 页
字号:
#include <async.h>#include <arpc.h>#include <wmstr.h>#include <ihash.h>#include <sha1.h>#include <adb_prot.h>#include <id_utils.h>#include <dbfe.h>// {{{ Globalsstatic bool dbstarted (false);static str dbsock;static int clntfd (-1);static const u_int32_t asrvbufsize (1024*1025);class dbmanager;static dbmanager *dbm;// }}}// {{{ DB key conversionstrid_to_str (const chordID &key){  // pad out all keys to 20 bytes so that they sort correctly  str keystr = strbuf () << key;  strbuf padkeystr;  for (int pad = 2*sha1::hashsize - keystr.len (); pad > 0; pad--)    padkeystr << "0";  padkeystr << keystr;  assert (padkeystr.tosuio ()->resid () == 2*sha1::hashsize);  return padkeystr;}inline voidstr_to_dbt (const str &s, DBT *d){  bzero (d, sizeof (*d));  d->size = s.len ();  d->data = (void *) s.cstr ();}inline chordIDdbt_to_id (const DBT &dbt){  str c (static_cast<char *> (dbt.data), dbt.size);  chordID id;  if (!str2chordID (c, id))    fatal << "Invalid chordID as database key: " << c << "\n";  return id;}// }}}// {{{ DB for Namespace/* For each namespace (e.g. vnode + ctype, and shared cache), * we potentially need to store several things: *   1. chordID -> data.  20 bytes -> potentially large bytes *   2. chordID -> auxdata (e.g. noauth hash). 20 bytes -> 4 bytes-ish *   3. BSM data (chordID -> [vnodeid, ...]. 20 bytes -> 6*16 bytes-ish *                vnodeid -> [chordID, ...]) * with potentially the ability to (3) ordered by len([vnodeid...]). * This object encapsulates all that stuff. */ // {{{ getnumreplicas: Secondary key extractor for bsm datastatic intgetnumreplicas (DB *sdb, const DBT *pkey, const DBT *pdata, DBT *skey){  u_int32_t *numreps = (u_int32_t *) malloc(sizeof(u_int32_t));  adb_vbsinfo_t vbs;  if (!buf2xdr (vbs, pdata->data, pdata->size)) {    hexdump hd (pdata->data, pdata->size);    warn << "getnumreplicas: unable to unmarshal pdata.\n"           << hd << "\n";    return -1;  }  *numreps = htonl (vbs.d.size ());  // this flag should mean that berkeley db will free this memory when it's  // done with it.  skey->flags = DB_DBT_APPMALLOC;  skey->data = numreps;  skey->size = sizeof (*numreps);  return 0;}// }}}// {{{ BerkeleyDB transaction API versioning wrappersstatic inline intdbns_txn_begin (DB_ENV *dbe, DB_TXN **t){  int r;#if DB_VERSION_MAJOR >= 4  r = dbe->txn_begin (dbe, NULL, t, 0);#else  r = txn_begin (dbe, NULL, t, 0);#endif  return r;}static inline intdbns_txn_abort (DB_ENV *dbe, DB_TXN *t){  int r;#if DB_VERSION_MAJOR >= 4  r = t->abort (t);#else  r = txn_abort (t, 0);#endif   return r;}static inline intdbns_txn_commit (DB_ENV *dbe, DB_TXN *t){  int r;#if DB_VERSION_MAJOR >= 4  r = t->commit (t, 0);#else  r = txn_commit (t, 0);#endif   return r;}// }}}class dbns {  friend class dbmanager;  str name;  ihash_entry<dbns> hlink;  DB_ENV *dbe;  DB *datadb;  DB *auxdatadb;  DB *bsdb;  DB *bsdx;  // XXX should have a instance-level lock that governs datadb/auxdatadb.  // Private updater, for different txn handling  int updateone (const chordID &key, const adb_bsinfo_t &bsinfo, bool present,	         DB_TXN *t);public:  dbns (const str &dbpath, const str &name, bool aux);  ~dbns ();  void warner (const char *method, const char *desc, int r);  void sync ();  bool hasaux () { return auxdatadb != NULL; };  // Primary data management  int insert (const chordID &key, const str data, u_int32_t auxdata);  int insert (const chordID &key, DBT &data, DBT &auxdata);  int lookup (const chordID &key, str &data);  int del (const chordID &key);  int getkeys (const chordID &start, size_t count, bool getaux,      rpc_vec<adb_keyaux_t, RPC_INFINITY> &out);   // Block status information  int getblockrange_all (const chordID &start, const chordID &stop,    int count, rpc_vec<adb_bsloc_t, RPC_INFINITY> &out);  int getblockrange_extant (const chordID &start, const chordID &stop,    int extant, int count, rpc_vec<adb_bsloc_t, RPC_INFINITY> &out);  int getkeyson (const adb_vnodeid &n, const chordID &start,		 const chordID &stop, int count, 		 rpc_vec<adb_keyaux_t, RPC_INFINITY> &out);  int update (const chordID &block, const adb_bsinfo_t &bsinfo, bool present);  int updatebatch (rpc_vec<adb_updatearg, RPC_INFINITY> &uargs);  int getinfo (const chordID &block, rpc_vec<adb_vnodeid, RPC_INFINITY> &out);};// {{{ dbns::dbnsdbns::dbns (const str &dbpath, const str &name, bool aux) :  name (name),  dbe (NULL),  datadb (NULL),  auxdatadb (NULL),  bsdb (NULL),  bsdx (NULL){#define DBNS_ERRCHECK(desc) \  if (r) {		  \    fatal << desc << " returned " << r << ": " << db_strerror (r) << "\n"; \    return;		  \  }  assert (dbpath[dbpath.len () - 1] == '/');  strbuf fullpath ("%s%s", dbpath.cstr (), name.cstr ());  int r = -1;  r = dbfe_initialize_dbenv (&dbe, fullpath, false, 1024);  DBNS_ERRCHECK ("dbe->open");  r = dbfe_opendb (dbe, &datadb, "db", DB_CREATE, 0);  DBNS_ERRCHECK ("datadb->open");  if (aux) {    r = dbfe_opendb (dbe, &auxdatadb, "auxdb", DB_CREATE, 0);    DBNS_ERRCHECK ("auxdatadb->open");  }  r = dbfe_opendb (dbe, &bsdb, "bsdb", DB_CREATE, 0);  DBNS_ERRCHECK ("bsdb->open");  r = dbfe_opendb (dbe, &bsdx, "bsdx", DB_CREATE, 0, /* dups = */ true);  DBNS_ERRCHECK ("bsdx->open");  r = bsdb->associate (bsdb, NULL, bsdx, getnumreplicas, DB_AUTO_COMMIT);  DBNS_ERRCHECK ("bsdb->associate (bsdx)");  warn << "dbns::dbns (" << dbpath << ", " << name << ", " << aux << ")\n";}// }}}// {{{ dbns::~dbnsdbns::~dbns (){#define DBNS_DBCLOSE(x)			\  if (x) {				\    (void) x->close (x, 0); x = NULL;	\  }  // Start with main databases  DBNS_DBCLOSE(datadb);  DBNS_DBCLOSE(auxdatadb);  // Close secondary before the primary  DBNS_DBCLOSE(bsdx);  DBNS_DBCLOSE(bsdb);  // Shut down the environment  DBNS_DBCLOSE(dbe);#undef DBNS_DBCLOSE  warn << "dbns::~dbns (" << name << ")\n";}// }}}// {{{ dbns::warnervoiddbns::warner (const char *method, const char *desc, int r){  timespec ts;  strbuf t;  clock_gettime (CLOCK_REALTIME, &ts);  t.fmt ("%d.%06d ", int (ts.tv_sec), int (ts.tv_nsec/1000));  warn << t << ": " << method << ": " << desc << ": " << db_strerror (r) << "\n";}// }}}// {{{ dbns::syncvoiddbns::sync (){#if (DB_VERSION_MAJOR < 4)   txn_checkpoint (dbe, 0, 0, 0);#else   dbe->txn_checkpoint (dbe, 0, 0, 0);#endif}// }}}// {{{ dbns::insert (chordID, DBT, DBT)intdbns::insert (const chordID &key, DBT &data, DBT &auxdata){  int r = 0;  str key_str = id_to_str (key);  DBT skey;  str_to_dbt (key_str, &skey);  if (auxdatadb) {    // To keep auxdata in sync, use an explicit transaction    DB_TXN *t = NULL;    r = dbns_txn_begin (dbe, &t);    r = datadb->put (datadb, t, &skey, &data, 0);    if (r) {      warner ("dbns::insert", "data put error", r);      r = dbns_txn_abort (dbe, t);    }     r = auxdatadb->put (auxdatadb, t, &skey, &auxdata, 0);    if (r) {      warner ("dbns::insert", "auxdata put error", r);      r = dbns_txn_abort (dbe, t);    } else {      r = dbns_txn_commit (dbe, t);    }    if (r)      warner ("dbns::insert", "abort/commit error", r);  } else {    // Use implicit transaction    r = datadb->put (datadb, NULL, &skey, &data, DB_AUTO_COMMIT);    if (r)      warner ("dbns::insert", "put error", r);  }  return r;}// }}}// {{{ dbns::insert (chordID, str, u_int32_t)intdbns::insert (const chordID &key, const str data, u_int32_t auxdata){  DBT datadbt;  str_to_dbt (data, &datadbt);  DBT auxdatadbt;  bzero (&auxdatadbt, sizeof (auxdatadbt));  auxdatadbt.size = sizeof (u_int32_t);  u_int32_t nauxdata = htonl (auxdata);  auxdatadbt.data = &nauxdata;   return insert (key, data, auxdata);}// }}}// {{{ dbns::lookupintdbns::lookup (const chordID &key, str &data){  int r = 0;  str key_str = id_to_str (key);  DBT skey;  str_to_dbt (key_str, &skey);  DBT content;  bzero (&content, sizeof (content));  // Implicit transaction  r = datadb->get (datadb, NULL, &skey, &content, 0);  if (r) {    if (r != DB_NOTFOUND)      warner ("dbns::fetch", "get error", r);    return r;  }  data.setbuf ((const char *) (content.data), content.size);  return 0;}// }}}// {{{ dbns::delintdbns::del (const chordID &key){  int r = 0;  str key_str = id_to_str (key);  DBT skey;  str_to_dbt (key_str, &skey);  // Implicit transaction  r = datadb->del (datadb, NULL, &skey, DB_AUTO_COMMIT);  if (r && r != DB_NOTFOUND)    warner ("dbns::del", "del error", r);  return r;}// }}}// {{{ dbns::getkeysintdbns::getkeys (const chordID &start, size_t count, bool getaux, rpc_vec<adb_keyaux_t, RPC_INFINITY> &out){  int r = 0;  DBC *cursor;  // Fully serialized reads of auxdatadb  if (auxdatadb)     r = auxdatadb->cursor (auxdatadb, NULL, &cursor, 0);  else    r = datadb->cursor (datadb, NULL, &cursor, 0);  if (r) {    warner ("dbns::getkeys", "cursor open", r);    (void) cursor->c_close (cursor);    return r;  }  // Could possibly improve efficiency here by using SleepyCat's bulk reads  str key_str = id_to_str (start);  DBT key;  str_to_dbt (key_str, &key);  DBT data_template;  bzero (&data_template, sizeof (data_template));  // If DB_DBT_PARTIAL and data.dlen == 0, no data is read.  if (!auxdatadb || !getaux)    data_template.flags = DB_DBT_PARTIAL;  DBT data = data_template;  u_int32_t limit = count;  if (count < 0)    limit = (u_int32_t) (asrvbufsize/(1.5*sizeof (adb_keyaux_t)));  // since we set a limit, we know the maximum amount we have to allocate  out.setsize( limit );  u_int32_t elements = 0;  r = cursor->c_get (cursor, &key, &data, DB_SET_RANGE);  while (elements < limit && !r) {    out[elements].key = dbt_to_id (key);    if (getaux)      out[elements].auxdata = ntohl (*(u_int32_t *)data.data);    elements++;    bzero (&key, sizeof (key));    data = data_template;    r = cursor->c_get (cursor, &key, &data, DB_NEXT);  }  if( elements < limit ) {    out.setsize( elements );  }  if (r && r != DB_NOTFOUND)    warner ("dbns::getkeys", "cursor get", r);  (void) cursor->c_close (cursor);  return r;}// }}}// {{{ dbns::getblockrangeintdbns::getblockrange_all (const chordID &start, const chordID &stop,   int count, rpc_vec<adb_bsloc_t, RPC_INFINITY> &out){  int r = 0;  DBC *cursor;  r = bsdb->cursor (bsdb, NULL, &cursor,0);  if (r) {    warner ("dbns::getblockrange_all", "cursor open", r);    (void) cursor->c_close (cursor);    return r;  }  chordID cur = start;  str key_str = id_to_str (start);  DBT key;  str_to_dbt (key_str, &key);  DBT data;  bzero (&data, sizeof (data));  u_int32_t limit = count;  if (count < 0)    limit = (asrvbufsize/(sizeof (adb_bsloc_t)/2));  r = cursor->c_get (cursor, &key, &data, DB_SET_RANGE);  // Loop around the ring if at end.  if (r == DB_NOTFOUND) {    bzero (&key, sizeof (key));    r = cursor->c_get (cursor, &key, &data, DB_FIRST);  }  // since we set a limit, we know the maximum amount we have to allocate  out.setsize( limit );  u_int32_t elements = 0;  while (!r && elements < limit)  {    adb_vbsinfo_t vbs;    chordID k = dbt_to_id (key);    if (!betweenrightincl (cur, stop, k)) {      r = DB_NOTFOUND;      break;    }    cur = k;    if (buf2xdr (vbs, data.data, data.size)) {      out[elements].block = k;      out[elements].hosts.setsize( vbs.d.size() );      // explicit deep copy      for( u_int32_t i = 0; i < vbs.d.size(); i++ ) {	out[elements].hosts[i].n = vbs.d[i].n;	out[elements].hosts[i].auxdata = vbs.d[i].auxdata;      }      elements++;    } else {      warner ("dbns::getblockrange_all", "XDR unmarshalling failed", 0);    }     bzero (&key, sizeof (key));    bzero (&data, sizeof (data));    r = cursor->c_get (cursor, &key, &data, DB_NEXT);    if (r == DB_NOTFOUND)      r = cursor->c_get (cursor, &key, &data, DB_FIRST);  }  if (r && r != DB_NOTFOUND)    warner ("dbns::getblockrange_all", "cursor get", r);  if( elements < limit ) {    out.setsize( elements );  }  (void) cursor->c_close (cursor);  return r;}intdbns::getblockrange_extant (const chordID &start, const chordID &stop,    int extant, int count, rpc_vec<adb_bsloc_t, RPC_INFINITY> &out){  // XXX maybe think of a way to improve the degree of abstraction.  //     for cursors.  int r = 0;  DBC *cursor;  r = bsdx->cursor (bsdx, NULL, &cursor,0);  if (r) {    warner ("dbns::getblockrange_extant", "cursor open", r);    (void) cursor->c_close (cursor);    return r;  }  extant = htonl (extant);  DBT ekey;  bzero (&ekey, sizeof (ekey));  ekey.data = &extant;  ekey.size = sizeof (extant);  str key_str = id_to_str (start);  DBT key;  str_to_dbt (key_str, &key);  chordID cur = start;  DBT data;  bzero (&data, sizeof (data));  u_int32_t limit = count;  if (count < 0)    limit = (asrvbufsize/(sizeof (adb_bsloc_t)/2));  // Find only those who have extant as requested  // find the one with the primary key greater or equal to   // the start key  r = cursor->c_pget (cursor, &ekey, &key, &data, DB_GET_BOTH_RANGE);  // Also must remember to start to loop around the ring  if (r == DB_NOTFOUND) {    bzero (&key, sizeof (key));    // I guess it didn't work.  Just try any old key with this extant    r = cursor->c_pget (cursor, &ekey, &key, &data, DB_SET);  }  // since we set a limit, we know the maximum amount we have to allocate  out.setsize( limit );  u_int32_t elements = 0;  while (!r && elements < limit)  {    adb_vbsinfo_t vbs;    chordID k = dbt_to_id (key);    // NOTE: this assumes that the keys are stored in order of the    // primary keys in the secondary db.  Also assumes the DB_GET_BOTH_RANGE    // call above works as expected.    if (!betweenrightincl (cur, stop, k)) {      r = DB_NOTFOUND;      break;    }    cur = k;    if (buf2xdr (vbs, data.data, data.size)) {      out[elements].block = k;      out[elements].hosts.setsize( vbs.d.size() );      // explicit deep copy      for( u_int32_t i = 0; i < vbs.d.size(); i++ ) {	out[elements].hosts[i].n = vbs.d[i].n;	out[elements].hosts[i].auxdata = vbs.d[i].auxdata;      }      elements++;    } else {      warner ("dbns::getblockrange_extant", "XDR unmarshalling failed", 0);    }     bzero (&key, sizeof (key));    bzero (&data, sizeof (data));    r = cursor->c_pget (cursor, &ekey, &key, &data, DB_NEXT_DUP);    if (r == DB_NOTFOUND)      r = cursor->c_pget (cursor, &ekey, &key, &data, DB_SET);  }  if (r && r != DB_NOTFOUND)    warner ("dbns::getblockrange_extant", "cursor get", r);  if( elements < limit ) {    out.setsize( elements );  }  (void) cursor->c_close (cursor);  return r;}// }}}// {{{ dbns::getkeysonintdbns::getkeyson (const adb_vnodeid &n, const chordID &start,    const chordID &stop, int count, rpc_vec<adb_keyaux_t, RPC_INFINITY> &out){  int r = 0;  DBC *cursor;  // Fully serialized reads of bsdb  r = bsdb->cursor (bsdb, NULL, &cursor, 0);  if (r) {    warner ("dbns::getkeyson", "cursor open", r);    (void) cursor->c_close (cursor);    return r;  }  chordID cur = start;  // Could possibly improve efficiency here by using SleepyCat's bulk reads  str key_str = id_to_str (start);  DBT key;  str_to_dbt (key_str, &key);  DBT data;  bzero (&data, sizeof (data));  u_int32_t limit = count;  if (count < 0)    limit = (u_int32_t) (asrvbufsize/(1.5*sizeof (adb_keyaux_t)));  // Position the cursor if possible  r = cursor->c_get (cursor, &key, &data, DB_SET_RANGE);  // Also must remember to start to loop around the ring  if (r == DB_NOTFOUND) {    bzero (&key, sizeof (key));    r = cursor->c_get (cursor, &key, &data, DB_FIRST);  }  // since we set a limit, we know the maximum amount we have to allocate  out.setsize( limit );  u_int32_t elements = 0;  // Each adb_keyaux_t is 24ish bytes; leave some slack  while (!r && elements < limit)  {    adb_vbsinfo_t vbs;    chordID curkey = dbt_to_id (key);    if (!betweenrightincl (cur, stop, curkey)) {      r = DB_NOTFOUND;      break;    }    cur = curkey;    if (buf2xdr (vbs, data.data, data.size)) {      size_t dx;      for (dx = 0; dx < vbs.d.size (); dx++) {	if (memcmp (&vbs.d[dx].n, &n, sizeof (n)) == 0) break;      }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -