lsd.c

来自「基于DHT的对等协议」· C语言 代码 · 共 846 行 · 第 1/2 页

C
846
字号
/* * * Copyright (C) 2000 Frans Kaashoek (kaashoek@lcs.mit.edu) *                    Frank Dabek (fdabek@mit.edu) * *  Permission is hereby granted, free of charge, to any person obtaining *  a copy of this software and associated documentation files (the *  "Software"), to deal in the Software without restriction, including *  without limitation the rights to use, copy, modify, merge, publish, *  distribute, sublicense, and/or sell copies of the Software, and to *  permit persons to whom the Software is furnished to do so, subject to *  the following conditions: * *  The above copyright notice and this permission notice shall be *  included in all copies or substantial portions of the Software. * *  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, *  EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF *  MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND *  NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE *  LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION *  OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION *  WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */#include <misc_utils.h>#include <configurator.h>#include <comm.h>#include "chord.h"#include "dhash.h"#include "dhashgateway.h"#include <sys/types.h>#include <maint_prot.h>#include "lsdctl_prot.h"#include <location.h>#include <locationtable.h>#include <fingerroute.h>#include <fingerroutepns.h>#include <recroute.h>#include <accordion.h>#include <modlogger.h>#define info  modlogger ("lsd", modlogger::INFO)#define trace modlogger ("lsd", modlogger::TRACE)// #define PROFILING // When a process starts up profiling is not happening.  But by// sending a SIGUSR1, profiling is turned on.  (Another SIGUSR1 turns// it off.)  This allows specific, user-controlled periods of time to// be profiled.  Program must be compiled with -pg for this to work.#ifdef PROFILING extern "C" {  void moncontrol(int);}#endifEXITFN (cleanup);int vnodes = 1;static char *logfname;static char *tracefname;static bool p2pstarted (false);static str p2psocket;static bool ctlstarted (false);static str ctlsocket;static str heartbeatfn;ptr<chord> chordnode;vec<ref<dhash> > dh;int myport;str maintsock = "/tmp/maint-sock"; enum routing_mode_t {  MODE_SUCC,  MODE_CHORD,  MODE_PNS,  MODE_PNSREC,  MODE_CHORDREC,  MODE_TCPPNSREC,  MODE_ACCORDION,} mode;struct routing_mode_desc {  routing_mode_t m;  const char *cmdline;  const char *desc;  vnode_producer_t producer;};					       /* List of routing modes.  Please keep this in sync with the enum above. */routing_mode_desc modes[] = {  { MODE_SUCC, "successors", "use only successor lists",    wrap (vnode::produce_vnode) },  { MODE_CHORD, "chord", "use fingers and successors",    wrap (fingerroute::produce_vnode) },  { MODE_PNS, "pns", "use proximity neighbor selection",    wrap (fingerroutepns::produce_vnode) },  { MODE_PNSREC, "pnsrec", "g^2 pns recursive",    wrap (recroute<fingerroutepns>::produce_vnode) },  { MODE_CHORDREC, "chordrec", "recursive routing with plain finger tables",    wrap (recroute<fingerroute>::produce_vnode) },  { MODE_TCPPNSREC, "tcppnsrec", "g^2 pns recursive with data over tcp",    wrap (recroute<fingerroutepns>::produce_vnode) },  { MODE_ACCORDION, "accordion", "Accordion Routing",    wrap (accordion::produce_vnode)},};void stats ();void stop ();void halt ();void set_maint (bool enable);// =====================================static lsdctl_lsdparameters parameters;voidlsdctl_fillnodeinfo (lsdctl_nodeinfo &ni, ptr<location> l){  ni.n = l->id ();  ni.addr = l->address ();  ni.vnode_num = l->vnode ();  const Coord c = l->coords ();  ni.coords.setsize (c.size () + 1);  for (size_t j = 0; j < c.size (); j++)    ni.coords[j] = (int32_t) c.coords[j];  ni.coords[c.size ()] = (int32_t) c.ht;  ni.a_lat = (u_int32_t) l->distance ();  ni.a_var = (u_int32_t) l->a_var ();  ni.nrpc = l->nrpc ();  ni.pinned = chordnode->locations->pinned (l->id ());  ni.alive = l->alive ();  ni.dead_time = l->dead_time ();}voidlsdctl_finishstats (svccb *sbp, ptr<lsdctl_rpcstatlist> sl, clnt_stat err){  // Pull up our stats after RPC since maintd's stats may  // take a while.  This should ensure that all the stats  // are from about the same time interval.  bool *clear = sbp->Xtmpl getarg<bool> ();  rpcstats *s = rpc_stats_tab.first ();  while (s) {    lsdctl_rpcstat si;    si.key          = s->key;    si.ncall        = s->ncall;    si.nrexmit      = s->nrexmit;    si.nreply       = s->nreply;    si.call_bytes   = s->call_bytes;    si.rexmit_bytes = s->rexmit_bytes;    si.reply_bytes  = s->reply_bytes;    si.latency_ewma = s->latency_ewma;    sl->stats.push_back (si);    s = rpc_stats_tab.next (s);  }    u_int64_t now = getusec ();  sl->interval = now - rpc_stats_lastclear;  if (*clear) {    s = rpc_stats_tab.first ();    while (s) {      rpcstats *t = rpc_stats_tab.next (s);      rpc_stats_tab.remove (s);      delete s;      s = t;    }    rpc_stats_tab.clear ();    rpc_stats_lastclear = now;  }  sbp->reply (sl);}voidlsdctl_dispatch (ptr<asrv> s, svccb *sbp){  if (!sbp) {    // Close the server    s->setcb (NULL);    return;  }  trace << "received lsdctl " << sbp->proc () << "\n";  switch (sbp->proc ()) {  case LSDCTL_NULL:    sbp->reply (NULL);    break;  case LSDCTL_EXIT:    sbp->reply (NULL);    halt ();    break;  case LSDCTL_SETTRACELEVEL:    {      int *lvl = sbp->Xtmpl getarg<int> ();      info << "Setting new maxprio to " << *lvl << "\n";      modlogger::setmaxprio (*lvl); /* XXX should validate this value! */      sbp->reply (NULL);    }    break;  case LSDCTL_SETSTABILIZE:    {      bool *s = sbp->Xtmpl getarg<bool> ();      if (*s)	chordnode->stabilize ();      else	chordnode->stop ();      sbp->reply (s);    }    break;  case LSDCTL_SETREPLICATE:    {      lsdctl_setreplicate_arg *a = sbp->Xtmpl getarg<lsdctl_setreplicate_arg> ();      if (a->enable) {	set_maint (true);	for (unsigned int i = 0; i < chordnode->num_vnodes (); i++)	  dh[i]->start (a->randomize);      } else {	set_maint (false);	for (unsigned int i = 0; i < chordnode->num_vnodes (); i++)	  dh[i]->stop ();      }      sbp->replyref (a->enable);    }    break;  case LSDCTL_GETLOCTABLE:    {      // int *v = sbp->template getarg<int> ();      // Ignore v      ptr<lsdctl_nodeinfolist> nl = New refcounted<lsdctl_nodeinfolist> ();      nl->nlist.setsize (chordnode->locations->size ());      ptr<location> l = chordnode->locations->first_loc ();      int i = 0;      while (l != NULL) {	lsdctl_fillnodeinfo (nl->nlist[i], l);	l = chordnode->locations->next_loc (l->id ());	i++;      }      sbp->reply (nl);    }    break;  case LSDCTL_GETRPCSTATS:    {      bool *clear = sbp->Xtmpl getarg<bool> ();      ptr<lsdctl_rpcstatlist> sl = New refcounted<lsdctl_rpcstatlist> ();      // Grab any stats from maintd, if available.      int fd = unixsocket_connect (maintsock);      if (fd >= 0) {	ptr<aclnt> c = aclnt::alloc (axprt_unix::alloc (fd, 32*1024),	    lsdctl_prog_1);	c->call (LSDCTL_GETRPCSTATS, clear, sl,	    wrap (&lsdctl_finishstats, sbp, sl));      } else {	lsdctl_finishstats (sbp, sl, RPC_SUCCESS);      }    }    break;  case LSDCTL_GETMYIDS:    {      ptr<lsdctl_nodeinfolist> nl = New refcounted<lsdctl_nodeinfolist> ();      size_t nv = chordnode->num_vnodes ();      nl->nlist.setsize (nv);      for (unsigned int i = 0; i < nv; i++) {	ptr<vnode> v = chordnode->get_vnode (i);	lsdctl_fillnodeinfo (nl->nlist[i], v->my_location ());      }      sbp->reply (nl);    }    break;  case LSDCTL_GETDHASHSTATS:    {      lsdctl_getdhashstats_arg *arg = sbp->Xtmpl getarg<lsdctl_getdhashstats_arg> ();      ptr<lsdctl_dhashstats> ds = New refcounted<lsdctl_dhashstats> ();      for (int v = 0; v < vnodes; v++) {	// Treat < 0 as a wildcard; otherwise only do particular vnode.	if (arg->vnode >= 0 && arg->vnode != v)	  continue;	vec<dstat> stats = dh[v]->stats ();	for (unsigned int i = 0; i < stats.size (); i++) {	  bool found = false;	  for (unsigned int j = 0; j < ds->stats.size (); j++) {	    if (ds->stats[j].desc == stats[i].desc) {	      ds->stats[j].value += stats[i].value;	      found = true;	      break;	    }	  }	  if (!found) {	    lsdctl_stat stat;	    stat.desc = stats[i].desc;	    stat.value = stats[i].value;	    ds->stats.push_back (stat);	  }	}      }      sbp->reply (ds);    }    break;  case LSDCTL_GETLSDPARAMETERS:    sbp->reply (&parameters);    break;  case LSDCTL_GETRPCMSTATS:    {      lsdctl_rpcmstats res;      strbuf ob;      chordnode->rpcmstats (ob);      res.stats = ob;      sbp->replyref (res);    }    break;  default:    sbp->reject (PROC_UNAVAIL);    break;  }}// =====================================voidcontrol_accept (ref<axprt_stream> x){  ptr<asrv> srv;  srv = asrv::alloc (x, lsdctl_prog_1, NULL);  srv->setcb (wrap (&lsdctl_dispatch, srv));}voidgateway_accept (ref<axprt_stream> x){  // constructor of dhashgateway object calls mkref to maintain a  // reference to itself until the program is gone.  vNew refcounted<dhashgateway> (x, chordnode, dh[0]);}typedef callback<void, ref<axprt_stream> >::ptr acceptercb_t;static voidclient_accept_socket (int lfd, acceptercb_t accepter){  sockaddr_un sun;  bzero (&sun, sizeof (sun));  socklen_t sunlen = sizeof (sun);  int fd = accept (lfd, reinterpret_cast<sockaddr *> (&sun), &sunlen);  if (fd < 0)    fatal ("EOF\n");  ref<axprt_stream> x = axprt_stream::alloc (fd, 1024*1025);  accepter (x);}static voidclient_listen (int fd, acceptercb_t accepter){  if (listen (fd, 5) < 0) {    fatal ("Error from listen: %m\n");    close (fd);  }  else {    fdcb (fd, selread, wrap (client_accept_socket, fd, accepter));  }}static voidcleanup (){  if (p2pstarted)    unlink (p2psocket);  if (ctlstarted)    unlink (ctlsocket);}static voidstartclntd(){  unlink (p2psocket);  int clntfd = unixsocket (p2psocket);  if (clntfd < 0)     fatal << "Error creating client socket (UNIX)" << strerror (errno) << "\n";  client_listen (clntfd, wrap (gateway_accept));  int port = (myport == 0) ? 0 : myport + 1;   int tcp_clntfd = inetsocket (SOCK_STREAM, port);  if (tcp_clntfd < 0)    fatal << "Error creating client socket (TCP) " << strerror(errno) << "\n";  client_listen (tcp_clntfd, wrap (gateway_accept));  p2pstarted = true;}static voidstartcontroller (){  unlink (ctlsocket);  int ctlfd = unixsocket (ctlsocket);  if (ctlfd < 0)    fatal << "Error creating control socket (UNIX)" << strerror (errno) << "\n";  client_listen (ctlfd, wrap (control_accept));  ctlstarted = true;}#ifdef PROFILINGvoidtoggle_profiling (){  static int pfstate = 1;

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?