dhblock_srv.c
来自「基于DHT的对等协议」· C语言 代码 · 共 319 行
C
319 行
#include <chord.h>#include "dhash.h"#include "dhash_common.h"#include "dhashcli.h"#include <locationtable.h>#include <location.h>#include <libadb.h>#include <dhblock_srv.h>#include <configurator.h>#include <modlogger.h>#define info modlogger ("dbhlock_srv", modlogger::INFO)#define trace modlogger ("dhblock_srv", modlogger::TRACE)dhblock_srv::dhblock_srv (ptr<vnode> node, ptr<dhashcli> cli, dhash_ctype c, str msock, str dbsock, str dbname, bool hasaux, ptr<chord_trigger_t> t) : repair_tcb (NULL), ctype (c), db (New refcounted<adb> (dbsock, dbname, hasaux, t)), maint (get_maint_aclnt (msock)), node (node), cli (cli), repair_read_bytes (0), repair_sent_bytes (0), repairs_completed (0), expired_repairs (0){ warn << "opened " << dbsock << " with space " << dbname << (hasaux ? " (hasaux)\n" : "\n");}dhblock_srv::~dhblock_srv (){ stop ();}strdhblock_srv::prefix () const{ const char *p = "unknown"; switch (ctype) { case DHASH_CONTENTHASH: p = "chash"; break; case DHASH_NOAUTH: p = "noauth"; break; case DHASH_KEYHASH: p = "keyhash"; break; case DHASH_APPEND: p = "append"; break; } return p;} ptr<aclnt>dhblock_srv::get_maint_aclnt (str msock){ int fd = unixsocket_connect (msock); if (fd < 0) fatal ("get_maint_aclnt: Error connecting to %s: %m\n", msock.cstr ()); make_async (fd); ptr<aclnt> c = aclnt::alloc (axprt_unix::alloc (fd, 1024*1025), maint_program_1); return c;}voiddhblock_srv::maint_initspace (int efrags, int dfrags, ptr<chord_trigger_t> t){ maint_dhashinfo_t dhi; node->my_location ()->fill_node (dhi.host); dhi.ctype = ctype; dhi.dbsock = db->dbsock (); dhi.dbname = db->name (); dhi.hasaux = db->hasaux (); dhi.efrags = efrags; dhi.dfrags = dfrags; maint_status *res = New maint_status (MAINTPROC_OK); maint->call (MAINTPROC_INITSPACE, &dhi, res, wrap (this, &dhblock_srv::maintinitcb, t, res));}voiddhblock_srv::maintinitcb (ptr<chord_trigger_t> t, maint_status *res, clnt_stat err){ if (err || *res) warn << "Maintenance initialization failed for " << db->name () << ": " << err << "/" << *res << "\n"; delete res;}voiddhblock_srv::maint_getrepairs (int thresh, int count, chordID start, cb_maintrepairs_t cbr){ maint_getrepairsarg arg; bzero (&arg, sizeof (arg)); node->my_location ()->fill_node (arg.host); arg.ctype = ctype; arg.thresh = thresh; arg.count = count; arg.start = start; maint_getrepairsres *res = New maint_getrepairsres (); maint->call (MAINTPROC_GETREPAIRS, &arg, res, wrap (this, &dhblock_srv::maintgetrepairscb, res, cbr));}voiddhblock_srv::maintgetrepairscb (maint_getrepairsres *res, cb_maintrepairs_t cbr, clnt_stat err){ vec<maint_repair_t> repairs; if (err || res->status) { warn << "Maintenance getrepairs failed for " << db->name () << ": " << err << "/" << res->status << "\n"; } else { for (size_t i = 0; i < res->repairs.size (); i++) repairs.push_back (res->repairs[i]); } cbr (repairs); delete res;}ptr<location>dhblock_srv::maintloc2location (u_int32_t a, u_int32_t b){ chord_node_wire x; bzero (&x, sizeof (x)); x.machine_order_ipv4_addr = a; x.machine_order_port_vnnum = b; return node->locations->lookup_or_create (make_chord_node (x));}// These 2 functions exist just so that we can have a generic adbcb.// Unfortunately, we can't wrap superclass functions in subclasses.void dhblock_srv::db_store (chordID k, str d, cb_dhstat cb){ db->store (k, d, wrap (this, &dhblock_srv::adbcb, cb));}void dhblock_srv::db_store (chordID k, str d, u_int32_t aux, u_int32_t expire, cb_dhstat cb){ db->store (k, d, aux, expire, wrap (this, &dhblock_srv::adbcb, cb));}voiddhblock_srv::adbcb (cb_dhstat cb, adb_status astat){ switch (astat) { case ADB_OK: cb (DHASH_OK); break; case ADB_ERR: cb (DHASH_DBERR); break; case ADB_DISKFULL: cb (DHASH_DISKFULL); break; default: // Don't expect adb::store and adb::remove to have other return codes. fatal << "Unexpected adb_status in dhblock_srv::adbcb: " << astat << "\n"; }}voiddhblock_srv::start (bool randomize){ int delay = 0; if (!repair_tcb) { if (randomize) delay = random_getword () % dhash::reptm (); repair_tcb = delaycb (dhash::reptm () + delay, wrap (this, &dhblock_srv::repair_timer)); }}voiddhblock_srv::stop (){ if (repair_tcb) { // Outstanding repairs will continue timecb_remove (repair_tcb); repair_tcb = NULL; }}voiddhblock_srv::fetch (chordID k, cb_fetch cb){ db->fetch (k, cb);}voiddhblock_srv::base_stats (vec<dstat> &s){ str p = prefix (); s.push_back (dstat (p << ".repair_read_bytes", repair_read_bytes)); s.push_back (dstat (p << ".repair_sent_bytes", repair_sent_bytes)); s.push_back (dstat (p << ".repairs_completed", repairs_completed)); s.push_back (dstat (p << ".expired_repairs", expired_repairs)); s.push_back (dstat (p << ".repairs_queued", repairs_queued.size ())); s.push_back (dstat (p << ".repairs_inprogress", repairs_inprogress.size ()));}voiddhblock_srv::stats (vec<dstat> &s){ base_stats (s);}//// Repair management// //repair_job::repair_job (blockID key, ptr<location> w, u_int32_t to) : key (key), where (w), desc (strbuf () << key << " ->" << where->id ()), donecb (NULL), timeout (to){}voidrepair_job::setdonecb (cbv cb, u_int32_t to){ donecb = cb; timeout = to;}voidrepair_job::start (){ // if (to) // delaycb (to, 0, XXXgiveup); execute ();}booldhblock_srv::repair_add (ptr<repair_job> job){ if (!job) return false; if (repairs_queued[job->desc] || repairs_inprogress[job->desc]) return false; info << node->my_ID () << ": repair_add for " << job->desc << "\n"; repair_q.push_back (job); repairs_queued.insert (job->desc); repair_flush_q (); return true;}u_int32_tdhblock_srv::repair_qlength (){ return repairs_queued.size ();}voiddhblock_srv::repair_timer (){ if (repairs_queued.size () < REPAIR_QUEUE_MAX) generate_repair_jobs (); // Generation will finish asynchronously // REPAIR_QUEUE_MAX is a soft cap to try and // keep us from generating too many repairs; subclasses // are free to call repair_add to enqueue jobs whenever // they like. repair_flush_q (); repair_tcb = delaycb (dhash::reptm (), wrap (this, &dhblock_srv::repair_timer));}voiddhblock_srv::repair_done (str desc){ repairs_inprogress.remove (desc); repairs_completed++; info << "completed repair of " << desc << "; " << repairs_inprogress.size () << " in progress, " << repairs_queued.size () << " in queue.\n"; if (repairs_queued.size () < REPAIR_QUEUE_MAX) generate_repair_jobs (); repair_flush_q ();}voiddhblock_srv::repair_flush_q (){ while ((repairs_inprogress.size () < REPAIR_OUTSTANDING_MAX) && (repair_q.size () > 0)) { ptr<repair_job> job = repair_q.pop_front (); repairs_queued.remove (job->desc); assert (!repairs_inprogress[job->desc]); repairs_inprogress.insert (job->desc); job->setdonecb (wrap (this, &dhblock_srv::repair_done, job->desc)); job->start (); trace << "dhblock_srv::repair_flush_q: started repair of " << job->desc << "\n"; } assert (repair_q.size () == repairs_queued.size ());}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?