maint_policy.t
来自「基于DHT的对等协议」· T 代码 · 共 824 行 · 第 1/2 页
T
824 行
// -*-c++-*-// vim: filetype=cpp foldmethod=marker#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <db.h>#include <arpc.h>#include <chord_prot.h>#include <comm.h>#include <merkle.h>#include <merkle_tree_bdb.h>#include <id_utils.h>#include <location.h>#include <libadb.h>#include <rpclib.h>#include <maint_prot.h>#include "maint_policy.h"// {{{ Bandwidth tracker// See comm.[Ch]static inline rpcstats *getstats (int progno, int procno){ str key = strbuf ("%d:%d", progno, procno); rpcstats *stats = rpc_stats_tab[key]; if (!stats) { stats = New rpcstats (key); rpc_stats_tab.insert (stats); } return stats;}static voidtrack_aclnt (aclnt_acct_t a){ rpcstats *stats = getstats (a.progno, a.procno); switch (a.dir) { case ACCT_SEND: stats->ncall++; stats->call_bytes += a.len; break; case ACCT_RECV: stats->nreply++; stats->reply_bytes += a.len; break; default: fatal ("Unknown RPC direction: %d", a.dir); break; }}// }}}// {{{ locationcc - TCP connection managementvec<locationcc *> locationcc::allocated;locationcc::locationcc (const chord_node &n) : n (n), tcpc (NULL), x (NULL), reapcaller (NULL){ allocated.push_back (this);}ptr<locationcc>locationcc::alloc (const chord_node &n){ for (size_t i = 0; i < allocated.size (); i++) { if (allocated[i]->n.x == n.x) return mkref (allocated[i]); } return New refcounted<locationcc> (n);}voidlocationcc::aclntmaker (const rpc_program *p, locationcc::aclntcb cb){ ptr<aclnt> c (NULL); if (x && !x->ateof ()) { c = aclnt::alloc (x, *p); c->set_acct_hook (wrap (&track_aclnt)); } cb (c);}TAMED voidlocationcc::get_stream_aclnt (const rpc_program &p, callback<void, ptr<aclnt> >::ref cb){ VARS { int fd (-1); } if (x && x->ateof ()) { if (reapcaller) timecb_remove (reapcaller); reapcaller = NULL; x = NULL; } if (x) { assert (!x->ateof ()); assert (reapcaller); timecb_remove (reapcaller); reapcaller = NULL; ptr<aclnt> c = aclnt::alloc (x, p); if (c) c->set_acct_hook (wrap (&track_aclnt)); cb (c); } else { assert (!reapcaller); aclntcbs.push_back (wrap (this, &locationcc::aclntmaker, &p, cb)); if (tcpc) return; // Our cb will be called when tcpc finishes. BLOCK { tcpc = tcpconnect (n.r.hostname, n.r.port-1, // LAME CONVENTION @(fd)); } tcpc = NULL; if (fd < 0) { warn << "locationcc: connect to " << n << " failed: " << strerror (errno) << "\n"; assert (x == NULL); } else { x = axprt_stream::alloc (fd); } while (aclntcbs.size ()) { cbv cb = aclntcbs.pop_back (); cb (); } } if (x) { // XXX Should dynamically calculate delay to be just // longer than the time between the start of two rounds. reapcaller = delaycb (1200, wrap (this, &locationcc::reaper)); }}voidlocationcc::fill_ipportvn (u_int32_t &a, u_int32_t &b){ sockaddr_in saddr; bzero (&saddr, sizeof(sockaddr_in)); // saddr.sin_family = AF_INET; inet_aton (n.r.hostname.cstr (), &saddr.sin_addr); saddr.sin_port = htons (n.r.port); /* saddr fields are in network byte order */ a = ntohl (saddr.sin_addr.s_addr); b = (ntohs (saddr.sin_port) << 16) | n.vnode_num;}voidlocationcc::reaper () { reapcaller = NULL; // Forget about the axprt. This will cause the connection // to close as soon as any outstanding aclnt's lose their last ref. x = NULL;}locationcc::~locationcc () { if (reapcaller) { timecb_remove (reapcaller); reapcaller = NULL; } for (size_t i = 0; i < allocated.size (); i++) { if (allocated[i] == this) { allocated[i] = allocated.back (); allocated.pop_back (); break; } }}// }}}// {{{ maintainer - base maintainer classstatic str ctype2ext (dhash_ctype c) { switch (c) { case DHASH_CONTENTHASH: return "c"; break; case DHASH_KEYHASH: return "k"; break; case DHASH_NOAUTH: return "n"; break; default: fatal << "bad ctype\n"; }}const u_int32_t maintainer::default_delay = 300;maintainer::maintainer (str path, maint_dhashinfo_t *hostinfo, ptr<syncer> s) : host (hostinfo->host), ctype (hostinfo->ctype), sync (s), efrags (hostinfo->efrags), dfrags (hostinfo->dfrags), db (New refcounted<adb> (hostinfo->dbsock, hostinfo->dbname, hostinfo->hasaux)), private_path (path), gm (New refcounted<maint_global> (this)), running (false), in_progress (false), delay (default_delay), mainttimer (NULL), ltree (NULL){}maintainer::~maintainer () { stop ();}voidmaintainer::start (u_int32_t d, bool randomize){ int jitter = 0; delay = d; if (mainttimer || in_progress) return; running = true; if (randomize) jitter = random_getword () % delay; mainttimer = delaycb (delay + jitter, wrap (this, &maintainer::start_helper));}void maintainer::start_helper (){ mainttimer = NULL; // Helper is needed for delaycb since run_cycle is TAMED. run_cycle (wrap (this, &maintainer::restart, delay));}voidmaintainer::restart (u_int32_t d){ if (running) start (d);}voidmaintainer::stop (){ if (mainttimer) { timecb_remove (mainttimer); mainttimer = NULL; } // If we are in-progress, make sure we don't restart later. // We do not have control to stop an in-process sync. running = false;}TAMED voidmaintainer::run_cycle (cbv cb){ in_progress = true; // Run local and global cycle in parallel BLOCK { local_maint_cycle (@()); gm->next (@()); } in_progress = false; delaycb (0, cb);}TAMED voidmaintainer::local_maint_cycle (cbv cb){ BLOCK { update_neighbors (@()); } if (preds.size () > 0) { BLOCK { process_neighbors (preds, succs, @()); } } (cb) ();}TAMED voidmaintainer::update_neighbors (cbv cb){ VARS { ptr<chordID> id; chord_nodelistres *slist; chord_nodelistres *plist; clnt_stat e1, e2; bool changed (false); } id = New refcounted<chordID> (host.x); slist = New chord_nodelistres (); plist = New chord_nodelistres (); BLOCK { doRPC (host, chord_program_1, CHORDPROC_GETPREDLIST, id, plist, @(e1)); doRPC (host, chord_program_1, CHORDPROC_GETSUCCLIST, id, slist, @(e2)); } if (e1 || e2) { warn << "my local node is down.\n"; changed = true; goto updateOUT; } // Predecessor and predecessor list (for range). { size_t sz = plist->resok->nlist.size (); vec<ptr<locationcc> > npreds; for (size_t i = 1; i < sz; i++) { chord_node n = make_chord_node (plist->resok->nlist[i]); ptr<locationcc> s = locationcc::alloc (n); npreds.push_back (s); } if (npreds.size () != preds.size ()) changed = true; else { for (size_t i = 0; i < preds.size (); i++) { if (npreds[i]->id () != preds[i]->id ()) { changed = true; break; } } } preds = npreds; } // Successor { size_t sz = slist->resok->nlist.size (); vec<ptr<locationcc> > nsuccs; for (size_t i = 1; i < sz; i++) { chord_node n = make_chord_node (slist->resok->nlist[i]); ptr<locationcc> s = locationcc::alloc (n); nsuccs.push_back (s); } if (nsuccs.size () != succs.size ()) changed = true; else { for (size_t i = 0; i < succs.size (); i++) { if (nsuccs[i]->id () != succs[i]->id ()) { changed = true; break; } } } succs = nsuccs; }updateOUT: stable = !changed; if (slist) delete slist; if (plist) delete plist; cb ();}TAMED voidmaintainer::process_neighbors ( const vec<ptr<locationcc> > &preds, const vec<ptr<locationcc> > &succs, cbv cb){ // See Carbonite, Passing Tone or other implementations. cb ();}size_tmaintainer::get_global_repairs (size_t max, rpc_vec<maint_repair_t, RPC_INFINITY> &repairs){ size_t added = 0; ptr<locationcc> hostcc = locationcc::alloc (host); while (gm->maintqueue.size () && repairs.size () < max) { maint_repair_t repair; repair.responsible = false; repair.id = gm->maintqueue.pop_front (); hostcc->fill_ipportvn (repair.src_ipv4_addr, repair.src_port_vnnum); ptr<locationcc> dst = gm->maintdest.pop_front (); dst->fill_ipportvn (repair.dst_ipv4_addr, repair.dst_port_vnnum); repairs.push_back (repair); added++; } if (added || gm->maintqueue.size ()) warn << host << " added " << added << " gm repairs; " << gm->maintqueue.size () << " remaining.\n"; return added;}// }}}// {{{ Carboniteref<maintainer> carbonite::produce_maintainer (str path, maint_dhashinfo_t *hostinfo, ptr<syncer> s, cbv cb){ return New refcounted<carbonite> (path, hostinfo, s, cb);}carbonite::carbonite (str path, maint_dhashinfo_t *hostinfo, ptr<syncer> s, cbv cb) : maintainer (path, hostinfo, s){ db->getspaceinfo (wrap (this, &carbonite::init_ltree, cb));}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?