maintd.c
来自「基于DHT的对等协议」· C语言 代码 · 共 493 行
C
493 行
#include <arpc.h>#include <comm.h>#include <modlogger.h>#include <misc_utils.h>#include <dhash_types.h>#include <dhash_common.h>#include <merkle_sync_prot.h>#include <merkle.h>#include <lsdctl_prot.h>#include <maint_prot.h>#include "maint_policy.h"// {{{ Globalsstatic const char *ctlsock;static const char *logfname;static const char *localdatapath;static vec<ptr<maintainer> > maintainers;enum maint_mode_t { MAINT_CARBONITE, MAINT_PASSINGTONE,} maint_mode;struct maint_mode_desc { maint_mode_t m; const char *cmdline; maintainer_producer_t producer;} maint_modes[] = { { MAINT_CARBONITE, "carbonite", &carbonite::produce_maintainer }, { MAINT_PASSINGTONE, "passingtone", &passingtone::produce_maintainer }};enum sync_mode_t { SYNC_MERKLE, SYNC_TIME} sync_mode;struct sync_mode_desc { sync_mode_t m; const char *cmdline; syncer_producer_t producer;} sync_modes[] = { { SYNC_MERKLE, "merkle", &merkle_sync::produce_syncer }};// }}}// {{{ General utility functionstemplate<class VS, class S>static Sselect_mode (const char *arg, const VS *modes, int nmodes){ int i; S m = modes[0].m; for (i = 0; i < nmodes; i++) { if (strcmp (arg, modes[i].cmdline) == 0) { m = modes[i].m; break; } } if (i == nmodes) { strbuf s; for (i = 0; i < nmodes; i++) s << " " << modes[i].cmdline; fatal << "allowed modes are" << s << "\n"; } return m;}static voidhalt (){ warn << "Exiting on command.\n"; while (maintainers.size ()) { maintainers.pop_back (); } exit (0);}EXITFN (cleanup);static voidcleanup (){ unlink (ctlsock);}voidstart_logs (){ static int logfd (-1); // XXX please don't call setlogfd or change errfd anywhere else... if (logfname) { if (logfd >= 0) close (logfd); logfd = open (logfname, O_RDWR | O_CREAT, 0666); if (logfd < 0) fatal << "Could not open log file " << logfd << " for appending.\n"; lseek (logfd, 0, SEEK_END); errfd = logfd; modlogger::setlogfd (logfd); }}static voidusage () { warnx << "Usage: " << progname << "\t[-C maintd-ctlsock]\n" << "\t[-d localdatapath]\n" << "\t[-D]\n" << "\t[-L logfilename]\n" << "\t[-m maintmode]\n" << "\t[-s syncmode]\n" << "\t[-t]\n"; exit (1);}// }}}// {{{ Remote-side RPC handlingstatic void srvaccept (int fd);static void sync_dispatch (ptr<asrv> srv, svccb *sbp);static voidinit_remote_server (const net_address &addr){ in_addr laddr; inet_aton (addr.hostname, &laddr); int tcpfd = inetsocket (SOCK_STREAM, addr.port-1, ntohl (laddr.s_addr)); if (tcpfd < 0) fatal ("binding TCP addr %s port %d: %m\n", addr.hostname.cstr (), addr.port-1); int ret = listen (tcpfd, 5); if (ret < 0) fatal ("listen (%d, 5): %m\n", tcpfd); fdcb (tcpfd, selread, wrap (&srvaccept, tcpfd)); warn << "Listening for sync requests on " << addr.hostname << ":" << addr.port-1 << ".\n";}static voidsrvaccept (int lfd){ sockaddr_un sun; bzero (&sun, sizeof (sun)); socklen_t sunlen = sizeof (sun); int fd = accept (lfd, reinterpret_cast<sockaddr *> (&sun), &sunlen); if (fd < 0) { warn ("accept: unexpected EOF: %m\n"); return; } ref<axprt_stream> c = axprt_stream::alloc (fd, 1024*1025); // XXX should accept for whatever the chosen syncmode wants. ptr<asrv> s = asrv::alloc (c, merklesync_program_1); s->setcb (wrap (&sync_dispatch, s));}static voidsync_dispatch (ptr<asrv> srv, svccb *sbp){ if (!sbp) { srv->setcb (NULL); srv = NULL; return; } bool ok = false; syncdest_t *discrim = sbp->Xtmpl getarg<syncdest_t> (); for (size_t i = 0; i < maintainers.size (); i++) { // maintainer takes some time to get local db info from adbd; // must check for valid localtree before dispatching if (maintainers[i]->sync->sync_program ().progno == sbp->prog () && maintainers[i]->sync->ctype == discrim->ctype && maintainers[i]->host.vnode_num == (int) discrim->vnode && maintainers[i]->localtree () != NULL) { ok = true; ptr<merkle_tree> t = maintainers[i]->localtree (); maintainers[i]->sync->dispatch (t, sbp); break; } } if (!ok) sbp->reject (PROC_UNAVAIL);}// }}}// {{{ Control-side RPC executionvoiddo_setmaint (svccb *sbp){ maint_setmaintarg *arg = sbp->Xtmpl getarg<maint_setmaintarg> (); for (unsigned int i = 0; i < maintainers.size (); i++) { if (arg->enable) { maintainers[i]->start (arg->delay, arg->randomize); } else { maintainers[i]->stop (); } } sbp->replyref (arg->enable);}void do_initspace_cb (svccb *sbp);voiddo_initspace (svccb *sbp){ maint_dhashinfo_t *arg = sbp->Xtmpl getarg<maint_dhashinfo_t> (); maint_status res (MAINTPROC_OK); chord_node host = arg->host; dhash_ctype ctype = arg->ctype; // Check that we don't already have a maintainer for this host/ctype for (unsigned int i = 0; i < maintainers.size (); i++) { if (maintainers[i]->host.r.hostname == host.r.hostname && maintainers[i]->host.r.port == host.r.port && maintainers[i]->host.vnode_num == host.vnode_num && maintainers[i]->ctype == ctype) { res = MAINTPROC_ERR; sbp->replyref (res); return; } } ptr<syncer> s = sync_modes[sync_mode].producer (ctype); ptr<maintainer> m = maint_modes[maint_mode].producer (localdatapath, arg, s, wrap (&do_initspace_cb, sbp)); maintainers.push_back (m);}voiddo_initspace_cb (svccb *sbp){ maint_status res (MAINTPROC_OK); sbp->replyref (res);}voiddo_listen (svccb *sbp){ static bool initialized (false); maint_status res (MAINTPROC_OK); net_address *addr = sbp->Xtmpl getarg<net_address> (); if (!initialized) { init_remote_server (*addr); initialized = true; } else { res = MAINTPROC_ERR; } sbp->replyref (res);}voiddo_getrepairs (svccb *sbp){ maint_getrepairsarg *arg = sbp->Xtmpl getarg<maint_getrepairsarg> (); maint_getrepairsres *res = sbp->Xtmpl getres<maint_getrepairsres> (); res->status = MAINTPROC_OK; chord_node host = arg->host; dhash_ctype ctype = arg->ctype; ptr<maintainer> m = NULL; for (unsigned int i = 0; i < maintainers.size (); i++) { if (maintainers[i]->host.r.hostname == host.r.hostname && maintainers[i]->host.r.port == host.r.port && maintainers[i]->host.vnode_num == host.vnode_num && maintainers[i]->ctype == ctype) { m = maintainers[i]; } } if (!m) { res->status = MAINTPROC_ERR; sbp->reply (res); return; } m->getrepairs (arg->start, arg->thresh, arg->count, res->repairs); sbp->reply (res);}// }}}// {{{ Control-side RPC accept and dispatchstatic void accept_cb (int lfd);void dispatch_maint (ref<axprt_stream> s, ptr<asrv> a, svccb *sbp);void dispatch_lsdctl (ref<axprt_stream> s, ptr<asrv> a, svccb *sbp);static voidlisten_unix (str sock_name){ unlink (sock_name); int clntfd = unixsocket (sock_name); if (clntfd < 0) fatal << "Error creating socket (UNIX)" << strerror (errno) << "\n"; if (listen (clntfd, 128) < 0) { fatal ("Error from listen: %m\n"); close (clntfd); } else { fdcb (clntfd, selread, wrap (accept_cb, clntfd)); }}static void accept_cb (int lfd){ sockaddr_un sun; bzero (&sun, sizeof (sun)); socklen_t sunlen = sizeof (sun); int fd = accept (lfd, reinterpret_cast<sockaddr *> (&sun), &sunlen); if (fd < 0) fatal ("EOF\n"); ref<axprt_stream> x = axprt_stream::alloc (fd, 1024*1025); ptr<asrv> a = asrv::alloc (x, maint_program_1); a->setcb (wrap (&dispatch_maint, x, a)); // Handle lsdctl just for the stats functionality. ptr<asrv> b = asrv::alloc (x, lsdctl_prog_1); b->setcb (wrap (&dispatch_lsdctl, x, b));}voiddispatch_lsdctl (ref<axprt_stream> x, ptr<asrv> a, svccb *sbp){ if (sbp == NULL) { a->setcb (NULL); return; } switch (sbp->proc ()) { case LSDCTL_NULL: sbp->reply (NULL); break; case LSDCTL_GETRPCSTATS: { // Code copied from lsd/lsd.C bool *clear = sbp->Xtmpl getarg<bool> (); ptr<lsdctl_rpcstatlist> sl = New refcounted<lsdctl_rpcstatlist> (); sl->stats.setsize (rpc_stats_tab.size ()); rpcstats *s = rpc_stats_tab.first (); int i = 0; while (s) { sl->stats[i].key = s->key; sl->stats[i].ncall = s->ncall; sl->stats[i].nrexmit = s->nrexmit; sl->stats[i].nreply = s->nreply; sl->stats[i].call_bytes = s->call_bytes; sl->stats[i].rexmit_bytes = s->rexmit_bytes; sl->stats[i].reply_bytes = s->reply_bytes; sl->stats[i].latency_ewma = s->latency_ewma; s = rpc_stats_tab.next (s); i++; } u_int64_t now = getusec (); sl->interval = now - rpc_stats_lastclear; if (*clear) { s = rpc_stats_tab.first (); while (s) { rpcstats *t = rpc_stats_tab.next (s); rpc_stats_tab.remove (s); delete s; s = t; } rpc_stats_tab.clear (); rpc_stats_lastclear = now; } sbp->reply (sl); } break; default: sbp->reject (PROC_UNAVAIL); break; }}voiddispatch_maint (ref<axprt_stream> s, ptr<asrv> a, svccb *sbp){ if (sbp == NULL) { warn << "EOF from client\n"; a->setcb (NULL); return; } switch (sbp->proc ()) { case MAINTPROC_NULL: sbp->reply (NULL); break; case MAINTPROC_SETMAINT: do_setmaint (sbp); break; case MAINTPROC_INITSPACE: do_initspace (sbp); break; case MAINTPROC_LISTEN: do_listen (sbp); break; case MAINTPROC_GETREPAIRS: do_getrepairs (sbp); break; default: warn << "unknown procedure: " << sbp->proc () << "\n"; sbp->reject (PROC_UNAVAIL); } return;}// }}}int main (int argc, char **argv) { char ch; setprogname (argv[0]); random_init (); bool do_daemonize = false; localdatapath = "./maintdata/"; ctlsock = "/tmp/maint-sock"; maint_mode = MAINT_PASSINGTONE; sync_mode = SYNC_MERKLE; while ((ch = getopt (argc, argv, "C:d:DL:m:s:t"))!=-1) switch (ch) { case 'C': ctlsock = optarg; break; case 'd': localdatapath = optarg; break; case 'D': do_daemonize = true; break; case 'L': logfname = optarg; break; case 'm': maint_mode = select_mode<maint_mode_desc, maint_mode_t> (optarg, maint_modes, sizeof (maint_modes)/sizeof (maint_modes[0])); break; case 's': sync_mode = select_mode<sync_mode_desc, sync_mode_t> (optarg, sync_modes, sizeof (sync_modes)/sizeof (sync_modes[0])); break; case 't': modlogger::setmaxprio (modlogger::TRACE); break; default: usage (); break; } if (do_daemonize) { daemonize (); logfname = NULL; } start_logs (); warn << "Starting up " << maint_modes[maint_mode].cmdline << " maintenance, syncing with " << sync_modes[sync_mode].cmdline << ".\n"; { struct stat sb; if (stat (localdatapath, &sb) < 0) { if (errno != ENOENT || (mkdir (localdatapath, 0755) < 0 && errno != EEXIST)) fatal ("%s: %m\n", localdatapath); if (stat (localdatapath, &sb) < 0) fatal ("stat (%s): %m\n", localdatapath); warn << "Created " << localdatapath << " for maintenance state.\n"; } if (!S_ISDIR (sb.st_mode)) fatal ("%s: not a directory\n", localdatapath); } listen_unix (ctlsock); sigcb (SIGHUP, wrap (&start_logs)); sigcb (SIGINT, wrap (&halt)); sigcb (SIGTERM, wrap (&halt)); amain ();}// vim: foldmethod=marker
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?