lsd.c
来自「基于DHT的对等协议」· C语言 代码 · 共 846 行 · 第 1/2 页
C
846 行
/* * * Copyright (C) 2000 Frans Kaashoek (kaashoek@lcs.mit.edu) * Frank Dabek (fdabek@mit.edu) * * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the * "Software"), to deal in the Software without restriction, including * without limitation the rights to use, copy, modify, merge, publish, * distribute, sublicense, and/or sell copies of the Software, and to * permit persons to whom the Software is furnished to do so, subject to * the following conditions: * * The above copyright notice and this permission notice shall be * included in all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */#include <misc_utils.h>#include <configurator.h>#include <comm.h>#include "chord.h"#include "dhash.h"#include "dhashgateway.h"#include <sys/types.h>#include <maint_prot.h>#include "lsdctl_prot.h"#include <location.h>#include <locationtable.h>#include <fingerroute.h>#include <fingerroutepns.h>#include <recroute.h>#include <accordion.h>#include <modlogger.h>#define info modlogger ("lsd", modlogger::INFO)#define trace modlogger ("lsd", modlogger::TRACE)// #define PROFILING // When a process starts up profiling is not happening. But by// sending a SIGUSR1, profiling is turned on. (Another SIGUSR1 turns// it off.) This allows specific, user-controlled periods of time to// be profiled. Program must be compiled with -pg for this to work.#ifdef PROFILING extern "C" { void moncontrol(int);}#endifEXITFN (cleanup);int vnodes = 1;static char *logfname;static char *tracefname;static bool p2pstarted (false);static str p2psocket;static bool ctlstarted (false);static str ctlsocket;static str heartbeatfn;ptr<chord> chordnode;vec<ref<dhash> > dh;int myport;str maintsock = "/tmp/maint-sock"; enum routing_mode_t { MODE_SUCC, MODE_CHORD, MODE_PNS, MODE_PNSREC, MODE_CHORDREC, MODE_TCPPNSREC, MODE_ACCORDION,} mode;struct routing_mode_desc { routing_mode_t m; const char *cmdline; const char *desc; vnode_producer_t producer;}; /* List of routing modes. Please keep this in sync with the enum above. */routing_mode_desc modes[] = { { MODE_SUCC, "successors", "use only successor lists", wrap (vnode::produce_vnode) }, { MODE_CHORD, "chord", "use fingers and successors", wrap (fingerroute::produce_vnode) }, { MODE_PNS, "pns", "use proximity neighbor selection", wrap (fingerroutepns::produce_vnode) }, { MODE_PNSREC, "pnsrec", "g^2 pns recursive", wrap (recroute<fingerroutepns>::produce_vnode) }, { MODE_CHORDREC, "chordrec", "recursive routing with plain finger tables", wrap (recroute<fingerroute>::produce_vnode) }, { MODE_TCPPNSREC, "tcppnsrec", "g^2 pns recursive with data over tcp", wrap (recroute<fingerroutepns>::produce_vnode) }, { MODE_ACCORDION, "accordion", "Accordion Routing", wrap (accordion::produce_vnode)},};void stats ();void stop ();void halt ();void set_maint (bool enable);// =====================================static lsdctl_lsdparameters parameters;voidlsdctl_fillnodeinfo (lsdctl_nodeinfo &ni, ptr<location> l){ ni.n = l->id (); ni.addr = l->address (); ni.vnode_num = l->vnode (); const Coord c = l->coords (); ni.coords.setsize (c.size () + 1); for (size_t j = 0; j < c.size (); j++) ni.coords[j] = (int32_t) c.coords[j]; ni.coords[c.size ()] = (int32_t) c.ht; ni.a_lat = (u_int32_t) l->distance (); ni.a_var = (u_int32_t) l->a_var (); ni.nrpc = l->nrpc (); ni.pinned = chordnode->locations->pinned (l->id ()); ni.alive = l->alive (); ni.dead_time = l->dead_time ();}voidlsdctl_finishstats (svccb *sbp, ptr<lsdctl_rpcstatlist> sl, clnt_stat err){ // Pull up our stats after RPC since maintd's stats may // take a while. This should ensure that all the stats // are from about the same time interval. bool *clear = sbp->Xtmpl getarg<bool> (); rpcstats *s = rpc_stats_tab.first (); while (s) { lsdctl_rpcstat si; si.key = s->key; si.ncall = s->ncall; si.nrexmit = s->nrexmit; si.nreply = s->nreply; si.call_bytes = s->call_bytes; si.rexmit_bytes = s->rexmit_bytes; si.reply_bytes = s->reply_bytes; si.latency_ewma = s->latency_ewma; sl->stats.push_back (si); s = rpc_stats_tab.next (s); } u_int64_t now = getusec (); sl->interval = now - rpc_stats_lastclear; if (*clear) { s = rpc_stats_tab.first (); while (s) { rpcstats *t = rpc_stats_tab.next (s); rpc_stats_tab.remove (s); delete s; s = t; } rpc_stats_tab.clear (); rpc_stats_lastclear = now; } sbp->reply (sl);}voidlsdctl_dispatch (ptr<asrv> s, svccb *sbp){ if (!sbp) { // Close the server s->setcb (NULL); return; } trace << "received lsdctl " << sbp->proc () << "\n"; switch (sbp->proc ()) { case LSDCTL_NULL: sbp->reply (NULL); break; case LSDCTL_EXIT: sbp->reply (NULL); halt (); break; case LSDCTL_SETTRACELEVEL: { int *lvl = sbp->Xtmpl getarg<int> (); info << "Setting new maxprio to " << *lvl << "\n"; modlogger::setmaxprio (*lvl); /* XXX should validate this value! */ sbp->reply (NULL); } break; case LSDCTL_SETSTABILIZE: { bool *s = sbp->Xtmpl getarg<bool> (); if (*s) chordnode->stabilize (); else chordnode->stop (); sbp->reply (s); } break; case LSDCTL_SETREPLICATE: { lsdctl_setreplicate_arg *a = sbp->Xtmpl getarg<lsdctl_setreplicate_arg> (); if (a->enable) { set_maint (true); for (unsigned int i = 0; i < chordnode->num_vnodes (); i++) dh[i]->start (a->randomize); } else { set_maint (false); for (unsigned int i = 0; i < chordnode->num_vnodes (); i++) dh[i]->stop (); } sbp->replyref (a->enable); } break; case LSDCTL_GETLOCTABLE: { // int *v = sbp->template getarg<int> (); // Ignore v ptr<lsdctl_nodeinfolist> nl = New refcounted<lsdctl_nodeinfolist> (); nl->nlist.setsize (chordnode->locations->size ()); ptr<location> l = chordnode->locations->first_loc (); int i = 0; while (l != NULL) { lsdctl_fillnodeinfo (nl->nlist[i], l); l = chordnode->locations->next_loc (l->id ()); i++; } sbp->reply (nl); } break; case LSDCTL_GETRPCSTATS: { bool *clear = sbp->Xtmpl getarg<bool> (); ptr<lsdctl_rpcstatlist> sl = New refcounted<lsdctl_rpcstatlist> (); // Grab any stats from maintd, if available. int fd = unixsocket_connect (maintsock); if (fd >= 0) { ptr<aclnt> c = aclnt::alloc (axprt_unix::alloc (fd, 32*1024), lsdctl_prog_1); c->call (LSDCTL_GETRPCSTATS, clear, sl, wrap (&lsdctl_finishstats, sbp, sl)); } else { lsdctl_finishstats (sbp, sl, RPC_SUCCESS); } } break; case LSDCTL_GETMYIDS: { ptr<lsdctl_nodeinfolist> nl = New refcounted<lsdctl_nodeinfolist> (); size_t nv = chordnode->num_vnodes (); nl->nlist.setsize (nv); for (unsigned int i = 0; i < nv; i++) { ptr<vnode> v = chordnode->get_vnode (i); lsdctl_fillnodeinfo (nl->nlist[i], v->my_location ()); } sbp->reply (nl); } break; case LSDCTL_GETDHASHSTATS: { lsdctl_getdhashstats_arg *arg = sbp->Xtmpl getarg<lsdctl_getdhashstats_arg> (); ptr<lsdctl_dhashstats> ds = New refcounted<lsdctl_dhashstats> (); for (int v = 0; v < vnodes; v++) { // Treat < 0 as a wildcard; otherwise only do particular vnode. if (arg->vnode >= 0 && arg->vnode != v) continue; vec<dstat> stats = dh[v]->stats (); for (unsigned int i = 0; i < stats.size (); i++) { bool found = false; for (unsigned int j = 0; j < ds->stats.size (); j++) { if (ds->stats[j].desc == stats[i].desc) { ds->stats[j].value += stats[i].value; found = true; break; } } if (!found) { lsdctl_stat stat; stat.desc = stats[i].desc; stat.value = stats[i].value; ds->stats.push_back (stat); } } } sbp->reply (ds); } break; case LSDCTL_GETLSDPARAMETERS: sbp->reply (¶meters); break; case LSDCTL_GETRPCMSTATS: { lsdctl_rpcmstats res; strbuf ob; chordnode->rpcmstats (ob); res.stats = ob; sbp->replyref (res); } break; default: sbp->reject (PROC_UNAVAIL); break; }}// =====================================voidcontrol_accept (ref<axprt_stream> x){ ptr<asrv> srv; srv = asrv::alloc (x, lsdctl_prog_1, NULL); srv->setcb (wrap (&lsdctl_dispatch, srv));}voidgateway_accept (ref<axprt_stream> x){ // constructor of dhashgateway object calls mkref to maintain a // reference to itself until the program is gone. vNew refcounted<dhashgateway> (x, chordnode, dh[0]);}typedef callback<void, ref<axprt_stream> >::ptr acceptercb_t;static voidclient_accept_socket (int lfd, acceptercb_t accepter){ sockaddr_un sun; bzero (&sun, sizeof (sun)); socklen_t sunlen = sizeof (sun); int fd = accept (lfd, reinterpret_cast<sockaddr *> (&sun), &sunlen); if (fd < 0) fatal ("EOF\n"); ref<axprt_stream> x = axprt_stream::alloc (fd, 1024*1025); accepter (x);}static voidclient_listen (int fd, acceptercb_t accepter){ if (listen (fd, 5) < 0) { fatal ("Error from listen: %m\n"); close (fd); } else { fdcb (fd, selread, wrap (client_accept_socket, fd, accepter)); }}static voidcleanup (){ if (p2pstarted) unlink (p2psocket); if (ctlstarted) unlink (ctlsocket);}static voidstartclntd(){ unlink (p2psocket); int clntfd = unixsocket (p2psocket); if (clntfd < 0) fatal << "Error creating client socket (UNIX)" << strerror (errno) << "\n"; client_listen (clntfd, wrap (gateway_accept)); int port = (myport == 0) ? 0 : myport + 1; int tcp_clntfd = inetsocket (SOCK_STREAM, port); if (tcp_clntfd < 0) fatal << "Error creating client socket (TCP) " << strerror(errno) << "\n"; client_listen (tcp_clntfd, wrap (gateway_accept)); p2pstarted = true;}static voidstartcontroller (){ unlink (ctlsocket); int ctlfd = unixsocket (ctlsocket); if (ctlfd < 0) fatal << "Error creating control socket (UNIX)" << strerror (errno) << "\n"; client_listen (ctlfd, wrap (control_accept)); ctlstarted = true;}#ifdef PROFILINGvoidtoggle_profiling (){ static int pfstate = 1;
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?