📄 transmittedqueue.cc
字号:
/* * transmittedqueue.{cc,hh} -- stores all forwarded packets, * decodes received packets and resends sent packets if necessary * Wenjun Hu * * Copyright (c) 2005 University of Cambridge * Copyright (c) 2005 Massachusetts Institute of Technology * * Permission is hereby granted, free of charge, to any person obtaining a * copy of this software and associated documentation files (the "Software"), * to deal in the Software without restriction, subject to the conditions * listed in the Click LICENSE file. These conditions include: you must * preserve this copyright notice, and you cannot mention the copyright * holders in advertising related to the Software without their permission. * The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This * notice is a summary of the Click LICENSE file; the license in that file is * legally binding. */#include <click/config.h>#include <click/confparse.hh>#include <clicknet/ether.h>#include <clicknet/ip.h>#include <click/error.hh>#include <click/glue.hh>#include <click/straccum.hh>#include <click/timestamp.hh>#include <click/packet_anno.hh>#include <elements/wifi/sr/srpacket.hh>#include <elements/wifi/sr/path.hh>#include "scrambler.hh"#include "transmittedqueue.hh"#include "encodedheader.hh"CLICK_DECLSTransmittedQueue::TransmittedQueue() : Element(2,2), _decoded(0), _added(0), _removed(0), _overheard(0), _undecodable(0), _expired(0), _dupe(0), _unacked(0), _retxed(0), _ip(), _timeout(60000), _granularity(50), _timer(this){}TransmittedQueue::~TransmittedQueue(){}void *TransmittedQueue::cast(const char *n){ if (strcmp(n, "TransmittedQueue") == 0) return (TransmittedQueue *)this; else return 0; }intTransmittedQueue::configure(Vector<String> &conf, ErrorHandler *errh){ int res = cp_va_parse(conf, this, errh, cpKeywords, "IP", cpIPAddress, "IP address", &_ip, cpOptional, cpKeywords, "TIMEOUT", cpUnsigned, "timeout", &_timeout, "GRAN", cpUnsigned, "granularity", &_granularity, cpEnd); if (!_ip) return errh->error("IP not specified"); return res;}intTransmittedQueue::initialize(ErrorHandler *){ // this is a silly implementation, but at least it works for now // a better implementation would use associative vectors // to keep track of times and vectors of packet entries for (int i = 0; i <= _timeout/_granularity * 3; i++) { PerUnitEntries* entries = new PerUnitEntries; _removal_entries.push_back(entries); } _timer.initialize(this); //_timer.schedule_after_ms(_timeout); -- wait till first packet is added to the storage reset(); return 0;}void TransmittedQueue::run_timer(){ int now = Timestamp::now().msec1(); if (now - _start_time >= _timeout) { PerUnitEntries* v = _removal_entries[0]; clear_entries(v); _removal_entries.erase(_removal_entries.begin()); _removal_entries.push_back(v); // to reuse the memory space _start_time += _granularity; //debugging //click_chatter("%s: start time changed to %d", id().cc(), _start_time); } if (_added - _removed > 0) _timer.schedule_after_ms(_granularity);}/*void TransmittedQueue::add_await_ack_entry(uint32_t nexthop, uint32_t next2hop, uint32_t seq){ //mark the corresponding packet uint32_t packetkey = (next2hop)?next2hop:nexthop; Per2hopNbQ **nbq = _queues.findp(packetkey); if (!nbq || !(*nbq)) { click_chatter("%s: missing packet: nexthop %s, dataseq %d", id().cc(), IPAddress(nexthop).s().cc(), seq); return; } WritablePacket *mp = (WritablePacket *)((*nbq)->findp(seq)); if (!mp) { click_chatter("%s: missing packet: nexthop %s, dataseq %d", id().cc(), IPAddress(nexthop).s().cc(), seq); return; } SET_SEND_ERR_ANNO(mp, 1); // add an entry to keep track of acks too NbEntries** nbq = _await_acks.findp(nexthop); if (!nbq || !(*nbq)) { ///***check again NbEntries *new_q = new NbEntries; nbq = &new_q; _await_acks.insert(nexthop, new_q); } PacketEntry *pe = new PacketEntry(next2hop, seq); (*nbq)->insert(seq, pe); }*/void TransmittedQueue::reset(){ for (int i = 0; i < _removal_entries.size(); i++) clear_entries(_removal_entries[i]); _start_time = Timestamp::now().msec1(); _decoded = 0; _added = 0; _removed = 0; _overheard = 0; _undecodable = 0; _expired = 0; _dupe = 0; _unacked = 0; _retxed = 0;}void //TransmittedQueue::clear_entries(Vector<PacketEntry*>* v)TransmittedQueue::clear_entries(PerUnitEntries* v){ for (int i = 0; i < v->size(); i++) { PacketEntry *pe = (*v)[i]; uint32_t pe_nb = pe->_nb; uint16_t pe_seq = pe->_seq; uint32_t pe_src = pe->_src; // clear the packets Per2hopNbQ **nbq = _queues.findp(pe_nb); if (nbq && (*nbq)) { StringAccum newkey; newkey << IPAddress(pe_src) << pe_seq; String keystring = newkey.take_string(); Packet *p = *((*nbq)->findp(keystring)); if (p) { if (!SEND_ERR_ANNO(p)) { // only delete the packet if there's no need to retransmit it // otherwise the ack/retransmit manager will take care of it //(*nbq)->remove(pe_seq); (*nbq)->remove(keystring); p->kill(); _expired++; _removed++; // debugging //click_chatter("%s: packets removed %d, remaining %d", id().cc(), _removed, (*nbq)->size()); if ((*nbq)->size() == 0) { _queues.remove(pe_nb); delete (*nbq); } } else { _unacked++; } } } delete pe; // the index info to the packet is deleted irrespective of the packet } v->clear();}voidTransmittedQueue::push(int port, Packet* p){ if (port == 0) { //debugging //click_chatter("\n%{element}: push port 0 - hope to add packet to the storage\n", this); // adds a copy of the packet to the queue WritablePacket *p_copy = p->clone()->uniqueify(); if (!p_copy) { click_chatter("%{element}: might have run out of memory\n", this); return; }/* moved to decoding part // we need to decrement ttl // because the actual (sent) packet's ttl is decremented before encoding is done // code copied from DecIPTTL::simple_action click_ip *ip_copy = p_copy->ip_header(); assert(ip_copy); if (ip_copy->ip_ttl <= 1) { // dont bother keeping the packet output(0).push(p); } else { ip_copy->ip_ttl--; unsigned long sum = (~ntohs(ip_copy->ip_sum) & 0xFFFF) + 0xFEFF; ip_copy->ip_sum = ~htons(sum + (sum >> 16)); } // we don't need the ethernet header //p_copy->pull(sizeof(struct click_ether)); */ // extract the expected next hop and hop after the next struct click_ether *eth = (struct click_ether *)p_copy->data(); struct srpacket *srh = (struct srpacket *)(eth + 1); Path route_p = srh->get_path(); uint32_t src_ip = route_p[0].addr(); uint32_t next_2hop = 0; uint32_t nexthop = 0; for (int i = 0; i<route_p.size(); i++) { if (route_p[i].addr()==_ip.addr()) { nexthop = route_p[i+1]; // if the packet ends up here, there's at least a next hop along the path next_2hop = ((i+2)<route_p.size())?route_p[i+2].addr():0; break; } } uint32_t key = (next_2hop)?next_2hop:nexthop; //if (key) { //debugging //click_chatter("%{element}: looking for the 2-hop neighbour storage for %d\n", this, next_2hop); Per2hopNbQ** nbq = _queues.findp(key); if (!nbq || !(*nbq)) { ///***check again Per2hopNbQ *new_q = new Per2hopNbQ; nbq = &new_q; _queues.insert(key, new_q); } //uint32_t seq = srh->data_seq(); click_ip *ip_copy = p_copy->ip_header(); uint16_t seq = ntohs(ip_copy->ip_id); StringAccum newkey;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -