📄 toipflowdumps.cc
字号:
tcp_seq_t s = ntohl(tcph->th_seq); if (!_have_first_seq[direction]) { _first_seq[direction] = s; _have_first_seq[direction] = true; } tcp_seq_t a = ntohl(tcph->th_ack); if (!(tcph->th_flags & TH_ACK)) a = _first_seq[!direction]; else if (!_have_first_seq[!direction]) { _first_seq[!direction] = a; _have_first_seq[!direction] = true; } _pkt[_npkt].th_seq = s - _first_seq[direction]; _pkt[_npkt].th_ack = a - _first_seq[!direction]; _pkt[_npkt].th_flags = tcph->th_flags; _pkt[_npkt].payload_len = ntohs(iph->ip_len) - (iph->ip_hl << 2) - (tcph->th_off << 2); // XXX check for correctness? if (_tcp_opt && tcph->th_off > (sizeof(click_tcp) >> 2) && (tcph->th_off != 8 || *(reinterpret_cast<const uint32_t *>(tcph + 1)) != htonl(0x0101080A) || (_tcp_opt & IPSummaryDump::DO_TCPOPT_TIMESTAMP))) store_opt(tcph, direction); if (_tcp_windows) _tcp_windows[_npkt] = tcph->th_win; } else _pkt[_npkt].payload_len = ntohs(iph->ip_len) - sizeof(click_udp); _npkt++; if (_packet_count < 0xFFFFFFFFU) _packet_count++; return 0;}intToIPFlowDumps::Flow::add_note(const String &s, ErrorHandler *errh){ if (_nnote >= NNOTE && output(errh) < 0) return -1; _note[_nnote].before_pkt = _npkt; _note[_nnote].pos = _note_text.length(); _note_text << s; _nnote++; _note_count++; return 0;}ToIPFlowDumps::ToIPFlowDumps() : Element(1, 0), _nnoagg(0), _nagg(0), _agg_notifier(0), _task(this), _gc_timer(gc_hook, this), _compress_child(-1){ for (int i = 0; i < NFLOWMAP; i++) _flowmap[i] = 0;}ToIPFlowDumps::~ToIPFlowDumps(){}StringToIPFlowDumps::output_pattern() const{ return (_gzip ? _filename_pattern + ".gz" : _filename_pattern);}voidToIPFlowDumps::notify_noutputs(int n){ set_noutputs(n < 1 ? 0 : 1);}intToIPFlowDumps::configure(Vector<String> &conf, ErrorHandler *errh){ Element *e = 0; bool absolute_time = false, absolute_seq = false, binary = false, all_tcp_opt = false, tcp_opt = false, tcp_window = false, ip_id = false, gzip = false; _mincount = 0; if (cp_va_parse(conf, this, errh, cpOptional, cpFilename, "output filename pattern", &_filename_pattern, cpKeywords, "OUTPUT_PATTERN", cpFilename, "output filename pattern", &_filename_pattern, "NOTIFIER", cpElement, "aggregate deletion notifier", &e, "ABSOLUTE_TIME", cpBool, "print absolute timestamps?", &absolute_time, "ABSOLUTE_SEQ", cpBool, "print absolute sequence numbers?", &absolute_seq, "BINARY", cpBool, "output binary records?", &binary, "ALL_TCP_OPT", cpBool, "output all TCP options?", &all_tcp_opt, "TCP_OPT", cpBool, "output TCP options?", &tcp_opt, "TCP_WINDOW", cpBool, "output TCP windows?", &tcp_window, "GZIP", cpBool, "gzip output files?", &gzip, "IP_ID", cpBool, "output IP IDs?", &ip_id, "MINCOUNT", cpUnsigned, "output flows with at least this many packets", &_mincount, cpEnd) < 0) return -1; if (!_filename_pattern) _filename_pattern = "-"; if (find(_filename_pattern, '%') == _filename_pattern.end()) errh->warning("OUTPUT_PATTERN has no %% escapes, so output files will get overwritten"); if (e && !(_agg_notifier = (AggregateNotifier *)e->cast("AggregateNotifier"))) return errh->error("%s is not an AggregateNotifier", e->id().cc()); _absolute_time = absolute_time; _absolute_seq = absolute_seq; _binary = binary; if (all_tcp_opt) _tcp_opt = IPSummaryDump::DO_TCPOPT_ALL_NOPAD; else if (tcp_opt) _tcp_opt = IPSummaryDump::DO_TCPOPT_MSS | IPSummaryDump::DO_TCPOPT_WSCALE | IPSummaryDump::DO_TCPOPT_SACK; else _tcp_opt = 0; _tcp_window = tcp_window; _ip_id = ip_id; _gzip = gzip; return 0;}extern "C" char **environ;intToIPFlowDumps::add_compressable(const String &filename, ErrorHandler *errh){ bool nowait = filename.length(); static int arg_space = -1; // append current filename if (filename) _compressables.push_back(filename); if (_compressables.size() < 10 && nowait) return 0; // wait for current compression child if (_compress_child >= 0) { int status; int retval = waitpid(_compress_child, &status, (nowait ? WNOHANG : 0)); if (retval == 0) return 0; else if (retval < 0) return errh->lerror(declaration(), "compressor: waitpid: %s;\ncancelling compression", strerror(errno)); else { _compress_child = -1; if (WIFSIGNALED(status)) return errh->lerror(declaration(), "compressor exited with signal %d;\ncancelling compression", WTERMSIG(status)); else if (!WIFEXITED(status) || WEXITSTATUS(status)) return errh->lerror(declaration(), "compressor did not exit normally;\ncancelling compression"); } } if (_compressables.size() == 0) return 0; // calculate maximum argument list size if (arg_space < 0) {#ifdef _SC_ARG_MAX arg_space = sysconf(_SC_ARG_MAX);#elif defined(ARG_MAX) arg_space = ARG_MAX;#else arg_space = 1024;#endif for (char **eptr = environ; *eptr; eptr++) arg_space -= strlen(*eptr) + 1; arg_space = (arg_space < 64 ? 1024 : arg_space - 32); } // fork child Vector<const char *> args; args.push_back("gzip"); args.push_back("-f"); int n = 0, my_arg_space = arg_space; for (int i = _compressables.size() - 1; i >= 0; i--, n++) { if (_compressables[i][0] == '-') // beware initial dashes _compressables[i] = "./" + _compressables[i]; my_arg_space -= _compressables[i].length() + 1; // not too long a line if (my_arg_space < 0) break; args.push_back(_compressables[i].cc()); } args.push_back((const char *) 0); if ((_compress_child = fork()) == 0) { if (execvp("gzip", (char * const *) &args[0]) < 0) { errh->lerror(declaration(), "gzip failed: %s", strerror(errno)); abort(); } } else if (_compress_child < 0) errh->lerror(declaration(), "fork failed: %s;\ncancelling compression", strerror(errno)); // remove old compressables _compressables.resize(_compressables.size() - n); // done if (n == 0) return errh->lerror(declaration(), "compressor failed: argument list too long; cancelling compression"); else return 0;}voidToIPFlowDumps::end_flow(Flow *f, ErrorHandler *errh){ if (f->npackets() >= _mincount) { f->output(errh); if (_gzip && f->filename() != "-") if (add_compressable(f->filename(), errh) < 0) _gzip = false; } else f->unlink(errh); delete f; _nflows--;}voidToIPFlowDumps::cleanup(CleanupStage){ ErrorHandler *errh = ErrorHandler::default_handler(); for (int i = 0; i < NFLOWMAP; i++) while (Flow *f = _flowmap[i]) { _flowmap[i] = f->next(); end_flow(f, errh); } if (_nnoagg > 0 && _nagg == 0) errh->lwarning(declaration(), "saw no packets with aggregate annotations"); while ((_compress_child >= 0 || _compressables.size()) && add_compressable("", errh) >= 0) /* nada */;}intToIPFlowDumps::initialize(ErrorHandler *errh){ if (input_is_pull(0) && noutputs() == 0) { ScheduleInfo::join_scheduler(this, &_task, errh); _signal = Notifier::upstream_empty_signal(this, 0, &_task); } if (_agg_notifier) _agg_notifier->add_listener(this); _gc_timer.initialize(this); return 0;}StringToIPFlowDumps::expand_filename(const Packet *pkt, ErrorHandler *errh) const{ const char *data = _filename_pattern.data(); int len = _filename_pattern.length(); StringAccum sa; for (int p = 0; p < len; p++) if (data[p] == '%') { p++; bool zero_pad = false; int field_width = -1; int precision = -1; if (p < len && data[p] == '0') zero_pad = true, p++; if (p < len && isdigit(data[p])) { field_width = data[p] - '0'; for (p++; p < len && isdigit(data[p]); p++) field_width = (field_width * 10) + data[p] - '0'; } if (p < len && data[p] == '.') { precision = 0; for (p++; p < len && isdigit(data[p]); p++) precision = (precision * 10) + data[p] - '0'; } StringAccum subsa; if (p >= len) errh->error("bad filename pattern"); else if (data[p] == 'n' || data[p] == 'x' || data[p] == 'X') { char format[3] = "%d"; if (data[p] != 'n') format[1] = data[p]; uint32_t value = AGGREGATE_ANNO(pkt); if (precision >= 0 && precision <= 3) value = (value >> ((3 - precision) * 8)) & 255; else if (precision >= 4 && precision <= 5) value = (value >> ((5 - precision) * 16)) & 65535; subsa.snprintf(20, format, value); } else if (data[p] == 's' && (precision < 0 || precision > 3)) subsa << IPAddress(pkt->ip_header()->ip_src); else if (data[p] == 's') subsa << ((ntohl(pkt->ip_header()->ip_src.s_addr) >> ((3 - precision) * 8)) & 255); else if (data[p] == 'd' && (precision < 0 || precision > 3)) subsa << IPAddress(pkt->ip_header()->ip_dst); else if (data[p] == 'd') subsa << ((ntohl(pkt->ip_header()->ip_dst.s_addr) >> ((3 - precision) * 8)) & 255); else if (data[p] == 'S') subsa << ntohs(pkt->tcp_header()->th_sport); else if (data[p] == 'D') subsa << ntohs(pkt->tcp_header()->th_dport); else if (data[p] == 'p') subsa << (pkt->ip_header()->ip_p == IP_PROTO_TCP ? 'T' : 'U'); else if (data[p] == '%') subsa << '%'; else errh->error("bad filename pattern `%%%c'", data[p]); if (field_width >= 0 && subsa.length() < field_width) for (int l = field_width - subsa.length(); l > 0; l--) sa << (zero_pad ? '0' : '_'); sa << subsa; } else sa << _filename_pattern[p]; return sa.take_string();}ToIPFlowDumps::Flow *ToIPFlowDumps::find_aggregate(uint32_t agg, const Packet *p){ if (agg == 0) return 0; int bucket = (agg & (NFLOWMAP - 1)); Flow *prev = 0, *f = _flowmap[bucket]; while (f && f->aggregate() != agg) { prev = f; f = f->next(); } if (f) /* nada */; else if (p && (f = new Flow(p, expand_filename(p, ErrorHandler::default_handler()), _absolute_time, _absolute_seq, _binary, _ip_id, _tcp_opt, _tcp_window))) { prev = f; _nflows++; } else return 0; if (prev) { prev->set_next(f->next()); f->set_next(_flowmap[bucket]); _flowmap[bucket] = f; } return f;}inline voidToIPFlowDumps::smaction(Packet *p){ if (Flow *f = find_aggregate(AGGREGATE_ANNO(p), p)) { _nagg++; f->add_pkt(p, ErrorHandler::default_handler()); } else _nnoagg++;}voidToIPFlowDumps::push(int, Packet *p){ smaction(p); checked_output_push(0, p);}Packet *ToIPFlowDumps::pull(int){ if (Packet *p = input(0).pull()) { smaction(p); return p; } else return 0;}boolToIPFlowDumps::run_task(){ Packet *p = input(0).pull(); if (p) { smaction(p); p->kill(); } else if (!_signal) return false; _task.fast_reschedule(); return p != 0;}voidToIPFlowDumps::add_note(uint32_t agg, const String &s, ErrorHandler *errh){ if (Flow *f = find_aggregate(agg, 0)) f->add_note(s, (errh ? errh : ErrorHandler::default_handler())); else if (errh) errh->warning("aggregate not found");}voidToIPFlowDumps::aggregate_notify(uint32_t agg, AggregateEvent event, const Packet *){ if (event == DELETE_AGG && find_aggregate(agg, 0)) { _gc_aggs.push_back(agg); _gc_aggs.push_back(click_jiffies()); if (!_gc_timer.scheduled()) _gc_timer.schedule_after_ms(250); }}voidToIPFlowDumps::gc_hook(Timer *t, void *thunk){ ToIPFlowDumps *td = static_cast<ToIPFlowDumps *>(thunk); uint32_t limit_jiff = click_jiffies() - (CLICK_HZ / 4); int i; for (i = 0; i < td->_gc_aggs.size() && SEQ_LEQ(td->_gc_aggs[i+1], limit_jiff); i += 2) if (Flow *f = td->find_aggregate(td->_gc_aggs[i], 0)) { int bucket = (f->aggregate() & (NFLOWMAP - 1)); assert(td->_flowmap[bucket] == f); td->_flowmap[bucket] = f->next(); td->end_flow(f, ErrorHandler::default_handler()); } if (i < td->_gc_aggs.size()) { td->_gc_aggs.erase(td->_gc_aggs.begin(), td->_gc_aggs.begin() + i); t->schedule_after_ms(250); }}enum { H_CLEAR };intToIPFlowDumps::write_handler(const String &, Element *e, void *thunk, ErrorHandler *errh){ ToIPFlowDumps *td = static_cast<ToIPFlowDumps *>(e); switch ((intptr_t)thunk) { case H_CLEAR: for (int i = 0; i < NFLOWMAP; i++) while (Flow *f = td->_flowmap[i]) { td->_flowmap[i] = f->next(); td->end_flow(f, errh); } return 0; default: return -1; }}voidToIPFlowDumps::add_handlers(){ add_write_handler("clear", write_handler, (void *)H_CLEAR);}CLICK_ENDDECLSELEMENT_REQUIRES(userlevel AggregateNotifier IPSummaryDump_TCP)EXPORT_ELEMENT(ToIPFlowDumps)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -