⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 scramblequeue.cc

📁 COPE the first practical network coding scheme which is developped on click
💻 CC
📖 第 1 页 / 共 4 页
字号:
  }  memcpy(eth_h->ether_shost, _myea.data(), 6);  eth_h->ether_type = htons(_enc_ethtype);  click_chatter("%s: combine() produces packet of length %u", id().cc(), new_p->length());  return new_p;}    void ScrambleQueue::push(int port, Packet *p_in){  _pushes++;  click_chatter("%u: sq push %u, logged_qsize %u", Timestamp::now().msec1(), _pushes, size());  if (next_i(_tail) != _head) {    WritablePacket *p = p_in->uniqueify();    click_ether *eth_p;    struct srpacket *srh;    eth_p = (click_ether *)p->data();    srh = (struct srpacket *)(eth_p + 1);    EtherAddress dst = EtherAddress(eth_p->ether_dhost);    if (CodingManager::get_qsize(dst) > _fair_queue_thresh) {      StringAccum sa_t;      sa_t << "packet dropped: queue to neighbor " << dst << " full";       click_chatter("%s %s", id().cc(), sa_t.c_str());      _drops++;      p_in->kill();           return;    }    uint16_t *x = _nbseq.findp(dst);    if (!x) {      _nbseq.insert(dst, 0);      x = _nbseq.findp(dst);    }    srh->set_nseq(*x);    srh->set_checksum();    *x = srh->nseq() + 1;    _sendmgr->do_events(false);    // need to generate a unique ID since can't count on ip_id to be unique    // do this only for packets originating at me    uint32_t prev_hop, next_hop;    prev_hop = next_hop = 0;    srh->find_surrounding_hops(prev_hop, next_hop, _myip.addr());    if (prev_hop == 0) {      // clobber ip id only if I generated the packet      struct click_ip *iph = p->ip_header();      click_chatter("clobbering ipid of incoming packet, old=%u, new=%u", ntohs(iph->ip_id), _gseq);      iph->ip_id = htons(_gseq++);      // need to recompute checksum since it might have changed      iph->ip_sum = 0;      unsigned hlen = iph->ip_hl << 2;      iph->ip_sum = click_in_cksum((const unsigned char *)iph, hlen);      srh->set_checksum();    }    // debugging, need to get this data before pushing to fullportqueue    // as p could be made null due to kill if fullportqueue is full    // this packet was not dropped    // maintain some statistics here about number of packets of each flow type    StringAccum sa;    sa << prev_hop << ":" << next_hop;    String s = sa.take_string();    if (_npackets.findp(s) == 0) {      int x = 1;      _npackets.insert(s, x);    } else {      int x = _npackets[s] + 1;      _npackets.insert(s, x);    }    const struct click_ip *ip_p1 = p->ip_header();    StringAccum sa_src;    sa_src << IPAddress(ip_p1->ip_src);    StringAccum sa_dst;    sa_dst << IPAddress(ip_p1->ip_dst);    click_chatter("%s: packet with SR (%u, %u), IP (%s, %s), dataseq %u, nseq %u, ipid %u pushed at %u, qsize %u", id().cc(), prev_hop, next_hop, sa_src.c_str(), sa_dst.c_str(), srh->data_seq(), srh->nseq(), ntohs(ip_p1->ip_id), Timestamp::now().msec1(), size());     // click_chatter("%s: 12345 srcipaddress:%s dstipaddress:%s ipid:%u pointer:%x", id().cc(), sa_src.c_str(), sa_dst.c_str(), ntohs(ip_p1->ip_id), (void*)p_in->data());     SimpleQueue::push(port, p);    CodingManager::add_virtual(p);    StringAccum sa_t;    if (p->length() > 200) {      sa_t << "tcp data packet pushed to ethdst = " << EtherAddress(eth_p->ether_dhost);      click_chatter("%s: %u %s", id().cc(), Timestamp::now().msec1(), sa_t.c_str());             sa_t.clear();    }    else {      sa_t << "tcp ack packet pushed to ethdst = " << EtherAddress(eth_p->ether_dhost);      click_chatter("%s: %u %s", id().cc(), Timestamp::now().msec1(), sa_t.c_str());             sa_t.clear();       }    if (!_empty_note.signal_active()) {      click_chatter("waking listeners");      _empty_note.wake_listeners();    }    if (size() == capacity()) {      _full_note.sleep_listeners();    }  } else {    // packet was dropped    click_chatter("%s: packet dropped: queue full", id().cc());     _drops++;    p_in->kill();  }  return;}/* ETHTYPES * Pureack * Purerecp * Ack + recp * Ack + data * Ack + Recp + Data * Recp + data */ WritablePacket* ScrambleQueue::add_header(WritablePacket *old_p, Packet *header, int iseth, int htype){  if (iseth) {    if (htype == 1) {             //adding ack header      struct click_ether *eth_h = (struct click_ether*)malloc(sizeof(struct click_ether));      memcpy(eth_h, (struct click_ether*)old_p->data(), sizeof(struct click_ether));      old_p->pull(sizeof(struct click_ether));      struct click_enc_ack *ack_h = (struct click_enc_ack*)header->data();      ack_h->_et_next = eth_h->ether_type;      eth_h->ether_type = htons(_ack_ethtype);      old_p->push(ack_h->get_hlen(ack_h->nentries()));      memcpy((struct click_enc_ack *)old_p->data(), ack_h, ack_h->get_hlen(ack_h->nentries()));      old_p->push(sizeof(struct click_ether));      memcpy((struct click_ether *)old_p->data(), eth_h, sizeof(struct click_ether));      free(eth_h);      click_chatter("%s added ack header", id().cc());      return old_p;    }    if (htype == 2) {           // adding reception reports      struct click_ether *eth_h = (struct click_ether*)malloc(sizeof(struct click_ether));      memcpy(eth_h, (struct click_ether*)old_p->data(), sizeof(struct click_ether));      old_p->pull(sizeof(struct click_ether));      struct click_enc_recp *recp_h = (struct click_enc_recp*)header->data();      recp_h->_et_next = eth_h->ether_type;      eth_h->ether_type = htons(_recp_ethtype);      old_p->push(recp_h->get_hlen(recp_h->nentries()));      memcpy((struct click_enc_recp *)old_p->data(), recp_h, recp_h->get_hlen(recp_h->nentries()));      old_p->push(sizeof(struct click_ether));      memcpy((struct click_ether *)old_p->data(), eth_h, sizeof(struct click_ether));      free(eth_h);      click_chatter("%s added reception reports", id().cc());      return old_p;    }  }  else {    if (old_p==NULL) {      old_p = Packet::make(sizeof(struct click_ether));    }    struct click_ether *eth_h = (struct click_ether*)old_p->data();    memcpy(eth_h->ether_shost, _myea.data(), 6*sizeof(uint8_t));     const EtherAddress dst = _aliases->get_random_neighbor(_myea);    memcpy(eth_h->ether_dhost, dst.data(), 6*sizeof(uint8_t));     eth_h->ether_type = htons(_nothing_ethtype);  }  return old_p;}          WritablePacket*ScrambleQueue::handle_rexmits(int nfound, uint32_t now){  WritablePacket *new_p = 0;  if (nfound == 2) {    Packet *p1 = _sendmgr->take_rexmit_p1();    Packet *p2 = _sendmgr->take_rexmit_p2();    CodedActualPkts coded_actual_pkts;    coded_actual_pkts.push_back(p1);    coded_actual_pkts.push_back(p2);    // compute statistics    _totrexmitpkts += 2;    struct srpacket *srh = (struct srpacket *)(((struct click_ether *)p1->data()) + 1);    const struct click_ip *iph = p1->ip_header();    click_chatter("%s: %u rexmitting packet with ipsrc = %s, ipdst = %s, ipid = %u, iplen = %u", id().cc(), Timestamp::now().msec1(), IPAddress(iph->ip_src).s().c_str(), IPAddress(iph->ip_dst).s().c_str(), ntohs(iph->ip_id));    _totrexmitbytes += (srh->hlen_with_data() - srh->hlen_wo_data());    _totencodedbytes += (srh->hlen_with_data() - srh->hlen_wo_data());    uint32_t prev_hop, next_hop;    prev_hop = next_hop = 0;    srh->find_surrounding_hops(prev_hop, next_hop, _myip.addr());    StringAccum sa;    sa << prev_hop << ":" << next_hop;    String s = sa.take_string();    if (_nrexmits.findp(s) == 0) {      int x = 1;      _nrexmits.insert(s, x);    } else {      int x = _nrexmits[s] + 1;      _nrexmits.insert(s, x);    }    if (_nencoded.findp(s) == 0) {      int x = 1;      _nencoded.insert(s, x);    } else {      int x = _nencoded[s] + 1;      _nencoded.insert(s, x);    }    srh = (struct srpacket *)(((struct click_ether *)p2->data()) + 1);    iph = p2->ip_header();    click_chatter("%s: %u rexmitting packet with ipsrc = %s, ipdst = %s, ipid = %u, iplen = %u", id().cc(), Timestamp::now().msec1(), IPAddress(iph->ip_src).s().c_str(), IPAddress(iph->ip_dst).s().c_str(), ntohs(iph->ip_id));    _totrexmitbytes += (srh->hlen_with_data() - srh->hlen_wo_data());    _totencodedbytes += (srh->hlen_with_data() - srh->hlen_wo_data());    prev_hop = next_hop = 0;    srh->find_surrounding_hops(prev_hop, next_hop, _myip.addr());    sa.clear();    sa << prev_hop << ":" << next_hop;    s = sa.take_string();    if (_nrexmits.findp(s) == 0) {      int x = 1;      _nrexmits.insert(s, x);    } else {      int x = _nrexmits[s] + 1;      _nrexmits.insert(s, x);    }    if (_nencoded.findp(s) == 0) {      int x = 1;      _nencoded.insert(s, x);    } else {      int x = _nencoded[s] + 1;      _nencoded.insert(s, x);    }    _totencodedpkts += 2;    // do not swap this order    // need to add_rexmitted_packet before encoding    // otherwise the kill inside encode might delete the packet    // basically need to think of Xorer::encode as consuming p1 and p2    // and returning a packet    _sendmgr->add_rexmitted_packet(p1, now);    _sendmgr->add_rexmitted_packet(p2, now);    // print out tcp statistics    // sq timestamp srcip srcport dstip dstport pktseq ackseq    const struct click_ip *ciph = p1->ip_header();    if (ciph->ip_p == IP_PROTO_TCP) {      const struct click_tcp *ctcph = p1->tcp_header();      StringAccum sa;      sa << "tcp sq " << now << " " << ciph->ip_src << " " << ntohs(ctcph->th_sport) << " " << ciph->ip_dst << " " << ntohs(ctcph->th_dport) << " " << ntohl(ctcph->th_seq) << " " << ntohl(ctcph->th_ack);      click_chatter("%s: %s %#0x", id().cc(), sa.c_str(), ctcph->th_flags);    } else {      click_chatter("%s: ip proto is %u", id().cc(), ciph->ip_p);    }    ciph = p2->ip_header();    if (ciph->ip_p == IP_PROTO_TCP) {      const struct click_tcp *ctcph = p1->tcp_header();      StringAccum sa;      sa << "tcp sq " << now << " " << ciph->ip_src << " " << ntohs(ctcph->th_sport) << " " << ciph->ip_dst << " " << ntohs(ctcph->th_dport) << " " << ntohl(ctcph->th_seq) << " " << ntohl(ctcph->th_ack);      click_chatter("%s: %s %#0x", id().cc(), sa.c_str(), ctcph->th_flags);    } else {      click_chatter("%s: ip proto is %u", id().cc(), ciph->ip_p);    }    int num_coded_bytes = 0;    new_p = CodingManager::combine_actual_pkts(&coded_actual_pkts, &num_coded_bytes, 1);    click_chatter("%s: pull() found 2 matching xor packets rexmit_p1 and rexmit_p2, qsize is %u, totencoded pkts, bytes is %u, %u", id().cc(), size(), _totencodedpkts, _totencodedbytes);    click_chatter("%s: pull() sending out encoded packet at %u (rexmit)", id().cc(), Timestamp::now().msec1());  }  if (nfound == 1) {    Packet *p1 = _sendmgr->take_rexmit_p1();    // statistics    _totrexmitpkts++;    struct srpacket *srh = (struct srpacket *)(((struct click_ether *)p1->data()) + 1);    const struct click_ip *iph = p1->ip_header();    click_chatter("%s: %u rexmitting packet with ipsrc = %s, ipdst = %s, ipid = %u, iplen = %u", id().cc(), Timestamp::now().msec1(), IPAddress(iph->ip_src).s().c_str(), IPAddress(iph->ip_dst).s().c_str(), ntohs(iph->ip_id));    _totrexmitbytes += (srh->hlen_with_data() - srh->hlen_wo_data());    uint32_t prev_hop, next_hop;    prev_hop = next_hop = 0;    srh->find_surrounding_hops(prev_hop, next_hop, _myip.addr());    StringAccum sa;    sa << prev_hop << ":" << next_hop;    String s = sa.take_string();    if (_nrexmits.findp(s) == 0) {      int x = 1;      _nrexmits.insert(s, x);    } else {      int x = _nrexmits[s] + 1;      _nrexmits.insert(s, x);    }    const struct click_ip *ciph = p1->ip_header();    if (ciph->ip_p == IP_PROTO_TCP) {      const struct click_tcp *ctcph = p1->tcp_header();      StringAccum sa;      sa << "tcp sq " << now << " " << ciph->ip_src << " " << ntohs(ctcph->th_sport) << " " << ciph->ip_dst << " " << ntohs(ctcph->th_dport) << " " << ntohl(ctcph->th_seq) << " " << ntohl(ctcph->th_ack);      click_chatter("%s: %s %#0x", id().cc(), sa.c_str(), ctcph->th_flags);    } else {      click_chatter("%s: ip proto is %u", id().cc(), ciph->ip_p);    }    // statistics    _totunencodedpkts++;    _totunencodedbytes += (srh->hlen_with_data() - srh->hlen_wo_data());    if (_nunencoded.findp(s) == 0) {      int x = 1;      _nunencoded.insert(s, x);    } else {      int x = _nunencoded[s] + 1;      _nunencoded.insert(s, x);    }    _sendmgr->add_rexmitted_packet(p1, now);    new_p = p1->uniqueify();    click_chatter("%s: pull() found no matching xor packet for rexmit_p1, qsize is %u", id().cc(), size());    struct click_ether *ep = (struct click_ether *)new_p->data();    struct srpacket *srhp = (struct srpacket *)(ep+1);    const struct click_ip *ipp = new_p->ip_header();    click_chatter("%s: pull() SRPKTHDR, IPLEN, PKTLEN %u, %u, %u", id().cc(), srhp->hlen_wo_data(), ntohs(ipp->ip_len), new_p->length());    click_chatter("%s: pull() sending out unencoded packet at %u (rexmit)", id().cc(), Timestamp::now().msec1());  }  return new_p;/*    Packet *p2 = find_xor_candidate(p1);    if (p2) {      // statistics      _totencodedpkts += 2;      // insert p1 into the nencoded table      _totencodedbytes += (srh->hlen_with_data() - srh->hlen_wo_data());      // click_chatter("sr hdr len w and wo data are %u and %u, totencoded is %u", srh->hlen_with_data(), srh->hlen_wo_data(), _totencodedbytes);      if (_nencoded.findp(s) == 0) {        int x = 1;        _nencoded.insert(s, x);      } else {        int x = _nencoded[s] + 1;        _nencoded.insert(s, x);      }      srh = (struct srpacket *)(((struct click_ether *)p2->data()) + 1);      _totencodedbytes += (srh->hlen_with_data() - srh->hlen_wo_data());

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -