📄 aggpktcounter.cc
字号:
// -*- c-basic-offset: 4 -*-/* * aggpktcounter.{cc,hh} -- element counts packets per packet number and * aggregate annotation * Eddie Kohler * * Copyright (c) 2002 International Computer Science Institute * * Permission is hereby granted, free of charge, to any person obtaining a * copy of this software and associated documentation files (the "Software"), * to deal in the Software without restriction, subject to the conditions * listed in the Click LICENSE file. These conditions include: you must * preserve this copyright notice, and you cannot mention the copyright * holders in advertising related to the Software without their permission. * The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This * notice is a summary of the Click LICENSE file; the license in that file is * legally binding. */#include <click/config.h>#include "aggpktcounter.hh"#include <click/confparse.hh>#include <click/error.hh>#include <click/packet_anno.hh>#include <click/router.hh>#include <click/straccum.hh>#include <unistd.h>#include <sys/stat.h>#include <sys/types.h>#include <sys/wait.h>#include <fcntl.h>CLICK_DECLSAggregatePacketCounter::Flow::Flow(uint32_t aggregate, int columns) : _aggregate(aggregate), _next(0), _counts(new Vector<uint32_t>[columns]){}voidAggregatePacketCounter::Flow::add(uint32_t packetno, int column){ if (_counts[column].size() <= (int) packetno) _counts[column].resize(packetno + 1, 0); _counts[column].at_u(packetno)++;}AggregatePacketCounter::packetctr_tAggregatePacketCounter::Flow::column_count(int column) const{ packetctr_t count = 0; for (const uint32_t *v = _counts[column].begin(); v < _counts[column].end(); v++) count += *v; return count;}voidAggregatePacketCounter::Flow::received(Vector<uint32_t> &v, const AggregatePacketCounter *apc) const{ int n = 0; for (int port = 0; port < apc->noutputs(); port++) if (_counts[port].size() > n) n = _counts[port].size(); for (int packetno = 0; packetno < n; packetno++) for (int port = 0; port < apc->noutputs(); port++) if (packetno < _counts[port].size() && _counts[port].at_u(packetno)) { v.push_back(packetno); break; }}voidAggregatePacketCounter::Flow::undelivered(Vector<uint32_t> &undelivered, const AggregatePacketCounter *apc) const{ assert(apc->noutputs() >= 2); int packetno; int min_n = (_counts[0].size() < _counts[1].size() ? _counts[0].size() : _counts[1].size()); for (packetno = 0; packetno < min_n; packetno++) if (_counts[0].at_u(packetno) > _counts[1].at_u(packetno)) undelivered.push_back(packetno); for (; packetno < _counts[0].size(); packetno++) if (_counts[0].at_u(packetno)) undelivered.push_back(packetno);}AggregatePacketCounter::AggregatePacketCounter(){ for (int i = 0; i < NFLOWMAP; i++) _flowmap[i] = 0;}AggregatePacketCounter::~AggregatePacketCounter(){}voidAggregatePacketCounter::notify_ninputs(int n){ set_ninputs(n < 1 ? 1 : n); set_noutputs(n < 1 ? 1 : n);}intAggregatePacketCounter::configure(Vector<String> &conf, ErrorHandler *errh){ Element *e = 0; _packetno = 0; if (cp_va_parse(conf, this, errh, cpKeywords, "NOTIFIER", cpElement, "aggregate deletion notifier", &e, "PACKETNO", cpInteger, "packet number annotation (-1..1)", &_packetno, cpEnd) < 0) return -1; if (_packetno > 1) return errh->error("'PACKETNO' cannot be bigger than 1"); /*if (e && !(_agg_notifier = (AggregateNotifier *)e->cast("AggregateNotifier"))) return errh->error("%s is not an AggregateNotifier", e->id().cc()); */ return 0;}intAggregatePacketCounter::initialize(ErrorHandler *){ _total_flows = _total_packets = 0; //if (_agg_notifier) //_agg_notifier->add_listener(this); //_gc_timer.initialize(this); return 0;}voidAggregatePacketCounter::end_flow(Flow *f, ErrorHandler *){ /* if (f->npackets() >= _mincount) { f->output(errh); if (_gzip && f->filename() != "-") if (add_compressable(f->filename(), errh) < 0) _gzip = false; } else f->unlink(errh);*/ delete f;}voidAggregatePacketCounter::cleanup(CleanupStage){ ErrorHandler *errh = ErrorHandler::default_handler(); for (int i = 0; i < NFLOWMAP; i++) while (Flow *f = _flowmap[i]) { _flowmap[i] = f->next(); end_flow(f, errh); } if (_total_packets > 0 && _total_flows == 0) errh->lwarning(declaration(), "saw no packets with aggregate annotations");}AggregatePacketCounter::Flow *AggregatePacketCounter::find_flow(uint32_t agg){ if (agg == 0) return 0; int bucket = (agg & (NFLOWMAP - 1)); Flow *prev = 0, *f = _flowmap[bucket]; while (f && f->aggregate() != agg) { prev = f; f = f->next(); } if (f) /* nada */; else if ((f = new Flow(agg, ninputs()))) { prev = f; _total_flows++; } else return 0; if (prev) { prev->set_next(f->next()); f->set_next(_flowmap[bucket]); _flowmap[bucket] = f; } return f;}inline voidAggregatePacketCounter::smaction(int port, Packet *p){ _total_packets++; if (Flow *f = find_flow(AGGREGATE_ANNO(p))) { if (_packetno >= 0) f->add(PACKET_NUMBER_ANNO(p, _packetno), port); else f->add(0, port); }}voidAggregatePacketCounter::push(int port, Packet *p){ smaction(port, p); output(port).push(p);}Packet *AggregatePacketCounter::pull(int port){ if (Packet *p = input(port).pull()) { smaction(port, p); return p; } else return 0;}/*voidAggregatePacketCounter::aggregate_notify(uint32_t agg, AggregateEvent event, const Packet *){ if (event == DELETE_AGG && find_aggregate(agg, 0)) { _gc_aggs.push_back(agg); _gc_aggs.push_back(click_jiffies()); if (!_gc_timer.scheduled()) _gc_timer.schedule_after_ms(250); }}voidAggregatePacketCounter::gc_hook(Timer *t, void *thunk){ AggregatePacketCounter *td = static_cast<AggregatePacketCounter *>(thunk); uint32_t limit_jiff = click_jiffies() - (CLICK_HZ / 4); int i; for (i = 0; i < td->_gc_aggs.size() && SEQ_LEQ(td->_gc_aggs[i+1], limit_jiff); i += 2) if (Flow *f = td->find_aggregate(td->_gc_aggs[i], 0)) { int bucket = (f->aggregate() & (NFLOWMAP - 1)); assert(td->_flowmap[bucket] == f); td->_flowmap[bucket] = f->next(); td->end_flow(f, ErrorHandler::default_handler()); } if (i < td->_gc_aggs.size()) { td->_gc_aggs.erase(td->_gc_aggs.begin(), td->_gc_aggs.begin() + i); t->schedule_after_ms(250); }}*/enum { H_CLEAR, H_COUNT };StringAggregatePacketCounter::read_handler(Element *e, void *thunk){ AggregatePacketCounter *ac = static_cast<AggregatePacketCounter *>(e); switch ((intptr_t)thunk) { case H_COUNT: { packetctr_t count = 0; for (int i = 0; i < NFLOWMAP; i++) for (const Flow *f = ac->_flowmap[i]; f; f = f->next()) for (int col = 0; col < ac->ninputs(); col++) count += f->column_count(col); return String(count) + "\n"; } default: return "<error>\n"; }}intAggregatePacketCounter::write_handler(const String &, Element *e, void *thunk, ErrorHandler *errh){ AggregatePacketCounter *td = static_cast<AggregatePacketCounter *>(e); switch ((intptr_t)thunk) { case H_CLEAR: for (int i = 0; i < NFLOWMAP; i++) while (Flow *f = td->_flowmap[i]) { td->_flowmap[i] = f->next(); td->end_flow(f, errh); } return 0; default: return -1; }}StringAggregatePacketCounter::flow_handler(uint32_t aggregate, FlowFunc func){ Vector<uint32_t> v; if (Flow *f = find_flow(aggregate)) (f->*func)(v, this); StringAccum sa; for (int i = 0; i < v.size(); i++) sa << v[i] << '\n'; return sa.take_string();}intAggregatePacketCounter::thing_read_handler(int, String& s, Element* e, const Handler* h, ErrorHandler* errh){ uint32_t aggregate; if (!s) aggregate = 0; else if (!cp_unsigned(cp_uncomment(s), &aggregate)) return errh->error("argument should be aggregate number"); FlowFunc ff = (h->thunk() ? &Flow::undelivered : &Flow::received); AggregatePacketCounter *apc = static_cast<AggregatePacketCounter *>(e); s = apc->flow_handler(aggregate, ff); return 0;}voidAggregatePacketCounter::add_handlers(){ add_write_handler("clear", write_handler, (void *)H_CLEAR); add_read_handler("count", read_handler, (void *)H_COUNT); set_handler("received", Handler::OP_READ | Handler::READ_PARAM, thing_read_handler, 0); set_handler("undelivered", Handler::OP_READ | Handler::READ_PARAM, thing_read_handler, (void*) 1);}CLICK_ENDDECLSELEMENT_REQUIRES(userlevel)EXPORT_ELEMENT(AggregatePacketCounter)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -