📄 codingmanager.cc
字号:
#include <click/config.h>#include <click/confparse.hh>#include <clicknet/ether.h>#include <clicknet/ip.h>#include <clicknet/udp.h>#include <clicknet/tcp.h>#include <click/ipaddress.hh>#include <click/etheraddress.hh>#include <click/error.hh>#include <click/straccum.hh>#include <click/timestamp.hh>#include <click/string.hh>#include <elements/wifi/sr/srpacket.hh>#include <elements/wifi/sr/path.hh>#include "codingmanager.hh"CLICK_DECLSCodingManager::CodingManager() : SimpleQueue(), _missed_coding_opp(0), _fix_enc_dst(false), _enc_dst(), _enc_ethtype(0), _myip(), _aliases(0), _guess_mgr(0), _listenmgr(0), _sendmgr(0), _enable_coding(false), _dec_prob_thresh(0.8), _ack_pk_len(200), _max_coded_pkts(8){ click_chatter("init coding manager");}CodingManager::~CodingManager(){}/*void*CodingManager::cast(const char *n) { if (strcmp(n, "CodingManager") == 0) return (CodingManager*)this; return SimpleQueue::cast(n);}*//*intCodingManager::enable_coding(const String &arg, Element *e, void *, ErrorHandler *errh){ CodingManager *cm = static_cast<CodingManager *>(e); bool x; if (!cp_bool(arg, &x)) return errh->error("`enable_coding' must be a boolean"); cm->set_enable_coding(x); if (x) click_chatter("%{element}: coding enabled", cm); else click_chatter("%{element}: coding disabled", cm); return 0;}*/voidCodingManager::add_virtual(Packet *p){ if (!_enable_coding) { return; } struct click_ether *eth_p = (click_ether *)p->data(); EtherAddress dst = EtherAddress(eth_p->ether_dhost); // Get IP header information const struct click_ip *ip_h = p->ip_header(); IPAddress src = IPAddress(ip_h->ip_src); uint16_t ipid = ntohs(ip_h->ip_id); StringAccum sa; sa << "Adding packet to virtual queue with src " << src << " ipid " << ipid << " headed to " << dst; click_chatter("%s %s", id().cc(), sa.c_str()); // now decide which queue to go to NbrQueue *vqueue; uint32_t pklen = p->length(); if (pklen < _ack_pk_len) { click_chatter("is an ack packet"); vqueue = _ackpk_map.find(dst); if (vqueue==NULL) { click_chatter("first time to destination creating queue"); vqueue = new NbrQueue; _ackpk_map.insert(dst, vqueue); } } else { click_chatter("is a data packet"); vqueue = _dtpk_map.find(dst); if (vqueue==NULL) { click_chatter("first time data packet"); vqueue = new NbrQueue; _dtpk_map.insert(dst, vqueue); } } PacketState *ps = new PacketState(dst, src, ipid); if (pklen < _ack_pk_len) ps->_data = false; vqueue->push_back(ps); uint32_t now = Timestamp::now().msec1(); click_chatter("%u push queue size %d diversity: %d", now, SimpleQueue::size(), _dtpk_map.size()); return;}// Find the position of a particular packet in the queueint CodingManager::find_position(PacketState *ps){ NbrQueue::iterator x = _queue.begin(); if (!x) return -1; int idx = 0; while (x < _queue.end()) { if ((*(*x))==(*ps)) return idx; idx++; x++; } return -1;}WritablePacket*CodingManager::copy_pkt(Packet *p){ WritablePacket *new_p = Packet::make(p->length()); memcpy((void *)new_p->data(), (void *)p->data(), p->length()); return new_p;} uint32_tCodingManager::get_qsize(EtherAddress dst){ uint32_t size = 0; NbrQueue *queue_t = _dtpk_map.find(dst); if (queue_t != NULL) { size += queue_t->size(); } queue_t = _ackpk_map.find(dst); if (queue_t != NULL) { size += queue_t->size(); } StringAccum sa_t; sa_t << "Queue to neighbor " << dst << " is " << size; click_chatter("%s %s", id().cc(), sa_t.c_str()); return size;} voidCodingManager::log_tcpqsize(){ uint32_t now = Timestamp::now().msec1(); NbrQueueMap::iterator dt_x = _dtpk_map.begin(); NbrQueueMap::iterator ack_x = _ackpk_map.begin(); NbrQueue *queue; while (dt_x != _dtpk_map.end()) { queue = dt_x.value(); StringAccum sa_t; sa_t << now << " Destination " << dt_x.key() << " TCP data packet Queue size " << queue->size(); click_chatter("%s", sa_t.c_str()); sa_t.clear(); dt_x++; } while (ack_x != _ackpk_map.end()) { queue = ack_x.value(); StringAccum sa_t; sa_t << now << " Destination " << ack_x.key() << " TCP ack packet Queue size " << queue->size(); click_chatter("%s", sa_t.c_str()); sa_t.clear(); ack_x++; }} //Codes normal packetsWritablePacket*CodingManager::find_coding_candidates(int port, int *num_packets, int *num_bytes){ uint32_t now = Timestamp::now().msec1(); *num_packets = *num_bytes = 0; if (!_enable_coding) { Packet *p = SimpleQueue::pull(port); if (p) { click_chatter("%u pull queue size %d", now, (SimpleQueue::size())); *num_packets = 1; struct click_ether *eth_h = (struct click_ether *)p->data(); const struct click_ip *ip_h = p->ip_header(); struct srpacket *srh = (struct srpacket *)(((struct click_ether *)p->data()) + 1); *num_bytes = srh->hlen_with_data() - srh->hlen_wo_data(); StringAccum sa_t; sa_t << "sending packet with ethdst = " << EtherAddress(eth_h->ether_dhost) << " pktdst = " << EtherAddress(eth_h->ether_dhost) << " ipsrc = " << IPAddress(ip_h->ip_src) << " ipid = " << ntohs(ip_h->ip_id) << " coded = 0"; click_chatter("%s: %u %s", id().cc(), Timestamp::now().msec1(), sa_t.c_str()); sa_t.clear(); if (p->length() > _ack_pk_len) { sa_t << "tcp data packet pulled to ethdst = " << EtherAddress(eth_h->ether_dhost); click_chatter("%s: %u %s", id().cc(), Timestamp::now().msec1(), sa_t.c_str()); sa_t.clear(); } else { sa_t << "tcp ack packet pulled to ethdst = " << EtherAddress(eth_h->ether_dhost); click_chatter("%s: %u %s", id().cc(), Timestamp::now().msec1(), sa_t.c_str()); sa_t.clear(); } _sendmgr->add_new_packet(p, Timestamp::now().msec1()); WritablePacket *new_p = p->uniqueify(); return new_p; } return NULL; } if (SimpleQueue::size() == 0) return NULL; click_chatter("%u pull queue size %d", now, (SimpleQueue::size()+1)); // Find and delete the first packet from the virtual queue log_tcpqsize(); Packet *p_head = SimpleQueue::deq(); struct click_ether *eth_p_head = (click_ether *)p_head->data(); EtherAddress dst_head = EtherAddress(eth_p_head->ether_dhost); // Get IP header information const struct click_ip *ip_h_head = p_head->ip_header(); IPAddress src_head = IPAddress(ip_h_head->ip_src); uint16_t ipid_head = ntohs(ip_h_head->ip_id); PacketState *ps_head = new PacketState(dst_head, src_head, ipid_head); if (p_head->length() < _ack_pk_len) ps_head->_data = false; SimpleQueue::lifo_enq(p_head); NbrList todeldt; NbrList todelack; NbrQueue *queue_t; if (ps_head->_data) { queue_t = _dtpk_map.find(ps_head->_nbr); queue_t->erase(queue_t->begin()); if (!queue_t->size()) todeldt.push_back(ps_head->_nbr); } else { queue_t = _ackpk_map.find(ps_head->_nbr); queue_t->erase(queue_t->begin()); if (!queue_t->size()) todelack.push_back(ps_head->_nbr); } CodedPktList *coded_pkts = new CodedPktList; coded_pkts->push_back(ps_head); int coded_pkt_cnt = 1; //Now look for other packets to code with NbrQueueMap::iterator dt_x = _dtpk_map.begin(); NbrQueueMap::iterator ack_x = _ackpk_map.begin(); // Start with a random queue int jumps = 0; if (_dtpk_map.size()) { jumps = random() % _dtpk_map.size(); for (int x = 0; x < jumps; x++) { if (dt_x == _dtpk_map.end()) { dt_x = _dtpk_map.begin(); } dt_x++; } } if (_ackpk_map.size()) { jumps = random() % _ackpk_map.size(); for (int x = 0; x < jumps; x++) { if (ack_x == _ackpk_map.end()) { ack_x = _ackpk_map.begin(); } ack_x++; } } //First data packets click_chatter("no of queues %d", _dtpk_map.size()); NbrQueue *queue; NbrQueue::iterator head; for (int x = 0; x < _dtpk_map.size(); x++) { if (dt_x == _dtpk_map.end()) { dt_x = _dtpk_map.begin(); } StringAccum sa_t; sa_t << "Looking in data Nbrqueue for " << (dt_x.key()); click_chatter("coding data packets: %s", sa_t.c_str()); queue = dt_x.value(); head = queue->begin(); if ((head < queue->end()) && (is_coding_worthy(coded_pkts, (*head)))) { click_chatter("found a coding worthy data packet"); coded_pkts->push_back((*head)); coded_pkt_cnt++; head = queue->erase(head); if (head >= queue->end()) { todeldt.push_back(dt_x.key()); } if (coded_pkt_cnt >= _max_coded_pkts) break; } dt_x++; } //Ack packets if (coded_pkt_cnt < _max_coded_pkts) { for (int x = 0; x < _ackpk_map.size(); x++) { if (ack_x == _ackpk_map.end()) { ack_x = _ackpk_map.begin(); } StringAccum sa_t; sa_t << "Looking in ack Nbrqueue for " << (ack_x.key()); click_chatter("coding ack packets: %s", sa_t.c_str()); queue = ack_x.value(); head = queue->begin(); if ((head < queue->end()) && (is_coding_worthy(coded_pkts, (*head)))) { sa_t.clear(); click_chatter("found a coding worthy ack packet"); coded_pkts->push_back((*head)); coded_pkt_cnt++; head = queue->erase(head); if (head >= queue->end()) { todelack.push_back(ack_x.key()); } if (coded_pkt_cnt >= _max_coded_pkts) break; } ack_x++; } } *num_packets = coded_pkt_cnt; if ((_dtpk_map.size() > 1) && (coded_pkt_cnt < 2)) { _missed_coding_opp++; } //delete queues both data and ack which have become empty NbrList::iterator todeldtx = todeldt.begin(); while (todeldtx < todeldt.end()) { _dtpk_map.remove((*todeldtx)); StringAccum sa_t; click_chatter("%s", sa_t.c_str()); todeldtx++; } NbrList::iterator todelackx = todelack.begin(); while (todelackx < todelack.end()) { _ackpk_map.remove((*todelackx)); StringAccum sa_t; click_chatter("%s", sa_t.c_str()); todelackx++; } if (coded_pkts->size() == 1) { Packet *p = SimpleQueue::pull(port);// _listenmgr->add_new_packet(p, now); _sendmgr->add_new_packet(p, now); struct click_ether *eth_h = (struct click_ether *)p->data(); const struct click_ip *ip_h = p->ip_header(); struct srpacket *sr_h = (struct srpacket *)(eth_h + 1); *num_bytes += (sr_h->hlen_with_data() - sr_h->hlen_wo_data()); StringAccum sa_t; sa_t << "sending packet with ethdst = " << EtherAddress(eth_h->ether_dhost) << " pktdst = " << EtherAddress(eth_h->ether_dhost) << " ipsrc = " << IPAddress(ip_h->ip_src) << " ipid = " << ntohs(ip_h->ip_id) << " coded = 0"; click_chatter("%s: %u %s", id().cc(), Timestamp::now().msec1(), sa_t.c_str()); sa_t.clear(); WritablePacket *new_p = p->uniqueify(); return new_p; } //Make the packet WritablePacket *new_p = combine_pkts(coded_pkts, num_bytes); return new_p;}voidCodingManager::del_pkt(PacketState *ps) { NbrQueue::iterator x = _queue.begin(); if (!x) return; while (x < _queue.end()) { if ((*(*x))==(*ps)) { click_chatter("erasing a packet from virtual queue"); x = _queue.erase(x); break; } x++; }} boolCodingManager::is_coding_worthy(CodedPktList *coded_pkts, PacketState *ps){ double dec_prob = 1.0; CodedPktList::iterator x = coded_pkts->begin(); while (x < coded_pkts->end()) { StringAccum sa; sa << "checking coding worthiness of " << (*x)->_src << " " << (*x)->_ipid << " and " << ps->_src << " " << ps->_ipid; click_chatter("%s %s", id().cc(), sa.c_str()); if ((*x)->_nbr == ps->_nbr) return 0;// if (!_guess_mgr->check_presence((*x), ps)) return 0; double dec_prob_t = (*x)->_dec_prob * _guess_mgr->get_prob((*x), ps); StringAccum sa_t; sa_t << "Decoding probability for " << (*x)->_nbr << " is " << dec_prob_t; click_chatter("%s %s", id().cc(), sa_t.c_str()); if (dec_prob_t < _dec_prob_thresh) return 0; sa_t.clear(); dec_prob *= _guess_mgr->get_prob(ps, (*x)); sa_t << "Decoding probability for " << ps->_nbr << " is " << dec_prob; click_chatter("%s %s", id().cc(), sa_t.c_str()); if (dec_prob < _dec_prob_thresh) return 0; x++; } ps->_dec_prob = dec_prob; x = coded_pkts->begin(); while (x < coded_pkts->end()) { (*x)->_dec_prob *= _guess_mgr->get_prob((*x), ps); x++; } return 1;}WritablePacket*CodingManager::combine_pkts(CodedPktList *coded_pkts, int *num_bytes){ *num_bytes = 0; CodedActualPkts coded_actual_pkts; CodedPktList::iterator itr = coded_pkts->begin(); while (itr < coded_pkts->end()) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -