⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 lsd.c

📁 chord 源码 http://pdos.csail.mit.edu/chord/
💻 C
📖 第 1 页 / 共 2 页
字号:
/* * * Copyright (C) 2000 Frans Kaashoek (kaashoek@lcs.mit.edu) *                    Frank Dabek (fdabek@mit.edu) * *  Permission is hereby granted, free of charge, to any person obtaining *  a copy of this software and associated documentation files (the *  "Software"), to deal in the Software without restriction, including *  without limitation the rights to use, copy, modify, merge, publish, *  distribute, sublicense, and/or sell copies of the Software, and to *  permit persons to whom the Software is furnished to do so, subject to *  the following conditions: * *  The above copyright notice and this permission notice shall be *  included in all copies or substantial portions of the Software. * *  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, *  EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF *  MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND *  NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE *  LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION *  OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION *  WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */#include <misc_utils.h>#include <configurator.h>#include <comm.h>#include "chord.h"#include "dhash.h"#include "dhashgateway.h"#include <sys/types.h>#include "lsdctl_prot.h"#include <location.h>#include <locationtable.h>#include <debruijn.h>#include <fingerroute.h>#include <fingerroutepns.h>#include <recroute.h>#include <accordion.h>#include <modlogger.h>#define info  modlogger ("lsd", modlogger::INFO)#define trace modlogger ("lsd", modlogger::TRACE)// #define PROFILING // When a process starts up profiling is not happening.  But by// sending a SIGUSR1, profiling is turned on.  (Another SIGUSR1 turns// it off.)  This allows specific, user-controlled periods of time to// be profiled.  Program must be compiled with -pg for this to work.#ifdef PROFILING extern "C" {  void moncontrol(int);}#endifEXITFN (cleanup);int vnodes = 1;static char *logfname;static char *tracefname;static bool p2pstarted (false);static str p2psocket;static bool ctlstarted (false);static str ctlsocket;static str heartbeatfn;ptr<chord> chordnode;vec<ref<dhash> > dh;int myport; enum routing_mode_t {  MODE_SUCC,  MODE_CHORD,  MODE_DEBRUIJN,  MODE_PNS,  MODE_PNSREC,  MODE_CHORDREC,  MODE_TCPPNSREC,  MODE_ACCORDION,} mode;struct routing_mode_desc {  routing_mode_t m;  char *cmdline;  char *desc;  vnode_producer_t producer;};					       /* List of routing modes.  Please keep this in sync with the enum above. */routing_mode_desc modes[] = {  { MODE_SUCC, "successors", "use only successor lists",    wrap (vnode::produce_vnode) },  { MODE_CHORD, "chord", "use fingers and successors",    wrap (fingerroute::produce_vnode) },  { MODE_DEBRUIJN, "debruijn", "use debruijn routing",    wrap (debruijn::produce_vnode) },  { MODE_PNS, "pns", "use proximity neighbor selection",    wrap (fingerroutepns::produce_vnode) },  { MODE_PNSREC, "pnsrec", "g^2 pns recursive",    wrap (recroute<fingerroutepns>::produce_vnode) },  { MODE_CHORDREC, "chordrec", "recursive routing with plain finger tables",    wrap (recroute<fingerroute>::produce_vnode) },  { MODE_TCPPNSREC, "tcppnsrec", "g^2 pns recursive with data over tcp",    wrap (recroute<fingerroutepns>::produce_vnode) },  { MODE_ACCORDION, "accordion", "Accordion Routing",    wrap (accordion::produce_vnode)},};void stats ();void stop ();void halt ();// =====================================static lsdctl_lsdparameters parameters;voidlsdctl_fillnodeinfo (lsdctl_nodeinfo &ni, ptr<location> l){  ni.n = l->id ();  ni.addr = l->address ();  ni.vnode_num = l->vnode ();  const Coord c = l->coords ();  ni.coords.setsize (c.size () + 1);  for (size_t j = 0; j < c.size (); j++)    ni.coords[j] = (int32_t) c.coords[j];  ni.coords[c.size ()] = (int32_t) c.ht;  ni.a_lat = (u_int32_t) l->distance ();  ni.a_var = (u_int32_t) l->a_var ();  ni.nrpc = l->nrpc ();  ni.pinned = chordnode->locations->pinned (l->id ());  ni.alive = l->alive ();  ni.dead_time = l->dead_time ();}voidlsdctl_dispatch (ptr<asrv> s, svccb *sbp){  if (!sbp) {    // Close the server    s->setcb (NULL);    return;  }  trace << "received lsdctl " << sbp->proc () << "\n";  switch (sbp->proc ()) {  case LSDCTL_NULL:    sbp->reply (NULL);    break;  case LSDCTL_EXIT:    sbp->reply (NULL);    halt ();    break;  case LSDCTL_SETTRACELEVEL:    {      int *lvl = sbp->Xtmpl getarg<int> ();      info << "Setting new maxprio to " << *lvl << "\n";      modlogger::setmaxprio (*lvl); /* XXX should validate this value! */      sbp->reply (NULL);    }    break;  case LSDCTL_SETSTABILIZE:    {      bool *s = sbp->Xtmpl getarg<bool> ();      if (*s)	chordnode->stabilize ();      else	chordnode->stop ();      sbp->reply (s);    }    break;  case LSDCTL_SETREPLICATE:    {      lsdctl_setreplicate_arg *a = sbp->Xtmpl getarg<lsdctl_setreplicate_arg> ();      if (a->enable) {	for (unsigned int i = 0; i < chordnode->num_vnodes (); i++)	  dh[i]->start (a->randomize);      } else {	for (unsigned int i = 0; i < chordnode->num_vnodes (); i++)	  dh[i]->stop ();      }      sbp->replyref (a->enable);    }    break;  case LSDCTL_GETLOCTABLE:    {      // int *v = sbp->template getarg<int> ();      // Ignore v      ptr<lsdctl_nodeinfolist> nl = New refcounted<lsdctl_nodeinfolist> ();      nl->nlist.setsize (chordnode->locations->size ());      ptr<location> l = chordnode->locations->first_loc ();      int i = 0;      while (l != NULL) {	lsdctl_fillnodeinfo (nl->nlist[i], l);	l = chordnode->locations->next_loc (l->id ());	i++;      }      sbp->reply (nl);    }    break;  case LSDCTL_GETRPCSTATS:    {      bool *clear = sbp->Xtmpl getarg<bool> ();            ptr<lsdctl_rpcstatlist> sl = New refcounted<lsdctl_rpcstatlist> ();            sl->stats.setsize (rpc_stats_tab.size ());      rpcstats *s = rpc_stats_tab.first ();      int i = 0;      while (s) {	sl->stats[i].key          = s->key;	sl->stats[i].ncall        = s->ncall;	sl->stats[i].nrexmit      = s->nrexmit;	sl->stats[i].nreply       = s->nreply;	sl->stats[i].call_bytes   = s->call_bytes;	sl->stats[i].rexmit_bytes = s->rexmit_bytes;	sl->stats[i].reply_bytes  = s->reply_bytes;	sl->stats[i].latency_ewma = s->latency_ewma;	s = rpc_stats_tab.next (s);	i++;      }            u_int64_t now = getusec ();      sl->interval = now - rpc_stats_lastclear;      if (*clear) {	s = rpc_stats_tab.first ();	while (s) {	  rpcstats *t = rpc_stats_tab.next (s);	  rpc_stats_tab.remove (s);	  delete s;	  s = t;	}	rpc_stats_tab.clear ();	rpc_stats_lastclear = now;      }	      sbp->reply (sl);    }    break;  case LSDCTL_GETMYIDS:    {      ptr<lsdctl_nodeinfolist> nl = New refcounted<lsdctl_nodeinfolist> ();      size_t nv = chordnode->num_vnodes ();      nl->nlist.setsize (nv);      for (unsigned int i = 0; i < nv; i++) {	ptr<vnode> v = chordnode->get_vnode (i);	lsdctl_fillnodeinfo (nl->nlist[i], v->my_location ());      }      sbp->reply (nl);    }    break;  case LSDCTL_GETDHASHSTATS:    {      lsdctl_getdhashstats_arg *arg = sbp->Xtmpl getarg<lsdctl_getdhashstats_arg> ();      ptr<lsdctl_dhashstats> ds = New refcounted<lsdctl_dhashstats> ();      if (arg->vnode < vnodes) {	vec<dstat> stats = dh[arg->vnode]->stats ();	ds->stats.setsize (stats.size ());	for (unsigned int i = 0; i < stats.size (); i++) {	  ds->stats[i].desc = stats[i].desc;	  ds->stats[i].value = stats[i].value;	}      }      sbp->reply (ds);    }    break;  case LSDCTL_GETLSDPARAMETERS:    sbp->reply (&parameters);    break;  default:    sbp->reject (PROC_UNAVAIL);    break;  }}// =====================================voidcontrol_accept (ref<axprt_stream> x){  ptr<asrv> srv;  srv = asrv::alloc (x, lsdctl_prog_1, NULL);  srv->setcb (wrap (&lsdctl_dispatch, srv));}voidgateway_accept (ref<axprt_stream> x){  // constructor of dhashgateway object calls mkref to maintain a  // reference to itself until the program is gone.  vNew refcounted<dhashgateway> (x, chordnode, dh[0]);}typedef callback<void, ref<axprt_stream> >::ptr acceptercb_t;static voidclient_accept_socket (int lfd, acceptercb_t accepter){  sockaddr_un sun;  bzero (&sun, sizeof (sun));  socklen_t sunlen = sizeof (sun);  int fd = accept (lfd, reinterpret_cast<sockaddr *> (&sun), &sunlen);  if (fd < 0)    fatal ("EOF\n");  ref<axprt_stream> x = axprt_stream::alloc (fd, 1024*1025);  accepter (x);}static voidclient_listen (int fd, acceptercb_t accepter){  if (listen (fd, 5) < 0) {    fatal ("Error from listen: %m\n");    close (fd);  }  else {    fdcb (fd, selread, wrap (client_accept_socket, fd, accepter));  }}static voidcleanup (){  if (p2pstarted)    unlink (p2psocket);  if (ctlstarted)    unlink (ctlsocket);}static voidstartclntd(){  unlink (p2psocket);  int clntfd = unixsocket (p2psocket);  if (clntfd < 0)     fatal << "Error creating client socket (UNIX)" << strerror (errno) << "\n";  client_listen (clntfd, wrap (gateway_accept));  int port = (myport == 0) ? 0 : myport + 1;   int tcp_clntfd = inetsocket (SOCK_STREAM, port);  if (tcp_clntfd < 0)    fatal << "Error creating client socket (TCP) " << strerror(errno) << "\n";  client_listen (tcp_clntfd, wrap (gateway_accept));  p2pstarted = true;}static voidstartcontroller (){  unlink (ctlsocket);  int ctlfd = unixsocket (ctlsocket);  if (ctlfd < 0)    fatal << "Error creating control socket (UNIX)" << strerror (errno) << "\n";  client_listen (ctlfd, wrap (control_accept));  ctlstarted = true;}#ifdef PROFILINGvoidtoggle_profiling (){  static int pfstate = 1;  if (pfstate)    warn << "Turning profiling off\n";  else    warn << "Turning profiling on\n";  pfstate = !pfstate;  moncontrol (pfstate ? 1 : 0);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -