adbd.c
来自「基于DHT的对等协议」· C语言 代码 · 共 1,538 行 · 第 1/3 页
C
1,538 行
#include <async.h>#include <arpc.h>#include <wmstr.h>#include <ihash.h>#include <sha1.h>#include <parseopt.h>#include <adb_prot.h>#include <id_utils.h>#include <dbfe.h>#include <merkle_tree_bdb.h>#include <sys/types.h>#include <dirent.h>// {{{ Globalsstatic bool dbstarted (false);static str dbsock;static int clntfd (-1);static const u_int32_t asrvbufsize (1024*1025);// Total disk allowed for data per dbns, in bytes.// Default: unlimitedstatic u_int64_t quota (0);// How many keys to try and expire at a time between stores.static u_int32_t expire_batch_size (24);// Threshold percentage for expiring on insertstatic u_int32_t expire_threshold (90);// Run an expiration of the mtree every this many seconds.static u_int32_t expire_mtree_interval (60);// If an object will expire in this many seconds, ignore it.static u_int32_t expire_buffer (15 * 60);// How frequently to consider checkpointing the database.// This affects the amount of disk I/O that's going to happen.static u_int32_t sync_interval (60);static u_int32_t max_unchkpt_log_size (1024); // KB// This is the key used to access the master metadata record.// The master metadata record contains:// Total size of objects put into a dbns,// The next expiration time for an object.static DBT master_metadata;class dbmanager;static dbmanager *dbm;// }}}// {{{ IO timingstatic bool iotime (false);static inline u_int64_tio_start (){ if (!iotime) return 0; timespec ts; clock_gettime (CLOCK_REALTIME, &ts); u_int64_t key = ts.tv_sec * INT64(1000000) + ts.tv_nsec / 1000; return key;}static inline voidio_finish (u_int64_t t, strbuf s){ if (!iotime) return; timespec ts; clock_gettime (CLOCK_REALTIME, &ts); u_int64_t now = ts.tv_sec * INT64(1000000) + ts.tv_nsec / 1000; warn << "IO: " << s << " in " << (now - t) << "us\n";}static voidtoggle_iotime (){ iotime = !iotime;}// }}}// {{{ DB key conversioninline voidid_to_dbt (const chordID &key, DBT *d){ static char buf[sha1::hashsize]; // XXX bug waiting to happen? bzero (d, sizeof (*d)); bzero (buf, sizeof (buf)); // XXX unnecessary; handled by rawmag. // We want big-endian for BTree mapping efficiency mpz_get_rawmag_be (buf, sizeof (buf), &key); d->size = sizeof (buf); d->data = (void *) buf;}inline voidstr_to_dbt (const str &s, DBT *d){ bzero (d, sizeof (*d)); d->size = s.len (); d->data = (void *) s.cstr ();}inline chordIDdbt_to_id (const DBT &dbt){ chordID id; assert (dbt.size == sha1::hashsize); mpz_set_rawmag_be (&id, static_cast<char *> (dbt.data), dbt.size); return id;}// }}}// {{{ DB for Namespace/* For each namespace (e.g. vnode + ctype), * we need to store several things: * 1. chordID -> data. 20 bytes -> potentially large bytes * 2. chordID -> metadata: 20 bytes -> ... * auxdata (e.g. noauth hash) 4 + 4*n bytes * expiration time * size * * The goal is to support the following operations: * A. Read/write of data objects * B. Merkle tree updates (shared with maintd via BDB) */ // {{{ getexpire: Secondary key extractor for metadatastatic intgetexpire (DB *sdb, const DBT *pkey, const DBT *pdata, DBT *skey){ u_int32_t *expire = (u_int32_t *) malloc(sizeof(u_int32_t)); // this flag should mean that berkeley db will free this memory when it's // done with it. skey->flags = DB_DBT_APPMALLOC; skey->data = expire; skey->size = sizeof (*expire); if (pkey->size == master_metadata.size && !memcmp (pkey->data, master_metadata.data, pkey->size)) { *expire = 0; return 0; } adb_metadata_t md; if (!buf2xdr (md, pdata->data, pdata->size)) { hexdump hd (pdata->data, pdata->size); warn << "getexpire: unable to unmarshal pdata.\n" << hd << "\n"; return -1; } // Ensure big-endian for proper BDB sorting. *expire = htonl (md.expiration); return 0;}// }}}// {{{ dbns declarationsclass dbns { friend class dbmanager; str name; ihash_entry<dbns> hlink; const bool aux; DB_ENV *dbe; str datapath; DB *metadatadb; DB *byexpiredb; u_int32_t last_mtree_time; merkle_tree_bdb *mtree; adb_master_metadata_t mmd; timecb_t *mtree_tcb; void mtree_cleaner (); timecb_t *sync_tcb; void periodic_sync (); int expire_walk (u_int32_t limit, u_int32_t start, u_int32_t end, vec<DBT> &victims, vec<adb_metadata_t> &victim_metadata); int update_metadata (bool add, u_int64_t sz, u_int32_t expiration, DB_TXN *t = NULL);public: dbns (const str &dbpath, const str &name, bool aux, str logpath = NULL); ~dbns (); void warner (const char *method, const char *desc, int r); void sync (bool force = false); bool hasaux () { return aux; }; str getname () { return name; } int get_metadata (const chordID &key, adb_metadata_t &metadata, DB_TXN *t = NULL); // Primary data management int insert (const chordID &key, DBT &data, u_int32_t auxdata = 0, u_int32_t exptime = 0); int lookup (const chordID &key, str &data, adb_metadata_t &md); int lookup_nextkey (const chordID &key, chordID &nextkey); int del (const chordID &key, u_int32_t auxdata); int getkeys (const chordID &start, size_t count, bool getaux, rpc_vec<adb_keyaux_t, RPC_INFINITY> &out); u_int32_t quotacheck (u_int64_t q) { // Returns percentage between 0 and 100 if (q == 0) return 0; if (q < mmd.size) return 100; return u_int64_t (100) * mmd.size / q; } int expire_mtree (); int expire (u_int32_t limit = 0, u_int32_t t = 0); str time2fn (u_int32_t exptime); int write_object (const chordID &key, DBT &data, u_int32_t t); int read_object (const chordID &key, str &data, adb_metadata_t &md); int expire_objects (u_int32_t exptime);};// }}}// {{{ dbns::dbnsdbns::dbns (const str &dbpath, const str &name, bool aux, str logpath) : name (name), aux (aux), dbe (NULL), metadatadb (NULL), byexpiredb (NULL), last_mtree_time (0), mtree (NULL), mtree_tcb (NULL), sync_tcb (NULL){ bzero (&mmd, sizeof (mmd));#define DBNS_ERRCHECK(desc) \ if (r) { \ fatal << desc << " returned " << r << ": " << db_strerror (r) << "\n"; \ return; \ } assert (dbpath[dbpath.len () - 1] == '/'); strbuf fullpath ("%s%s", dbpath.cstr (), name.cstr ()); str logconf = NULL; if (logpath) logconf = strbuf ("set_lg_dir %s/%s\n", logpath.cstr (), name.cstr ()); int r = -1; r = dbfe_initialize_dbenv (&dbe, fullpath, false, 10*1024, logconf); DBNS_ERRCHECK ("dbe->open"); datapath = fullpath << "/data"; mkdir (datapath, 0755); mtree = New merkle_tree_bdb (dbe, /* ro = */ false); r = dbfe_opendb (dbe, &metadatadb, "metadatadb", DB_CREATE, 0); DBNS_ERRCHECK ("metadatadb->open"); r = dbfe_opendb (dbe, &byexpiredb, "byexpiredb", DB_CREATE, 0, /* dups = */ true); DBNS_ERRCHECK ("byexpiredb->open"); r = metadatadb->associate (metadatadb, NULL, byexpiredb, getexpire, DB_AUTO_COMMIT); DBNS_ERRCHECK ("metadatdb->associate (byexpiredb)"); mtree_cleaner (); periodic_sync (); warn << "dbns::dbns (" << dbpath << ", " << name << ", " << aux << ")\n";}// }}}// {{{ dbns::~dbnsdbns::~dbns (){ if (mtree_tcb) { timecb_remove (mtree_tcb); mtree_tcb = NULL; } if (sync_tcb) { timecb_remove (sync_tcb); sync_tcb = NULL; } sync (/* force = */ true);#define DBNS_DBCLOSE(x) \ if (x) { \ (void) x->close (x, 0); x = NULL; \ } // Close out the merkle tree which shares our db environment delete mtree; mtree = NULL; // Close secondary before the primary for metadata DBNS_DBCLOSE(byexpiredb); DBNS_DBCLOSE(metadatadb); // Shut down the environment DBNS_DBCLOSE(dbe);#undef DBNS_DBCLOSE warn << "dbns::~dbns (" << name << ")\n";}// }}}// {{{ dbns::warnervoiddbns::warner (const char *method, const char *desc, int r){ timespec ts; strbuf t; clock_gettime (CLOCK_REALTIME, &ts); t.fmt ("%d.%06d ", int (ts.tv_sec), int (ts.tv_nsec/1000)); warn << t << ": " << method << ": " << desc << ": " << db_strerror (r) << "\n";}// }}}// {{{ dbns::periodic_syncvoiddbns::periodic_sync (){ sync_tcb = NULL; sync (); sync_tcb = delaycb (sync_interval + (tsnow.tv_nsec % 10), wrap (this, &dbns::periodic_sync));}// }}}// {{{ dbns::syncvoiddbns::sync (bool force){ int flags = 0; if (force) flags = DB_FORCE;#if (DB_VERSION_MAJOR < 4) txn_checkpoint (dbe, max_unchkpt_log_size, 10, flags);#else dbe->txn_checkpoint (dbe, max_unchkpt_log_size, 10, flags);#endif}// }}}// {{{ dbns::update_metadata (bool, u_int64_t, u_int32_t, DB_TXN *)intdbns::update_metadata (bool add, u_int64_t sz, u_int32_t expiration, DB_TXN *t){ DBT md; bzero (&md, sizeof (md)); int r = metadatadb->get (metadatadb, t, &master_metadata, &md, DB_RMW); switch (r) { case 0: if (!buf2xdr (mmd, md.data, md.size)) return -1; break; case DB_NOTFOUND: bzero (&mmd, sizeof (mmd)); break; default: return r; break; } if (!add && mmd.size < sz) { fatal << "dbns::update_metadata: small size: " << mmd.size << " < " << sz << "\n"; } if (add) mmd.size += sz; else mmd.size -= sz; if (add && quota && mmd.size > quota) return ENOSPC; if (mmd.expiration == 0 && expiration > 0) { if (sz > 0 && expiration < mmd.expiration) mmd.expiration = expiration; else if (sz < 0) { assert (expiration >= mmd.expiration); mmd.expiration = expiration; } } str md_str = xdr2str (mmd); str_to_dbt (md_str, &md); r = metadatadb->put (metadatadb, t, &master_metadata, &md, 0); return r;}// }}}// {{{ dbns::get_metadataintdbns::get_metadata (const chordID &key, adb_metadata_t &metadata, DB_TXN *t){ DBT skey; id_to_dbt (key, &skey); DBT md; bzero (&md, sizeof (md)); int r = metadatadb->get (metadatadb, t, &skey, &md, 0); if (r) { if (r != DB_NOTFOUND) warner ("dbns::get_metadata", "metadatadb->get", r); return r; } if (!buf2xdr (metadata, md.data, md.size)) return -1; return 0;}// }}}// {{{ dbns::insert (chordID, DBT, DBT)intdbns::insert (const chordID &key, DBT &data, u_int32_t auxdata, u_int32_t exptime){ int r = 0; DB_TXN *t = NULL; r = dbfe_txn_begin (dbe, &t); assert (r == 0); adb_metadata_t oldmetadata; r = get_metadata (key, oldmetadata, t); if (r != DB_NOTFOUND) { dbfe_txn_abort (dbe, t); return r; } r = update_metadata (true, data.size, exptime, t); if (r) { dbfe_txn_abort (dbe, t); // Even if r == ENOSPC, we can't afford to blow a lot of time // here doing expiration. return r; } // Prep BDB objects. DBT skey; id_to_dbt (key, &skey); const char *err = ""; int ret; if (exptime > timenow + expire_buffer) { // Only add to Merkle tree if this object is worth repairing. if (hasaux ()) { err = "mtree->insert aux"; r = mtree->insert (key, auxdata, t); } else { err = "mtree->insert"; r = mtree->insert (key, t); // insert may return DB_KEYEXIST in which case we need // not do any more work here. } if (r) { if (r != DB_KEYEXIST) warner ("dbns::insert", err, r); ret = dbfe_txn_abort (dbe, t); if (ret) warner ("dbns::insert", "abort/commit error", ret); return r; } } u_int32_t offset = write_object (key, data, exptime); if (offset < 0) { int saved_errno = errno; warn ("dbns::insert: write_object failed: %m\n"); ret = dbfe_txn_abort (dbe, t); if (ret) warner ("dbns::insert", "abort/commit error", ret); return saved_errno; } adb_metadata_t md; md.size = data.size; md.auxdata = auxdata; md.expiration = exptime; md.offset = offset; str md_str = xdr2str (md); DBT metadata; str_to_dbt (md_str, &metadata); err = "metadatadb->put"; r = metadatadb->put (metadatadb, t, &skey, &metadata, 0); if (r) { assert (r != DB_KEYEXIST); // Already checked. ret = dbfe_txn_abort (dbe, t); } else { ret = dbfe_txn_commit (dbe, t); } if (ret) warner ("dbns::insert", "abort/commit error", ret); return r;}// }}}// {{{ dbns::lookupintdbns::lookup (const chordID &key, str &data, adb_metadata_t &md){ int r = 0; r = read_object (key, data, md); // read_object returns -1 on error, 0 otherwise. if (r) { // Treat all errors as not found. return DB_NOTFOUND; } return 0;}// }}}// {{{ dbns::lookup_nextintdbns::lookup_nextkey (const chordID &key, chordID &nextkey){ int r = 0; DBT skey; id_to_dbt (key, &skey); DBT content; bzero (&content, sizeof (content)); DBC *cursor; r = metadatadb->cursor (metadatadb, NULL, &cursor, 0); if (r) { warner ("dbns::lookup_nextkey", "cursor open", r); (void) cursor->c_close (cursor);
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?