📄 meter.cc
字号:
fdListIter_t iter; fd_set rset, wset; fd_sets_t fds; struct timeval tv; int cnt = 0; int stop = 0; eventVec_t retEvents; Event *e = NULL; try { // fill the fd set FD_ZERO(&fds.rset); FD_ZERO(&fds.wset); for (iter = fdList.begin(); iter != fdList.end(); iter++) { if ((iter->first.mode == FD_RD) || (iter->first.mode == FD_RW)) { FD_SET(iter->first.fd, &fds.rset); } if ((iter->first.mode == FD_WT) || (iter->first.mode == FD_RW)) { FD_SET(iter->first.fd, &fds.wset); } } fds.max = fdList.begin()->first.fd; // register a timer for ctrlcomm (only online capturing) if (enableCtrl) { int t = comm->getTimeout(); if (t > 0) { evnt->addEvent(new CtrlCommTimerEvent(t, t * 1000)); } } // start threads (if threading is configured) expt->run(); proc->run(); clss->run();#ifdef DEBUG log->dlog(ch,"------- meter is running -------");#endif do { if (onlineCap) { // select rset = fds.rset; wset = fds.wset; tv = evnt->getNextEventTime(); //cerr << "timeout: " << tv.tv_sec*1e6+tv.tv_usec << endl; // note: under most unix the minimal sleep time of select is // 10ms which means an event may be executed 10ms after expiration! if ((cnt = select(fds.max+1, &rset, &wset, NULL, &tv)) < 0) { if (errno != EINTR) { throw Error("select error: %s", strerror(errno)); } } // check FD events if (cnt > 0) { if (FD_ISSET( s_sigpipe[0], &rset)) { // handle sig action char c; if (read(s_sigpipe[0], &c, 1) > 0) { switch (c) { case 'S': stop = 1; break; case 'D': cerr << *this; break; case 'A': // next event // check Event Scheduler events e = evnt->getNextEvent(); if (e != NULL) { // FIXME hack if (e->getType() == CTRLCOMM_TIMER) { comm->handleFDEvent(&retEvents, NULL, NULL, &fds); } else { handleEvent(e, &fds); } // reschedule the event evnt->reschedNextEvent(e); e = NULL; } break; default: throw Error("unknown signal"); } //} else { //throw Error("sigpipe read error"); } } else { // check meter components if (!classThread) { clss->handleFDEvent(&retEvents, &rset, &wset, &fds); } if (enableCtrl) { comm->handleFDEvent(&retEvents, &rset, &wset, &fds); } } } } else { // offline mode if (!classThread) { // offline threaded is probably broken... // current time is set eccording to packet timestamp from trace int ret = clss->handleFDEvent(&retEvents, NULL, NULL, NULL); if (ret < 0) { cout << "End of capture file reached" << endl; if (capFiles.size() > 0) { cout << *clss; // close current tap clss->clearTaps(); // open new tap string tt = conf->getValue("TapType", "CLASSIFIER"); string sz = conf->getValue("SnapSize", "CLASSIFIER"); int snapsize = sz.empty() ? DEF_SNAPSIZE : atoi(sz.c_str()); int bsize = conf->getValue("RcvBufSize", "CLASSIFIER").empty() ? 65535 : atoi(conf->getValue("RcvBufSize", "CLASSIFIER").c_str()); string dev = capFiles.front(); NetTap *nett; if (tt == "erf") {#ifdef HAVE_ERF if (conf->isTrue("ERFLegacy", "MAIN")) { nett = new NetTapERF(dev, onlineCap, conf->isFalse("NoPromiscInt", "MAIN"), snapsize, 1, bsize, 1); } else { nett = new NetTapERF(dev, onlineCap, conf->isFalse("NoPromiscInt", "MAIN"), snapsize, 1, bsize); }#else throw Error("No support for ERF");#endif } else { /* pcap by default */#ifdef ENABLE_THREADS nett = new NetTapPcap(dev, onlineCap, conf->isFalse("NoPromiscInt", "MAIN"), snapsize, !conf->isTrue("Thread", "CLASSIFIER"), bsize);#else nett = new NetTapPcap(dev, onlineCap, conf->isFalse("NoPromiscInt", "MAIN"), snapsize, 1, bsize);#endif } clss->registerTap(nett); // remove current file from list capFiles.pop_front(); } else { stop = 1; } } } // execute all due events evnt->getNextEventTime(); char c; while (read(s_sigpipe[0], &c, 1) > 0) { switch (c) { case 'S': stop = 1; break; case 'D': cerr << *this; break; case 'A': // check Event Scheduler events e = evnt->getNextEvent(); if (e != NULL) { handleEvent(e, &fds); // reschedule the event evnt->reschedNextEvent(e); e = NULL; } break; default: throw Error("unknown signal"); } //} else { //throw Error("sigpipe read error"); evnt->getNextEventTime(); } } if (!pprocThread) { proc->handleFDEvent(&retEvents, NULL,NULL, NULL); } if (!expThread) { expt->handleFDEvent(&retEvents, NULL, NULL, NULL); } // schedule events if (retEvents.size() > 0) { for (eventVecIter_t iter = retEvents.begin(); iter != retEvents.end(); iter++) { evnt->addEvent(*iter); } retEvents.clear(); } } while (!stop); // wait for packet processor to handle all remaining packets (if threaded) proc->waitUntilDone(); // do export for all rules that have no export interval (for capture files and on ctrl-c) ruleDB_t rules = rulm->getRules(); for (ruleDBIter_t iter = rules.begin(); iter != rules.end(); iter++) { if ((*iter)->getIntervals()->empty()) { FlowRecord *rec; // retrieve flow data via evaluation module(s) for that rule rec = proc->exportRule((*iter)->getUId(), (*iter)->getRuleName()); // final flow record rec->setFinal(1); // export flow records directly expt->exportFlowRecord(rec, ""); saveDelete(rec); } } // finally do all push export events in the queue while((e = evnt->getNextEvent()) != NULL) { if (e->getType() == PUSH_EXPORT) { // indicate this is the final record (bit of a hack) ((PushExportEvent *) e)->setFinal(1); handleEvent(e, &fds); if (!expThread) { expt->handleFDEvent(&retEvents, NULL, NULL, NULL); } } saveDelete(e); } // wait for exporter to handle all remaining exports (if threaded) expt->waitUntilDone(); log->log(ch,"meter going down on Ctrl-C" );#ifdef DEBUG log->dlog(ch,"------- shutdown -------" );#endif if (!onlineCap) { // print the classifier stats cout << *clss; } } catch (Error &err) { if (log.get()) { // Logger might not be available yet log->elog(ch, err); } // in case an exception happens between get and reschedule event if (e != NULL) { saveDelete(e); } throw err; }}/* ------------------------- dump ------------------------- */void Meter::dump(ostream &os){ /* FIXME to be implemented */ os << "dump" << endl;}/* ------------------------- operator<< ------------------------- */ostream& operator<<(ostream &os, Meter &obj){ obj.dump(os); return os;}/* ------------------------ signal handler ---------------------- */void Meter::sigint_handler(int i){ char c = 'S'; write(s_sigpipe[1], &c,1);}void Meter::sigusr1_handler(int i){ char c = 'D'; write(s_sigpipe[1], &c,1);}void Meter::exit_fct(void){ unlink(NETMATE_LOCK_FILE.c_str());}void Meter::sigalarm_handler(int i){ g_timeout = 1;}/* -------------------- alreadyRunning -------------------- */int Meter::alreadyRunning(){ FILE *file; char cmd[128]; struct stat stats; int status, oldPid; // do we have a lock file ? if (stat(NETMATE_LOCK_FILE.c_str(), &stats ) == 0) { // read process ID from lock file file = fopen(NETMATE_LOCK_FILE.c_str(), "rt" ); if (file == NULL) { throw Error("cannot open old pidfile '%s' for reading: %s\n", NETMATE_LOCK_FILE.c_str(), strerror(errno)); } fscanf(file, "%d\n", &oldPid); fclose(file); // check if process still exists sprintf( cmd, "ps %d > /dev/null", oldPid ); status = system(cmd); // if yes, do not start a new meter if (status == 0) { return 1; } // pid file but no meter process ->meter must have crashed // remove (old) pid file and proceed unlink(NETMATE_LOCK_FILE.c_str()); } // no lock file and no running meter process // write new lock file and continue file = fopen(NETMATE_LOCK_FILE.c_str(), "wt" ); if (file == NULL) { throw Error("cannot open pidfile '%s' for writing: %s\n", NETMATE_LOCK_FILE.c_str(), strerror(errno)); } fprintf(file, "%d\n", getpid()); fclose(file); return 0;}/* ------------------------- main() ------------------------- */// Log functions are not used before the logger is initializedint main(int argc, char *argv[]){ try { // start up the netmate (this blocks until Ctrl-C !) cout << NETMATE_VERSION << endl;#ifdef DEBUG cout << NETMATE_OPTIONS << endl;#endif auto_ptr<Meter> meter(new Meter(argc, argv)); cout << "Up and running." << endl; // going into main loop meter->run(); // shut down the meter cout << "Terminating netmate." << endl; } catch (Error &e) { cerr << "Terminating netmate on error: " << e.getError() << endl; exit(1); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -