adbd.c

来自「基于DHT的对等协议」· C语言 代码 · 共 1,538 行 · 第 1/3 页

C
1,538
字号
#include <async.h>#include <arpc.h>#include <wmstr.h>#include <ihash.h>#include <sha1.h>#include <parseopt.h>#include <adb_prot.h>#include <id_utils.h>#include <dbfe.h>#include <merkle_tree_bdb.h>#include <sys/types.h>#include <dirent.h>// {{{ Globalsstatic bool dbstarted (false);static str dbsock;static int clntfd (-1);static const u_int32_t asrvbufsize (1024*1025);// Total disk allowed for data per dbns, in bytes.// Default: unlimitedstatic u_int64_t quota (0);// How many keys to try and expire at a time between stores.static u_int32_t expire_batch_size (24);// Threshold percentage for expiring on insertstatic u_int32_t expire_threshold (90);// Run an expiration of the mtree every this many seconds.static u_int32_t expire_mtree_interval (60);// If an object will expire in this many seconds, ignore it.static u_int32_t expire_buffer (15 * 60);// How frequently to consider checkpointing the database.// This affects the amount of disk I/O that's going to happen.static u_int32_t sync_interval (60);static u_int32_t max_unchkpt_log_size (1024); // KB// This is the key used to access the master metadata record.// The master metadata record contains://   Total size of objects put into a dbns,//   The next expiration time for an object.static DBT master_metadata;class dbmanager;static dbmanager *dbm;// }}}// {{{ IO timingstatic bool iotime (false);static inline u_int64_tio_start (){  if (!iotime)    return 0;  timespec ts;  clock_gettime (CLOCK_REALTIME, &ts);  u_int64_t key = ts.tv_sec * INT64(1000000) + ts.tv_nsec / 1000;  return key;}static inline voidio_finish (u_int64_t t, strbuf s){  if (!iotime)    return;  timespec ts;  clock_gettime (CLOCK_REALTIME, &ts);  u_int64_t now = ts.tv_sec * INT64(1000000) + ts.tv_nsec / 1000;  warn << "IO: " << s << " in " << (now - t) << "us\n";}static voidtoggle_iotime (){  iotime = !iotime;}// }}}// {{{ DB key conversioninline voidid_to_dbt (const chordID &key, DBT *d){  static char buf[sha1::hashsize]; // XXX bug waiting to happen?  bzero (d, sizeof (*d));  bzero (buf, sizeof (buf)); // XXX unnecessary; handled by rawmag.  // We want big-endian for BTree mapping efficiency  mpz_get_rawmag_be (buf, sizeof (buf), &key);  d->size = sizeof (buf);  d->data = (void *) buf;}inline voidstr_to_dbt (const str &s, DBT *d){  bzero (d, sizeof (*d));  d->size = s.len ();  d->data = (void *) s.cstr ();}inline chordIDdbt_to_id (const DBT &dbt){  chordID id;  assert (dbt.size == sha1::hashsize);  mpz_set_rawmag_be (&id, static_cast<char *> (dbt.data), dbt.size);  return id;}// }}}// {{{ DB for Namespace/* For each namespace (e.g. vnode + ctype), * we need to store several things: *   1. chordID -> data.  20 bytes -> potentially large bytes *   2. chordID -> metadata: 20 bytes -> ... *        auxdata (e.g. noauth hash) 4 + 4*n bytes *        expiration time *        size * * The goal is to support the following operations: *   A. Read/write of data objects *   B. Merkle tree updates (shared with maintd via BDB) */ // {{{ getexpire: Secondary key extractor for metadatastatic intgetexpire (DB *sdb, const DBT *pkey, const DBT *pdata, DBT *skey){  u_int32_t *expire = (u_int32_t *) malloc(sizeof(u_int32_t));  // this flag should mean that berkeley db will free this memory when it's  // done with it.  skey->flags = DB_DBT_APPMALLOC;  skey->data = expire;  skey->size = sizeof (*expire);  if (pkey->size == master_metadata.size &&       !memcmp (pkey->data, master_metadata.data, pkey->size))  {    *expire = 0;    return 0;  }  adb_metadata_t md;  if (!buf2xdr (md, pdata->data, pdata->size)) {    hexdump hd (pdata->data, pdata->size);    warn << "getexpire: unable to unmarshal pdata.\n" << hd << "\n";    return -1;  }  // Ensure big-endian for proper BDB sorting.  *expire = htonl (md.expiration);  return 0;}// }}}// {{{ dbns declarationsclass dbns {  friend class dbmanager;  str name;  ihash_entry<dbns> hlink;  const bool aux;  DB_ENV *dbe;  str datapath;  DB *metadatadb;  DB *byexpiredb;  u_int32_t last_mtree_time;  merkle_tree_bdb *mtree;  adb_master_metadata_t mmd;  timecb_t *mtree_tcb;  void mtree_cleaner ();  timecb_t *sync_tcb;  void periodic_sync ();  int expire_walk (u_int32_t limit, u_int32_t start, u_int32_t end,    vec<DBT> &victims, vec<adb_metadata_t> &victim_metadata);  int update_metadata (bool add, u_int64_t sz, u_int32_t expiration, DB_TXN *t = NULL);public:  dbns (const str &dbpath, const str &name, bool aux, str logpath = NULL);  ~dbns ();  void warner (const char *method, const char *desc, int r);  void sync (bool force = false);  bool hasaux () { return aux; };  str getname () { return name; }  int get_metadata (const chordID &key, adb_metadata_t &metadata, DB_TXN *t = NULL);  // Primary data management  int insert (const chordID &key, DBT &data, u_int32_t auxdata = 0, u_int32_t exptime = 0);  int lookup (const chordID &key, str &data, adb_metadata_t &md);  int lookup_nextkey (const chordID &key, chordID &nextkey);  int del (const chordID &key, u_int32_t auxdata);  int getkeys (const chordID &start, size_t count, bool getaux,      rpc_vec<adb_keyaux_t, RPC_INFINITY> &out);   u_int32_t quotacheck (u_int64_t q) {    // Returns percentage between 0 and 100    if (q == 0) return 0;    if (q < mmd.size) return 100;    return u_int64_t (100) * mmd.size / q;  }  int expire_mtree ();  int expire (u_int32_t limit = 0, u_int32_t t = 0);  str time2fn (u_int32_t exptime);  int write_object (const chordID &key, DBT &data, u_int32_t t);  int read_object (const chordID &key, str &data, adb_metadata_t &md);  int expire_objects (u_int32_t exptime);};// }}}// {{{ dbns::dbnsdbns::dbns (const str &dbpath, const str &name, bool aux, str logpath) :  name (name),  aux (aux),  dbe (NULL),  metadatadb (NULL),  byexpiredb (NULL),  last_mtree_time (0),  mtree (NULL),  mtree_tcb (NULL),  sync_tcb (NULL){  bzero (&mmd, sizeof (mmd));#define DBNS_ERRCHECK(desc) \  if (r) {		  \    fatal << desc << " returned " << r << ": " << db_strerror (r) << "\n"; \    return;		  \  }  assert (dbpath[dbpath.len () - 1] == '/');  strbuf fullpath ("%s%s", dbpath.cstr (), name.cstr ());  str logconf = NULL;  if (logpath)    logconf = strbuf ("set_lg_dir %s/%s\n", logpath.cstr (), name.cstr ());  int r = -1;  r = dbfe_initialize_dbenv (&dbe, fullpath, false, 10*1024, logconf);  DBNS_ERRCHECK ("dbe->open");  datapath = fullpath << "/data";  mkdir (datapath, 0755);  mtree = New merkle_tree_bdb (dbe, /* ro = */ false);  r = dbfe_opendb (dbe, &metadatadb, "metadatadb", DB_CREATE, 0);  DBNS_ERRCHECK ("metadatadb->open");  r = dbfe_opendb (dbe, &byexpiredb, "byexpiredb", DB_CREATE, 0, /* dups = */ true);  DBNS_ERRCHECK ("byexpiredb->open");  r = metadatadb->associate (metadatadb, NULL, byexpiredb, getexpire, DB_AUTO_COMMIT);  DBNS_ERRCHECK ("metadatdb->associate (byexpiredb)");  mtree_cleaner ();  periodic_sync ();  warn << "dbns::dbns (" << dbpath << ", " << name << ", " << aux << ")\n";}// }}}// {{{ dbns::~dbnsdbns::~dbns (){  if (mtree_tcb) {    timecb_remove (mtree_tcb);    mtree_tcb = NULL;  }  if (sync_tcb) {    timecb_remove (sync_tcb);    sync_tcb = NULL;  }  sync (/* force = */ true);#define DBNS_DBCLOSE(x)			\  if (x) {				\    (void) x->close (x, 0); x = NULL;	\  }  // Close out the merkle tree which shares our db environment  delete mtree;  mtree = NULL;  // Close secondary before the primary for metadata  DBNS_DBCLOSE(byexpiredb);  DBNS_DBCLOSE(metadatadb);  // Shut down the environment  DBNS_DBCLOSE(dbe);#undef DBNS_DBCLOSE  warn << "dbns::~dbns (" << name << ")\n";}// }}}// {{{ dbns::warnervoiddbns::warner (const char *method, const char *desc, int r){  timespec ts;  strbuf t;  clock_gettime (CLOCK_REALTIME, &ts);  t.fmt ("%d.%06d ", int (ts.tv_sec), int (ts.tv_nsec/1000));  warn << t << ": " << method << ": " << desc << ": " << db_strerror (r) << "\n";}// }}}// {{{ dbns::periodic_syncvoiddbns::periodic_sync (){  sync_tcb = NULL;  sync ();  sync_tcb = delaycb (sync_interval + (tsnow.tv_nsec % 10),      wrap (this, &dbns::periodic_sync));}// }}}// {{{ dbns::syncvoiddbns::sync (bool force){  int flags = 0;  if (force)    flags = DB_FORCE;#if (DB_VERSION_MAJOR < 4)  txn_checkpoint (dbe, max_unchkpt_log_size, 10, flags);#else  dbe->txn_checkpoint (dbe, max_unchkpt_log_size, 10, flags);#endif}// }}}// {{{ dbns::update_metadata (bool, u_int64_t, u_int32_t, DB_TXN *)intdbns::update_metadata (bool add, u_int64_t sz, u_int32_t expiration, DB_TXN *t){  DBT md; bzero (&md, sizeof (md));  int r = metadatadb->get (metadatadb, t, &master_metadata, &md, DB_RMW);  switch (r) {    case 0:      if (!buf2xdr (mmd, md.data, md.size))	return -1;      break;    case DB_NOTFOUND:      bzero (&mmd, sizeof (mmd));      break;    default:      return r;      break;  }  if (!add && mmd.size < sz) {    fatal << "dbns::update_metadata: small size: "          << mmd.size << " < " << sz << "\n";  }  if (add)    mmd.size += sz;  else    mmd.size -= sz;  if (add && quota && mmd.size > quota)    return ENOSPC;  if (mmd.expiration == 0 && expiration > 0) {    if (sz > 0 && expiration < mmd.expiration)      mmd.expiration = expiration;    else if (sz < 0) {      assert (expiration >= mmd.expiration);      mmd.expiration = expiration;    }  }  str md_str = xdr2str (mmd);  str_to_dbt (md_str, &md);  r = metadatadb->put (metadatadb, t, &master_metadata, &md, 0);  return r;}// }}}// {{{ dbns::get_metadataintdbns::get_metadata (const chordID &key, adb_metadata_t &metadata, DB_TXN *t){  DBT skey;  id_to_dbt (key, &skey);  DBT md;  bzero (&md, sizeof (md));  int r = metadatadb->get (metadatadb, t, &skey, &md, 0);  if (r) {    if (r != DB_NOTFOUND)      warner ("dbns::get_metadata", "metadatadb->get", r);    return r;  }  if (!buf2xdr (metadata, md.data, md.size))    return -1;  return 0;}// }}}// {{{ dbns::insert (chordID, DBT, DBT)intdbns::insert (const chordID &key, DBT &data, u_int32_t auxdata, u_int32_t exptime){  int r = 0;  DB_TXN *t = NULL;  r = dbfe_txn_begin (dbe, &t);  assert (r == 0);  adb_metadata_t oldmetadata;  r = get_metadata (key, oldmetadata, t);  if (r != DB_NOTFOUND) {    dbfe_txn_abort (dbe, t);    return r;  }  r = update_metadata (true, data.size, exptime, t);  if (r) {    dbfe_txn_abort (dbe, t);    // Even if r == ENOSPC, we can't afford to blow a lot of time    // here doing expiration.    return r;  }  // Prep BDB objects.  DBT skey;  id_to_dbt (key, &skey);  const char *err = "";  int ret;  if (exptime > timenow + expire_buffer) {    // Only add to Merkle tree if this object is worth repairing.    if (hasaux ()) {      err = "mtree->insert aux";      r = mtree->insert (key, auxdata, t);    } else {      err = "mtree->insert";      r = mtree->insert (key, t);      // insert may return DB_KEYEXIST in which case we need      // not do any more work here.    }    if (r) {      if (r != DB_KEYEXIST)	warner ("dbns::insert", err, r);      ret = dbfe_txn_abort (dbe, t);      if (ret)	warner ("dbns::insert", "abort/commit error", ret);      return r;    }  }  u_int32_t offset = write_object (key, data, exptime);  if (offset < 0) {    int saved_errno = errno;    warn ("dbns::insert: write_object failed: %m\n");    ret = dbfe_txn_abort (dbe, t);    if (ret)      warner ("dbns::insert", "abort/commit error", ret);    return saved_errno;  }  adb_metadata_t md;  md.size = data.size;  md.auxdata = auxdata;  md.expiration = exptime;  md.offset = offset;  str md_str = xdr2str (md);  DBT metadata;  str_to_dbt (md_str, &metadata);  err = "metadatadb->put";  r = metadatadb->put (metadatadb, t, &skey, &metadata, 0);  if (r) {    assert (r != DB_KEYEXIST); // Already checked.    ret = dbfe_txn_abort (dbe, t);  } else {    ret = dbfe_txn_commit (dbe, t);  }  if (ret)    warner ("dbns::insert", "abort/commit error", ret);  return r;}// }}}// {{{ dbns::lookupintdbns::lookup (const chordID &key, str &data, adb_metadata_t &md){  int r = 0;  r = read_object (key, data, md);  // read_object returns -1 on error, 0 otherwise.  if (r) {    // Treat all errors as not found.    return DB_NOTFOUND;  }  return 0;}// }}}// {{{ dbns::lookup_nextintdbns::lookup_nextkey (const chordID &key, chordID &nextkey){  int r = 0;  DBT skey;  id_to_dbt (key, &skey);  DBT content;  bzero (&content, sizeof (content));  DBC *cursor;  r = metadatadb->cursor (metadatadb, NULL, &cursor, 0);  if (r) {    warner ("dbns::lookup_nextkey", "cursor open", r);    (void) cursor->c_close (cursor);

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?