dbfe.c

来自「基于DHT的对等协议」· C语言 代码 · 共 449 行

C
449
字号
/* * dbfe.C * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License as * published by the Free Software Foundation; either version 2, or (at * your option) any later version. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 * USA * */#include <async.h>#include "dbfe.h"#ifdef DMALLOC#include "dmalloc.h"#endif#define CACHE_OPT "opt_cachesize"#define CREATE_OPT "opt_create"#define DBENV_OPT    "opt_dbenv"#define FLAG_OPT   "opt_flag"#define JOIN_OPT  "opt_join"#define PERM_OPT "opt_permissions"// Choose the best stablest txn protected level availablestatic const int isolated_read_flag = #if (DB_VERSION_MAJOR < 4) || \    ((DB_VERSION_MAJOR == 4) && (DB_VERSION_MINOR < 3))    DB_DIRTY_READ#elif ((DB_VERSION_MAJOR == 4) && (DB_VERSION_MINOR == 3))    DB_DEGREE_2#else     DB_READ_COMMITTED#endif     ;///////////////// static /////////////////////ref<dbImplInfo>dbGetImplInfo() {  ref<dbImplInfo> info = New refcounted<dbImplInfo>();  info->supportedOptions.push_back(CACHE_OPT);  info->supportedOptions.push_back(CREATE_OPT);  info->supportedOptions.push_back(DBENV_OPT);  info->supportedOptions.push_back(FLAG_OPT);  info->supportedOptions.push_back(JOIN_OPT);  info->supportedOptions.push_back(PERM_OPT);  return info;}//////////////////// dbOptions ///////////////////////dbOptions::dbOptions()  {}int verifyOption (const char *optionSig) {  vec<const char *> allowed = dbGetImplInfo()->supportedOptions;  for (unsigned int i=0; i < allowed.size(); i++)     if (memcmp(allowed[i], optionSig, strlen(allowed[i])) == 0) return 1;  return 0;}intdbOptions::addOption(const char *optionSig, long value) {  if (!verifyOption(optionSig)) return EINVAL;  optionRec optr;  optr.sig = optionSig;  optr.value = value;  options.push_back(optr);  return 0;}longdbOptions::getOption(const char *optionSig) {  for (unsigned int i=0; i < options.size(); i++)     if (memcmp(options[i].sig, optionSig, strlen(options[i].sig)) == 0) return options[i].value;  return -1;}//////////////////////dbEnumeration//////////////////////dbEnumeration::dbEnumeration (DB *db, DB_ENV *dbe) :  txnid (NULL),  dbe (dbe),  cursor (NULL){  int r = 0;  if (dbe) {    r = dbfe_txn_begin (dbe, &txnid);    if (r) {      const char *path (NULL);      dbe->get_home (dbe, &path);      fatal << "enumeration error for " << path << ": "	    << db_strerror (r) << "\n";    }  }  r = db->cursor(db, txnid, &cursor, isolated_read_flag);  if (r) {    const char *path (NULL);    dbe->get_home (dbe, &path);    fatal << "enumeration error for " << path << ": "          << db_strerror (r) << "\n";  }}dbEnumeration::~dbEnumeration() {  cursor->c_close (cursor);  if (txnid) {    dbfe_txn_commit (dbe, txnid);    txnid = NULL;  }}ptr<dbPair>dbEnumeration::getElement(u_int32_t flags, const str &startkey,    bool getdata){  DBT key;  bzero(&key, sizeof(key));  if (startkey) {    key.size = startkey.len ();    key.data = (void *) startkey.cstr ();  }  DBT data;  bzero(&data, sizeof(data));  if (!getdata)    data.flags = DB_DBT_PARTIAL;  int err = cursor->c_get(cursor, &key, &data, flags);  if (err) {    //    warn << "db error: " << db_strerror(err) << "\n";    return NULL;  }  str keyrec (static_cast<char *> (key.data), key.size);  str valrec = NULL;  if (getdata)     valrec = str (static_cast<char *> (data.data), data.size);  return New refcounted<dbPair>(keyrec, valrec);}ptr<dbPair>dbEnumeration::nextElement(bool getdata){  return getElement (DB_NEXT, NULL, getdata);}ptr<dbPair>dbEnumeration::prevElement(bool getdata){  return getElement (DB_PREV, NULL, getdata);}ptr<dbPair>dbEnumeration::nextElement(const str &startkey, bool getdata){  return getElement(DB_SET_RANGE, startkey, getdata);}ptr<dbPair>dbEnumeration::lastElement(bool getdata){  return getElement(DB_LAST, NULL, getdata);}ptr<dbPair>dbEnumeration::firstElement(bool getdata){  return getElement(DB_FIRST, NULL, getdata);}////////////////////// dbfe //////////////////////////////dbfe::dbfe () : dbe (NULL), db (NULL){}dbfe::~dbfe() {  closedb ();}int dbfe::opendb (const char *filename, dbOptions opts) {   int r = -1;  bool do_dbenv = false;  long use_dbenv = opts.getOption(DBENV_OPT);  if (filename && use_dbenv) do_dbenv = true;  long mode = opts.getOption(PERM_OPT);  if (mode == -1) mode = 0664;  long flags = opts.getOption(FLAG_OPT);  if (flags == -1) flags = DB_CREATE;  long join = opts.getOption(JOIN_OPT);  long create = opts.getOption(CREATE_OPT);  if (create == 0) flags |= DB_EXCL;  long cachesize = opts.getOption(CACHE_OPT);  if (cachesize == -1) cachesize = 1024;  /* KB */#if (DB_VERSION_MAJOR < 4) || \    ((DB_VERSION_MAJOR == 4) && (DB_VERSION_MINOR < 3))  // BerkeleyDB 4.3 introduces support for level 2 isolation.  // Until then, we need support for dirtier stuff, if requested.  flags |= DB_DIRTY_READ;#endif   if (do_dbenv) {    r = dbfe_initialize_dbenv (&dbe, filename, join >= 0, cachesize);    if (r){      warn << "dbe->open returned " << r << " which is " << db_strerror(r) << "\n";      return r;    }    r = dbfe_opendb (dbe, &db, "db", flags, mode);  } else {    r = dbfe_opendb (dbe, &db, filename, flags, mode);  }  return r;}voiddbfe::checkpoint (){  if (dbe)#if (DB_VERSION_MAJOR < 4)     txn_checkpoint (dbe, 0, 0, 0);#else     dbe->txn_checkpoint (dbe, 0, 0, 0);#endif}int dbfe::insert (const str &key, const str &data) {   DB_TXN *t = NULL;  int r = 0, tr = 0;  DBT skey, content;  bzero(&skey, sizeof(skey));  bzero(&content, sizeof(content));  content.size = data.len ();  content.data = (void *) (data.cstr ());  skey.size = key.len ();  skey.data = (void *) (key.cstr ());  if(dbe) {#if DB_VERSION_MAJOR >= 4    r = dbe->txn_begin(dbe, NULL, &t, 0);#else    r = txn_begin(dbe, NULL, &t, 0);#endif    if (r) return r;  }  r = db->put(db, t, &skey, &content, 0);  if (r) {    warn << "insert (put): db error: " << db_strerror(r) << "\n";    return r;  }  if(t) {#if DB_VERSION_MAJOR >= 4    tr = t->commit(t, 0);#else    tr = txn_commit(t, 0);#endif    if (!r) r = tr;    t = NULL;  }  if (r) warn << "insert: db error: " << db_strerror(r) << "\n";    return r;} str dbfe::lookup (const str &key) {   DBT skey, content;  bzero(&skey, sizeof(skey));  skey.size = key.len ();  skey.data = (void *) (key.cstr ());  bzero(&content, sizeof(content));  int r=0;    if ((r = db->get(db, NULL, &skey, &content, 0)) != 0) return NULL;  str ret (static_cast<char *> (content.data), content.size);  // warnx << "return " << content.size << "\n";  return ret;} int dbfe::del (const str &key) {  DB_TXN *t = NULL;  int err, terr;  DBT dkey;  bzero(&dkey, sizeof(dkey));  dkey.size = key.len ();  dkey.data = (void *) (key.cstr ());  if(dbe) {#if DB_VERSION_MAJOR >= 4    err = dbe->txn_begin(dbe, NULL, &t, 0);#else    err = txn_begin(dbe, NULL, &t, 0);#endif    if (err) return err;  }  err = db->del (db, t, &dkey, 0);  if(t) {#if DB_VERSION_MAJOR >= 4    terr = t->commit(t, 0);#else    terr = txn_commit (t, 0);#endif    if (!err) err = terr;    t = NULL;  }  return err;}int dbfe::closedb () {   int r;  r = db->close(db, 0);  if(dbe) dbe->close(dbe, 0);  return r;}ptr<dbEnumeration> dbfe::enumerate (){  return New refcounted<dbEnumeration>(db, dbe);}voiddbfe::sync () {  if (dbe)    return;  db->sync (db, 0L);}intdbfe::traverse (traverse_act_t cb){  /* Adapted from BerkeleyDB get_bulk example code */  DBC *dbcp;  DBT key, data, onedata;  DB_TXN *txnid = NULL;  size_t retklen, retdlen;  void *retkey, *retdata;  int ret, t_ret;  void *p;  bzero (&key, sizeof (key));  bzero (&data, sizeof (data));  bzero (&onedata, sizeof (onedata));  /* Review the database in 5MB chunks. */#define BUFFER_LENGTH (5 * 1024 * 1024)  if ((data.data = malloc(BUFFER_LENGTH)) == NULL) {    return (errno);  }  data.ulen = BUFFER_LENGTH;  data.flags = DB_DBT_USERMEM;  ret = dbfe_txn_begin (dbe, &txnid);  if (ret) {    db->err (db, ret, "DBE->txn_begin");    free (data.data);    return (ret);  }  /* Acquire a cursor for the database. */  if ((ret = db->cursor(db, txnid, &dbcp, isolated_read_flag)) != 0) {    db->err (db, ret, "DB->cursor");    dbfe_txn_abort (dbe, txnid);    free (data.data);    return (ret);  }  for (;;) {    /*     * Acquire the next set of key/data pairs.     */    if ((ret = dbcp->c_get (dbcp,	    &key, &data, DB_MULTIPLE_KEY | DB_NEXT)) != 0)    {      if (ret == DB_BUFFER_SMALL) {	/* Single key too big! Allow BDB to allocate memory. */	ret = dbcp->c_get (dbcp, &key, &onedata, DB_NEXT);	if (ret != 0) {	  if (ret != DB_NOTFOUND)	    db->err (db, ret, "DBcursor->c_get one");	  break;	}	str rk ((const char *) key.data, key.size);	str rd ((const char *) onedata.data, onedata.size);	(*cb) (rk, rd);	continue;      } else      if (ret != DB_NOTFOUND)	db->err (db, ret, "DBcursor->c_get bulk");      break;    }    for (DB_MULTIPLE_INIT (p, &data);;) {      DB_MULTIPLE_KEY_NEXT (p, &data, retkey, retklen, retdata, retdlen);      if (p == NULL)	break;      str rk ((char *) retkey, retklen);      str rd ((char *) retdata, retdlen);      (*cb) (rk, rd);    }  }  if ((t_ret = dbcp->c_close (dbcp)) != 0) {    db->err(db, ret, "DBcursor->close");    if (ret == 0)      ret = t_ret;  }  free (data.data);  if (ret != 0)    dbfe_txn_abort (dbe, txnid);  else    dbfe_txn_commit (dbe, txnid);  return (ret);}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?