📄 aggregateipflows.cc
字号:
output(0).push(p); // if not a fragment, know we don't have more fragments if (!was_fragment) return; } // assign aggregate annotation to other packets with the same IP ID for (Packet *p = first; p; p = p->next()) if (good_ip_header(p)->ip_id == want_ip_id) SET_AGGREGATE_ANNO(p, finfo->aggregate());}voidAggregateIPFlows::reap_map(Map &table, uint32_t timeout, uint32_t done_timeout){ timeout = _active_sec - timeout; done_timeout = _active_sec - done_timeout; int frag_timeout = _active_sec - _fragment_timeout; // free completed flows and emit fragments for (Map::iterator iter = table.begin(); iter; iter++) { HostPairInfo *hpinfo = &iter.value(); // fragments while (hpinfo->_fragment_head && hpinfo->_fragment_head->timestamp_anno().sec() < frag_timeout) assign_aggregate(table, hpinfo, frag_timeout); // can't delete any flows if there are fragments if (hpinfo->_fragment_head) continue; // completed flows FlowInfo **pprev = &hpinfo->_flows; FlowInfo *f = *pprev; while (f) { // circular comparison if (SEC_OLDER(f->_last_timestamp.sec(), (f->_flow_over == 3 ? done_timeout : timeout))) { notify(f->_aggregate, AggregateListener::DELETE_AGG, 0); *pprev = f->_next; delete_flowinfo(iter.key(), f); } else pprev = &f->_next; f = *pprev; } // XXX never free host pairs }}voidAggregateIPFlows::reap(){ if (_gc_sec) { reap_map(_tcp_map, _tcp_timeout, _tcp_done_timeout); reap_map(_udp_map, _udp_timeout, _udp_timeout); } _gc_sec = _active_sec + _gc_interval;}const click_ip *AggregateIPFlows::icmp_encapsulated_header(const Packet *p){ const click_icmp *icmph = p->icmp_header(); if (icmph && (icmph->icmp_type == ICMP_UNREACH || icmph->icmp_type == ICMP_TIMXCEED || icmph->icmp_type == ICMP_PARAMPROB || icmph->icmp_type == ICMP_SOURCEQUENCH || icmph->icmp_type == ICMP_REDIRECT)) { const click_ip *embedded_iph = reinterpret_cast<const click_ip *>(icmph + 1); unsigned embedded_hlen = embedded_iph->ip_hl << 2; if ((unsigned)p->transport_length() >= sizeof(click_icmp) + embedded_hlen && embedded_hlen >= sizeof(click_ip)) return embedded_iph; } return 0;}intAggregateIPFlows::relevant_timeout(const FlowInfo *f, const Map &m) const{ if (&m == &_udp_map) return _udp_timeout; else if (f->_flow_over == 3) return _tcp_done_timeout; else return _tcp_timeout;}// XXX timing when fragments are merged back in?AggregateIPFlows::FlowInfo *AggregateIPFlows::find_flow_info(Map &m, HostPairInfo *hpinfo, uint32_t ports, bool flipped, const Packet *p){ FlowInfo **pprev = &hpinfo->_flows; for (FlowInfo *finfo = *pprev; finfo; pprev = &finfo->_next, finfo = finfo->_next) if (finfo->_ports == ports) { // if this flow is actually dead (but has not yet been garbage // collected), then kill it for consistent semantics int age = p->timestamp_anno().sec() - finfo->_last_timestamp.sec(); // 4.Feb.2004 - Also start a new flow if the old flow closed off, // and we have a SYN. if ((age > _smallest_timeout && age > relevant_timeout(finfo, m)) || (finfo->_flow_over == 3 && p->ip_header()->ip_p == IP_PROTO_TCP && (p->tcp_header()->th_flags & TH_SYN))) { // old aggregate has died notify(finfo->aggregate(), AggregateListener::DELETE_AGG, 0); const click_ip *iph = good_ip_header(p); HostPair hp(iph->ip_src.s_addr, iph->ip_dst.s_addr); delete_flowinfo(hp, finfo, false); // make a new aggregate finfo->_aggregate = _next; _next++; finfo->_reverse = flipped; finfo->_flow_over = 0; if (stats()) stat_new_flow_hook(p, finfo); notify(finfo->aggregate(), AggregateListener::NEW_AGG, p); } // otherwise, move to the front of the list and return *pprev = finfo->_next; finfo->_next = hpinfo->_flows; hpinfo->_flows = finfo; return finfo; } // make and install new FlowInfo pair FlowInfo *finfo; if (stats()) { finfo = new StatFlowInfo(ports, hpinfo->_flows, _next); stat_new_flow_hook(p, finfo); } else finfo = new FlowInfo(ports, hpinfo->_flows, _next); finfo->_reverse = flipped; hpinfo->_flows = finfo; _next++; notify(finfo->aggregate(), AggregateListener::NEW_AGG, p); return finfo;}intAggregateIPFlows::handle_fragment(Packet *p, int paint, Map &table, HostPairInfo *hpinfo){ if (hpinfo->_fragment_head) hpinfo->_fragment_tail->set_next(p); else hpinfo->_fragment_head = p; hpinfo->_fragment_tail = p; p->set_next(0); SET_AGGREGATE_ANNO(p, 0); SET_PAINT_ANNO(p, paint); if (int p_sec = p->timestamp_anno().sec()) _active_sec = p_sec; // get rid of old fragments int frag_timeout = _active_sec - _fragment_timeout; Packet *head; while ((head = hpinfo->_fragment_head) && (head->timestamp_anno().sec() < frag_timeout || !IP_ISFRAG(good_ip_header(head)))) assign_aggregate(table, hpinfo, frag_timeout); return ACT_NONE;}intAggregateIPFlows::handle_packet(Packet *p){ const click_ip *iph = p->ip_header(); int paint = 0; // assign timestamp if no timestamp given if (!p->timestamp_anno()) { if (!_timestamp_warning) { click_chatter("%{element}: warning: packet received without timestamp", this); _timestamp_warning = true; } p->timestamp_anno().set_now(); } // extract encapsulated ICMP header if appropriate if (iph && iph->ip_p == IP_PROTO_ICMP && IP_FIRSTFRAG(iph) && _handle_icmp_errors) { iph = icmp_encapsulated_header(p); paint = 2; } // return if not a proper TCP/UDP packet if (!iph || (iph->ip_p != IP_PROTO_TCP && iph->ip_p != IP_PROTO_UDP) || (iph->ip_src.s_addr == 0 && iph->ip_dst.s_addr == 0)) return ACT_DROP; const uint8_t *udp_ptr = reinterpret_cast<const uint8_t *>(iph) + (iph->ip_hl << 2); if ((udp_ptr + sizeof(click_udp)) - p->data() > (int) p->length()) // packet not big enough return ACT_DROP; // find relevant FlowInfo Map &m = (iph->ip_p == IP_PROTO_TCP ? _tcp_map : _udp_map); HostPair hosts(iph->ip_src.s_addr, iph->ip_dst.s_addr); if (hosts.a != iph->ip_src.s_addr) paint ^= 1; HostPairInfo *hpinfo = m.findp_force(hosts); // check for fragment if (IP_ISFRAG(iph)) { if (IP_FIRSTFRAG(iph) || _fragments) return handle_fragment(p, paint, m, hpinfo); else return ACT_DROP; } else if (hpinfo->_fragment_head) return handle_fragment(p, paint, m, hpinfo); uint32_t ports = *(reinterpret_cast<const uint32_t *>(udp_ptr)); if (paint & 1) ports = flip_ports(ports); FlowInfo *finfo = find_flow_info(m, hpinfo, ports, paint & 1, p); if (!finfo) { click_chatter("out of memory!"); return ACT_DROP; } // mark packet with aggregate number and paint _active_sec = p->timestamp_anno().sec(); finfo->_last_timestamp = p->timestamp_anno(); SET_AGGREGATE_ANNO(p, finfo->aggregate()); if (finfo->reverse()) paint ^= 1; SET_PAINT_ANNO(p, paint); // packet emit hook packet_emit_hook(p, iph, finfo); return ACT_EMIT;}voidAggregateIPFlows::push(int, Packet *p){ int action = handle_packet(p); // GC if necessary if (_active_sec >= _gc_sec) reap(); if (action == ACT_EMIT) output(0).push(p); else if (action == ACT_DROP) checked_output_push(1, p);}Packet *AggregateIPFlows::pull(int){ Packet *p = input(0).pull(); int action = (p ? handle_packet(p) : ACT_NONE); // GC if necessary if (_active_sec >= _gc_sec) reap(); if (action == ACT_EMIT) return p; else if (action == ACT_DROP) checked_output_push(1, p); return 0;}enum { H_CLEAR };intAggregateIPFlows::write_handler(const String &, Element *e, void *thunk, ErrorHandler *){ AggregateIPFlows *af = static_cast<AggregateIPFlows *>(e); switch ((intptr_t)thunk) { case H_CLEAR: { int active_sec = af->_active_sec, gc_sec = af->_gc_sec; af->_active_sec = af->_gc_sec = 0x7FFFFFFF; af->reap(); af->_active_sec = active_sec, af->_gc_sec = gc_sec; return 0; } default: return -1; }}voidAggregateIPFlows::add_handlers(){ add_write_handler("clear", write_handler, (void *)H_CLEAR);}ELEMENT_REQUIRES(userlevel AggregateNotifier)EXPORT_ELEMENT(AggregateIPFlows)#include <click/bighashmap.cc>CLICK_ENDDECLS
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -