📄 rap.cc
字号:
if (!anyack()) { flags_ |= RF_ANYACK; ipg_ = srtt_; } if (LossDetection(RAP_ACK_BASED, ackHeader)) LossHandler(); // XXX We only stop by sequence number when we are in // "counting sequence number" mode. -- haoboy if (counting_pkt() && (ackHeader->seqno_ >= curseq_)) finish();}//----------------------------------------------------------------------// RapAgent::timeout// Called when a timer fires.//// "type" is the type of Timeout event//----------------------------------------------------------------------void RapAgent::timeout(int type){ if (type == RAP_IPG_TIMEOUT) IpgTimeout(); else if (type == RAP_RTT_TIMEOUT) RttTimeout(); else assert(FALSE);}//----------------------------------------------------------------------// RapAgent::IpgTimeout// Called when the ipgTimer_ fires.//----------------------------------------------------------------------void RapAgent::IpgTimeout(){ double waitPeriod; // Time before next transmission Debug(debugEnable_, logfile_, "%.3f: IPG Timeout.\n", Scheduler::instance().clock()); if (LossDetection(RAP_TIMER_BASED)) LossHandler(); else if (!counting_pkt()) { if (app_) { int nbytes; AppData* data = app_->get_data(nbytes); // Missing data in application. What should we do?? // For now, simply schedule the next SendPacket(). // If the application has nothing to send, it'll stop // the rap agent later on. if (data != NULL) { SendPacket(nbytes, data); dctr_++; } } else { // If RAP doesn't have application, just go ahead and // send packet SendPacket(size_); dctr_++; } } else if (seqno_ < curseq_) { SendPacket(size_); dctr_++; } // XXX If we only bound IPG in DecreaseIpg(), the thresholding will // happen immediately because DecreaseIpg() isn't called immediately. // So we do it here. if (fixIpg_ != 0) ipg_ = fixIpg_; if (useFineGrain_) waitPeriod = frtt_ / xrtt_ * ipg_; else waitPeriod = ipg_; // By this point, we may have been stopped by applications above // Thus, do not reschedule a timer if we are stopped. if (!is_stopped()) ipgTimer_.resched(waitPeriod + Random::uniform(overhead_));} //----------------------------------------------------------------------// RapAgent::RttTimeout// Called when the rttTimer_ fires.// Decrease IPG. Restart rttTimer_.//----------------------------------------------------------------------void RapAgent::RttTimeout(){ Debug(debugEnable_, logfile_, "%.3f: RTT Timeout.\n", Scheduler::instance().clock()); // During the past srtt_, we are supposed to send out srtt_/ipg_ // packets. If we sent less than that, we may not increase rate if (100*dctr_*(ipg_/srtt_) >= dpthresh_) DecreaseIpg(); // Additive increase in rate else Debug(debugEnable_, logfile_, "- %f Cannot increase rate due to insufficient data.\n", Scheduler::instance().clock()); dctr_ = 0; double debugIpg = ipg_ + overhead_ / 2; Debug(debugEnable_, logfile_, "- ipg decreased at %.3f to %f\n", Scheduler::instance().clock(), debugIpg); rttTimer_.resched(srtt_);}//----------------------------------------------------------------------// RapAgent::LossDetection// Called in ipgTimeout (RAP_TIMER_BASED) // or in RecvAck (RAP_ACK_BASED).//// Returns:// TRUE if loss detected, FALSE otherwise.//// "ackHeader" is the RAP header of the received Ack (PT_RAP_ACK).//----------------------------------------------------------------------static double currentTime;static hdr_rap *ackHdr;static RapAgent *rapAgent; static int numLosses;int EqualStatus(void *i1, void *i2){ return (((TransHistoryEntry *) i1)->status == ((TransHistoryEntry *) i2)->status);}void DestroyTransHistoryEntry(int item){ TransHistoryEntry *entry = (TransHistoryEntry *) item; Debug(rapAgent->GetDebugFlag(), rapAgent->GetLogfile(), "- purged seq num %d\n", entry->seqno); delete entry;}void TimerLostPacket(int item){ TransHistoryEntry *entry = (TransHistoryEntry *) item; if ((entry->departureTime + rapAgent->GetTimeout()) <= currentTime) { // ~ Packets lost in RAP session rapAgent->IncrementLossCount(); // Ignore cluster losses if (entry->status != RAP_INACTIVE) { assert(entry->status == RAP_SENT); numLosses++; Debug(rapAgent->GetDebugFlag(), rapAgent->GetLogfile(), "- timerlost seq num %d , last sent %d\n", entry->seqno, rapAgent->GetSeqno()); } entry->status = RAP_PURGED; }}void AckLostPacket(int item){ TransHistoryEntry *entry = (TransHistoryEntry *) item; int seqno, lastRecv, lastMiss, prevRecv; seqno = entry->seqno; lastRecv = ackHdr->lastRecv; lastMiss = ackHdr->lastMiss; prevRecv = ackHdr->prevRecv; if (seqno <= lastRecv) { if ((seqno > lastMiss) || (seqno == prevRecv)) entry->status = RAP_PURGED; // Was Received, now purge else if ((lastRecv - seqno) >= 3) { // ~ Packets lost in RAP session rapAgent->IncrementLossCount(); if (entry->status != RAP_INACTIVE) { assert(entry->status == RAP_SENT); numLosses++; Debug(rapAgent->GetDebugFlag(), rapAgent->GetLogfile(), "- acklost seqno %d , last sent %d\n", seqno, rapAgent->GetSeqno()); } // Was Lost, purge from history entry->status = RAP_PURGED; } }}int RapAgent::LossDetection(RapLossType type, hdr_rap *ackHeader){ TransHistoryEntry key(0, RAP_PURGED); currentTime = key.departureTime; ackHdr = ackHeader; rapAgent = this; numLosses = 0; switch(type) { case RAP_TIMER_BASED: transmissionHistory_.Mapcar(TimerLostPacket); break; case RAP_ACK_BASED: transmissionHistory_.Mapcar(AckLostPacket); break; default: assert(FALSE); } Debug(debugEnable_, logfile_, "- %d losses detected\n", numLosses); Debug(debugEnable_, logfile_, "- history size %d\n", transmissionHistory_.Size()); transmissionHistory_.Purge((void *) &key, EqualStatus, // Purge PURGED packets DestroyTransHistoryEntry); Debug(debugEnable_, logfile_, "- history size %d\n", transmissionHistory_.Size()); if (numLosses) return TRUE; else return FALSE;}//----------------------------------------------------------------------// RapAgent::LossHandler// Called when loss detected.// Increase IPG. Mark packets INACTIVE. Reschedule rttTimer_.//----------------------------------------------------------------------void MarkInactive(int item){ TransHistoryEntry *entry = (TransHistoryEntry *) item; entry->status = RAP_INACTIVE;}void RapAgent::LossHandler(){ IncreaseIpg(); // Multiplicative decrease in rate double debugIpg = ipg_ + overhead_ / 2; Debug(debugEnable_, logfile_, "- ipg increased at %.3f to %f\n", Scheduler::instance().clock(), debugIpg); transmissionHistory_.Mapcar(MarkInactive); Debug(debugEnable_, logfile_, "- window full packets marked inactive\n"); rttTimer_.resched(srtt_);}//----------------------------------------------------------------------// RapAgent::SendAck// Create an ack packet, set fields, send the packet out.//// "seqNum" is the sequence number of the packet being acked.//----------------------------------------------------------------------void RapAgent::SendAck(int seqNum){ type_ = PT_RAP_ACK; Packet* pkt = allocpkt(); // Create a new packet hdr_rap* hdr = (hdr_rap*) pkt->access(off_rap_); // Access header hdr->seqno() = seqNum; hdr->flags() = RH_ACK; hdr->lastRecv = lastRecv_; hdr->lastMiss = lastMiss_; hdr->prevRecv = prevRecv_; hdr_cmn *ch = (hdr_cmn*)pkt->access(off_cmn_); ch->size() = rap_base_hdr_size_; send(pkt, 0); Debug(debugEnable_, logfile_, "- ack sent %u [%u %u %u]\n", seqNum, lastRecv_, lastMiss_, prevRecv_);}//----------------------------------------------------------------------// RapSinkAgent::UpdateLastHole// Update the last hole in sequence number space at the receiver.// // "seqNum" is the sequence number of the data packet received.//----------------------------------------------------------------------void RapAgent::UpdateLastHole(int seqNum){ assert(seqNum > 0); if (seqNum > (lastRecv_ + 1)) { prevRecv_ = lastRecv_; lastRecv_ = seqNum; lastMiss_ = seqNum - 1; return; } if (seqNum == (lastRecv_ + 1)) { lastRecv_ = seqNum; return; } if ((lastMiss_ < seqNum) && (seqNum <= lastRecv_)) // Duplicate return; if (seqNum == lastMiss_) { if ((prevRecv_ + 1) == seqNum) // Hole filled prevRecv_ = lastMiss_ = 0; else lastMiss_--; return; } if ((prevRecv_ < seqNum) && (seqNum < lastMiss_)) { prevRecv_ = seqNum; return; } assert(seqNum <= prevRecv_); // Pretty late...}// take pkt countvoid RapAgent::advanceby(int delta){ flags_ |= RF_COUNTPKT; curseq_ = delta; start();}void RapAgent::finish(){ stop(); Tcl::instance().evalf("%s done", this->name());}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -