📄 lsd.c
字号:
}#endifvoidstats () {#ifdef PROFILING toggle_profiling ();#else warn << "STATS:\n"; chordnode->stats (); for (unsigned int i = 0 ; i < chordnode->num_vnodes (); i++) dh[i]->print_stats (); strbuf x; chordnode->print (x); warnx << x;#endif}voidstop (){ chordnode->stop (); for (unsigned int i = 0 ; i < chordnode->num_vnodes (); i++) dh[i]->stop ();}voidhalt (){ for (unsigned int i = 0; i < chordnode->num_vnodes (); i++) { ptr<vnode> v = chordnode->get_vnode (i); warnx << "Exiting on command " << v->my_location ()->id () << "\n"; } info << "stopping.\n"; chordnode = NULL; exit (0);}voidstart_logs (){ static int tracefd (-1); static int logfd (-1); // XXX please don't call setlogfd or change errfd anywhere else... if (tracefname) { if (tracefd >= 0) close (tracefd); tracefd = open (tracefname, O_WRONLY|O_APPEND|O_CREAT, 0666); if (tracefd < 0) fatal << "Couldn't open trace file " << tracefname << " for append.\n"; modlogger::setlogfd (tracefd); } if (logfname) { if (logfd >= 0) close (logfd); logfd = open (logfname, O_RDWR | O_CREAT, 0666); if (logfd < 0) fatal << "Could not open log file " << logfname << " for appending.\n"; lseek (logfd, 0, SEEK_END); errfd = logfd; }}static voiddo_heartbeat (str fn){ struct stat sb; if (stat(fn, &sb)) { int fd = open (fn, O_WRONLY | O_CREAT, 0644); if (fd < 0) { warn ("heartbeat failed: open %m\n"); } else { close (fd); } } else { if (utimes (fn, NULL) < 0) warn ("heartbeat failed: utimes %m\n"); } delaycb (60, wrap (&do_heartbeat, fn));}static voidusage (){ warnx << "Usage: " << progname << " -j hostname:port -p port\n" "\t[-d <dbprefix>]\n" "\t[-v <number of vnodes>]\n" "\t[-S <sock>]\n" "\t[-C <ctlsock>]\n" "\t[-l <locally bound IP>]\n" "\t[-m [chord|debruijn]]\n" "\t[-b <debruijn logbase>]\n" "\t[-s <server select mode>]\n" "\t[-L <warn/fatal/panic output file name>]\n" "\t[-T <trace file name (aka new log)>]\n" "\t[-O <config file>]\n" ; exit (1);}void finish_start();int ch;int ss_mode = -1;int lbase = 1;// ensure enough room for fingers and successors.int max_loccache = 0;int max_vnodes = 1;str wellknownhost;int wellknownport = 0;int nreplica = 0;bool replicate = true;bool do_daemonize = false;str db_name = "/tmp/db-sock";str my_name;char *cffile = NULL;intmain (int argc, char **argv){#ifdef PROFILING toggle_profiling (); // turn profiling off#endif setprogname (argv[0]); mp_clearscrub (); // sfsconst_init (); random_init (); sigcb(SIGUSR1, wrap (&stats)); sigcb(SIGUSR2, wrap (&stop)); sigcb(SIGHUP, wrap (&start_logs)); sigcb(SIGINT, wrap (&halt)); sigcb(SIGTERM, wrap (&halt)); int nmodes = sizeof (modes)/sizeof(modes[0]); myport = 0; my_name = my_addr (); p2psocket = "/tmp/chord-sock"; ctlsocket = "/tmp/lsdctl-sock"; mode = MODE_CHORD; while ((ch = getopt (argc, argv, "b:C:d:fFH:j:l:L:M:m:n:O:Pp:rS:s:T:tv:D"))!=-1) switch (ch) { case 'b': lbase = atoi (optarg); break; case 'C': ctlsocket = optarg; break; case 'D': do_daemonize = true; break; case 'd': db_name = optarg; break; case 'f': warnx << "-f mode is no longer supported... using -F.\n"; case 'F': mode = MODE_CHORD; break; case 'H': heartbeatfn = optarg; break; case 'j': { char *bs_port = strchr (optarg, ':'); if (!bs_port) usage (); char *sep = bs_port; *bs_port = 0; bs_port++; if (inet_addr (optarg) == INADDR_NONE) { //yep, this blocks struct hostent *h = gethostbyname (optarg); if (!h) { warn << "Invalid address or hostname: " << optarg << "\n"; usage (); } struct in_addr *ptr = (struct in_addr *)h->h_addr; wellknownhost = inet_ntoa (*ptr); } else wellknownhost = optarg; wellknownport = atoi (bs_port); *sep = ':'; // restore optarg for argv printing later. break; } case 'L': logfname = optarg; break; case 'l': if (inet_addr (optarg) == INADDR_NONE) fatal << "must specify bind address in dotted decimal form\n"; my_name = optarg; break; case 'M': max_loccache = atoi (optarg); break; case 'm': { int i; for (i = 0; i < nmodes; i++) { if (strcmp (optarg, modes[i].cmdline) == 0) { mode = modes[i].m; break; } } if (i == nmodes) { strbuf s; for (i = 0; i < nmodes; i++) s << " " << modes[i].cmdline; fatal << "allowed modes are" << s << "\n"; } } break; case 'n': nreplica = atoi (optarg); break; case 'O': cffile = optarg; break; case 'P': fatal << "mode PROX no longer supported\n"; break; case 'p': myport = atoi (optarg); break; case 'r': replicate = false; break; case 'S': p2psocket = optarg; break; case 's': ss_mode = atoi(optarg); break; case 't': modlogger::setmaxprio (modlogger::TRACE); break; case 'T': tracefname = optarg; break; case 'v': vnodes = atoi (optarg); break; default: usage (); break; } if (wellknownport == 0) usage (); if (do_daemonize) { daemonize (); logfname = tracefname = NULL; } start_logs (); if (cffile) { bool ok = Configurator::only ().parse (cffile); assert (ok); } if (mode == MODE_TCPPNSREC) { // HACK set global indicator variable. // Storage is defined in dhash/client.C. extern bool dhash_tcp_transfers; dhash_tcp_transfers = true; Configurator::only ().set_str ("chord.rpc_mode", "tcp"); } Configurator::only ().get_int ("chord.max_vnodes", max_vnodes); if (vnodes > max_vnodes) { warn << "Requested vnodes (" << vnodes << ") more than maximum allowed (" << max_vnodes << ")\n"; usage (); } if (!max_loccache) Configurator::only ().get_int ("locationtable.maxcache", max_loccache); // Override cf file stuff max_loccache = max_loccache * (vnodes + 1); if (ss_mode >= 0) { Configurator::only ().set_int ("dhashcli.order_successors", ((ss_mode & 1) ? 1 : 0)); Configurator::only ().set_int ("chord.greedy_lookup", ((ss_mode & 2) ? 1 : 0)); Configurator::only ().set_int ("chord.find_succlist_shaving", ((ss_mode & 4) ? 1 : 0)); } if (lbase != 1) { if (mode != MODE_DEBRUIJN) warnx << "logbase " << lbase << " only supported in debruijn\n"; Configurator::only ().set_int ("debruijn.logbase", lbase); } if (!replicate) Configurator::only ().set_int ("dhash.start_maintenance", 0); Configurator::only ().dump (); { strbuf x = strbuf ("starting: "); for (int i = 0; i < argc; i++) { x << argv[i] << " "; } x << "\n"; info << x; } assert (mode == modes[mode].m); chordnode = New refcounted<chord> (my_name, myport, modes[mode].producer, vnodes, max_loccache); for (int i = 0; i < vnodes; i++) { ptr<vnode> v = chordnode->get_vnode (i); dh.push_back (dhash::produce_dhash (v, db_name, wrap(&finish_start))); } info << "starting amain.\n"; amain ();}int dhashes_finished = 0;void finish_start (){ dhashes_finished++; info << "DHash " << dhashes_finished << " is ready.\n"; if( dhashes_finished != vnodes ) { return; } chordnode->startchord (); chordnode->join (wellknownhost, wellknownport); time_t now = time (NULL); warn << "lsd starting up at " << ctime ((const time_t *)&now); warn << " running with options: \n"; warn << " IP/port: " << my_name << ":" << myport << "\n"; warn << " vnodes: " << vnodes << "\n"; warn << " lookup_mode: " << mode << "\n"; warn << " ss_mode: " << ss_mode << "\n"; // Initialize for use by LSDCTL_GETLSDPARAMETERS parameters.nvnodes = vnodes; parameters.adbdsock = db_name; Configurator::only ().get_int ("dhash.efrags", parameters.efrags); Configurator::only ().get_int ("dhash.dfrags", parameters.dfrags); Configurator::only ().get_int ("dhash.replica", parameters.nreplica); parameters.addr.hostname = my_name; parameters.addr.port = myport; // chord->get_port (); if (heartbeatfn) delaycb (0, wrap (&do_heartbeat, heartbeatfn)); if (p2psocket) startclntd(); if (ctlsocket) startcontroller ();}// This is needed to instantiate recursive routing classes.#include <recroute.C>template class recroute<fingerroutepns>;template class recroute<fingerroute>;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -