📄 csfq.cc
字号:
/* forward the packet and relabel it */ if (alpha_/pktLabel < 1.0) hdr->setlabel(alpha_); } /* if buffer full, drop the packet */ if (qsizeCrt_ + pktSize > qsize_) { // forced drop dropPkt(flowId, pkt); #ifdef CSFQ_LOG logDroppedPacket1(flowId, now); #endif /* * call estAlpha with 1, as this packet would have been enqued * by CSFQ */ estAlpha(pktSize, pktLabel, now, 1); /* * decrease alpha_; the number of times alpha_ is decreased * During an interval of length kLink_ is bounded by kalpha_. * This is to avoid overcorrection. */ if (kalpha_-- >= 0) alpha_ *= 0.99; return; } /* enqueue the packet and estimate alpha_ */ qsizeCrt_ += pktSize; q_.enque(pkt); estAlpha(pktSize, pktLabel, now, 1); }Packet* CSFQ::deque(){ Packet *pkt = q_.deque(); hdr_ip* hip; hdr_cmn* hdr; int pktSize; if (pkt) { hip = hdr_ip::access(pkt); hdr = hdr_cmn::access(pkt); pktSize = hdr->size() << 3; qsizeCrt_ -= pktSize;#ifdef CSFQ_LOG#ifdef SRCID logDptPacket(getId(hip->src()), Scheduler::instance().clock(), pktSize);#else logDptPacket(hip->flowid(), Scheduler::instance().clock(), pktSize);#endif#endif } return (pkt);}/* compute estimated flow rate by using exponentially averaging */double CSFQ::estRate(int flowid, int pktSize, double arvTime){ double d = (arvTime - fs_[flowid].prevTime_)*1000000; double k = fs_[flowid].k_; if (d == 0.0) { fs_[flowid].size_ += pktSize; if (fs_[flowid].estRate_) return fs_[flowid].estRate_; else /* this is the first packet; just initialize the rate */ return (fs_[flowid].estRate_ = rateAlpha_/2); } else { pktSize += fs_[flowid].size_; fs_[flowid].size_ = 0; } fs_[flowid].prevTime_ = arvTime; fs_[flowid].estRate_ = (1. - exp(-d/k))*(double)pktSize/d + exp(-d/k)*fs_[flowid].estRate_; return fs_[flowid].estRate_;}/* estimate the link's alpha parameter */void CSFQ::estAlpha(int pktSize, double pktLabel, double arvTime, int enqueue) { float d = (arvTime - lastArv_)*1000000., w, rate = rate_/1000000.; double k = kLink_/1000000.; // set lastArv_ to the arrival time of the first packet if (lastArv_ == 0.) lastArv_ = arvTime; // account for packets received simultaneously pktSize_ += pktSize; if (enqueue) pktSizeE_ += pktSize; if (arvTime == lastArv_) return; // estimate the aggreagte arrival rate (rateTotal_) and the // aggregate forwarded (rateAlpha_) rates w = exp(-d/kLink_); rateAlpha_ = (1 - w)*(double)pktSizeE_/d + w*rateAlpha_; rateTotal_ = (1 - w)*(double)pktSize_/d + w*rateTotal_; lastArv_ = arvTime; pktSize_ = pktSizeE_ = 0; // compute the initial value of alpha_ if (alpha_ == 0.) { if (qsizeCrt_ < qsizeThresh_) { if (tmpAlpha_ < pktLabel) tmpAlpha_ = pktLabel; return; } if (alpha_ < tmpAlpha_) alpha_ = tmpAlpha_; if (alpha_ == 0.) alpha_ = rate / 2.; // arbitrary initialization tmpAlpha_ = 0.; } // update alpha_ if (rate <= rateTotal_) { // link congested if (!congested_) { congested_ = 1; lArv_ = arvTime; kalpha_ = KALPHA; } else { if (arvTime < lArv_ + k) return; lArv_ = arvTime; alpha_ *= rate/rateAlpha_; if (rate < alpha_) alpha_ = rate; } } else { // (rate < rateTotal_) => link uncongested if (congested_) { congested_ = 0; lArv_ = arvTime; tmpAlpha_ = 0; } else { if (arvTime < lArv_ + k) { if (tmpAlpha_ < pktLabel) tmpAlpha_ = pktLabel; } else { alpha_ = tmpAlpha_; lArv_ = arvTime; if (qsizeCrt_ < qsizeThresh_) alpha_ = 0.; else tmpAlpha_ = 0.; } } }#ifdef CSFQ_LOG printf("|%d %f %f %f\n", id_, arvTime, rateAlpha_, rateTotal_); #endif} #ifdef SRCID/* get identifier from source address */int CSFQ::getId(int src) { int i = src % MAXFLOWS; if (!hashid_[i]) { /* first packet of this flow */ fs_[i].weight_ = fs_[0].weight_; fs_[i].k_ = fs_[0].k_; hashid_[i] = src; } else if (hashid_[i] != src) panic1("Source addresses colision!\n"); return i;}#endif#ifdef PENALTY_BOXint CSFQ::isInHash(int idx, IdentHashTable *hashTable, int size) { return (hashTable[idx % size].valid_);}int CSFQ::insertInHash(int idx, double flowNormRate, IdentHashTable *hashTable, int size) { double now = Scheduler::instance().clock(); int i = idx % size; if (!hashTable[i].valid_) { hashTable[i].valid_ = TRUE; hashTable[i].estNormRate_ = flowNormRate; hashTable[i].prevTime_ = now; return TRUE; } else /* hash colision */ return FALSE; }void CSFQ::deleteFromHash(int idx, IdentHashTable *hashTable, int size) { hashTable[idx % size].valid_ = FALSE;}double CSFQ::updateNormRate(int idx, double pktLabel, IdentHashTable *hashTable, int size) { int i = idx % size; double u = pktLabel/alpha_; if (!hashTable[i].valid_) /* this is just a double-check */ return 0.; hashTable[i].estNormRate_ = (1 - BETA)*u + BETA*hashTable[i].estNormRate_; return hashTable[i].estNormRate_;}int CSFQ::recordDroppedPacket(int flowId){ droppedArray_[numDroppedPkts_] = flowId; numDroppedPkts_++; if (numDroppedPkts_ == DROPPED_ARRAY_SIZE) return TRUE; else return FALSE;}void CSFQ::choseToMonitor(){ /* do a sort of last received packets */ /* * since usually the array size is no larger * than 100, for simplicity we do buble sort ;-) */ int i, j, count = 0, temp; int ids[DROPPED_ARRAY_SIZE]; int cnt[DROPPED_ARRAY_SIZE]; for (i = 0; i < DROPPED_ARRAY_SIZE - 1; i++) { for (j = i; j < DROPPED_ARRAY_SIZE - 1; j++) { if (droppedArray_[i] < droppedArray_[j]) { temp = droppedArray_[j]; droppedArray_[j] = droppedArray_[i]; droppedArray_[i] = temp; } } } /* now count the number of dropped packets for every flow */ for (i = 0; i < DROPPED_ARRAY_SIZE - 1; ) { ids[count] = droppedArray_[i]; cnt[count] = 1; count++; for (j = i; j < DROPPED_ARRAY_SIZE - 1; j++) { if (droppedArray_[i] == droppedArray_[j]) cnt[count - 1]++; else break; } i = j; } /* * now decide what to monitor: monitor flows that have a higher * dropping rate than the average */ for (i = count - 1; i >= 0; i--) { if (isInHash(ids[i], monitorTable_, MONITOR_TABLE_SIZE) || isInHash(ids[i], punishTable_, PUNISH_TABLE_SIZE)) continue; if (cnt[i] >= DROPPED_ARRAY_SIZE/count || !monitorTable_[ids[i] % MONITOR_TABLE_SIZE].valid_) { deleteFromHash(ids[i], monitorTable_, MONITOR_TABLE_SIZE); insertInHash(ids[i], 1., monitorTable_, MONITOR_TABLE_SIZE); } }}void CSFQ::punishFlow(int flowId) { double rate = monitorTable_[flowId % MONITOR_TABLE_SIZE].estNormRate_; deleteFromHash(flowId, monitorTable_, MONITOR_TABLE_SIZE); insertInHash(flowId, rate, punishTable_, PUNISH_TABLE_SIZE); }int CSFQ::monitor(int flowId, double pktLabel){ if (isInHash(flowId, punishTable_, PUNISH_TABLE_SIZE)) { double rate = updateNormRate(flowId, pktLabel, punishTable_, PUNISH_TABLE_SIZE); if (rate < GOOD_THRESH) punishTable_[flowId % PUNISH_TABLE_SIZE].valid_ = 0; return TRUE; } if (isInHash(flowId, monitorTable_, MONITOR_TABLE_SIZE)) { double rate = updateNormRate(flowId, pktLabel, monitorTable_, MONITOR_TABLE_SIZE); if (rate > PUNISH_THRESH) { punishFlow(flowId); } } return FALSE;} #endifvoid CSFQ::dropPkt(int flowId, Packet *pkt){#ifdef PENALTY_BOX if (recordDroppedPacket(flowId)) { choseToMonitor(); numDroppedPkts_ = 0; }#endif drop(pkt);}void CSFQ::logDroppedPacket(int flowId, double ctime){ printf("@%d %d %f %d\n", flowId, id_, ctime, fs_[flowId].numDroped_++); }void CSFQ::logDroppedPacket1(int flowId, double ctime){ printf("1@%d %d %f %d\n", flowId, id_, ctime, fs_[flowId].numDroped_++); }void CSFQ::logArvPacket(int flowId, double ctime, int pktSize){ printf(">%d %d %f %d %d %d\n", flowId, id_, ctime, fs_[flowId].numArv_++, pktSize/8, qsizeCrt_);}void CSFQ::logDptPacket(int flowId, double ctime, int pktSize){ printf("<%d %d %f %d %d %d\n", flowId, id_, ctime, fs_[flowId].numDpt_++, pktSize/8, qsizeCrt_);}void CSFQ::logEstAlpha(double ctime){ printf("#%d %f %f %d\n", id_, ctime, alpha_, qsizeCrt_/8);}void CSFQ::logEstLabel(int flowId, double ctime, double label){ printf("=%d %d %f %f\n", flowId, id_, ctime, label);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -