adbd.c
来自「基于DHT的对等协议」· C语言 代码 · 共 1,538 行 · 第 1/3 页
C
1,538 行
return r; } // Implicit transaction r = cursor->c_get (cursor, &skey, &content, DB_SET_RANGE); // Loop around the ring if at end. if (r == DB_NOTFOUND) { bzero (&skey, sizeof (skey)); r = cursor->c_get (cursor, &skey, &content, DB_FIRST); } if (r) { if (r != DB_NOTFOUND) warner ("dbns::fetch", "get error", r); (void) cursor->c_close (cursor); return r; } nextkey = dbt_to_id(skey); (void) cursor->c_close (cursor); return 0;}// }}}// {{{ dbns::delintdbns::del (const chordID &key, u_int32_t auxdata){ // NB! This does not remove any actual data. int r = 0; adb_metadata_t metadata; r = get_metadata (key, metadata); if (r) return r; DBT skey; id_to_dbt (key, &skey); DB_TXN *t = NULL; r = dbfe_txn_begin (dbe, &t); assert (r == 0); // Update min expiration lazily; we can't know if this is the // last object with a particular expiration time. r = update_metadata (false, metadata.size, 0, t); if (r) { dbfe_txn_abort (dbe, t); return r; } r = metadatadb->del (metadatadb, t, &skey, 0); if (r) { if (r != DB_NOTFOUND) warner ("dbns::remove", "metadatadb->del", r); dbfe_txn_abort (dbe, t); return r; } // Only attempt to update Merkle tree if object was present. const char *err = ""; if (hasaux ()) { err = "mtree->remove aux"; r = mtree->remove (key, auxdata, t); } else { err = "mtree->remove"; r = mtree->remove (key, t); } int ret = 0; if (r && r != DB_NOTFOUND) { warner ("dbns::remove", err, r); ret = dbfe_txn_abort (dbe, t); } else { // Ignore any NOTFOUND errors since the Merkle // key may have been removed by expiration. r = 0; ret = dbfe_txn_commit (dbe, t); } if (ret) warner ("dbns::remove", "abort/commit error", ret); return r;}// }}}// {{{ dbns::getkeysintdbns::getkeys (const chordID &start, size_t count, bool getaux, rpc_vec<adb_keyaux_t, RPC_INFINITY> &out){ int r = 0; DBC *cursor; r = metadatadb->cursor (metadatadb, NULL, &cursor, 0); if (r) { warner ("dbns::getkeys", "cursor open", r); (void) cursor->c_close (cursor); return r; } // Could possibly improve efficiency here by using SleepyCat's bulk reads DBT key; id_to_dbt (start, &key); DBT data_template; bzero (&data_template, sizeof (data_template)); DBT data = data_template; u_int32_t limit = count; if (count < 0) limit = (u_int32_t) (asrvbufsize/(1.5*sizeof (adb_keyaux_t))); // since we set a limit, we know the maximum amount we have to allocate out.setsize (limit); u_int32_t elements = 0; r = cursor->c_get (cursor, &key, &data, DB_SET_RANGE); while (elements < limit && !r) { if (key.size == master_metadata.size && !memcmp (key.data, master_metadata.data, key.size)) goto getkeys_nextkey; out[elements].key = dbt_to_id (key); if (getaux) { adb_metadata_t md; if (!buf2xdr (md, data.data, data.size)) { warnx << name << ": Bad metadata for " << out[elements].key << "\n"; goto getkeys_nextkey; } out[elements].auxdata = md.auxdata; } elements++;getkeys_nextkey: bzero (&key, sizeof (key)); data = data_template; r = cursor->c_get (cursor, &key, &data, DB_NEXT); } if (elements < limit) { out.setsize (elements); } if (r && r != DB_NOTFOUND) warner ("dbns::getkeys", "cursor get", r); (void) cursor->c_close (cursor); return r;}// }}}// {{{ dbns::expire_walk// Grab limit keys from time start to end into victims/victim_metadata.// Caller is responsible for freeing the data allocated into// the victims DBTs.intdbns::expire_walk (u_int32_t limit, u_int32_t start, u_int32_t end, vec<DBT> &victims, vec<adb_metadata_t> &victim_metadata){ // Open a cursor in secondary database. // Make sure it points to the first thing after 0. // Keep reading and deleting until the thing's key is > t. // Accumulate entries into a vec, including the object size. u_int32_t begin_time_data = htonl (start); DBT begin_time; bzero (&begin_time, sizeof (begin_time)); begin_time.data = &begin_time_data; begin_time.size = sizeof (begin_time_data); DBT key; bzero (&key, sizeof (key)); key.flags = DB_DBT_MALLOC; DBT content; bzero (&content, sizeof (content)); DBC *cursor = NULL; int r = byexpiredb->cursor (byexpiredb, NULL, &cursor, 0); if (r) { warner ("dbns::expire_walk", "byexpiredb->cursor", r); if (cursor) cursor->c_close (cursor); return r; } r = cursor->c_pget (cursor, &begin_time, &key, &content, DB_SET_RANGE); while (!r) { if (key.size == master_metadata.size && !memcmp (key.data, master_metadata.data, key.size)) { free (key.data); } else if (limit > 0 && victims.size () >= limit) { free (key.data); break; } else { adb_metadata_t md; buf2xdr (md, content.data, content.size); if (md.expiration < end) { victims.push_back (key); victim_metadata.push_back (md); } else { free (key.data); break; } } bzero (&key, sizeof (key)); key.flags = DB_DBT_MALLOC; r = cursor->c_pget (cursor, &begin_time, &key, &content, DB_NEXT); } if (r && r != DB_NOTFOUND) warner ("dbns::expire_walk", "byexpiredb cursor->c_pget", r); (void) cursor->c_close (cursor); return r;}// }}}// {{{ dbns::mtree_cleanervoiddbns::mtree_cleaner (){ expire_mtree (); // Align the next run time so that everyone in the system runs // at approximately the start of the next interval, with some jitter. // This should reduce the number of spurious sync repairs. u_int32_t next_interval = (tsnow.tv_sec / expire_mtree_interval) * expire_mtree_interval + expire_mtree_interval + (tsnow.tv_nsec % 10); mtree_tcb = delaycb (next_interval - tsnow.tv_sec, wrap (this, &dbns::mtree_cleaner));}// }}}// {{{ dbns::expire_mtree// Clean out expired keys from the local Merkle treeintdbns::expire_mtree (){ vec<DBT> victims; vec<adb_metadata_t> victim_metadata; u_int32_t now = time (NULL); // Round to the next lowest expire_mtree_interval. // This may compensate for the jitter scheduled in by mtree_cleaner. now = (now / expire_mtree_interval) * expire_mtree_interval; // Get all objects from the last time we did this until present int r = expire_walk (0, last_mtree_time, now, victims, victim_metadata); DB_TXN *t = NULL; int retry_count = 0; // Iterate over objects to be expired: while (victims.size ()) { retry_count = 0;retry: t = NULL; dbe->txn_begin (dbe, NULL, &t, 0); DBT key = victims.pop_back (); adb_metadata_t md = victim_metadata.pop_back (); chordID id = dbt_to_id (key); if (hasaux ()) { r = mtree->remove (id, md.auxdata, t); } else { r = mtree->remove (id, t); } switch (r) { case 0: warnx ("%d.%06d ", int (tsnow.tv_sec), int (tsnow.tv_nsec/1000)) << name << ": Expired mtree " << id << "\n"; dbfe_txn_commit (dbe, t); break; case DB_NOTFOUND: // Okay to continue dbfe_txn_abort (dbe, t); break; case DB_LOCK_DEADLOCK: // Must immediately abort. dbfe_txn_abort (dbe, t); warner ("dbns::expire_mtree", "mtree remove", r); if (retry_count < 10) { retry_count++; victims.push_back (key); victim_metadata.push_back (md); goto retry; } else { // Give up on the whole thing. warnx << name << ": too many retries for " << id << "; aborting.\n"; free (key.data); goto abort_cleanup; } break; default: warner ("dbns::expire_mtree", "mtree remove", r); dbfe_txn_abort (dbe, t); goto abort_cleanup; break; } free (key.data); } last_mtree_time = now; return r;abort_cleanup: while (victims.size ()) { DBT key = victims.pop_back (); free (key.data); } // Leave last_mtree_time as old, make sure we get everything // that we should have done this round. return r;}// }}}// {{{ dbns::expire (u_int32_t, u_int32_t)intdbns::expire (u_int32_t limit, u_int32_t deadline){ if (deadline == 0) deadline = time (NULL); vec<DBT> victims; vec<adb_metadata_t> victim_metadata; int r = expire_walk (limit, 1, deadline, victims, victim_metadata); u_int64_t victim_size = 0; u_int32_t last_expire = 0; // XXX Would it be okay to modify the database (and its // sibling databases) while the cursor is open? // start a transaction DB_TXN *parent = NULL; dbfe_txn_begin (dbe, &parent); u_int32_t txnsize = 0; // Iterate over objects to be expired: while (victims.size ()) { DB_TXN *t = NULL; dbe->txn_begin (dbe, parent, &t, 0); DBT key = victims.pop_back (); adb_metadata_t md = victim_metadata.pop_back (); chordID id = dbt_to_id (key); warnx ("%d.%06d ", int (tsnow.tv_sec), int (tsnow.tv_nsec/1000)) << name << ": Expiring " << id << "\n"; const char *err = ""; do { err = "mtree->remove"; if (hasaux ()) { r = mtree->remove (id, md.auxdata, t); } else { r = mtree->remove (id, t); } // Ignore error on mtree removals err = "metadatadb->del"; r = metadatadb->del (metadatadb, t, &key, 0); if (r) break; } while (0); free (key.data); if (r) { warner ("dbns::expire", err, r); dbfe_txn_abort (dbe, t); } else { victim_size += md.size; if (md.expiration > last_expire) last_expire = md.expiration; dbfe_txn_commit (dbe, t); } txnsize++; if (txnsize > 1000) { txnsize = 0; r = update_metadata (false, victim_size, last_expire, parent); victim_size = 0; dbfe_txn_commit (dbe, parent); dbfe_txn_begin (dbe, &parent); } } // Update the metadata with size/time difference r = update_metadata (false, victim_size, last_expire, parent); dbfe_txn_commit (dbe, parent); // Metadata is gone, now remove data. expire_objects (last_expire); return r;}// }}}// {{{ dbns::time2fnstrdbns::time2fn (u_int32_t exptime){ // Each bin holds 256 seconds worth of writes. // The Dell PowerEdge SC1425s with Maxtor 6Y160M0 SATA disks // write at 6MB/s with write caching disabled, for about 1.5GB files. // The name of the file is the expiration time rounded up to the nearest // multiple of 0xFF so the filename's time has past, the file can be unlinked. static char subpath[20]; sprintf (subpath, "%04x/%08x", ((exptime & 0xFFFF0000) >> 16), ((exptime + 0xFF) & 0xFFFFFF00)); str path = datapath << "/" << subpath; return path;}// }}}// {{{ dbns::write_objectvoidmkpath (const char *path){ int len = strlen (path); char *buf = New char[len]; const char *slash = path; while ((slash = strchr(slash+1, '/')) != NULL) { len = slash - path; memcpy(buf, path, len); buf[len] = 0; if (mkdir(buf, 0777)) { if (errno == EEXIST) { struct stat st; if (!stat(buf, &st) && S_ISDIR(st.st_mode)) continue; } fatal ("mkpath at %s: %m", buf); } } delete[] buf;}intdbns::write_object (const chordID &key, DBT &data, u_int32_t exptime){ // Cache file descriptors for commonly used files. str fn = time2fn (exptime); mkpath (fn); int fd = open (fn, O_CREAT|O_WRONLY|O_APPEND, 0666); if (fd < 0) return fd; struct stat sb; if (fstat (fd, &sb) < 0) return -1; if (write (fd, data.data, data.size) != (int) data.size) { int saved_errno = errno; close (fd); errno = saved_errno; return -1; } close (fd); return sb.st_size;}// }}}// {{{ dbns::read_objectintdbns::read_object (const chordID &key, str &data, adb_metadata_t &metadata){ // Get the metadata necessary to do the read. int r = get_metadata (key, metadata); if (r) { if (r != DB_NOTFOUND) warner ("dbns::read_object", "get_metadata", r); return -1; } str fn = time2fn (metadata.expiration); int fd = open (fn, O_RDONLY); if (fd < 0) { if (errno != ENOENT) warn ("open: %m\n"); return -1; } if (lseek (fd, metadata.offset, SEEK_SET) < 0) { warn ("lseek: %m\n"); close (fd); return -1; } mstr raw (metadata.size); char *buf = raw.cstr (); u_int32_t left = metadata.size; while (left > 0) { int nread = read (fd, buf, left); if (nread < 0) { warn ("read: %m\n"); break; } else if (nread == 0) { warn << "EOF reading " << key << " from " << fn << "\n"; break; } else { left -= nread; buf += nread; } } close (fd); if (left == 0) { data = raw; return 0; } return -1;}// }}}// {{{ dbns::expire_objectsintdbns::expire_objects (u_int32_t exptime){ u_int32_t hightime = exptime >> 16; DIR *datadir = opendir (datapath); if (!datadir) { warn ("opendir: %s %m\n", datapath.cstr ()); return -1; } struct dirent *dp = NULL; // This code relies on file names making sense according to time2fn. // If it were a little more suspicious, it might stat the files. while ((dp = readdir (datadir)) != NULL) { if (strlen (dp->d_name) != 4) continue; char *ep = NULL; u_int32_t t = strtoul (dp->d_name, &ep, 16); if (!ep || *ep != '\0') continue; assert ((t & 0xFFFF0000) == 0); if (t > 0 && t <= hightime) { str subdirpath = datapath << "/" << dp->d_name; DIR *subdir = opendir (subdirpath); if (!subdir) { warn ("opendir: %s: %m\n", subdirpath.cstr ()); continue; } struct dirent *sdp = NULL; while ((sdp = readdir (subdir)) != NULL) { if (strlen (sdp->d_name) != 8) continue; char *ep = NULL; u_int32_t rt = strtoul (sdp->d_name, &ep, 16); if (!ep || *ep != '\0') continue;
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?