📄 scramblequeue.cc
字号:
} memcpy(eth_h->ether_shost, _myea.data(), 6); eth_h->ether_type = htons(_enc_ethtype); click_chatter("%s: combine() produces packet of length %u", id().cc(), new_p->length()); return new_p;} void ScrambleQueue::push(int port, Packet *p_in){ _pushes++; click_chatter("%u: sq push %u, logged_qsize %u", Timestamp::now().msec1(), _pushes, size()); if (next_i(_tail) != _head) { WritablePacket *p = p_in->uniqueify(); click_ether *eth_p; struct srpacket *srh; eth_p = (click_ether *)p->data(); srh = (struct srpacket *)(eth_p + 1); EtherAddress dst = EtherAddress(eth_p->ether_dhost); if (CodingManager::get_qsize(dst) > _fair_queue_thresh) { StringAccum sa_t; sa_t << "packet dropped: queue to neighbor " << dst << " full"; click_chatter("%s %s", id().cc(), sa_t.c_str()); _drops++; p_in->kill(); return; } uint16_t *x = _nbseq.findp(dst); if (!x) { _nbseq.insert(dst, 0); x = _nbseq.findp(dst); } srh->set_nseq(*x); srh->set_checksum(); *x = srh->nseq() + 1; _sendmgr->do_events(false); // need to generate a unique ID since can't count on ip_id to be unique // do this only for packets originating at me uint32_t prev_hop, next_hop; prev_hop = next_hop = 0; srh->find_surrounding_hops(prev_hop, next_hop, _myip.addr()); if (prev_hop == 0) { // clobber ip id only if I generated the packet struct click_ip *iph = p->ip_header(); click_chatter("clobbering ipid of incoming packet, old=%u, new=%u", ntohs(iph->ip_id), _gseq); iph->ip_id = htons(_gseq++); // need to recompute checksum since it might have changed iph->ip_sum = 0; unsigned hlen = iph->ip_hl << 2; iph->ip_sum = click_in_cksum((const unsigned char *)iph, hlen); srh->set_checksum(); } // debugging, need to get this data before pushing to fullportqueue // as p could be made null due to kill if fullportqueue is full // this packet was not dropped // maintain some statistics here about number of packets of each flow type StringAccum sa; sa << prev_hop << ":" << next_hop; String s = sa.take_string(); if (_npackets.findp(s) == 0) { int x = 1; _npackets.insert(s, x); } else { int x = _npackets[s] + 1; _npackets.insert(s, x); } const struct click_ip *ip_p1 = p->ip_header(); StringAccum sa_src; sa_src << IPAddress(ip_p1->ip_src); StringAccum sa_dst; sa_dst << IPAddress(ip_p1->ip_dst); click_chatter("%s: packet with SR (%u, %u), IP (%s, %s), dataseq %u, nseq %u, ipid %u pushed at %u, qsize %u", id().cc(), prev_hop, next_hop, sa_src.c_str(), sa_dst.c_str(), srh->data_seq(), srh->nseq(), ntohs(ip_p1->ip_id), Timestamp::now().msec1(), size()); // click_chatter("%s: 12345 srcipaddress:%s dstipaddress:%s ipid:%u pointer:%x", id().cc(), sa_src.c_str(), sa_dst.c_str(), ntohs(ip_p1->ip_id), (void*)p_in->data()); SimpleQueue::push(port, p); CodingManager::add_virtual(p); StringAccum sa_t; if (p->length() > 200) { sa_t << "tcp data packet pushed to ethdst = " << EtherAddress(eth_p->ether_dhost); click_chatter("%s: %u %s", id().cc(), Timestamp::now().msec1(), sa_t.c_str()); sa_t.clear(); } else { sa_t << "tcp ack packet pushed to ethdst = " << EtherAddress(eth_p->ether_dhost); click_chatter("%s: %u %s", id().cc(), Timestamp::now().msec1(), sa_t.c_str()); sa_t.clear(); } if (!_empty_note.signal_active()) { click_chatter("waking listeners"); _empty_note.wake_listeners(); } if (size() == capacity()) { _full_note.sleep_listeners(); } } else { // packet was dropped click_chatter("%s: packet dropped: queue full", id().cc()); _drops++; p_in->kill(); } return;}/* ETHTYPES * Pureack * Purerecp * Ack + recp * Ack + data * Ack + Recp + Data * Recp + data */ WritablePacket* ScrambleQueue::add_header(WritablePacket *old_p, Packet *header, int iseth, int htype){ if (iseth) { if (htype == 1) { //adding ack header struct click_ether *eth_h = (struct click_ether*)malloc(sizeof(struct click_ether)); memcpy(eth_h, (struct click_ether*)old_p->data(), sizeof(struct click_ether)); old_p->pull(sizeof(struct click_ether)); struct click_enc_ack *ack_h = (struct click_enc_ack*)header->data(); ack_h->_et_next = eth_h->ether_type; eth_h->ether_type = htons(_ack_ethtype); old_p->push(ack_h->get_hlen(ack_h->nentries())); memcpy((struct click_enc_ack *)old_p->data(), ack_h, ack_h->get_hlen(ack_h->nentries())); old_p->push(sizeof(struct click_ether)); memcpy((struct click_ether *)old_p->data(), eth_h, sizeof(struct click_ether)); free(eth_h); click_chatter("%s added ack header", id().cc()); return old_p; } if (htype == 2) { // adding reception reports struct click_ether *eth_h = (struct click_ether*)malloc(sizeof(struct click_ether)); memcpy(eth_h, (struct click_ether*)old_p->data(), sizeof(struct click_ether)); old_p->pull(sizeof(struct click_ether)); struct click_enc_recp *recp_h = (struct click_enc_recp*)header->data(); recp_h->_et_next = eth_h->ether_type; eth_h->ether_type = htons(_recp_ethtype); old_p->push(recp_h->get_hlen(recp_h->nentries())); memcpy((struct click_enc_recp *)old_p->data(), recp_h, recp_h->get_hlen(recp_h->nentries())); old_p->push(sizeof(struct click_ether)); memcpy((struct click_ether *)old_p->data(), eth_h, sizeof(struct click_ether)); free(eth_h); click_chatter("%s added reception reports", id().cc()); return old_p; } } else { if (old_p==NULL) { old_p = Packet::make(sizeof(struct click_ether)); } struct click_ether *eth_h = (struct click_ether*)old_p->data(); memcpy(eth_h->ether_shost, _myea.data(), 6*sizeof(uint8_t)); const EtherAddress dst = _aliases->get_random_neighbor(_myea); memcpy(eth_h->ether_dhost, dst.data(), 6*sizeof(uint8_t)); eth_h->ether_type = htons(_nothing_ethtype); } return old_p;} WritablePacket*ScrambleQueue::handle_rexmits(int nfound, uint32_t now){ WritablePacket *new_p = 0; if (nfound == 2) { Packet *p1 = _sendmgr->take_rexmit_p1(); Packet *p2 = _sendmgr->take_rexmit_p2(); CodedActualPkts coded_actual_pkts; coded_actual_pkts.push_back(p1); coded_actual_pkts.push_back(p2); // compute statistics _totrexmitpkts += 2; struct srpacket *srh = (struct srpacket *)(((struct click_ether *)p1->data()) + 1); const struct click_ip *iph = p1->ip_header(); click_chatter("%s: %u rexmitting packet with ipsrc = %s, ipdst = %s, ipid = %u, iplen = %u", id().cc(), Timestamp::now().msec1(), IPAddress(iph->ip_src).s().c_str(), IPAddress(iph->ip_dst).s().c_str(), ntohs(iph->ip_id)); _totrexmitbytes += (srh->hlen_with_data() - srh->hlen_wo_data()); _totencodedbytes += (srh->hlen_with_data() - srh->hlen_wo_data()); uint32_t prev_hop, next_hop; prev_hop = next_hop = 0; srh->find_surrounding_hops(prev_hop, next_hop, _myip.addr()); StringAccum sa; sa << prev_hop << ":" << next_hop; String s = sa.take_string(); if (_nrexmits.findp(s) == 0) { int x = 1; _nrexmits.insert(s, x); } else { int x = _nrexmits[s] + 1; _nrexmits.insert(s, x); } if (_nencoded.findp(s) == 0) { int x = 1; _nencoded.insert(s, x); } else { int x = _nencoded[s] + 1; _nencoded.insert(s, x); } srh = (struct srpacket *)(((struct click_ether *)p2->data()) + 1); iph = p2->ip_header(); click_chatter("%s: %u rexmitting packet with ipsrc = %s, ipdst = %s, ipid = %u, iplen = %u", id().cc(), Timestamp::now().msec1(), IPAddress(iph->ip_src).s().c_str(), IPAddress(iph->ip_dst).s().c_str(), ntohs(iph->ip_id)); _totrexmitbytes += (srh->hlen_with_data() - srh->hlen_wo_data()); _totencodedbytes += (srh->hlen_with_data() - srh->hlen_wo_data()); prev_hop = next_hop = 0; srh->find_surrounding_hops(prev_hop, next_hop, _myip.addr()); sa.clear(); sa << prev_hop << ":" << next_hop; s = sa.take_string(); if (_nrexmits.findp(s) == 0) { int x = 1; _nrexmits.insert(s, x); } else { int x = _nrexmits[s] + 1; _nrexmits.insert(s, x); } if (_nencoded.findp(s) == 0) { int x = 1; _nencoded.insert(s, x); } else { int x = _nencoded[s] + 1; _nencoded.insert(s, x); } _totencodedpkts += 2; // do not swap this order // need to add_rexmitted_packet before encoding // otherwise the kill inside encode might delete the packet // basically need to think of Xorer::encode as consuming p1 and p2 // and returning a packet _sendmgr->add_rexmitted_packet(p1, now); _sendmgr->add_rexmitted_packet(p2, now); // print out tcp statistics // sq timestamp srcip srcport dstip dstport pktseq ackseq const struct click_ip *ciph = p1->ip_header(); if (ciph->ip_p == IP_PROTO_TCP) { const struct click_tcp *ctcph = p1->tcp_header(); StringAccum sa; sa << "tcp sq " << now << " " << ciph->ip_src << " " << ntohs(ctcph->th_sport) << " " << ciph->ip_dst << " " << ntohs(ctcph->th_dport) << " " << ntohl(ctcph->th_seq) << " " << ntohl(ctcph->th_ack); click_chatter("%s: %s %#0x", id().cc(), sa.c_str(), ctcph->th_flags); } else { click_chatter("%s: ip proto is %u", id().cc(), ciph->ip_p); } ciph = p2->ip_header(); if (ciph->ip_p == IP_PROTO_TCP) { const struct click_tcp *ctcph = p1->tcp_header(); StringAccum sa; sa << "tcp sq " << now << " " << ciph->ip_src << " " << ntohs(ctcph->th_sport) << " " << ciph->ip_dst << " " << ntohs(ctcph->th_dport) << " " << ntohl(ctcph->th_seq) << " " << ntohl(ctcph->th_ack); click_chatter("%s: %s %#0x", id().cc(), sa.c_str(), ctcph->th_flags); } else { click_chatter("%s: ip proto is %u", id().cc(), ciph->ip_p); } int num_coded_bytes = 0; new_p = CodingManager::combine_actual_pkts(&coded_actual_pkts, &num_coded_bytes, 1); click_chatter("%s: pull() found 2 matching xor packets rexmit_p1 and rexmit_p2, qsize is %u, totencoded pkts, bytes is %u, %u", id().cc(), size(), _totencodedpkts, _totencodedbytes); click_chatter("%s: pull() sending out encoded packet at %u (rexmit)", id().cc(), Timestamp::now().msec1()); } if (nfound == 1) { Packet *p1 = _sendmgr->take_rexmit_p1(); // statistics _totrexmitpkts++; struct srpacket *srh = (struct srpacket *)(((struct click_ether *)p1->data()) + 1); const struct click_ip *iph = p1->ip_header(); click_chatter("%s: %u rexmitting packet with ipsrc = %s, ipdst = %s, ipid = %u, iplen = %u", id().cc(), Timestamp::now().msec1(), IPAddress(iph->ip_src).s().c_str(), IPAddress(iph->ip_dst).s().c_str(), ntohs(iph->ip_id)); _totrexmitbytes += (srh->hlen_with_data() - srh->hlen_wo_data()); uint32_t prev_hop, next_hop; prev_hop = next_hop = 0; srh->find_surrounding_hops(prev_hop, next_hop, _myip.addr()); StringAccum sa; sa << prev_hop << ":" << next_hop; String s = sa.take_string(); if (_nrexmits.findp(s) == 0) { int x = 1; _nrexmits.insert(s, x); } else { int x = _nrexmits[s] + 1; _nrexmits.insert(s, x); } const struct click_ip *ciph = p1->ip_header(); if (ciph->ip_p == IP_PROTO_TCP) { const struct click_tcp *ctcph = p1->tcp_header(); StringAccum sa; sa << "tcp sq " << now << " " << ciph->ip_src << " " << ntohs(ctcph->th_sport) << " " << ciph->ip_dst << " " << ntohs(ctcph->th_dport) << " " << ntohl(ctcph->th_seq) << " " << ntohl(ctcph->th_ack); click_chatter("%s: %s %#0x", id().cc(), sa.c_str(), ctcph->th_flags); } else { click_chatter("%s: ip proto is %u", id().cc(), ciph->ip_p); } // statistics _totunencodedpkts++; _totunencodedbytes += (srh->hlen_with_data() - srh->hlen_wo_data()); if (_nunencoded.findp(s) == 0) { int x = 1; _nunencoded.insert(s, x); } else { int x = _nunencoded[s] + 1; _nunencoded.insert(s, x); } _sendmgr->add_rexmitted_packet(p1, now); new_p = p1->uniqueify(); click_chatter("%s: pull() found no matching xor packet for rexmit_p1, qsize is %u", id().cc(), size()); struct click_ether *ep = (struct click_ether *)new_p->data(); struct srpacket *srhp = (struct srpacket *)(ep+1); const struct click_ip *ipp = new_p->ip_header(); click_chatter("%s: pull() SRPKTHDR, IPLEN, PKTLEN %u, %u, %u", id().cc(), srhp->hlen_wo_data(), ntohs(ipp->ip_len), new_p->length()); click_chatter("%s: pull() sending out unencoded packet at %u (rexmit)", id().cc(), Timestamp::now().msec1()); } return new_p;/* Packet *p2 = find_xor_candidate(p1); if (p2) { // statistics _totencodedpkts += 2; // insert p1 into the nencoded table _totencodedbytes += (srh->hlen_with_data() - srh->hlen_wo_data()); // click_chatter("sr hdr len w and wo data are %u and %u, totencoded is %u", srh->hlen_with_data(), srh->hlen_wo_data(), _totencodedbytes); if (_nencoded.findp(s) == 0) { int x = 1; _nencoded.insert(s, x); } else { int x = _nencoded[s] + 1; _nencoded.insert(s, x); } srh = (struct srpacket *)(((struct click_ether *)p2->data()) + 1); _totencodedbytes += (srh->hlen_with_data() - srh->hlen_wo_data());
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -