dhblock_replicated_srv.c
来自「基于DHT的对等协议」· C语言 代码 · 共 300 行
C
300 行
#include <chord.h>#include <dhash.h>#include <dhash_common.h>#include "dhashcli.h"#include <configurator.h>#include <location.h>#include "dhash_store.h"#include "dhblock_replicated.h"#include "dhblock_replicated_srv.h"#include <modlogger.h>#define warning modlogger ("dhblock_replicated", modlogger::WARNING)#define info modlogger ("dhblock_replicated", modlogger::INFO)#define trace modlogger ("dhblock_replicated", modlogger::TRACE)#ifdef DMALLOC#include <dmalloc.h>#endif// key to store data under in the DB. Low 32-bits are zerochordIDdhblock_replicated_srv::id_to_dbkey (chordID key){ //get a key with the low bits cleard out key = key >> 32; key = key << 32; assert (key > 0); //assert that there were some high bits left return key;}chordIDdhblock_replicated_srv::idaux_to_mkey (chordID key, u_int32_t hash){ chordID mkey = id_to_dbkey (key) | hash; return mkey;}dhblock_replicated_srv::dhblock_replicated_srv (ptr<vnode> node, ptr<dhashcli> cli, dhash_ctype ctype, str msock, str dbsock, str dbname, ptr<chord_trigger_t> t) : dhblock_srv (node, cli, ctype, msock, dbsock, dbname, true, t), last_repair (node->my_pred ()->id ()), maint_pending (false), stale_repairs (0){ maint_initspace (dhblock_replicated::num_replica (), dhblock_replicated::num_replica (), t);}voiddhblock_replicated_srv::stats (vec<dstat> &s){ str p = prefix (); base_stats (s); s.push_back (dstat (p << ".stale_repairs", stale_repairs));}voiddhblock_replicated_srv::fetch (chordID k, cb_fetch cb){ chordID db_key = id_to_dbkey (k); return db->fetch (db_key, cb);}voiddhblock_replicated_srv::store (chordID key, str new_data, u_int32_t expiration, cb_dhstat cb){ chordID dbKey = id_to_dbkey (key); // Serialize writers to this key in the order that they arrive here. // Let subclasses deal with the actual logic of updating the databases. if( _paused_stores[dbKey] != NULL ) { vec<cbv> *waiters = *(_paused_stores[dbKey]); waiters->push_back (wrap (db, &adb::fetch, dbKey, false, wrap (this, &dhblock_replicated_srv::store_after_fetch_cb, new_data, expiration, cb))); return; } else { // We must clear this out before calling cb on all possible paths vec<cbv> *waiters = New vec<cbv> (); _paused_stores.insert (dbKey, waiters); db->fetch (dbKey, wrap (this, &dhblock_replicated_srv::store_after_fetch_cb, new_data, expiration, cb)); }}voiddhblock_replicated_srv::store_after_fetch_cb (str new_data, u_int32_t expiration, cb_dhstat cb, adb_status err, adb_fetchdata_t obj){ if (err == ADB_ERR) { finish_store (obj.id); cb (DHASH_DBERR); return; } if (err == ADB_NOTFOUND) obj.data = ""; // Hand off the real work to my subclass. real_store (obj.id, obj.data, new_data, expiration, wrap (this, &dhblock_replicated_srv::store_after_rstore_cb, obj.id, cb));}voiddhblock_replicated_srv::store_after_rstore_cb (chordID dbkey, cb_dhstat cb, dhash_stat stat){ finish_store (dbkey); cb (stat);}voiddhblock_replicated_srv::finish_store (chordID key){ assert (_paused_stores[key] != NULL); vec<cbv> *waiters = *(_paused_stores[key]); if (waiters->size () > 0) { cbv cb = waiters->pop_front (); (*cb) (); } else { _paused_stores.remove (key); delete waiters; }}voiddhblock_replicated_srv::generate_repair_jobs (){ if (maint_pending) return; // Use last_repair to handle continuations // But be sure to restart after predecessor changes. if (!between (node->my_pred ()->id (), node->my_ID (), last_repair)) last_repair = node->my_pred ()->id (); maint_pending = true; chordID rngmin = id_to_dbkey (last_repair) | 0xFFFFFFFF; // Get anything that isn't replicated efrags times (if Carbonite). // Expect that we'll be told who to fetch from. u_int32_t reps = dhblock_replicated::num_replica (); maint_getrepairs (reps, REPAIR_QUEUE_MAX - repair_qlength (), rngmin, wrap (this, &dhblock_replicated_srv::maintqueue));}voiddhblock_replicated_srv::maintqueue (const vec<maint_repair_t> &repairs){ maint_pending = false; for (size_t i = 0; i < repairs.size (); i++) { ptr<location> f = maintloc2location ( repairs[i].src_ipv4_addr, repairs[i].src_port_vnnum); ptr<location> w = maintloc2location ( repairs[i].dst_ipv4_addr, repairs[i].dst_port_vnnum); blockID key (repairs[i].id, ctype); ptr<repair_job> job (NULL); if (repairs[i].responsible) { job = New refcounted<rjrep> (key, f, w, mkref (this)); last_repair = repairs[i].id; } else { // Not responsible for this object; // therefore, no need to reverse it, even if stale. // XXX: Normally should pretend we are already a reversed job. // However, for now, don't do anything special and // cause our own copy to get updated since we don't // delete our stale copy after insertion. // job = New refcounted<rjrep> (key, f, w, mkref (this), true); job = New refcounted<rjrep> (key, f, w, mkref (this)); } repair_add (job); } // Reset when no new repairs are sent. if (!repairs.size ()) last_repair = node->my_pred ()->id ();}//// Repair Logic Implementation////rjrep::rjrep (blockID key, ptr<location> s, ptr<location> w, ptr<dhblock_replicated_srv> bsrv, bool rev) : repair_job (key, w), src (s), bsrv (bsrv), reversed (rev){ if (src) desc = strbuf () << key << " " << src->id () << "->" << where->id ();}voidrjrep::execute (){ // Update the local copy of this object first (may be unnecessary). if (src) { if (src == bsrv->node->my_location ()) { // We have the object; go straight to sendblock. delaycb (0, wrap (mkref (this), &rjrep::storecb, DHASH_OK)); } else { ptr<chordID> id (NULL); id = New refcounted<chordID> (src->id ()); bsrv->cli->retrieve (key, wrap (mkref (this), &rjrep::repair_retrieve_cb), DHASHCLIENT_GUESS_SUPPLIED|DHASHCLIENT_SKIP_LOOKUP, id); } } else { bsrv->cli->retrieve (key, wrap (mkref (this), &rjrep::repair_retrieve_cb)); }}voidrjrep::repair_retrieve_cb (dhash_stat err, ptr<dhash_block> b, route r){ if (err) { info << "rjrep (" << key<< "): retrieve during repair failed: " << err << "\n"; return; } if (b->expiration < static_cast<u_int32_t> (timenow)) bsrv->expired_repairs++; bsrv->repair_read_bytes += b->data.len (); bsrv->store (key.ID, b->data, b->expiration, wrap (mkref (this), &rjrep::storecb));}voidrjrep::storecb (dhash_stat err){ if (err) { if (src && !reversed && err == DHASH_STALE) { // Wait, it appears that I have newer data locally! // Better inform the original guy, in addition to what // else I was planning to do anyway. trace << bsrv->node->my_ID () << ": repair fetch from " << src->id () << " was stale.\n"; ptr<repair_job> job = New refcounted<rjrep> (key, bsrv->node->my_location (), src, bsrv, true); bsrv->repair_add (job); } else { info << "rjrep (" << key << ") storecb err: " << err << "\n"; } } if (where == bsrv->node->my_location ()) { return; } else { trace << bsrv->node->my_ID () << ": repair sending " << key << " to " << where->id () << "\n"; // Send a copy of the updated local copy to its destination bsrv->cli->sendblock (where, key, bsrv, wrap (mkref (this), &rjrep::repair_send_cb)); }}voidrjrep::repair_send_cb (dhash_stat err, bool something, u_int32_t sz){ if (!err) bsrv->repair_sent_bytes += sz; if (err == DHASH_STALE) { bsrv->repair_sent_bytes += sz; bsrv->stale_repairs++; } if (err && err != DHASH_STALE) { info << "rjrep (" << key << ") error sending block: " << err << "\n"; } else if (!reversed && err == DHASH_STALE) { // Whoops, newer stuff on the remote. Better update // myself and the guy I got it from. trace << bsrv->node->my_ID () << ": repair store to " << where->id () << " was stale.\n"; ptr<repair_job> job = NULL; if (!src) job = New refcounted<rjrep> (key, where, bsrv->node->my_location (), bsrv, true); else job = New refcounted<rjrep> (key, where, src, bsrv, true); bsrv->repair_add (job); }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?