📄 adbd.c
字号:
#include <async.h>#include <arpc.h>#include <wmstr.h>#include <ihash.h>#include <sha1.h>#include <adb_prot.h>#include <id_utils.h>#include <dbfe.h>// {{{ Globalsstatic bool dbstarted (false);static str dbsock;static int clntfd (-1);static const u_int32_t asrvbufsize (1024*1025);class dbmanager;static dbmanager *dbm;// }}}// {{{ DB key conversionstrid_to_str (const chordID &key){ // pad out all keys to 20 bytes so that they sort correctly str keystr = strbuf () << key; strbuf padkeystr; for (int pad = 2*sha1::hashsize - keystr.len (); pad > 0; pad--) padkeystr << "0"; padkeystr << keystr; assert (padkeystr.tosuio ()->resid () == 2*sha1::hashsize); return padkeystr;}inline voidstr_to_dbt (const str &s, DBT *d){ bzero (d, sizeof (*d)); d->size = s.len (); d->data = (void *) s.cstr ();}inline chordIDdbt_to_id (const DBT &dbt){ str c (static_cast<char *> (dbt.data), dbt.size); chordID id; if (!str2chordID (c, id)) fatal << "Invalid chordID as database key: " << c << "\n"; return id;}// }}}// {{{ DB for Namespace/* For each namespace (e.g. vnode + ctype, and shared cache), * we potentially need to store several things: * 1. chordID -> data. 20 bytes -> potentially large bytes * 2. chordID -> auxdata (e.g. noauth hash). 20 bytes -> 4 bytes-ish * 3. BSM data (chordID -> [vnodeid, ...]. 20 bytes -> 6*16 bytes-ish * vnodeid -> [chordID, ...]) * with potentially the ability to (3) ordered by len([vnodeid...]). * This object encapsulates all that stuff. */ // {{{ getnumreplicas: Secondary key extractor for bsm datastatic intgetnumreplicas (DB *sdb, const DBT *pkey, const DBT *pdata, DBT *skey){ u_int32_t *numreps = (u_int32_t *) malloc(sizeof(u_int32_t)); adb_vbsinfo_t vbs; if (!buf2xdr (vbs, pdata->data, pdata->size)) { hexdump hd (pdata->data, pdata->size); warn << "getnumreplicas: unable to unmarshal pdata.\n" << hd << "\n"; return -1; } *numreps = htonl (vbs.d.size ()); // this flag should mean that berkeley db will free this memory when it's // done with it. skey->flags = DB_DBT_APPMALLOC; skey->data = numreps; skey->size = sizeof (*numreps); return 0;}// }}}// {{{ BerkeleyDB transaction API versioning wrappersstatic inline intdbns_txn_begin (DB_ENV *dbe, DB_TXN **t){ int r;#if DB_VERSION_MAJOR >= 4 r = dbe->txn_begin (dbe, NULL, t, 0);#else r = txn_begin (dbe, NULL, t, 0);#endif return r;}static inline intdbns_txn_abort (DB_ENV *dbe, DB_TXN *t){ int r;#if DB_VERSION_MAJOR >= 4 r = t->abort (t);#else r = txn_abort (t, 0);#endif return r;}static inline intdbns_txn_commit (DB_ENV *dbe, DB_TXN *t){ int r;#if DB_VERSION_MAJOR >= 4 r = t->commit (t, 0);#else r = txn_commit (t, 0);#endif return r;}// }}}class dbns { friend class dbmanager; str name; ihash_entry<dbns> hlink; DB_ENV *dbe; DB *datadb; DB *auxdatadb; DB *bsdb; DB *bsdx; // XXX should have a instance-level lock that governs datadb/auxdatadb. // Private updater, for different txn handling int updateone (const chordID &key, const adb_bsinfo_t &bsinfo, bool present, DB_TXN *t);public: dbns (const str &dbpath, const str &name, bool aux); ~dbns (); void warner (const char *method, const char *desc, int r); void sync (); bool hasaux () { return auxdatadb != NULL; }; // Primary data management int insert (const chordID &key, const str data, u_int32_t auxdata); int insert (const chordID &key, DBT &data, DBT &auxdata); int lookup (const chordID &key, str &data); int del (const chordID &key); int getkeys (const chordID &start, size_t count, bool getaux, rpc_vec<adb_keyaux_t, RPC_INFINITY> &out); // Block status information int getblockrange_all (const chordID &start, const chordID &stop, int count, rpc_vec<adb_bsloc_t, RPC_INFINITY> &out); int getblockrange_extant (const chordID &start, const chordID &stop, int extant, int count, rpc_vec<adb_bsloc_t, RPC_INFINITY> &out); int getkeyson (const adb_vnodeid &n, const chordID &start, const chordID &stop, int count, rpc_vec<adb_keyaux_t, RPC_INFINITY> &out); int update (const chordID &block, const adb_bsinfo_t &bsinfo, bool present); int updatebatch (rpc_vec<adb_updatearg, RPC_INFINITY> &uargs); int getinfo (const chordID &block, rpc_vec<adb_vnodeid, RPC_INFINITY> &out);};// {{{ dbns::dbnsdbns::dbns (const str &dbpath, const str &name, bool aux) : name (name), dbe (NULL), datadb (NULL), auxdatadb (NULL), bsdb (NULL), bsdx (NULL){#define DBNS_ERRCHECK(desc) \ if (r) { \ fatal << desc << " returned " << r << ": " << db_strerror (r) << "\n"; \ return; \ } assert (dbpath[dbpath.len () - 1] == '/'); strbuf fullpath ("%s%s", dbpath.cstr (), name.cstr ()); int r = -1; r = dbfe_initialize_dbenv (&dbe, fullpath, false, 1024); DBNS_ERRCHECK ("dbe->open"); r = dbfe_opendb (dbe, &datadb, "db", DB_CREATE, 0); DBNS_ERRCHECK ("datadb->open"); if (aux) { r = dbfe_opendb (dbe, &auxdatadb, "auxdb", DB_CREATE, 0); DBNS_ERRCHECK ("auxdatadb->open"); } r = dbfe_opendb (dbe, &bsdb, "bsdb", DB_CREATE, 0); DBNS_ERRCHECK ("bsdb->open"); r = dbfe_opendb (dbe, &bsdx, "bsdx", DB_CREATE, 0, /* dups = */ true); DBNS_ERRCHECK ("bsdx->open"); r = bsdb->associate (bsdb, NULL, bsdx, getnumreplicas, DB_AUTO_COMMIT); DBNS_ERRCHECK ("bsdb->associate (bsdx)"); warn << "dbns::dbns (" << dbpath << ", " << name << ", " << aux << ")\n";}// }}}// {{{ dbns::~dbnsdbns::~dbns (){#define DBNS_DBCLOSE(x) \ if (x) { \ (void) x->close (x, 0); x = NULL; \ } // Start with main databases DBNS_DBCLOSE(datadb); DBNS_DBCLOSE(auxdatadb); // Close secondary before the primary DBNS_DBCLOSE(bsdx); DBNS_DBCLOSE(bsdb); // Shut down the environment DBNS_DBCLOSE(dbe);#undef DBNS_DBCLOSE warn << "dbns::~dbns (" << name << ")\n";}// }}}// {{{ dbns::warnervoiddbns::warner (const char *method, const char *desc, int r){ timespec ts; strbuf t; clock_gettime (CLOCK_REALTIME, &ts); t.fmt ("%d.%06d ", int (ts.tv_sec), int (ts.tv_nsec/1000)); warn << t << ": " << method << ": " << desc << ": " << db_strerror (r) << "\n";}// }}}// {{{ dbns::syncvoiddbns::sync (){#if (DB_VERSION_MAJOR < 4) txn_checkpoint (dbe, 0, 0, 0);#else dbe->txn_checkpoint (dbe, 0, 0, 0);#endif}// }}}// {{{ dbns::insert (chordID, DBT, DBT)intdbns::insert (const chordID &key, DBT &data, DBT &auxdata){ int r = 0; str key_str = id_to_str (key); DBT skey; str_to_dbt (key_str, &skey); if (auxdatadb) { // To keep auxdata in sync, use an explicit transaction DB_TXN *t = NULL; r = dbns_txn_begin (dbe, &t); r = datadb->put (datadb, t, &skey, &data, 0); if (r) { warner ("dbns::insert", "data put error", r); r = dbns_txn_abort (dbe, t); } r = auxdatadb->put (auxdatadb, t, &skey, &auxdata, 0); if (r) { warner ("dbns::insert", "auxdata put error", r); r = dbns_txn_abort (dbe, t); } else { r = dbns_txn_commit (dbe, t); } if (r) warner ("dbns::insert", "abort/commit error", r); } else { // Use implicit transaction r = datadb->put (datadb, NULL, &skey, &data, DB_AUTO_COMMIT); if (r) warner ("dbns::insert", "put error", r); } return r;}// }}}// {{{ dbns::insert (chordID, str, u_int32_t)intdbns::insert (const chordID &key, const str data, u_int32_t auxdata){ DBT datadbt; str_to_dbt (data, &datadbt); DBT auxdatadbt; bzero (&auxdatadbt, sizeof (auxdatadbt)); auxdatadbt.size = sizeof (u_int32_t); u_int32_t nauxdata = htonl (auxdata); auxdatadbt.data = &nauxdata; return insert (key, data, auxdata);}// }}}// {{{ dbns::lookupintdbns::lookup (const chordID &key, str &data){ int r = 0; str key_str = id_to_str (key); DBT skey; str_to_dbt (key_str, &skey); DBT content; bzero (&content, sizeof (content)); // Implicit transaction r = datadb->get (datadb, NULL, &skey, &content, 0); if (r) { if (r != DB_NOTFOUND) warner ("dbns::fetch", "get error", r); return r; } data.setbuf ((const char *) (content.data), content.size); return 0;}// }}}// {{{ dbns::delintdbns::del (const chordID &key){ int r = 0; str key_str = id_to_str (key); DBT skey; str_to_dbt (key_str, &skey); // Implicit transaction r = datadb->del (datadb, NULL, &skey, DB_AUTO_COMMIT); if (r && r != DB_NOTFOUND) warner ("dbns::del", "del error", r); return r;}// }}}// {{{ dbns::getkeysintdbns::getkeys (const chordID &start, size_t count, bool getaux, rpc_vec<adb_keyaux_t, RPC_INFINITY> &out){ int r = 0; DBC *cursor; // Fully serialized reads of auxdatadb if (auxdatadb) r = auxdatadb->cursor (auxdatadb, NULL, &cursor, 0); else r = datadb->cursor (datadb, NULL, &cursor, 0); if (r) { warner ("dbns::getkeys", "cursor open", r); (void) cursor->c_close (cursor); return r; } // Could possibly improve efficiency here by using SleepyCat's bulk reads str key_str = id_to_str (start); DBT key; str_to_dbt (key_str, &key); DBT data_template; bzero (&data_template, sizeof (data_template)); // If DB_DBT_PARTIAL and data.dlen == 0, no data is read. if (!auxdatadb || !getaux) data_template.flags = DB_DBT_PARTIAL; DBT data = data_template; u_int32_t limit = count; if (count < 0) limit = (u_int32_t) (asrvbufsize/(1.5*sizeof (adb_keyaux_t))); // since we set a limit, we know the maximum amount we have to allocate out.setsize( limit ); u_int32_t elements = 0; r = cursor->c_get (cursor, &key, &data, DB_SET_RANGE); while (elements < limit && !r) { out[elements].key = dbt_to_id (key); if (getaux) out[elements].auxdata = ntohl (*(u_int32_t *)data.data); elements++; bzero (&key, sizeof (key)); data = data_template; r = cursor->c_get (cursor, &key, &data, DB_NEXT); } if( elements < limit ) { out.setsize( elements ); } if (r && r != DB_NOTFOUND) warner ("dbns::getkeys", "cursor get", r); (void) cursor->c_close (cursor); return r;}// }}}// {{{ dbns::getblockrangeintdbns::getblockrange_all (const chordID &start, const chordID &stop, int count, rpc_vec<adb_bsloc_t, RPC_INFINITY> &out){ int r = 0; DBC *cursor; r = bsdb->cursor (bsdb, NULL, &cursor,0); if (r) { warner ("dbns::getblockrange_all", "cursor open", r); (void) cursor->c_close (cursor); return r; } chordID cur = start; str key_str = id_to_str (start); DBT key; str_to_dbt (key_str, &key); DBT data; bzero (&data, sizeof (data)); u_int32_t limit = count; if (count < 0) limit = (asrvbufsize/(sizeof (adb_bsloc_t)/2)); r = cursor->c_get (cursor, &key, &data, DB_SET_RANGE); // Loop around the ring if at end. if (r == DB_NOTFOUND) { bzero (&key, sizeof (key)); r = cursor->c_get (cursor, &key, &data, DB_FIRST); } // since we set a limit, we know the maximum amount we have to allocate out.setsize( limit ); u_int32_t elements = 0; while (!r && elements < limit) { adb_vbsinfo_t vbs; chordID k = dbt_to_id (key); if (!betweenrightincl (cur, stop, k)) { r = DB_NOTFOUND; break; } cur = k; if (buf2xdr (vbs, data.data, data.size)) { out[elements].block = k; out[elements].hosts.setsize( vbs.d.size() ); // explicit deep copy for( u_int32_t i = 0; i < vbs.d.size(); i++ ) { out[elements].hosts[i].n = vbs.d[i].n; out[elements].hosts[i].auxdata = vbs.d[i].auxdata; } elements++; } else { warner ("dbns::getblockrange_all", "XDR unmarshalling failed", 0); } bzero (&key, sizeof (key)); bzero (&data, sizeof (data)); r = cursor->c_get (cursor, &key, &data, DB_NEXT); if (r == DB_NOTFOUND) r = cursor->c_get (cursor, &key, &data, DB_FIRST); } if (r && r != DB_NOTFOUND) warner ("dbns::getblockrange_all", "cursor get", r); if( elements < limit ) { out.setsize( elements ); } (void) cursor->c_close (cursor); return r;}intdbns::getblockrange_extant (const chordID &start, const chordID &stop, int extant, int count, rpc_vec<adb_bsloc_t, RPC_INFINITY> &out){ // XXX maybe think of a way to improve the degree of abstraction. // for cursors. int r = 0; DBC *cursor; r = bsdx->cursor (bsdx, NULL, &cursor,0); if (r) { warner ("dbns::getblockrange_extant", "cursor open", r); (void) cursor->c_close (cursor); return r; } extant = htonl (extant); DBT ekey; bzero (&ekey, sizeof (ekey)); ekey.data = &extant; ekey.size = sizeof (extant); str key_str = id_to_str (start); DBT key; str_to_dbt (key_str, &key); chordID cur = start; DBT data; bzero (&data, sizeof (data)); u_int32_t limit = count; if (count < 0) limit = (asrvbufsize/(sizeof (adb_bsloc_t)/2)); // Find only those who have extant as requested // find the one with the primary key greater or equal to // the start key r = cursor->c_pget (cursor, &ekey, &key, &data, DB_GET_BOTH_RANGE); // Also must remember to start to loop around the ring if (r == DB_NOTFOUND) { bzero (&key, sizeof (key)); // I guess it didn't work. Just try any old key with this extant r = cursor->c_pget (cursor, &ekey, &key, &data, DB_SET); } // since we set a limit, we know the maximum amount we have to allocate out.setsize( limit ); u_int32_t elements = 0; while (!r && elements < limit) { adb_vbsinfo_t vbs; chordID k = dbt_to_id (key); // NOTE: this assumes that the keys are stored in order of the // primary keys in the secondary db. Also assumes the DB_GET_BOTH_RANGE // call above works as expected. if (!betweenrightincl (cur, stop, k)) { r = DB_NOTFOUND; break; } cur = k; if (buf2xdr (vbs, data.data, data.size)) { out[elements].block = k; out[elements].hosts.setsize( vbs.d.size() ); // explicit deep copy for( u_int32_t i = 0; i < vbs.d.size(); i++ ) { out[elements].hosts[i].n = vbs.d[i].n; out[elements].hosts[i].auxdata = vbs.d[i].auxdata; } elements++; } else { warner ("dbns::getblockrange_extant", "XDR unmarshalling failed", 0); } bzero (&key, sizeof (key)); bzero (&data, sizeof (data)); r = cursor->c_pget (cursor, &ekey, &key, &data, DB_NEXT_DUP); if (r == DB_NOTFOUND) r = cursor->c_pget (cursor, &ekey, &key, &data, DB_SET); } if (r && r != DB_NOTFOUND) warner ("dbns::getblockrange_extant", "cursor get", r); if( elements < limit ) { out.setsize( elements ); } (void) cursor->c_close (cursor); return r;}// }}}// {{{ dbns::getkeysonintdbns::getkeyson (const adb_vnodeid &n, const chordID &start, const chordID &stop, int count, rpc_vec<adb_keyaux_t, RPC_INFINITY> &out){ int r = 0; DBC *cursor; // Fully serialized reads of bsdb r = bsdb->cursor (bsdb, NULL, &cursor, 0); if (r) { warner ("dbns::getkeyson", "cursor open", r); (void) cursor->c_close (cursor); return r; } chordID cur = start; // Could possibly improve efficiency here by using SleepyCat's bulk reads str key_str = id_to_str (start); DBT key; str_to_dbt (key_str, &key); DBT data; bzero (&data, sizeof (data)); u_int32_t limit = count; if (count < 0) limit = (u_int32_t) (asrvbufsize/(1.5*sizeof (adb_keyaux_t))); // Position the cursor if possible r = cursor->c_get (cursor, &key, &data, DB_SET_RANGE); // Also must remember to start to loop around the ring if (r == DB_NOTFOUND) { bzero (&key, sizeof (key)); r = cursor->c_get (cursor, &key, &data, DB_FIRST); } // since we set a limit, we know the maximum amount we have to allocate out.setsize( limit ); u_int32_t elements = 0; // Each adb_keyaux_t is 24ish bytes; leave some slack while (!r && elements < limit) { adb_vbsinfo_t vbs; chordID curkey = dbt_to_id (key); if (!betweenrightincl (cur, stop, curkey)) { r = DB_NOTFOUND; break; } cur = curkey; if (buf2xdr (vbs, data.data, data.size)) { size_t dx; for (dx = 0; dx < vbs.d.size (); dx++) { if (memcmp (&vbs.d[dx].n, &n, sizeof (n)) == 0) break; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -