dhblock_chash_srv.c
来自「基于DHT的对等协议」· C语言 代码 · 共 305 行
C
305 行
#include <chord.h>#include <dhash.h>#include <dhash_common.h>#include <dhashcli.h>#include <location.h>#include <libadb.h>#include <dhblock_chash.h>#include <dhblock_chash_srv.h>#include <configurator.h>#include <locationtable.h>#include <ida.h>#include <modlogger.h>#define warning modlogger ("dhblock_chash", modlogger::WARNING)#define info modlogger ("dhblock_chash", modlogger::INFO)#define trace modlogger ("dhblock_chash", modlogger::TRACE)#ifdef DMALLOC#include <dmalloc.h>#endifstruct rjchash : public repair_job { rjchash (blockID key, ptr<location> s, ptr<location> w, ptr<dhblock_chash_srv> bsrv); ptr<location> src; const ptr<dhblock_chash_srv> bsrv; void execute (); // 1. Check cache db and send frag if present. // 2. Else retrieve the block // 3. Cache it and send frag. // Async callbacks void cache_check_cb (adb_status stat, adb_fetchdata_t obj); void retrieve_cb (dhash_stat err, ptr<dhash_block> b, route r); void send_frag (str block, u_int32_t expiration); void local_store_cb (dhash_stat stat); void send_frag_cb (dhash_stat err, bool present, u_int32_t sz);};struct rjchashsend : public repair_job { rjchashsend (blockID key, ptr<location> w, ptr<dhblock_chash_srv> bsrv); const ptr<dhblock_chash_srv> bsrv; void execute (); // Async callback void send_frag_cb (dhash_stat err, bool present, u_int32_t sz);};dhblock_chash_srv::dhblock_chash_srv (ptr<vnode> node, ptr<dhashcli> cli, str msock, str dbsock, str dbname, ptr<chord_trigger_t> t) : dhblock_srv (node, cli, DHASH_CONTENTHASH, msock, dbsock, dbname, false, t), last_repair (node->my_pred ()->id ()), maint_pending (false), cache_hits (0), cache_misses (0), cache_db (NULL){ cache_db = New refcounted<adb> (dbsock, "ccache", false, t); maint_initspace (dhblock_chash::num_efrags (), dhblock_chash::num_dfrags (), t);}dhblock_chash_srv::~dhblock_chash_srv (){}voiddhblock_chash_srv::stats (vec<dstat> &s){ str p = prefix (); base_stats (s); s.push_back (dstat (p << ".repair_cache_hits", cache_hits)); s.push_back (dstat (p << ".repair_cache_misses", cache_misses));}voiddhblock_chash_srv::store (chordID key, str d, u_int32_t expire, cb_dhstat cb){ const char *action; if (1) { // without maintaining our own merkle tree, we can't know action = "N"; // New db_store (key, d, 0, expire, cb); } else { action = "R"; cb (DHASH_OK); } bigint h = compute_hash (d.cstr (), d.len ()); info << "db write: " << node->my_ID () << " " << action << " " << key << " " << d.len () << " " << h << "\n";}voiddhblock_chash_srv::generate_repair_jobs (){ if (maint_pending) return; // Use last_repair to handle continuations // But be sure to restart after predecessor changes. if (!between (node->my_pred ()->id (), node->my_ID (), last_repair)) last_repair = node->my_pred ()->id (); maint_pending = true; u_int32_t frags = dhblock_chash::num_efrags (); maint_getrepairs (frags, REPAIR_QUEUE_MAX - repair_qlength (), incID (last_repair), wrap (this, &dhblock_chash_srv::maintqueue));}voiddhblock_chash_srv::maintqueue (const vec<maint_repair_t> &repairs){ maint_pending = false; for (size_t i = 0; i < repairs.size (); i++) { blockID key (repairs[i].id, DHASH_CONTENTHASH); ptr<location> f = NULL; if (repairs[i].src_ipv4_addr > 0) { // Only Passing Tone (or global maint) will provide a source. // We should assert num_dfrags == 1 here, probably. f = maintloc2location (repairs[i].src_ipv4_addr, repairs[i].src_port_vnnum); } ptr<location> w = maintloc2location ( repairs[i].dst_ipv4_addr, repairs[i].dst_port_vnnum); ptr<repair_job> job (NULL); if (repairs[i].responsible) { job = New refcounted<rjchash> (key, f, w, mkref (this)); last_repair = repairs[i].id; } else { // This is a pmaint repair job; just transfer our local object. job = New refcounted<rjchashsend> (key, w, mkref (this)); } repair_add (job); } // Reset when no new repairs are sent. if (!repairs.size ()) last_repair = node->my_pred ()->id ();}//// Repair Logic Implementation//rjchash::rjchash (blockID key, ptr<location> s, ptr<location> w, ptr<dhblock_chash_srv> bsrv) : repair_job (key, w), src (s), bsrv (bsrv){}voidrjchash::execute (){ bsrv->cache_db->fetch (key.ID, wrap (mkref (this), &rjchash::cache_check_cb));}voidrjchash::cache_check_cb (adb_status stat, adb_fetchdata_t obj){ if (stat == ADB_OK) { bsrv->cache_hits++; assert (key.ID == obj.id); send_frag (obj.data, obj.expiration); } else { bsrv->cache_misses++; ptr<chordID> id (NULL); int options = 0; if (src) { id = New refcounted<chordID> (src->id ()); options = DHASHCLIENT_GUESS_SUPPLIED|DHASHCLIENT_SKIP_LOOKUP; } bsrv->cli->retrieve (key, wrap (mkref (this), &rjchash::retrieve_cb), options, id); }}static void cache_store_cb (adb_status stat);voidrjchash::retrieve_cb (dhash_stat err, ptr<dhash_block> b, route r){ if (err) { trace << "retrieve missing block " << key << " failed: " << err << "\n"; // We'll probably try again later. // XXX maybe make sure we don't try too "often"? } else { assert (b); bsrv->repair_read_bytes += b->data.len (); bsrv->cache_db->store (key.ID, b->data, 0, b->expiration, wrap (cache_store_cb)); send_frag (b->data, b->expiration); }}static voidcache_store_cb (adb_status stat){ if (stat != ADB_OK) warn << "store error caching block: " << stat << "\n";}voidrjchash::send_frag (str block, u_int32_t expiration){ if (expiration < static_cast<u_int32_t> (timenow)) bsrv->expired_repairs++; u_long m = Ida::optimal_dfrag (block.len (), dhblock::dhash_mtu ()); if (m > dhblock_chash::num_dfrags ()) m = dhblock_chash::num_dfrags (); str frag = Ida::gen_frag (m, block); //if the block will be sent to us, just store it instead of sending // an RPC to ourselves. This will happen a lot, so the optmization // is probably worth it. if (where == bsrv->node->my_location ()) { bsrv->store (key.ID, frag, expiration, wrap (mkref (this), &rjchash::local_store_cb)); } else { bsrv->cli->sendblock (where, key, frag, expiration, wrap (mkref (this), &rjchash::send_frag_cb)); }}voidrjchash::local_store_cb (dhash_stat stat){ if (stat != DHASH_OK) { warning << "dhblock_chash_srv database store error: " << stat << "\n"; } else { info << "repair: " << bsrv->node->my_ID () << " sent " << key << " to " << where->id () << " (me).\n"; }}voidrjchash::send_frag_cb (dhash_stat err, bool present, u_int32_t sz){ strbuf x; x << "repair: " << bsrv->node->my_ID (); if (!err) { bsrv->repair_sent_bytes += sz; x << " sent "; } else { x << " error sending "; } x << key << " to " << where->id (); if (err) x << " (" << err << ")"; info << x << ".\n";}//// Repair Logic Implementation//rjchashsend::rjchashsend (blockID key, ptr<location> w, ptr<dhblock_chash_srv> bsrv) : repair_job (key, w), bsrv (bsrv){}voidrjchashsend::execute (){ bsrv->cli->sendblock (where, key, bsrv, wrap (mkref (this), &rjchashsend::send_frag_cb));}voidrjchashsend::send_frag_cb (dhash_stat err, bool present, u_int32_t sz){ strbuf x; x << "grepair: " << bsrv->node->my_ID (); if (!err) { // Remove this fragment/replica; it was transferred successfully. // bsrv->db->remove (key.ID, wrap (XXX)); bsrv->repair_sent_bytes += sz; x << " sent "; } else { x << " error sending "; } x << key << " to " << where->id (); if (err) x << " (" << err << ")"; info << x << ".\n";}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?