📄 scramblequeue.cc
字号:
// click_chatter("sr hdr len w and wo data are %u and %u, totencoded is %u", srh->hlen_with_data(), srh->hlen_wo_data(), _totencodedbytes); prev_hop = next_hop = 0; srh->find_surrounding_hops(prev_hop, next_hop, _myip.addr()); sa.clear(); sa << prev_hop << ":" << next_hop; s = sa.take_string(); if (_nencoded.findp(s) == 0) { int x = 1; _nencoded.insert(s, x); } else { int x = _nencoded[s] + 1; _nencoded.insert(s, x); } // subtract 1 from _npackets table _npackets.insert(s, _npackets[s]-1); if (size() == capacity() - 1) { _full_note.wake_listeners(); } _sendmgr->add_rexmitted_packet(p1, now); _sendmgr->add_new_packet(p2, now); ciph = p2->ip_header(); if (ciph->ip_p == IP_PROTO_TCP) { const struct click_tcp *ctcph = p1->tcp_header(); StringAccum sa; sa << "tcp sq " << now << " " << ciph->ip_src << " " << ntohs(ctcph->th_sport) << " " << ciph->ip_dst << " " << ntohs(ctcph->th_dport) << " " << ntohl(ctcph->th_seq) << " " << ntohl(ctcph->th_ack); click_chatter("%s: %s %#0x", id().cc(), sa.c_str(), ctcph->th_flags); } else { click_chatter("%s: ip proto is %u", id().cc(), ciph->ip_p); } new_p = combine(p1, p2); click_chatter("%s: pull() found matching xor packet for rexmit_p1, qsize is %u, tot encoded pkts, bytes is %u, %u", id().cc(), size(), _totencodedpkts, _totencodedbytes); } else { // statistics _totunencodedpkts++; _totunencodedbytes += (srh->hlen_with_data() - srh->hlen_wo_data()); if (_nunencoded.findp(s) == 0) { int x = 1; _nunencoded.insert(s, x); } else { int x = _nunencoded[s] + 1; _nunencoded.insert(s, x); } _sendmgr->add_rexmitted_packet(p1, now); new_p = p1->uniqueify(); click_chatter("%s: pull() found no matching xor packet for rexmit_p1, qsize is %u", id().cc(), size()); struct click_ether *ep = (struct click_ether *)new_p->data(); struct srpacket *srhp = (struct srpacket *)(ep+1); const struct click_ip *ipp = new_p->ip_header(); click_chatter("%s: pull() SRPKTHDR, IPLEN, PKTLEN %u, %u, %u", id().cc(), srhp->hlen_wo_data(), ntohs(ipp->ip_len), new_p->length()); } }*/} WritablePacket*ScrambleQueue::handle_new_xmits(int port, uint32_t now){ WritablePacket *new_p = 0; Packet *p1 = SimpleQueue::pull(port); if (p1) { // statistics struct srpacket *srh = (struct srpacket *)(((struct click_ether *)p1->data()) + 1); uint32_t prev_hop, next_hop; prev_hop = next_hop = 0; srh->find_surrounding_hops(prev_hop, next_hop, _myip.addr()); StringAccum sa; sa << prev_hop << ":" << next_hop; String s = sa.take_string(); _npackets.insert(s, _npackets[s]-1); click_chatter("%s: pull() found packet p1", id().cc()); if (size() == capacity() - 1) { _full_note.wake_listeners(); } const struct click_ip *ciph = p1->ip_header(); if (ciph->ip_p == IP_PROTO_TCP) { const struct click_tcp *ctcph = p1->tcp_header(); StringAccum sa; sa << "tcp sq " << now << " " << ciph->ip_src << " " << ntohs(ctcph->th_sport) << " " << ciph->ip_dst << " " << ntohs(ctcph->th_dport) << " " << ntohl(ctcph->th_seq) << " " << ntohl(ctcph->th_ack); click_chatter("%s: %s %#0x", id().cc(), sa.c_str(), ctcph->th_flags); } else { click_chatter("%s: ip proto is %u", id().cc(), ciph->ip_p); } Packet *p2 = find_xor_candidate(p1); if (p2) { // statistics _totencodedpkts += 2; // insert p1 into _nencoded _totencodedbytes += (srh->hlen_with_data() - srh->hlen_wo_data()); if (_nencoded.findp(s) == 0) { int x = 1; _nencoded.insert(s, x); } else { int x = _nencoded[s] + 1; _nencoded.insert(s, x); } srh = (struct srpacket *)(((struct click_ether *)p2->data()) + 1); _totencodedbytes += (srh->hlen_with_data() - srh->hlen_wo_data()); prev_hop = next_hop = 0; srh->find_surrounding_hops(prev_hop, next_hop, _myip.addr()); sa.clear(); sa << prev_hop << ":" << next_hop; s = sa.take_string(); _npackets.insert(s, _npackets[s]-1); if (_nencoded.findp(s) == 0) { int x = 1; _nencoded.insert(s, x); } else { int x = _nencoded[s] + 1; _nencoded.insert(s, x); } click_chatter("%s: pull() found matching xor packet for p1, qsize is %u, totencoded pkts, bytes is %u, %u", id().cc(), size(), _totencodedpkts, _totencodedbytes); _sendmgr->add_new_packet(p1, now); _sendmgr->add_new_packet(p2, now); ciph = p2->ip_header(); if (ciph->ip_p == IP_PROTO_TCP) { const struct click_tcp *ctcph = p1->tcp_header(); StringAccum sa; sa << "tcp sq " << now << " " << ciph->ip_src << " " << ntohs(ctcph->th_sport) << " " << ciph->ip_dst << " " << ntohs(ctcph->th_dport) << " " << ntohl(ctcph->th_seq) << " " << ntohl(ctcph->th_ack); click_chatter("%s: %s %#0x", id().cc(), sa.c_str(), ctcph->th_flags); } else { click_chatter("%s: ip proto is %u", id().cc(), ciph->ip_p); } new_p = combine(p1, p2); } else { // statistics _totunencodedpkts++; _totunencodedbytes += (srh->hlen_with_data() - srh->hlen_wo_data()); if (_nunencoded.findp(s) == 0) { int x = 1; _nunencoded.insert(s, x); } else { int x = _nunencoded[s] + 1; _nunencoded.insert(s, x); } click_chatter("%s: pull() found no matching xor packet for p1, qsize is %u", id().cc(), size()); _sendmgr->add_new_packet(p1, now); new_p = p1->uniqueify(); struct click_ether *ep = (struct click_ether *)new_p->data(); struct srpacket *srhp = (struct srpacket *)(ep+1); const struct click_ip *ipp = new_p->ip_header(); click_chatter("%s: pull() SRPKTHDR, IPLEN, PKTLEN %u, %u, %u", id().cc(), srhp->hlen_wo_data(), ntohs(ipp->ip_len), new_p->length()); } } else { // _useless_pulls++; if (++_sleepiness >= SLEEPINESS_TRIGGER) { // update timer only needs to be called if you are going to sleep // otherwise, regular pulls will take care of sending whatever // rexmits/acks you need to send update_timer(); _empty_note.sleep_listeners(); } } return new_p;} Packet *ScrambleQueue::pull(int port){ _pulls++; // first look to see if we have any acks pending // if so, if they are more than some number, we just have a packet // containing acks uint32_t now = Timestamp::now().msec1(); click_chatter("%s: %u sq pull %u, useless pulls %u, sleepiness %u, qsize %u", id().cc(), now, _pulls, _useless_pulls, _sleepiness, size()); // HANDLING ACK PACKETS WritablePacket *ack_p = _recvmgr->get_acks(empty(), now); if (ack_p) { _sleepiness = 0; click_chatter("%s: pull() get_acks %#0x", id().cc(), ack_p); if (ack_p->length() > _max_ack_size) { click_chatter("%s: sending pure ack packet", id().cc()); WritablePacket *new_p = add_header(NULL, NULL, 0, 0); new_p = add_header(new_p, ack_p, 1, 1); _sleepiness = 0; ack_p->kill(); return new_p; } } click_chatter("%s here2", id().cc()); // HANDLING RECEPTION REPORTS WritablePacket *recp_p = _listenmgr->get_recps(empty(), now); if (recp_p) { _sleepiness = 0; click_chatter("%s: pull() got reception reports", id().cc()); if (recp_p->length() > _max_recp_size) { WritablePacket *new_p = add_header(NULL, NULL, 0, 0); new_p = add_header(new_p, recp_p, 1, 2); recp_p->kill(); if (ack_p) { new_p = add_header(new_p, ack_p, 1, 1); click_chatter("%s: sending packet with recp and ack header too", id().cc()); ack_p->kill(); } else { click_chatter("%s: sending packet with only recp header", id().cc()); } return new_p; } } int nfound = _sendmgr->do_events(true); if (nfound) { click_chatter("%s: sending rexmitted packet", id().cc()); _sleepiness = 0; WritablePacket *new_p = handle_rexmits(nfound, now); if (recp_p) { new_p = add_header(new_p, recp_p, 1, 2); recp_p->kill(); } if (ack_p) { new_p = add_header(new_p, ack_p, 1, 1); ack_p->kill(); } return new_p; } click_chatter("%s: trying to find a new packet to send", id().cc()); // AT THIS STAGE WE HAVE TO SEND THE PACKETS IN THE QUEUE int num_coded_pkts = 0; int num_coded_bytes = 0; WritablePacket *new_p = find_coding_candidates(port, &num_coded_pkts, &num_coded_bytes); if (num_coded_pkts > 1) { _totencodedpkts += num_coded_pkts; _totencodedbytes += num_coded_bytes; click_chatter("%s: pull() sending out encoded packet at %u (norexmit)", id().cc(), Timestamp::now().msec1()); } else if (num_coded_pkts == 1) { _totunencodedpkts++; _totunencodedbytes += num_coded_bytes; click_chatter("%s: pull() sending out unencoded packet at %u (norexmit)", id().cc(), Timestamp::now().msec1()); } if (!new_p) { if (ack_p || recp_p) { click_chatter("%s: no data, sending packet with only ack or recp headers", id().cc()); new_p = add_header(NULL, NULL, 0, 0); } } if (new_p) { _sleepiness = 0; click_chatter("%s: have some packet to transmit", id().cc()); if (recp_p) { new_p = add_header(new_p, recp_p, 1, 2); recp_p->kill(); } if (ack_p) { new_p = add_header(new_p, ack_p, 1, 1); ack_p->kill(); } click_chatter("%s sending out packet of length %d", id().cc(), new_p->length()); return new_p; } else { _useless_pulls++; if (++_sleepiness >= SLEEPINESS_TRIGGER) { // update timer only needs to be called if you are going to sleep // otherwise, regular pulls will take care of sending whatever // rexmits/acks you need to send update_timer(); _empty_note.sleep_listeners(); } } return NULL;}voidScrambleQueue::check_packet(Packet *p){ click_ether *eh = (click_ether *) p->data(); EtherAddress src = EtherAddress(eh->ether_shost); EtherAddress dst = EtherAddress(eh->ether_dhost); uint16_t ethtype = eh->ether_type; if (ethtype != ntohs(_recp_ethtype)) { click_chatter("no reception reports attached"); return; } click_chatter("has a reception report ethtype %u", ethtype); // has an recp header, try to separate that into another precpet struct click_enc_recp *recph = (struct click_enc_recp *)(eh + 1); // uint8_t hlen = recph->get_hlen(recph->nentries()); for (uint16_t i = 0; i < recph->nentries(); i++) { IPAddress src = IPAddress(recph->get_entry_src(i)); uint16_t ipid = recph->get_entry_id(i); uint8_t bmap = recph->get_entry_bmap(i); StringAccum sa; sa << "push() matched reception report for src " << src << ", ipid " << ipid << " src u32 " << recph->get_entry_src(i); click_chatter("%s: %s, bmap %x at %u", id().cc(), sa.c_str(), (uint32_t)bmap, Timestamp::now().msec1()); } return;} // if we don't have too many acks// run_timer(true)// if _rexmitp1 and _rexmitp2// xor_packets _rexmitp1 and _rexmitp2 and create new packet new_p (xor_packets will clone the longer packet and xor with the shorter)// foreach of _rexmitp1 and _rexmitp2// find packetid// update _nxmits in _sentpacks// if _nxmits == MAX_XMITS// if _timedout// kill packet from _sentpacks// else do nothing// ELSE // NXMITS < max_xmits// COMPUTE REXMITTIME, UPDATE _SENTPACKS AND _TIMER// RETURN NEW_P// // IF _REXMITP1// IF XOR_WORTHY(_REXMITP1)// look for xor_candidate p2 in real q// if found p2// xor_packets _rexmitp1 and p2 to create new_p// p2clone = clone(p2)// compute packetid of p2clone and rexmit time of p2clone// insert p2clone into _sentpacks with acked=0,nxmits=1 as well as timeout and rexmit events into _timer// kill p2// return new_p// no more candidates, so just send out _rexmitp1 as below:// p1clone = clone(_rexmitp1)// find packetid of p1clone// update _nxmits in _sentpacks// if _nxmits == MAX_XMITS// if _timedout// kill from _sentpacks// else do nothing// else // nxmits is lesser// compute _rexmittime, update _sentpacks and _timer// return p1clone//// otherwise pop the front of the q and do all the usual stuff
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -