adbd.c

来自「基于DHT的对等协议」· C语言 代码 · 共 1,538 行 · 第 1/3 页

C
1,538
字号
    return r;  }  // Implicit transaction  r = cursor->c_get (cursor, &skey, &content, DB_SET_RANGE);  // Loop around the ring if at end.  if (r == DB_NOTFOUND) {    bzero (&skey, sizeof (skey));    r = cursor->c_get (cursor, &skey, &content, DB_FIRST);  }  if (r) {    if (r != DB_NOTFOUND)      warner ("dbns::fetch", "get error", r);    (void) cursor->c_close (cursor);    return r;  }  nextkey = dbt_to_id(skey);  (void) cursor->c_close (cursor);  return 0;}// }}}// {{{ dbns::delintdbns::del (const chordID &key, u_int32_t auxdata){  // NB! This does not remove any actual data.  int r = 0;  adb_metadata_t metadata;  r = get_metadata (key, metadata);  if (r)    return r;  DBT skey;  id_to_dbt (key, &skey);  DB_TXN *t = NULL;  r = dbfe_txn_begin (dbe, &t);  assert (r == 0);  // Update min expiration lazily; we can't know if this is the  // last object with a particular expiration time.  r = update_metadata (false, metadata.size, 0, t);  if (r) {    dbfe_txn_abort (dbe, t);    return r;  }  r = metadatadb->del (metadatadb, t, &skey, 0);  if (r) {    if (r != DB_NOTFOUND)      warner ("dbns::remove", "metadatadb->del", r);    dbfe_txn_abort (dbe, t);    return r;  }  // Only attempt to update Merkle tree if object was present.  const char *err = "";  if (hasaux ()) {    err = "mtree->remove aux";    r = mtree->remove (key, auxdata, t);  } else {    err = "mtree->remove";    r = mtree->remove (key, t);  }  int ret = 0;  if (r && r != DB_NOTFOUND) {    warner ("dbns::remove", err, r);    ret = dbfe_txn_abort (dbe, t);  } else {    // Ignore any NOTFOUND errors since the Merkle    // key may have been removed by expiration.    r = 0;    ret = dbfe_txn_commit (dbe, t);  }  if (ret)    warner ("dbns::remove", "abort/commit error", ret);  return r;}// }}}// {{{ dbns::getkeysintdbns::getkeys (const chordID &start, size_t count, bool getaux, rpc_vec<adb_keyaux_t, RPC_INFINITY> &out){  int r = 0;  DBC *cursor;  r = metadatadb->cursor (metadatadb, NULL, &cursor, 0);  if (r) {    warner ("dbns::getkeys", "cursor open", r);    (void) cursor->c_close (cursor);    return r;  }  // Could possibly improve efficiency here by using SleepyCat's bulk reads  DBT key;  id_to_dbt (start, &key);  DBT data_template;  bzero (&data_template, sizeof (data_template));  DBT data = data_template;  u_int32_t limit = count;  if (count < 0)    limit = (u_int32_t) (asrvbufsize/(1.5*sizeof (adb_keyaux_t)));  // since we set a limit, we know the maximum amount we have to allocate  out.setsize (limit);  u_int32_t elements = 0;  r = cursor->c_get (cursor, &key, &data, DB_SET_RANGE);  while (elements < limit && !r) {    if (key.size == master_metadata.size && 	!memcmp (key.data, master_metadata.data, key.size))      goto getkeys_nextkey;    out[elements].key = dbt_to_id (key);    if (getaux) {      adb_metadata_t md;      if (!buf2xdr (md, data.data, data.size)) {	warnx << name << ": Bad metadata for " << out[elements].key << "\n";	goto getkeys_nextkey;      }      out[elements].auxdata = md.auxdata;    }    elements++;getkeys_nextkey:    bzero (&key, sizeof (key));    data = data_template;    r = cursor->c_get (cursor, &key, &data, DB_NEXT);  }  if (elements < limit) {    out.setsize (elements);  }  if (r && r != DB_NOTFOUND)    warner ("dbns::getkeys", "cursor get", r);  (void) cursor->c_close (cursor);  return r;}// }}}// {{{ dbns::expire_walk// Grab limit keys from time start to end into victims/victim_metadata.// Caller is responsible for freeing the data allocated into// the victims DBTs.intdbns::expire_walk (u_int32_t limit, u_int32_t start, u_int32_t end,    vec<DBT> &victims, vec<adb_metadata_t> &victim_metadata){  // Open a cursor in secondary database.  // Make sure it points to the first thing after 0.  // Keep reading and deleting until the thing's key is > t.  //   Accumulate entries into a vec, including the object size.  u_int32_t begin_time_data = htonl (start);  DBT begin_time; bzero (&begin_time, sizeof (begin_time));  begin_time.data = &begin_time_data;  begin_time.size = sizeof (begin_time_data);  DBT key; bzero (&key, sizeof (key));  key.flags = DB_DBT_MALLOC;  DBT content; bzero (&content, sizeof (content));  DBC *cursor = NULL;  int r = byexpiredb->cursor (byexpiredb, NULL, &cursor, 0);  if (r) {    warner ("dbns::expire_walk", "byexpiredb->cursor", r);    if (cursor)      cursor->c_close (cursor);    return r;  }  r = cursor->c_pget (cursor, &begin_time, &key, &content, DB_SET_RANGE);  while (!r) {    if (key.size == master_metadata.size && 	!memcmp (key.data, master_metadata.data, key.size))    {      free (key.data);    } else if (limit > 0 && victims.size () >= limit) {      free (key.data);      break;    } else {      adb_metadata_t md;      buf2xdr (md, content.data, content.size);      if (md.expiration < end) {	victims.push_back (key);	victim_metadata.push_back (md);      } else {	free (key.data);	break;      }    }    bzero (&key, sizeof (key));    key.flags = DB_DBT_MALLOC;    r = cursor->c_pget (cursor, &begin_time, &key, &content, DB_NEXT);  }  if (r && r != DB_NOTFOUND)    warner ("dbns::expire_walk", "byexpiredb cursor->c_pget", r);  (void) cursor->c_close (cursor);  return r;}// }}}// {{{ dbns::mtree_cleanervoiddbns::mtree_cleaner (){  expire_mtree ();  // Align the next run time so that everyone in the system runs  // at approximately the start of the next interval, with some jitter.  // This should reduce the number of spurious sync repairs.  u_int32_t next_interval =    (tsnow.tv_sec / expire_mtree_interval) * expire_mtree_interval +      expire_mtree_interval + (tsnow.tv_nsec % 10);  mtree_tcb = delaycb (next_interval - tsnow.tv_sec,      wrap (this, &dbns::mtree_cleaner));}// }}}// {{{ dbns::expire_mtree// Clean out expired keys from the local Merkle treeintdbns::expire_mtree (){  vec<DBT> victims;  vec<adb_metadata_t> victim_metadata;  u_int32_t now = time (NULL);  // Round to the next lowest expire_mtree_interval.  // This may compensate for the jitter scheduled in by mtree_cleaner.  now = (now / expire_mtree_interval) * expire_mtree_interval;  // Get all objects from the last time we did this until present  int r = expire_walk (0, last_mtree_time, now, victims, victim_metadata);  DB_TXN *t = NULL;  int retry_count = 0;  // Iterate over objects to be expired:  while (victims.size ()) {    retry_count = 0;retry:    t = NULL;    dbe->txn_begin (dbe, NULL, &t, 0);    DBT key = victims.pop_back ();    adb_metadata_t md = victim_metadata.pop_back ();    chordID id = dbt_to_id (key);    if (hasaux ()) {      r = mtree->remove (id, md.auxdata, t);    } else {      r = mtree->remove (id, t);    }    switch (r) {      case 0:	warnx ("%d.%06d ", int (tsnow.tv_sec), int (tsnow.tv_nsec/1000))	  << name << ": Expired mtree " << id << "\n";	dbfe_txn_commit (dbe, t);	break;      case DB_NOTFOUND:	// Okay to continue	dbfe_txn_abort (dbe, t);	break;      case DB_LOCK_DEADLOCK:	// Must immediately abort.	dbfe_txn_abort (dbe, t);	warner ("dbns::expire_mtree", "mtree remove", r);	if (retry_count < 10) {	  retry_count++;	  victims.push_back (key);	  victim_metadata.push_back (md);	  goto retry;	} else {	  // Give up on the whole thing.	  warnx << name << ": too many retries for " << id	        << "; aborting.\n";	  free (key.data);	  goto abort_cleanup;	}	break;      default:	warner ("dbns::expire_mtree", "mtree remove", r);	dbfe_txn_abort (dbe, t);	goto abort_cleanup;	break;    }    free (key.data);  }  last_mtree_time = now;  return r;abort_cleanup:  while (victims.size ()) {    DBT key = victims.pop_back ();    free (key.data);  }  // Leave last_mtree_time as old, make sure we get everything  // that we should have done this round.  return r;}// }}}// {{{ dbns::expire (u_int32_t, u_int32_t)intdbns::expire (u_int32_t limit, u_int32_t deadline){  if (deadline == 0)    deadline = time (NULL);  vec<DBT> victims;  vec<adb_metadata_t> victim_metadata;  int r = expire_walk (limit, 1, deadline, victims, victim_metadata);  u_int64_t victim_size = 0;  u_int32_t last_expire = 0;  // XXX Would it be okay to modify the database (and its  //     sibling databases) while the cursor is open?  // start a transaction  DB_TXN *parent = NULL;  dbfe_txn_begin (dbe, &parent);  u_int32_t txnsize = 0;  // Iterate over objects to be expired:  while (victims.size ()) {    DB_TXN *t = NULL;    dbe->txn_begin (dbe, parent, &t, 0);    DBT key = victims.pop_back ();    adb_metadata_t md = victim_metadata.pop_back ();    chordID id = dbt_to_id (key);    warnx ("%d.%06d ", int (tsnow.tv_sec), int (tsnow.tv_nsec/1000))      << name << ": Expiring " << id << "\n";    const char *err = "";    do {      err = "mtree->remove";      if (hasaux ()) {	r = mtree->remove (id, md.auxdata, t);      } else {	r = mtree->remove (id, t);      }      // Ignore error on mtree removals      err = "metadatadb->del";      r = metadatadb->del (metadatadb, t, &key, 0);      if (r) break;    } while (0);    free (key.data);    if (r) {      warner ("dbns::expire", err, r);      dbfe_txn_abort (dbe, t);    } else {      victim_size += md.size;      if (md.expiration > last_expire)	last_expire = md.expiration;      dbfe_txn_commit (dbe, t);    }    txnsize++;    if (txnsize > 1000) {      txnsize = 0;      r = update_metadata (false, victim_size, last_expire, parent);      victim_size = 0;      dbfe_txn_commit (dbe, parent);      dbfe_txn_begin (dbe, &parent);    }  }  // Update the metadata with size/time difference  r = update_metadata (false, victim_size, last_expire, parent);  dbfe_txn_commit (dbe, parent);  // Metadata is gone, now remove data.  expire_objects (last_expire);  return r;}// }}}// {{{ dbns::time2fnstrdbns::time2fn (u_int32_t exptime){  // Each bin holds 256 seconds worth of writes.  // The Dell PowerEdge SC1425s with Maxtor 6Y160M0 SATA disks   // write at 6MB/s with write caching disabled, for about 1.5GB files.  // The name of the file is the expiration time rounded up to the nearest  // multiple of 0xFF so the filename's time has past, the file can be unlinked.  static char subpath[20];  sprintf (subpath, "%04x/%08x",      ((exptime & 0xFFFF0000) >> 16), ((exptime + 0xFF) & 0xFFFFFF00));  str path = datapath << "/" << subpath;   return path;}// }}}// {{{ dbns::write_objectvoidmkpath (const char *path){  int len = strlen (path);  char *buf = New char[len];  const char *slash = path;  while ((slash = strchr(slash+1, '/')) != NULL) {    len = slash - path;    memcpy(buf, path, len);    buf[len] = 0;    if (mkdir(buf, 0777)) {      if (errno == EEXIST) {	struct stat st;	if (!stat(buf, &st) && S_ISDIR(st.st_mode))	  continue;      }      fatal ("mkpath at %s: %m", buf);    }  }  delete[] buf;}intdbns::write_object (const chordID &key, DBT &data, u_int32_t exptime){  // Cache file descriptors for commonly used files.  str fn = time2fn (exptime);  mkpath (fn);  int fd = open (fn, O_CREAT|O_WRONLY|O_APPEND, 0666);  if (fd < 0)    return fd;  struct stat sb;  if (fstat (fd, &sb) < 0)    return -1;  if (write (fd, data.data, data.size) != (int) data.size) {    int saved_errno = errno;    close (fd);    errno = saved_errno;    return -1;  }  close (fd);  return sb.st_size;}// }}}// {{{ dbns::read_objectintdbns::read_object (const chordID &key, str &data, adb_metadata_t &metadata){  // Get the metadata necessary to do the read.  int r = get_metadata (key, metadata);  if (r) {    if (r != DB_NOTFOUND)      warner ("dbns::read_object", "get_metadata", r);    return -1;  }  str fn = time2fn (metadata.expiration);  int fd = open (fn, O_RDONLY);  if (fd < 0) {    if (errno != ENOENT)      warn ("open: %m\n");    return -1;  }  if (lseek (fd, metadata.offset, SEEK_SET) < 0) {    warn ("lseek: %m\n");    close (fd);    return -1;  }  mstr raw (metadata.size);  char *buf = raw.cstr ();  u_int32_t left = metadata.size;  while (left > 0) {    int nread = read (fd, buf, left);    if (nread < 0) {      warn ("read: %m\n");      break;    } else if (nread == 0) {      warn << "EOF reading " << key << " from " << fn << "\n";      break;    } else {      left -= nread;      buf  += nread;    }  }  close (fd);  if (left == 0) {    data = raw;    return 0;  }  return -1;}// }}}// {{{ dbns::expire_objectsintdbns::expire_objects (u_int32_t exptime){  u_int32_t hightime = exptime >> 16;  DIR *datadir = opendir (datapath);  if (!datadir) {    warn ("opendir: %s %m\n", datapath.cstr ());    return -1;  }  struct dirent *dp = NULL;  // This code relies on file names making sense according to time2fn.  // If it were a little more suspicious, it might stat the files.  while ((dp = readdir (datadir)) != NULL) {    if (strlen (dp->d_name) != 4)      continue;    char *ep = NULL;    u_int32_t t = strtoul (dp->d_name, &ep, 16);    if (!ep || *ep != '\0')      continue;    assert ((t & 0xFFFF0000) == 0);    if (t > 0 && t <= hightime) {      str subdirpath = datapath << "/" << dp->d_name;      DIR *subdir = opendir (subdirpath);      if (!subdir) {	warn ("opendir: %s: %m\n", subdirpath.cstr ());	continue;      }      struct dirent *sdp = NULL;      while ((sdp = readdir (subdir)) != NULL) {	if (strlen (sdp->d_name) != 8)	  continue;	char *ep = NULL;	u_int32_t rt = strtoul (sdp->d_name, &ep, 16);	if (!ep || *ep != '\0')	  continue;

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?