📄 adbd.c
字号:
if (dx < vbs.d.size ()) { out[elements].key = curkey; out[elements].auxdata = vbs.d[dx].auxdata; elements++; } } else { warner ("dbns::getkeyson", "XDR unmarshalling failed", 0); } bzero (&key, sizeof (key)); bzero (&data, sizeof (data)); r = cursor->c_get (cursor, &key, &data, DB_NEXT); if (r == DB_NOTFOUND) r = cursor->c_get (cursor, &key, &data, DB_FIRST); } if (r && r != DB_NOTFOUND) warner ("dbns::getkeyson", "cursor get", r); if( elements < limit ) { out.setsize(elements); } (void) cursor->c_close (cursor); return r;}// }}}// {{{ dbns::updateoneintdbns::updateone (const chordID &key, const adb_bsinfo_t &bsinfo, bool present, DB_TXN *t){ int r; str key_str = id_to_str (key); DBT skey; str_to_dbt (key_str, &skey); DBT data; bzero (&data, sizeof (data)); adb_vbsinfo_t vbs; // get, with a write lock. r = bsdb->get (bsdb, t, &skey, &data, DB_RMW); if (r && r != DB_NOTFOUND) { warner ("dbns::update", "get error", r); return r; } // append/change if (r == DB_NOTFOUND) { if (present) { vbs.d.push_back (); vbs.d.back().n = bsinfo.n; vbs.d.back().auxdata = bsinfo.auxdata; } else { // Nothing to do, go home. return 0; } } else { if (!buf2xdr (vbs, data.data, data.size)) { warner ("dbns::update", "XDR unmarshalling failed", 0); // Nuke the old corrupt, soft state data. vbs.d.clear (); } // Find index of bsinfo in vbs. size_t dx; for (dx = 0; dx < vbs.d.size (); dx++) if (memcmp (&vbs.d[dx].n, &bsinfo.n, sizeof (bsinfo.n)) == 0) break; if (present) { if (dx < vbs.d.size ()) vbs.d[dx].auxdata = bsinfo.auxdata; else { vbs.d.push_back (); vbs.d.back().n = bsinfo.n; vbs.d.back().auxdata = bsinfo.auxdata; } } else { if (dx < vbs.d.size ()) { vbs.d[dx] = vbs.d.back (); vbs.d.pop_back (); } else { // No need to re-write to db; nothing changed. return 0; } } } // put str vbsout = xdr2str (vbs); str_to_dbt (vbsout, &data); r = bsdb->put (bsdb, t, &skey, &data, 0); if (r) warner ("dbns::update", "put error", r); return r;}// }}}// {{{ dbns::updateintdbns::update (const chordID &key, const adb_bsinfo_t &bsinfo, bool present){ int r; DB_TXN *t = NULL; dbns_txn_begin (dbe, &t); r = updateone (key, bsinfo, present, t); if (r) dbns_txn_abort (dbe, t); else dbns_txn_commit (dbe, t); return r;}// }}}// {{{ dbns::updatebatchintdbns::updatebatch (rpc_vec<adb_updatearg, RPC_INFINITY> &uargs){ int r (0); DB_TXN *t = NULL; dbns_txn_begin (dbe, &t); for (size_t i = 0; i < uargs.size (); i++) { r = updateone (uargs[i].key, uargs[i].bsinfo, uargs[i].present, t); if (r) { dbns_txn_abort (dbe, t); break; } } if (!r) { dbns_txn_commit (dbe, t); } return r;}// }}}// {{{ dbns::getinfointdbns::getinfo (const chordID &key, rpc_vec<adb_vnodeid, RPC_INFINITY> &out){ int r; str key_str = id_to_str (key); DBT skey; str_to_dbt (key_str, &skey); DBT data; bzero (&data, sizeof (data)); adb_vbsinfo_t vbs; // get r = bsdb->get (bsdb, NULL, &skey, &data, 0); if (r && r != DB_NOTFOUND) { warner ("dbns::update", "get error", r); return r; } if (r == DB_NOTFOUND) { return 0; } else { if (!buf2xdr (vbs, data.data, data.size)) { warner ("dbns::update", "XDR unmarshalling failed", 0); // Nuke the old corrupt, soft state data. vbs.d.clear (); return 0; } for (size_t i = 0; i < vbs.d.size (); i++) out.push_back (vbs.d[i].n); } return 0;}// }}}// }}}// {{{ DB Managerclass dbmanager { str dbpath; ihash<str, dbns, &dbns::name, &dbns::hlink> dbs; void dbcloser (dbns *db);public: dbmanager (str); ~dbmanager (); const str &getdbpath () { return dbpath; }; dbns *get (const str &n) { return dbs[n]; }; dbns *createdb (const str &n, bool aux);};dbmanager::dbmanager (str p) : dbpath (p){ if (p[p.len () - 1] != '/') dbpath = strbuf () << p << "/"; struct stat sb; if (stat (dbpath, &sb) < 0) { if (errno == ENOENT) { if (mkdir (dbpath, 0755) < 0) fatal ("dbmanager::dbmanager: mkdir (%s): %m\n", dbpath.cstr ()); } else { fatal ("dbmanager::dbmanager: stat (%s): %m\n", dbpath.cstr ()); } } else { if (!S_ISDIR (sb.st_mode)) fatal ("dbmanager::dbmanager: %s is not a directory\n", dbpath.cstr ()); if (access (dbpath, W_OK) < 0) fatal ("dbmanager::manager: access (%s, W_OK): %m\n", dbpath.cstr ()); }}dbmanager::~dbmanager (){ dbs.traverse (wrap (this, &dbmanager::dbcloser)); dbs.clear ();}voiddbmanager::dbcloser (dbns *db){ if (db) { warn << "Closing db " << db->name << "\n"; db->sync (); } delete db; db = NULL;}dbns *dbmanager::createdb (const str &n, bool aux){ dbns *db = dbs[n]; if (!db) { db = New dbns (dbpath, n, aux); dbs.insert (db); } return db;}// }}}// {{{ Shutdown functionsEXITFN (cleanup);static voidcleanup (){ if (dbstarted) { if (clntfd >= 0) { fdcb (clntfd, selread, NULL); close (clntfd); } unlink (dbsock); }}static voidhalt (){ warn << "Exiting on command...\n"; delete dbm; exit (0);}// }}}// {{{ RPC execution// {{{ do_initspacevoiddo_initspace (dbmanager *dbm, svccb *sbp){ adb_initspacearg *arg = sbp->Xtmpl getarg<adb_initspacearg> (); adb_status stat (ADB_OK); dbns *db = dbm->get (arg->name); if (db) { sbp->replyref (stat); return; } db = dbm->createdb (arg->name, arg->hasaux); stat = (db ? ADB_OK : ADB_ERR); sbp->replyref (stat);}// }}}// {{{ do_storevoiddo_store (dbmanager *dbm, svccb *sbp){ adb_storearg *arg = sbp->Xtmpl getarg<adb_storearg> (); dbns *db = dbm->get (arg->name); if (!db) { sbp->replyref (ADB_ERR); return; } DBT data; bzero (&data, sizeof (data)); data.data = arg->data.base (); data.size = arg->data.size (); DBT auxdatadbt; bzero (&auxdatadbt, sizeof (auxdatadbt)); auxdatadbt.size = sizeof (u_int32_t); u_int32_t nauxdata = htonl (arg->auxdata); auxdatadbt.data = &nauxdata; int r = db->insert (arg->key, data, auxdatadbt); sbp->replyref ((r == 0) ? ADB_OK : ADB_ERR);}// }}}// {{{ do_fetchvoiddo_fetch (dbmanager *dbm, svccb *sbp){ adb_fetcharg *arg = sbp->Xtmpl getarg<adb_fetcharg> (); adb_fetchres res (ADB_OK); dbns *db = dbm->get (arg->name); if (!db) { res.set_status (ADB_ERR); sbp->replyref (res); return; } str data; int r = db->lookup (arg->key, data); if (r) { res.set_status ((r == DB_NOTFOUND ? ADB_NOTFOUND : ADB_ERR)); } else { res.resok->key = arg->key; res.resok->data = data; } sbp->replyref (res);}// }}}// {{{ do_getkeysvoiddo_getkeys (dbmanager *dbm, svccb *sbp){ adb_getkeysarg *arg = sbp->Xtmpl getarg<adb_getkeysarg> (); adb_getkeysres res (ADB_OK); dbns *db = dbm->get (arg->name); if (!db) { res.set_status (ADB_ERR); sbp->replyref (res); return; } // Gets up to 128 keys at a time. res.resok->hasaux = arg->getaux; int r = db->getkeys (arg->start, 128, arg->getaux, res.resok->keyaux); res.resok->complete = (r == DB_NOTFOUND); if (r && r != DB_NOTFOUND) res.set_status (ADB_ERR); sbp->replyref (res);}// }}}// {{{ do_deletevoiddo_delete (dbmanager *dbm, svccb *sbp){ adb_deletearg *arg = sbp->Xtmpl getarg<adb_deletearg> (); dbns *db = dbm->get (arg->name); if (!db) { sbp->replyref (ADB_ERR); return; } int r = db->del (arg->key); sbp->replyref ((r == 0) ? ADB_OK : ADB_NOTFOUND);}// }}}// {{{ do_getblockrangevoiddo_getblockrange (dbmanager *dbm, svccb *sbp){ adb_getblockrangearg *arg = sbp->Xtmpl getarg<adb_getblockrangearg> (); adb_getblockrangeres *res = sbp->Xtmpl getres<adb_getblockrangeres> (); dbns *db = dbm->get (arg->name); if (!db) { res->status = ADB_ERR; sbp->reply (res); return; } int r (0); res->status = ADB_OK; if (arg->extant >= 0) { r = db->getblockrange_extant (arg->start, arg->stop, arg->extant, arg->count, res->blocks); } else { r = db->getblockrange_all (arg->start, arg->stop, arg->count, res->blocks); } if (r) { if (r == DB_NOTFOUND) { res->status = ADB_COMPLETE; } else { res->status = ADB_ERR; res->blocks.clear (); } } sbp->reply (res);}// }}}// {{{ do_getkeysonvoiddo_getkeyson (dbmanager *dbm, svccb *sbp){ adb_getkeysonarg *arg = sbp->Xtmpl getarg<adb_getkeysonarg> (); adb_getkeysres *res = sbp->Xtmpl getres<adb_getkeysres> (); res->set_status (ADB_OK); dbns *db = dbm->get (arg->name); if (!db) { res->set_status (ADB_ERR); sbp->reply (res); return; } res->resok->hasaux = db->hasaux (); int r = db->getkeyson (arg->who, arg->start, arg->stop, 128, res->resok->keyaux); res->resok->complete = (r == DB_NOTFOUND); if (r && r != DB_NOTFOUND) res->set_status (ADB_ERR); sbp->reply (res);}// }}}// {{{ do_updatevoiddo_update (dbmanager *dbm, svccb *sbp){ adb_updatearg *arg = sbp->Xtmpl getarg<adb_updatearg> (); adb_status res (ADB_OK); dbns *db = dbm->get (arg->name); if (!db) { sbp->replyref (ADB_ERR); return; } int r = db->update (arg->key, arg->bsinfo, arg->present); if (r) res = ADB_ERR; sbp->replyref (res);}// }}}// {{{ do_updatebatchvoiddo_updatebatch (dbmanager *dbm, svccb *sbp){ adb_updatebatcharg *args = sbp->Xtmpl getarg<adb_updatebatcharg> (); adb_status res (ADB_OK); if (!args->args.size ()) { sbp->replyref (res); return; } dbns *db = dbm->get (args->args[0].name); // assert: all args have the same namespace because batching // is done by the adb object in libadb.C and each adb obj // serves one namespace if (!db) { sbp->replyref (ADB_ERR); return; } int r = db->updatebatch (args->args); if (r) res = ADB_ERR; sbp->replyref (res);}// }}}// {{{ do_getinfovoiddo_getinfo (dbmanager *dbm, svccb *sbp){ adb_getinfoarg *arg = sbp->Xtmpl getarg<adb_getinfoarg> (); adb_getinfores res; res.status = ADB_OK; dbns *db = dbm->get (arg->name); if (!db) { sbp->replyref (ADB_ERR); return; } int r = db->getinfo (arg->key, res.nlist); if (r) { res.status = ADB_ERR; res.nlist.clear (); } sbp->replyref (res);}// }}}// }}}// {{{ RPC accept and dispatchvoiddispatch (ref<axprt_stream> s, ptr<asrv> a, dbmanager *dbm, svccb *sbp){ if (sbp == NULL) { warn << "EOF from client\n"; a = NULL; return; } switch (sbp->proc ()) { case ADBPROC_INITSPACE: do_initspace (dbm, sbp); break; case ADBPROC_STORE: do_store (dbm, sbp); break; case ADBPROC_FETCH: do_fetch (dbm, sbp); break; case ADBPROC_GETKEYS: do_getkeys (dbm, sbp); break; case ADBPROC_DELETE: do_delete (dbm, sbp); break; case ADBPROC_GETBLOCKRANGE: do_getblockrange (dbm, sbp); break; case ADBPROC_GETKEYSON: do_getkeyson (dbm, sbp); break; case ADBPROC_UPDATE: do_update (dbm, sbp); break; case ADBPROC_UPDATEBATCH: do_updatebatch (dbm, sbp); break; case ADBPROC_GETINFO: do_getinfo (dbm, sbp); break; default: fatal << "unknown procedure: " << sbp->proc () << "\n"; } return;}static void accept_cb (int lfd, dbmanager *dbm);static voidlisten_unix (str sock_name, dbmanager *dbm){ unlink (sock_name); clntfd = unixsocket (sock_name); if (clntfd < 0) fatal << "Error creating socket (UNIX)" << strerror (errno) << "\n"; if (listen (clntfd, 128) < 0) { fatal ("Error from listen: %m\n"); close (clntfd); } else { fdcb (clntfd, selread, wrap (accept_cb, clntfd, dbm)); } dbstarted = true;}static void accept_cb (int lfd, dbmanager *dbm){ sockaddr_un sun; bzero (&sun, sizeof (sun)); socklen_t sunlen = sizeof (sun); int fd = accept (lfd, reinterpret_cast<sockaddr *> (&sun), &sunlen); if (fd < 0) fatal ("EOF\n"); ref<axprt_stream> x = axprt_stream::alloc (fd, asrvbufsize); ptr<asrv> a = asrv::alloc (x, adb_program_1); a->setcb (wrap (dispatch, x, a, dbm));}// }}}voidusage (){ warnx << "Usage: adbd -d db -S sock [-D]\n"; exit (0);}int main (int argc, char **argv){ setprogname (argv[0]); char ch; str db_name = "/var/tmp/db"; dbsock = "/tmp/db-sock"; bool do_daemonize (false); while ((ch = getopt (argc, argv, "Dd:S:"))!=-1) switch (ch) { case 'D': do_daemonize = true; break; case 'd': db_name = optarg; break; case 'S': dbsock = optarg; break; default: usage (); } if (do_daemonize) { warn << "starting daemonized\n"; daemonize (); } warn << "db path: " << db_name << "\n"; warn << "db socket: " << dbsock << "\n"; dbm = New dbmanager(db_name); sigcb (SIGHUP, wrap (&halt)); sigcb (SIGINT, wrap (&halt)); sigcb (SIGTERM, wrap (&halt)); //setup the asrv listen_unix (dbsock, dbm); amain ();}// vim: foldmethod=marker
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -