📄 udt.cc
字号:
//// Author: Yunhong Gu, gu@lac.uic.edu//// Description: //// Assumption: This code does NOT process sequence number wrap, which will overflow after 2^31 packets.// But I assume that you won't run NS for that long time :)//// Last Update: 03/20/2006//#include <stdlib.h>#include <math.h>#include "ip.h"#include "udt.h"int hdr_udt::off_udt_;static class UDTHeaderClass : public PacketHeaderClass {public: UDTHeaderClass() : PacketHeaderClass("PacketHeader/UDT", sizeof(hdr_udt)) { bind_offset(&hdr_udt::off_udt_); }} class_udthdr;static class UdtClass : public TclClass {public: UdtClass() : TclClass("Agent/UDT") {} TclObject* create(int, const char*const*) { return (new UdtAgent()); }} class_udt;UdtAgent::UdtAgent(): Agent(PT_UDT),syn_timer_(this),ack_timer_(this),nak_timer_(this),exp_timer_(this),snd_timer_(this),syn_interval_(0.01),mtu_(1500),max_flow_window_(100000){ bind("mtu_", &mtu_); bind("max_flow_window_", &max_flow_window_); snd_loss_list_ = new SndLossList(max_flow_window_, 1 << 29, 1 << 30); rcv_loss_list_ = new RcvLossList(max_flow_window_, 1 << 29, 1 << 30); flow_window_size_ = 2; snd_interval_ = 0.000001; ack_interval_ = syn_interval_; nak_interval_ = syn_interval_; exp_interval_ = 1.01; nak_count_ = 0; dec_count_ = 0; snd_last_ack_ = 0; local_send_ = 0; local_loss_ = 0; local_ack_ = 0; snd_curr_seqno_ = -1; curr_max_seqno_ = 0; avg_nak_num_ = 2; dec_random_ = 2; loss_rate_limit_ = 0.01; loss_rate_ = 0; rtt_ = 1; rcv_interval_ = snd_interval_; rcv_last_ack_ = 0; rcv_last_ack_time_ = Scheduler::instance().clock(); rcv_last_ack2_ = 0; ack_seqno_ = -1; rcv_curr_seqno_ = -1; local_recv_ = 0; last_dec_seq_ = -1; last_delay_time_ = Scheduler::instance().clock(); last_dec_int_ = 1.0; slow_start_ = true; freeze_ = false; ack_timer_.resched(ack_interval_); nak_timer_.resched(nak_interval_);}UdtAgent::~UdtAgent(){}int UdtAgent::command(int argc, const char*const* argv){ return Agent::command(argc, argv);}void UdtAgent::recv(Packet *pkt, Handler*){ hdr_udt* udth = hdr_udt::access(pkt); double r; if (1 == udth->flag()) { switch (udth->type()) { case 2: sendCtrl(6, udth->ackseq()); if (udth->ack() > snd_last_ack_) { snd_last_ack_ = udth->ack(); snd_loss_list_->remove((int)snd_last_ack_); } else break; snd_timer_.resched(0); if (rtt_ == syn_interval_) rtt_ = udth->rtt() / 1000000.0; else rtt_ = rtt_ * 0.875 + udth->rtt() / 1000000.0 * 0.125; if (slow_start_) flow_window_size_ = snd_last_ack_; else if (udth->lrecv() > 0) flow_window_size_ = int(ceil(flow_window_size_ * 0.875 + udth->lrecv() * (rtt_ + syn_interval_) * 0.125)); if (flow_window_size_ > max_flow_window_) { slow_start_ = false; flow_window_size_ = max_flow_window_; } bandwidth_ = int(bandwidth_ * 0.875 + udth->bandwidth() * 0.125); exp_timer_.resched(exp_interval_); rateControl(); if (snd_interval_ > rtt_) { snd_interval_ = rtt_; snd_timer_.resched(0); } break; case 3: slow_start_ = false; last_dec_int_ = snd_interval_; if ((udth->loss()[0] & 0x7FFFFFFF) > last_dec_seq_) { freeze_ = true; snd_interval_ = snd_interval_ * 1.125; avg_nak_num_ = 1 + int(ceil(double(avg_nak_num_) * 0.875 + double(nak_count_) * 0.125)); dec_random_ = int(rand() * double(avg_nak_num_) / (RAND_MAX + 1.0)) + int(ceil(avg_nak_num_/5.0)); nak_count_ = 1; last_dec_seq_ = snd_curr_seqno_; } else if (0 == (++ nak_count_ % dec_random_)) { snd_interval_ = snd_interval_ * 1.125; last_dec_seq_ = snd_curr_seqno_; } if (snd_interval_ > rtt_) snd_interval_ = rtt_; local_loss_ ++; for (int i = 0, n = udth->losslen(); i < n; ++ i) { if ((udth->loss()[i] & 0x80000000) && ((udth->loss()[i] & 0x7FFFFFFF) >= snd_last_ack_)) { snd_loss_list_->insert(udth->loss()[i] & 0x7FFFFFFF, udth->loss()[i + 1]); ++ i; } else if (udth->loss()[i] >= snd_last_ack_) { snd_loss_list_->insert(udth->loss()[i], udth->loss()[i]); } } exp_timer_.resched(exp_interval_); snd_timer_.resched(0); break; case 4:/* if (slow_start_) slow_start_ = false; last_dec_int_ = snd_interval_; snd_interval_ = snd_interval_ * 1.125; last_dec_seq_ = snd_curr_seqno_; nak_count_ = -16; dec_count_ = 1;*/ break; case 6: { int ack; double rtt = ack_window_.acknowledge(udth->ackseq(), ack); if (rtt > 0) { time_window_.ack2arrival(rtt);// if ((time_window_.getdelaytrend()) && (Scheduler::instance().clock() - last_delay_time_ > 2 * rtt_))// sendCtrl(4); if (rtt_ == syn_interval_) rtt_ = rtt; else rtt_ = rtt_ * 0.875 + rtt * 0.125; nak_interval_ = rtt_; if (nak_interval_ < syn_interval_) nak_interval_ = syn_interval_; if (rcv_last_ack2_ < ack) rcv_last_ack2_ = ack; } break; } default: break; } Packet::free(pkt); return; } time_window_.pktarrival(); if (0 == udth->seqno() % 16) time_window_.probe1arrival(); else if (1 == udth->seqno() % 16) time_window_.probe2arrival(); local_recv_ ++; int offset = udth->seqno() - rcv_last_ack_; if (offset < 0) { Packet::free(pkt); return; } if (udth->seqno() > rcv_curr_seqno_ + 1) { int c; if (rcv_curr_seqno_ + 1 == udth->seqno() - 1) c = 1; else c = 2; int* loss = new int[c]; if (c == 2) { loss[0] = (rcv_curr_seqno_ + 1) | 0x80000000; loss[1] = udth->seqno() - 1; } else loss[0] = rcv_curr_seqno_ + 1; sendCtrl(3, c, loss); delete [] loss; } if (udth->seqno() > rcv_curr_seqno_) { rcv_curr_seqno_ = udth->seqno(); } else { rcv_loss_list_->remove(udth->seqno()); } Packet::free(pkt); return;}void UdtAgent::sendmsg(int nbytes, const char* /*flags*/){ if (curr_max_seqno_ == snd_curr_seqno_ + 1) exp_timer_.resched(exp_interval_); curr_max_seqno_ += nbytes/1468; snd_timer_.resched(0);}void UdtAgent::sendCtrl(int pkttype, int lparam, int* rparam){ Packet* p; hdr_udt* udth; hdr_cmn* ch; int ack; switch (pkttype) { case 2: if (rcv_loss_list_->getLossLength() == 0) ack = rcv_curr_seqno_ + 1; else ack = rcv_loss_list_->getFirstLostSeq(); if (ack > rcv_last_ack_) { rcv_last_ack_ = ack; } else if (Scheduler::instance().clock() - rcv_last_ack_time_ <= 2 * rtt_) { ack_timer_.resched(ack_interval_); break; } if (rcv_last_ack_ > rcv_last_ack2_) { p = allocpkt(40); udth = hdr_udt::access(p); udth->flag() = 1; udth->type() = 2; udth->lrecv() = time_window_.getpktspeed(); udth->bandwidth() = time_window_.getbandwidth(); udth->rtt() = int(rtt_ * 1000000.0); ack_seqno_ ++; udth->ackseq() = ack_seqno_; udth->ack() = rcv_last_ack_; ch = hdr_cmn::access(p); ch->size() = 40; Agent::send(p, 0); ack_window_.store(ack_seqno_, rcv_last_ack_); rcv_last_ack_time_ = Scheduler::instance().clock(); } ack_timer_.resched(ack_interval_); break; case 3: if (rparam != NULL) { p = allocpkt(32 + lparam * 4); udth = hdr_udt::access(p); udth->flag() = 1; udth->type() = 3; udth->losslen() = lparam; memcpy(udth->loss(), rparam, lparam * 4); ch = hdr_cmn::access(p); ch->size() = 32 + lparam * 4; Agent::send(p, 0); } else if (rcv_loss_list_->getLossLength() > 0) { int losslen; int* loss = new int[MAX_LOSS_LEN]; rcv_loss_list_->getLossArray(loss, &losslen, MAX_LOSS_LEN, rtt_); if (losslen > 0) { p = allocpkt(32 + losslen); udth = hdr_udt::access(p); udth->flag() = 1; udth->type() = 3; udth->losslen() = losslen; memcpy(udth->loss(), loss, MAX_LOSS_LEN); ch = hdr_cmn::access(p); ch->size() = 32 + losslen; Agent::send(p, 0); } delete [] loss; } nak_timer_.resched(nak_interval_); break; case 4: p = allocpkt(32); udth = hdr_udt::access(p); udth->flag() = 1; udth->type() = 4; ch = hdr_cmn::access(p); ch->size() = 32; Agent::send(p, 0); last_delay_time_ = Scheduler::instance().clock(); break; case 6: p = allocpkt(32); udth = hdr_udt::access(p); udth->flag() = 1; udth->type() = 6; udth->ackseq() = lparam; ch = hdr_cmn::access(p); ch->size() = 32; Agent::send(p, 0); break; }}void UdtAgent::sendData(){ bool probe = false; if (snd_last_ack_ == curr_max_seqno_) snd_timer_.resched(snd_interval_); int nextseqno; if (snd_loss_list_->getLossLength() > 0) { nextseqno = snd_loss_list_->getLostSeq(); } else if (snd_curr_seqno_ - snd_last_ack_ < flow_window_size_) { nextseqno = ++ snd_curr_seqno_; if (0 == nextseqno % 16) probe = true; } else {/* if (freeze_) { snd_timer_.resched(syn_interval_ + snd_interval_); freeze_ = false; } else snd_timer_.resched(snd_interval_);*/ return; } Packet* p; p = allocpkt(mtu_); hdr_udt* udth = hdr_udt::access(p); udth->flag() = 0; udth->seqno() = nextseqno; hdr_cmn* ch = hdr_cmn::access(p); ch->size() = mtu_; Agent::send(p, 0); local_send_ ++; if (probe) { snd_timer_.resched(0); return; } if (freeze_) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -