📄 lsd.c
字号:
/* * * Copyright (C) 2000 Frans Kaashoek (kaashoek@lcs.mit.edu) * Frank Dabek (fdabek@mit.edu) * * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the * "Software"), to deal in the Software without restriction, including * without limitation the rights to use, copy, modify, merge, publish, * distribute, sublicense, and/or sell copies of the Software, and to * permit persons to whom the Software is furnished to do so, subject to * the following conditions: * * The above copyright notice and this permission notice shall be * included in all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */#include <misc_utils.h>#include <configurator.h>#include <comm.h>#include "chord.h"#include "dhash.h"#include "dhashgateway.h"#include <sys/types.h>#include "lsdctl_prot.h"#include <location.h>#include <locationtable.h>#include <debruijn.h>#include <fingerroute.h>#include <fingerroutepns.h>#include <recroute.h>#include <accordion.h>#include <modlogger.h>#define info modlogger ("lsd", modlogger::INFO)#define trace modlogger ("lsd", modlogger::TRACE)// #define PROFILING // When a process starts up profiling is not happening. But by// sending a SIGUSR1, profiling is turned on. (Another SIGUSR1 turns// it off.) This allows specific, user-controlled periods of time to// be profiled. Program must be compiled with -pg for this to work.#ifdef PROFILING extern "C" { void moncontrol(int);}#endifEXITFN (cleanup);int vnodes = 1;static char *logfname;static char *tracefname;static bool p2pstarted (false);static str p2psocket;static bool ctlstarted (false);static str ctlsocket;static str heartbeatfn;ptr<chord> chordnode;vec<ref<dhash> > dh;int myport; enum routing_mode_t { MODE_SUCC, MODE_CHORD, MODE_DEBRUIJN, MODE_PNS, MODE_PNSREC, MODE_CHORDREC, MODE_TCPPNSREC, MODE_ACCORDION,} mode;struct routing_mode_desc { routing_mode_t m; char *cmdline; char *desc; vnode_producer_t producer;}; /* List of routing modes. Please keep this in sync with the enum above. */routing_mode_desc modes[] = { { MODE_SUCC, "successors", "use only successor lists", wrap (vnode::produce_vnode) }, { MODE_CHORD, "chord", "use fingers and successors", wrap (fingerroute::produce_vnode) }, { MODE_DEBRUIJN, "debruijn", "use debruijn routing", wrap (debruijn::produce_vnode) }, { MODE_PNS, "pns", "use proximity neighbor selection", wrap (fingerroutepns::produce_vnode) }, { MODE_PNSREC, "pnsrec", "g^2 pns recursive", wrap (recroute<fingerroutepns>::produce_vnode) }, { MODE_CHORDREC, "chordrec", "recursive routing with plain finger tables", wrap (recroute<fingerroute>::produce_vnode) }, { MODE_TCPPNSREC, "tcppnsrec", "g^2 pns recursive with data over tcp", wrap (recroute<fingerroutepns>::produce_vnode) }, { MODE_ACCORDION, "accordion", "Accordion Routing", wrap (accordion::produce_vnode)},};void stats ();void stop ();void halt ();// =====================================static lsdctl_lsdparameters parameters;voidlsdctl_fillnodeinfo (lsdctl_nodeinfo &ni, ptr<location> l){ ni.n = l->id (); ni.addr = l->address (); ni.vnode_num = l->vnode (); const Coord c = l->coords (); ni.coords.setsize (c.size () + 1); for (size_t j = 0; j < c.size (); j++) ni.coords[j] = (int32_t) c.coords[j]; ni.coords[c.size ()] = (int32_t) c.ht; ni.a_lat = (u_int32_t) l->distance (); ni.a_var = (u_int32_t) l->a_var (); ni.nrpc = l->nrpc (); ni.pinned = chordnode->locations->pinned (l->id ()); ni.alive = l->alive (); ni.dead_time = l->dead_time ();}voidlsdctl_dispatch (ptr<asrv> s, svccb *sbp){ if (!sbp) { // Close the server s->setcb (NULL); return; } trace << "received lsdctl " << sbp->proc () << "\n"; switch (sbp->proc ()) { case LSDCTL_NULL: sbp->reply (NULL); break; case LSDCTL_EXIT: sbp->reply (NULL); halt (); break; case LSDCTL_SETTRACELEVEL: { int *lvl = sbp->Xtmpl getarg<int> (); info << "Setting new maxprio to " << *lvl << "\n"; modlogger::setmaxprio (*lvl); /* XXX should validate this value! */ sbp->reply (NULL); } break; case LSDCTL_SETSTABILIZE: { bool *s = sbp->Xtmpl getarg<bool> (); if (*s) chordnode->stabilize (); else chordnode->stop (); sbp->reply (s); } break; case LSDCTL_SETREPLICATE: { lsdctl_setreplicate_arg *a = sbp->Xtmpl getarg<lsdctl_setreplicate_arg> (); if (a->enable) { for (unsigned int i = 0; i < chordnode->num_vnodes (); i++) dh[i]->start (a->randomize); } else { for (unsigned int i = 0; i < chordnode->num_vnodes (); i++) dh[i]->stop (); } sbp->replyref (a->enable); } break; case LSDCTL_GETLOCTABLE: { // int *v = sbp->template getarg<int> (); // Ignore v ptr<lsdctl_nodeinfolist> nl = New refcounted<lsdctl_nodeinfolist> (); nl->nlist.setsize (chordnode->locations->size ()); ptr<location> l = chordnode->locations->first_loc (); int i = 0; while (l != NULL) { lsdctl_fillnodeinfo (nl->nlist[i], l); l = chordnode->locations->next_loc (l->id ()); i++; } sbp->reply (nl); } break; case LSDCTL_GETRPCSTATS: { bool *clear = sbp->Xtmpl getarg<bool> (); ptr<lsdctl_rpcstatlist> sl = New refcounted<lsdctl_rpcstatlist> (); sl->stats.setsize (rpc_stats_tab.size ()); rpcstats *s = rpc_stats_tab.first (); int i = 0; while (s) { sl->stats[i].key = s->key; sl->stats[i].ncall = s->ncall; sl->stats[i].nrexmit = s->nrexmit; sl->stats[i].nreply = s->nreply; sl->stats[i].call_bytes = s->call_bytes; sl->stats[i].rexmit_bytes = s->rexmit_bytes; sl->stats[i].reply_bytes = s->reply_bytes; sl->stats[i].latency_ewma = s->latency_ewma; s = rpc_stats_tab.next (s); i++; } u_int64_t now = getusec (); sl->interval = now - rpc_stats_lastclear; if (*clear) { s = rpc_stats_tab.first (); while (s) { rpcstats *t = rpc_stats_tab.next (s); rpc_stats_tab.remove (s); delete s; s = t; } rpc_stats_tab.clear (); rpc_stats_lastclear = now; } sbp->reply (sl); } break; case LSDCTL_GETMYIDS: { ptr<lsdctl_nodeinfolist> nl = New refcounted<lsdctl_nodeinfolist> (); size_t nv = chordnode->num_vnodes (); nl->nlist.setsize (nv); for (unsigned int i = 0; i < nv; i++) { ptr<vnode> v = chordnode->get_vnode (i); lsdctl_fillnodeinfo (nl->nlist[i], v->my_location ()); } sbp->reply (nl); } break; case LSDCTL_GETDHASHSTATS: { lsdctl_getdhashstats_arg *arg = sbp->Xtmpl getarg<lsdctl_getdhashstats_arg> (); ptr<lsdctl_dhashstats> ds = New refcounted<lsdctl_dhashstats> (); if (arg->vnode < vnodes) { vec<dstat> stats = dh[arg->vnode]->stats (); ds->stats.setsize (stats.size ()); for (unsigned int i = 0; i < stats.size (); i++) { ds->stats[i].desc = stats[i].desc; ds->stats[i].value = stats[i].value; } } sbp->reply (ds); } break; case LSDCTL_GETLSDPARAMETERS: sbp->reply (¶meters); break; default: sbp->reject (PROC_UNAVAIL); break; }}// =====================================voidcontrol_accept (ref<axprt_stream> x){ ptr<asrv> srv; srv = asrv::alloc (x, lsdctl_prog_1, NULL); srv->setcb (wrap (&lsdctl_dispatch, srv));}voidgateway_accept (ref<axprt_stream> x){ // constructor of dhashgateway object calls mkref to maintain a // reference to itself until the program is gone. vNew refcounted<dhashgateway> (x, chordnode, dh[0]);}typedef callback<void, ref<axprt_stream> >::ptr acceptercb_t;static voidclient_accept_socket (int lfd, acceptercb_t accepter){ sockaddr_un sun; bzero (&sun, sizeof (sun)); socklen_t sunlen = sizeof (sun); int fd = accept (lfd, reinterpret_cast<sockaddr *> (&sun), &sunlen); if (fd < 0) fatal ("EOF\n"); ref<axprt_stream> x = axprt_stream::alloc (fd, 1024*1025); accepter (x);}static voidclient_listen (int fd, acceptercb_t accepter){ if (listen (fd, 5) < 0) { fatal ("Error from listen: %m\n"); close (fd); } else { fdcb (fd, selread, wrap (client_accept_socket, fd, accepter)); }}static voidcleanup (){ if (p2pstarted) unlink (p2psocket); if (ctlstarted) unlink (ctlsocket);}static voidstartclntd(){ unlink (p2psocket); int clntfd = unixsocket (p2psocket); if (clntfd < 0) fatal << "Error creating client socket (UNIX)" << strerror (errno) << "\n"; client_listen (clntfd, wrap (gateway_accept)); int port = (myport == 0) ? 0 : myport + 1; int tcp_clntfd = inetsocket (SOCK_STREAM, port); if (tcp_clntfd < 0) fatal << "Error creating client socket (TCP) " << strerror(errno) << "\n"; client_listen (tcp_clntfd, wrap (gateway_accept)); p2pstarted = true;}static voidstartcontroller (){ unlink (ctlsocket); int ctlfd = unixsocket (ctlsocket); if (ctlfd < 0) fatal << "Error creating control socket (UNIX)" << strerror (errno) << "\n"; client_listen (ctlfd, wrap (control_accept)); ctlstarted = true;}#ifdef PROFILINGvoidtoggle_profiling (){ static int pfstate = 1; if (pfstate) warn << "Turning profiling off\n"; else warn << "Turning profiling on\n"; pfstate = !pfstate; moncontrol (pfstate ? 1 : 0);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -