⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 adbd.c

📁 chord 源码 http://pdos.csail.mit.edu/chord/
💻 C
📖 第 1 页 / 共 2 页
字号:
      if (dx < vbs.d.size ()) {	out[elements].key = curkey;	out[elements].auxdata = vbs.d[dx].auxdata;	elements++;      }    } else {      warner ("dbns::getkeyson", "XDR unmarshalling failed", 0);    }     bzero (&key, sizeof (key));    bzero (&data, sizeof (data));    r = cursor->c_get (cursor, &key, &data, DB_NEXT);    if (r == DB_NOTFOUND)      r = cursor->c_get (cursor, &key, &data, DB_FIRST);  }  if (r && r != DB_NOTFOUND)    warner ("dbns::getkeyson", "cursor get", r);  if( elements < limit ) {    out.setsize(elements);  }  (void) cursor->c_close (cursor);  return r;}// }}}// {{{ dbns::updateoneintdbns::updateone (const chordID &key, const adb_bsinfo_t &bsinfo, bool present,	         DB_TXN *t){  int r;  str key_str = id_to_str (key);  DBT skey;  str_to_dbt (key_str, &skey);  DBT data; bzero (&data, sizeof (data));  adb_vbsinfo_t vbs;  // get, with a write lock.  r = bsdb->get (bsdb, t, &skey, &data, DB_RMW);  if (r && r != DB_NOTFOUND) {    warner ("dbns::update", "get error", r);    return r;  }  // append/change  if (r == DB_NOTFOUND) {    if (present) {      vbs.d.push_back ();      vbs.d.back().n = bsinfo.n;      vbs.d.back().auxdata = bsinfo.auxdata;    } else {      // Nothing to do, go home.      return 0;    }  } else {    if (!buf2xdr (vbs, data.data, data.size)) {      warner ("dbns::update", "XDR unmarshalling failed", 0);      // Nuke the old corrupt, soft state data.      vbs.d.clear ();    }    // Find index of bsinfo in vbs.    size_t dx;     for (dx = 0; dx < vbs.d.size (); dx++)       if (memcmp (&vbs.d[dx].n, &bsinfo.n, sizeof (bsinfo.n)) == 0) break;    if (present) {      if (dx < vbs.d.size ())	vbs.d[dx].auxdata = bsinfo.auxdata;      else {	vbs.d.push_back ();	vbs.d.back().n = bsinfo.n;	vbs.d.back().auxdata = bsinfo.auxdata;      }    } else {      if (dx < vbs.d.size ()) {	vbs.d[dx] = vbs.d.back ();	vbs.d.pop_back ();      } else {	// No need to re-write to db; nothing changed.	return 0;      }    }  }  // put  str vbsout = xdr2str (vbs);  str_to_dbt (vbsout, &data);  r = bsdb->put (bsdb, t, &skey, &data, 0);  if (r)    warner ("dbns::update", "put error", r);  return r;}// }}}// {{{ dbns::updateintdbns::update (const chordID &key, const adb_bsinfo_t &bsinfo, bool present){  int r;  DB_TXN *t = NULL;  dbns_txn_begin (dbe, &t);  r = updateone (key, bsinfo, present, t);  if (r)    dbns_txn_abort (dbe, t);  else    dbns_txn_commit (dbe, t);  return r;}// }}}// {{{ dbns::updatebatchintdbns::updatebatch (rpc_vec<adb_updatearg, RPC_INFINITY> &uargs){  int r (0);  DB_TXN *t = NULL;  dbns_txn_begin (dbe, &t);  for (size_t i = 0; i < uargs.size (); i++) {    r = updateone (uargs[i].key, uargs[i].bsinfo, uargs[i].present, t);    if (r) {      dbns_txn_abort (dbe, t);      break;    }  }  if (!r) {    dbns_txn_commit (dbe, t);  }  return r;}// }}}// {{{ dbns::getinfointdbns::getinfo (const chordID &key, rpc_vec<adb_vnodeid, RPC_INFINITY> &out){  int r;  str key_str = id_to_str (key);  DBT skey;  str_to_dbt (key_str, &skey);  DBT data; bzero (&data, sizeof (data));  adb_vbsinfo_t vbs;  // get  r = bsdb->get (bsdb, NULL, &skey, &data, 0);  if (r && r != DB_NOTFOUND) {    warner ("dbns::update", "get error", r);    return r;  }  if (r == DB_NOTFOUND) {    return 0;  } else {    if (!buf2xdr (vbs, data.data, data.size)) {      warner ("dbns::update", "XDR unmarshalling failed", 0);      // Nuke the old corrupt, soft state data.      vbs.d.clear ();      return 0;    }    for (size_t i = 0; i < vbs.d.size (); i++)      out.push_back (vbs.d[i].n);  }  return 0;}// }}}// }}}// {{{ DB Managerclass dbmanager {  str dbpath;  ihash<str, dbns, &dbns::name, &dbns::hlink> dbs;  void dbcloser (dbns *db);public:  dbmanager (str);  ~dbmanager ();  const str &getdbpath () { return dbpath; };  dbns *get (const str &n) { return dbs[n]; };  dbns *createdb (const str &n, bool aux);};dbmanager::dbmanager (str p) :  dbpath (p){  if (p[p.len () - 1] != '/')    dbpath = strbuf () << p << "/";  struct stat sb;  if (stat (dbpath, &sb) < 0) {    if (errno == ENOENT) {      if (mkdir (dbpath, 0755) < 0)	fatal ("dbmanager::dbmanager: mkdir (%s): %m\n", dbpath.cstr ());    } else {      fatal ("dbmanager::dbmanager: stat (%s): %m\n", dbpath.cstr ());    }  } else {    if (!S_ISDIR (sb.st_mode))       fatal ("dbmanager::dbmanager: %s is not a directory\n", dbpath.cstr ());    if (access (dbpath, W_OK) < 0)      fatal ("dbmanager::manager: access (%s, W_OK): %m\n", dbpath.cstr ());  }}dbmanager::~dbmanager (){  dbs.traverse (wrap (this, &dbmanager::dbcloser));  dbs.clear ();}voiddbmanager::dbcloser (dbns *db){  if (db) {    warn << "Closing db " << db->name << "\n";    db->sync ();  }  delete db;  db = NULL;}dbns *dbmanager::createdb (const str &n, bool aux){  dbns *db = dbs[n];  if (!db) {    db = New dbns (dbpath, n, aux);    dbs.insert (db);  }  return db;}// }}}// {{{ Shutdown functionsEXITFN (cleanup);static voidcleanup (){  if (dbstarted) {    if (clntfd >= 0) {      fdcb (clntfd, selread, NULL);      close (clntfd);    }    unlink (dbsock);  }}static voidhalt (){  warn << "Exiting on command...\n";  delete dbm;  exit (0);}// }}}// {{{ RPC execution// {{{ do_initspacevoiddo_initspace (dbmanager *dbm, svccb *sbp){  adb_initspacearg *arg = sbp->Xtmpl getarg<adb_initspacearg> ();  adb_status stat (ADB_OK);  dbns *db = dbm->get (arg->name);  if (db) {    sbp->replyref (stat);     return;  }  db = dbm->createdb (arg->name, arg->hasaux);  stat = (db ? ADB_OK : ADB_ERR);  sbp->replyref (stat);}// }}}// {{{ do_storevoiddo_store (dbmanager *dbm, svccb *sbp){  adb_storearg *arg = sbp->Xtmpl getarg<adb_storearg> ();  dbns *db = dbm->get (arg->name);  if (!db) {    sbp->replyref (ADB_ERR);    return;  }  DBT data;  bzero (&data, sizeof (data));  data.data = arg->data.base ();  data.size = arg->data.size ();  DBT auxdatadbt;  bzero (&auxdatadbt, sizeof (auxdatadbt));  auxdatadbt.size = sizeof (u_int32_t);  u_int32_t nauxdata = htonl (arg->auxdata);  auxdatadbt.data = &nauxdata;   int r = db->insert (arg->key, data, auxdatadbt);  sbp->replyref ((r == 0) ? ADB_OK : ADB_ERR);}// }}}// {{{ do_fetchvoiddo_fetch (dbmanager *dbm, svccb *sbp){  adb_fetcharg *arg = sbp->Xtmpl getarg<adb_fetcharg> ();  adb_fetchres res (ADB_OK);   dbns *db = dbm->get (arg->name);  if (!db) {    res.set_status (ADB_ERR);    sbp->replyref (res);    return;  }   str data;   int r = db->lookup (arg->key, data);  if (r) {    res.set_status ((r == DB_NOTFOUND ? ADB_NOTFOUND : ADB_ERR));  } else {    res.resok->key = arg->key;    res.resok->data = data;  }  sbp->replyref (res);}// }}}// {{{ do_getkeysvoiddo_getkeys (dbmanager *dbm, svccb *sbp){  adb_getkeysarg *arg = sbp->Xtmpl getarg<adb_getkeysarg> ();  adb_getkeysres res (ADB_OK);  dbns *db = dbm->get (arg->name);  if (!db) {    res.set_status (ADB_ERR);    sbp->replyref (res);    return;  }  // Gets up to 128 keys at a time.  res.resok->hasaux = arg->getaux;  int r = db->getkeys (arg->start, 128, arg->getaux, res.resok->keyaux);  res.resok->complete = (r == DB_NOTFOUND);  if (r && r != DB_NOTFOUND)     res.set_status (ADB_ERR);  sbp->replyref (res);}// }}}// {{{ do_deletevoiddo_delete (dbmanager *dbm, svccb *sbp){  adb_deletearg *arg = sbp->Xtmpl getarg<adb_deletearg> ();  dbns *db = dbm->get (arg->name);  if (!db) {    sbp->replyref (ADB_ERR);    return;  }  int r = db->del (arg->key);  sbp->replyref ((r == 0) ? ADB_OK : ADB_NOTFOUND);}// }}}// {{{ do_getblockrangevoiddo_getblockrange (dbmanager *dbm, svccb *sbp){  adb_getblockrangearg *arg = sbp->Xtmpl getarg<adb_getblockrangearg> ();  adb_getblockrangeres *res = sbp->Xtmpl getres<adb_getblockrangeres> ();  dbns *db = dbm->get (arg->name);  if (!db) {    res->status = ADB_ERR;    sbp->reply (res);    return;  }  int r (0);  res->status = ADB_OK;  if (arg->extant >= 0) {    r = db->getblockrange_extant (arg->start, arg->stop,	  arg->extant, arg->count, res->blocks);  } else {    r = db->getblockrange_all (arg->start, arg->stop,	  arg->count, res->blocks);  }  if (r) {    if (r == DB_NOTFOUND) {      res->status = ADB_COMPLETE;    } else {      res->status = ADB_ERR;      res->blocks.clear ();    }  }  sbp->reply (res);}// }}}// {{{ do_getkeysonvoiddo_getkeyson (dbmanager *dbm, svccb *sbp){  adb_getkeysonarg *arg = sbp->Xtmpl getarg<adb_getkeysonarg> ();  adb_getkeysres *res = sbp->Xtmpl getres<adb_getkeysres> ();  res->set_status (ADB_OK);  dbns *db = dbm->get (arg->name);  if (!db) {    res->set_status (ADB_ERR);    sbp->reply (res);    return;  }  res->resok->hasaux = db->hasaux ();  int r = db->getkeyson (arg->who, arg->start, arg->stop, 128, 			 res->resok->keyaux);  res->resok->complete = (r == DB_NOTFOUND);  if (r && r != DB_NOTFOUND)     res->set_status (ADB_ERR);  sbp->reply (res);}// }}}// {{{ do_updatevoiddo_update (dbmanager *dbm, svccb *sbp){  adb_updatearg *arg = sbp->Xtmpl getarg<adb_updatearg> ();  adb_status res (ADB_OK);  dbns *db = dbm->get (arg->name);  if (!db) {    sbp->replyref (ADB_ERR);    return;  }  int r = db->update (arg->key, arg->bsinfo, arg->present);  if (r)    res = ADB_ERR;  sbp->replyref (res);}// }}}// {{{ do_updatebatchvoiddo_updatebatch (dbmanager *dbm, svccb *sbp){  adb_updatebatcharg *args = sbp->Xtmpl getarg<adb_updatebatcharg> ();  adb_status res (ADB_OK);  if (!args->args.size ()) {    sbp->replyref (res);    return;  }  dbns *db = dbm->get (args->args[0].name);  // assert: all args have the same namespace because batching  //         is done by the adb object in libadb.C and each adb obj  //         serves one namespace  if (!db) {    sbp->replyref (ADB_ERR);    return;  }  int r = db->updatebatch (args->args);  if (r)    res = ADB_ERR;  sbp->replyref (res);}// }}}// {{{ do_getinfovoiddo_getinfo (dbmanager *dbm, svccb *sbp){  adb_getinfoarg *arg = sbp->Xtmpl getarg<adb_getinfoarg> ();  adb_getinfores res;  res.status = ADB_OK;  dbns *db = dbm->get (arg->name);  if (!db) {    sbp->replyref (ADB_ERR);    return;  }  int r = db->getinfo (arg->key, res.nlist);  if (r) {    res.status = ADB_ERR;    res.nlist.clear ();  }  sbp->replyref (res);}// }}}// }}}// {{{ RPC accept and dispatchvoiddispatch (ref<axprt_stream> s, ptr<asrv> a, dbmanager *dbm, svccb *sbp){  if (sbp == NULL) {    warn << "EOF from client\n";    a = NULL;    return;  }  switch (sbp->proc ()) {  case ADBPROC_INITSPACE:    do_initspace (dbm, sbp);    break;  case ADBPROC_STORE:    do_store (dbm, sbp);    break;  case ADBPROC_FETCH:    do_fetch (dbm, sbp);    break;  case ADBPROC_GETKEYS:    do_getkeys (dbm, sbp);    break;  case ADBPROC_DELETE:    do_delete (dbm, sbp);    break;  case ADBPROC_GETBLOCKRANGE:    do_getblockrange (dbm, sbp);    break;  case ADBPROC_GETKEYSON:    do_getkeyson (dbm, sbp);    break;  case ADBPROC_UPDATE:    do_update (dbm, sbp);    break;  case ADBPROC_UPDATEBATCH:    do_updatebatch (dbm, sbp);    break;  case ADBPROC_GETINFO:    do_getinfo (dbm, sbp);    break;  default:    fatal << "unknown procedure: " << sbp->proc () << "\n";  }    return;}static void accept_cb (int lfd, dbmanager *dbm);static voidlisten_unix (str sock_name, dbmanager *dbm){  unlink (sock_name);  clntfd = unixsocket (sock_name);  if (clntfd < 0)     fatal << "Error creating socket (UNIX)" << strerror (errno) << "\n";    if (listen (clntfd, 128) < 0) {    fatal ("Error from listen: %m\n");    close (clntfd);  } else {    fdcb (clntfd, selread, wrap (accept_cb, clntfd, dbm));  }  dbstarted = true;}static void accept_cb (int lfd, dbmanager *dbm){  sockaddr_un sun;  bzero (&sun, sizeof (sun));  socklen_t sunlen = sizeof (sun);  int fd = accept (lfd, reinterpret_cast<sockaddr *> (&sun), &sunlen);  if (fd < 0)    fatal ("EOF\n");  ref<axprt_stream> x = axprt_stream::alloc (fd, asrvbufsize);  ptr<asrv> a = asrv::alloc (x, adb_program_1);  a->setcb (wrap (dispatch, x, a, dbm));}// }}}voidusage (){  warnx << "Usage: adbd -d db -S sock [-D]\n";  exit (0);}int main (int argc, char **argv){  setprogname (argv[0]);  char ch;  str db_name = "/var/tmp/db";  dbsock = "/tmp/db-sock";  bool do_daemonize (false);  while ((ch = getopt (argc, argv, "Dd:S:"))!=-1)    switch (ch) {    case 'D':      do_daemonize = true;      break;    case 'd':      db_name = optarg;      break;    case 'S':      dbsock = optarg;      break;    default:      usage ();    }   if (do_daemonize) {    warn << "starting daemonized\n";    daemonize ();  }  warn << "db path: " << db_name << "\n";  warn << "db socket: " << dbsock << "\n";  dbm = New dbmanager(db_name);  sigcb (SIGHUP, wrap (&halt));  sigcb (SIGINT, wrap (&halt));  sigcb (SIGTERM, wrap (&halt));  //setup the asrv  listen_unix (dbsock, dbm);  amain ();}// vim: foldmethod=marker

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -