⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 toipflowdumps.cc

📁 COPE the first practical network coding scheme which is developped on click
💻 CC
📖 第 1 页 / 共 2 页
字号:
	tcp_seq_t s = ntohl(tcph->th_seq);	if (!_have_first_seq[direction]) {	    _first_seq[direction] = s;	    _have_first_seq[direction] = true;	}	tcp_seq_t a = ntohl(tcph->th_ack);	if (!(tcph->th_flags & TH_ACK))	    a = _first_seq[!direction];	else if (!_have_first_seq[!direction]) {	    _first_seq[!direction] = a;	    _have_first_seq[!direction] = true;	}		_pkt[_npkt].th_seq = s - _first_seq[direction];	_pkt[_npkt].th_ack = a - _first_seq[!direction];	_pkt[_npkt].th_flags = tcph->th_flags;	_pkt[_npkt].payload_len = ntohs(iph->ip_len) - (iph->ip_hl << 2) - (tcph->th_off << 2); // XXX check for correctness?	if (_tcp_opt	    && tcph->th_off > (sizeof(click_tcp) >> 2)	    && (tcph->th_off != 8		|| *(reinterpret_cast<const uint32_t *>(tcph + 1)) != htonl(0x0101080A)		|| (_tcp_opt & IPSummaryDump::DO_TCPOPT_TIMESTAMP)))	    store_opt(tcph, direction);	if (_tcp_windows)	    _tcp_windows[_npkt] = tcph->th_win;	    } else	_pkt[_npkt].payload_len = ntohs(iph->ip_len) - sizeof(click_udp);        _npkt++;    if (_packet_count < 0xFFFFFFFFU)	_packet_count++;        return 0;}intToIPFlowDumps::Flow::add_note(const String &s, ErrorHandler *errh){    if (_nnote >= NNOTE && output(errh) < 0)	return -1;    _note[_nnote].before_pkt = _npkt;    _note[_nnote].pos = _note_text.length();    _note_text << s;        _nnote++;    _note_count++;    return 0;}ToIPFlowDumps::ToIPFlowDumps()    : Element(1, 0), _nnoagg(0), _nagg(0), _agg_notifier(0), _task(this),      _gc_timer(gc_hook, this), _compress_child(-1){    for (int i = 0; i < NFLOWMAP; i++)	_flowmap[i] = 0;}ToIPFlowDumps::~ToIPFlowDumps(){}StringToIPFlowDumps::output_pattern() const{    return (_gzip ? _filename_pattern + ".gz" : _filename_pattern);}voidToIPFlowDumps::notify_noutputs(int n){    set_noutputs(n < 1 ? 0 : 1);}intToIPFlowDumps::configure(Vector<String> &conf, ErrorHandler *errh){    Element *e = 0;    bool absolute_time = false, absolute_seq = false, binary = false, all_tcp_opt = false, tcp_opt = false, tcp_window = false, ip_id = false, gzip = false;    _mincount = 0;        if (cp_va_parse(conf, this, errh,		    cpOptional,		    cpFilename, "output filename pattern", &_filename_pattern,		    cpKeywords,		    "OUTPUT_PATTERN", cpFilename, "output filename pattern", &_filename_pattern,		    "NOTIFIER", cpElement, "aggregate deletion notifier", &e,		    "ABSOLUTE_TIME", cpBool, "print absolute timestamps?", &absolute_time,		    "ABSOLUTE_SEQ", cpBool, "print absolute sequence numbers?", &absolute_seq,		    "BINARY", cpBool, "output binary records?", &binary,		    "ALL_TCP_OPT", cpBool, "output all TCP options?", &all_tcp_opt,		    "TCP_OPT", cpBool, "output TCP options?", &tcp_opt,		    "TCP_WINDOW", cpBool, "output TCP windows?", &tcp_window,		    "GZIP", cpBool, "gzip output files?", &gzip,		    "IP_ID", cpBool, "output IP IDs?", &ip_id,		    "MINCOUNT", cpUnsigned, "output flows with at least this many packets", &_mincount,		    cpEnd) < 0)	return -1;    if (!_filename_pattern)	_filename_pattern = "-";    if (find(_filename_pattern, '%') == _filename_pattern.end())	errh->warning("OUTPUT_PATTERN has no %% escapes, so output files will get overwritten");        if (e && !(_agg_notifier = (AggregateNotifier *)e->cast("AggregateNotifier")))	return errh->error("%s is not an AggregateNotifier", e->id().cc());    _absolute_time = absolute_time;    _absolute_seq = absolute_seq;    _binary = binary;    if (all_tcp_opt)	_tcp_opt = IPSummaryDump::DO_TCPOPT_ALL_NOPAD;    else if (tcp_opt)	_tcp_opt = IPSummaryDump::DO_TCPOPT_MSS | IPSummaryDump::DO_TCPOPT_WSCALE | IPSummaryDump::DO_TCPOPT_SACK;    else	_tcp_opt = 0;    _tcp_window = tcp_window;    _ip_id = ip_id;    _gzip = gzip;    return 0;}extern "C" char **environ;intToIPFlowDumps::add_compressable(const String &filename, ErrorHandler *errh){    bool nowait = filename.length();    static int arg_space = -1;    // append current filename    if (filename)	_compressables.push_back(filename);    if (_compressables.size() < 10 && nowait)	return 0;    // wait for current compression child    if (_compress_child >= 0) {	int status;	int retval = waitpid(_compress_child, &status, (nowait ? WNOHANG : 0));	if (retval == 0)	    return 0;	else if (retval < 0)	    return errh->lerror(declaration(), "compressor: waitpid: %s;\ncancelling compression", strerror(errno));	else {	    _compress_child = -1;	    if (WIFSIGNALED(status))		return errh->lerror(declaration(), "compressor exited with signal %d;\ncancelling compression", WTERMSIG(status));	    else if (!WIFEXITED(status) || WEXITSTATUS(status))		return errh->lerror(declaration(), "compressor did not exit normally;\ncancelling compression");	}    }    if (_compressables.size() == 0)	return 0;    // calculate maximum argument list size    if (arg_space < 0) {#ifdef _SC_ARG_MAX      arg_space = sysconf(_SC_ARG_MAX);#elif defined(ARG_MAX)      arg_space = ARG_MAX;#else      arg_space = 1024;#endif      for (char **eptr = environ; *eptr; eptr++)	arg_space -= strlen(*eptr) + 1;      arg_space = (arg_space < 64 ? 1024 : arg_space - 32);    }    // fork child    Vector<const char *> args;    args.push_back("gzip");    args.push_back("-f");    int n = 0, my_arg_space = arg_space;    for (int i = _compressables.size() - 1; i >= 0; i--, n++) {	if (_compressables[i][0] == '-') // beware initial dashes	    _compressables[i] = "./" + _compressables[i];	my_arg_space -= _compressables[i].length() + 1; // not too long a line	if (my_arg_space < 0)	    break;	args.push_back(_compressables[i].cc());    }    args.push_back((const char *) 0);    if ((_compress_child = fork()) == 0) {	if (execvp("gzip", (char * const *) &args[0]) < 0) {	    errh->lerror(declaration(), "gzip failed: %s", strerror(errno));	    abort();	}    } else if (_compress_child < 0)	errh->lerror(declaration(), "fork failed: %s;\ncancelling compression", strerror(errno));    // remove old compressables    _compressables.resize(_compressables.size() - n);    // done    if (n == 0)	return errh->lerror(declaration(), "compressor failed: argument list too long; cancelling compression");    else	return 0;}voidToIPFlowDumps::end_flow(Flow *f, ErrorHandler *errh){    if (f->npackets() >= _mincount) {	f->output(errh);	if (_gzip && f->filename() != "-")	    if (add_compressable(f->filename(), errh) < 0)		_gzip = false;    } else	f->unlink(errh);    delete f;    _nflows--;}voidToIPFlowDumps::cleanup(CleanupStage){    ErrorHandler *errh = ErrorHandler::default_handler();    for (int i = 0; i < NFLOWMAP; i++)	while (Flow *f = _flowmap[i]) {	    _flowmap[i] = f->next();	    end_flow(f, errh);	}    if (_nnoagg > 0 && _nagg == 0)	errh->lwarning(declaration(), "saw no packets with aggregate annotations");    while ((_compress_child >= 0 || _compressables.size())	   && add_compressable("", errh) >= 0)	/* nada */;}intToIPFlowDumps::initialize(ErrorHandler *errh){    if (input_is_pull(0) && noutputs() == 0) {	ScheduleInfo::join_scheduler(this, &_task, errh);	_signal = Notifier::upstream_empty_signal(this, 0, &_task);    }    if (_agg_notifier)	_agg_notifier->add_listener(this);    _gc_timer.initialize(this);    return 0;}StringToIPFlowDumps::expand_filename(const Packet *pkt, ErrorHandler *errh) const{    const char *data = _filename_pattern.data();    int len = _filename_pattern.length();    StringAccum sa;        for (int p = 0; p < len; p++)	if (data[p] == '%') {	    p++;	    bool zero_pad = false;	    int field_width = -1;	    int precision = -1;	    if (p < len && data[p] == '0')		zero_pad = true, p++;	    if (p < len && isdigit(data[p])) {		field_width = data[p] - '0';		for (p++; p < len && isdigit(data[p]); p++)		    field_width = (field_width * 10) + data[p] - '0';	    }	    if (p < len && data[p] == '.') {		precision = 0;		for (p++; p < len && isdigit(data[p]); p++)		    precision = (precision * 10) + data[p] - '0';	    }	    	    StringAccum subsa;	    if (p >= len)		errh->error("bad filename pattern");	    else if (data[p] == 'n' || data[p] == 'x' || data[p] == 'X') {		char format[3] = "%d";		if (data[p] != 'n')		    format[1] = data[p];		uint32_t value = AGGREGATE_ANNO(pkt);		if (precision >= 0 && precision <= 3)		    value = (value >> ((3 - precision) * 8)) & 255;		else if (precision >= 4 && precision <= 5)		    value = (value >> ((5 - precision) * 16)) & 65535;		subsa.snprintf(20, format, value);	    } else if (data[p] == 's' && (precision < 0 || precision > 3))		subsa << IPAddress(pkt->ip_header()->ip_src);	    else if (data[p] == 's')		subsa << ((ntohl(pkt->ip_header()->ip_src.s_addr) >> ((3 - precision) * 8)) & 255);	    else if (data[p] == 'd' && (precision < 0 || precision > 3))		subsa << IPAddress(pkt->ip_header()->ip_dst);	    else if (data[p] == 'd')		subsa << ((ntohl(pkt->ip_header()->ip_dst.s_addr) >> ((3 - precision) * 8)) & 255);	    else if (data[p] == 'S')		subsa << ntohs(pkt->tcp_header()->th_sport);	    else if (data[p] == 'D')		subsa << ntohs(pkt->tcp_header()->th_dport);	    else if (data[p] == 'p')		subsa << (pkt->ip_header()->ip_p == IP_PROTO_TCP ? 'T' : 'U');	    else if (data[p] == '%')		subsa << '%';	    else		errh->error("bad filename pattern `%%%c'", data[p]);	    	    if (field_width >= 0 && subsa.length() < field_width)		for (int l = field_width - subsa.length(); l > 0; l--)		    sa << (zero_pad ? '0' : '_');	    sa << subsa;	} else	    sa << _filename_pattern[p];    return sa.take_string();}ToIPFlowDumps::Flow *ToIPFlowDumps::find_aggregate(uint32_t agg, const Packet *p){    if (agg == 0)	return 0;        int bucket = (agg & (NFLOWMAP - 1));    Flow *prev = 0, *f = _flowmap[bucket];    while (f && f->aggregate() != agg) {	prev = f;	f = f->next();    }    if (f)	/* nada */;    else if (p && (f = new Flow(p, expand_filename(p, ErrorHandler::default_handler()), _absolute_time, _absolute_seq, _binary, _ip_id, _tcp_opt, _tcp_window))) {	prev = f;	_nflows++;    } else	return 0;    if (prev) {	prev->set_next(f->next());	f->set_next(_flowmap[bucket]);	_flowmap[bucket] = f;    }        return f;}inline voidToIPFlowDumps::smaction(Packet *p){    if (Flow *f = find_aggregate(AGGREGATE_ANNO(p), p)) {	_nagg++;	f->add_pkt(p, ErrorHandler::default_handler());    } else	_nnoagg++;}voidToIPFlowDumps::push(int, Packet *p){    smaction(p);    checked_output_push(0, p);}Packet *ToIPFlowDumps::pull(int){    if (Packet *p = input(0).pull()) {	smaction(p);	return p;    } else	return 0;}boolToIPFlowDumps::run_task(){    Packet *p = input(0).pull();    if (p) {	smaction(p);	p->kill();    } else if (!_signal)	return false;    _task.fast_reschedule();    return p != 0;}voidToIPFlowDumps::add_note(uint32_t agg, const String &s, ErrorHandler *errh){    if (Flow *f = find_aggregate(agg, 0))	f->add_note(s, (errh ? errh : ErrorHandler::default_handler()));    else if (errh)	errh->warning("aggregate not found");}voidToIPFlowDumps::aggregate_notify(uint32_t agg, AggregateEvent event, const Packet *){    if (event == DELETE_AGG && find_aggregate(agg, 0)) {	_gc_aggs.push_back(agg);	_gc_aggs.push_back(click_jiffies());	if (!_gc_timer.scheduled())	    _gc_timer.schedule_after_ms(250);    }}voidToIPFlowDumps::gc_hook(Timer *t, void *thunk){    ToIPFlowDumps *td = static_cast<ToIPFlowDumps *>(thunk);    uint32_t limit_jiff = click_jiffies() - (CLICK_HZ / 4);    int i;    for (i = 0; i < td->_gc_aggs.size() && SEQ_LEQ(td->_gc_aggs[i+1], limit_jiff); i += 2)	if (Flow *f = td->find_aggregate(td->_gc_aggs[i], 0)) {	    int bucket = (f->aggregate() & (NFLOWMAP - 1));	    assert(td->_flowmap[bucket] == f);	    td->_flowmap[bucket] = f->next();	    td->end_flow(f, ErrorHandler::default_handler());	}    if (i < td->_gc_aggs.size()) {	td->_gc_aggs.erase(td->_gc_aggs.begin(), td->_gc_aggs.begin() + i);	t->schedule_after_ms(250);    }}enum { H_CLEAR };intToIPFlowDumps::write_handler(const String &, Element *e, void *thunk, ErrorHandler *errh){    ToIPFlowDumps *td = static_cast<ToIPFlowDumps *>(e);    switch ((intptr_t)thunk) {      case H_CLEAR:	for (int i = 0; i < NFLOWMAP; i++)	    while (Flow *f = td->_flowmap[i]) {		td->_flowmap[i] = f->next();		td->end_flow(f, errh);	    }	return 0;      default:	return -1;    }}voidToIPFlowDumps::add_handlers(){    add_write_handler("clear", write_handler, (void *)H_CLEAR);}CLICK_ENDDECLSELEMENT_REQUIRES(userlevel AggregateNotifier IPSummaryDump_TCP)EXPORT_ELEMENT(ToIPFlowDumps)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -