📄 media-app.cc
字号:
/* debug */ if (bs2 >= MAXBKOFF_) debug("WARNING: MAX No of backoff Reached, bs1: %d, \bs2: %d\n", bs1, bs2); /* Check for adding condition */ //if ((bs1 > SMOOTHFACTOR_) && (bs2 > SMOOTHFACTOR_) && // (layers < MAX_LAYER)) { if ((bs1 > SMOOTHFACTOR_) && (bs2 > SMOOTHFACTOR_)){ // Check if all layers are already playing // Assume all streams have the same # of layer: // MAX_LAYER assert(layers <= num_layer_); // XXX Only limit the rate when we have all layers // playing. There should be a better way to limit the // transmission rate earlier! Note that we need RAP to // fix its IPG as soon as we fix the rate here. Thus, // RAP should do that in its IpgTimeout() // instead of DecreaseIpg(). See rap.cc. if (layers == num_layer_){#if 0 if (rate < num_layer_*LAYERBW_) panic("ERROR: rate: %.2f is less than \MAX BW for all %d layers!\n", rate, layers);#endif // Ask RAP to fix the rate at MAX_LAYER*LAYERBW rap()->FixIpg((double)seg_size_/ (double)(num_layer_*LAYERBW_)); // Mux the bandwidth evenly among layers return output(size, layers - 1); } // Calculate the first packet offset in this new layer int off_start = (int)floor((poffset_ + MWM(srtt)) / seg_size_) * seg_size_; // XXX Does the application have data between // off_start_ and off_start_+MWM(srtt)?? // XXX If the computed offset falls behind, we just // continue to send. if (data_[layers].start() <= off_start) { // Set LayerOffset[newlayer] = // poffset_ + MWM(srtt) * n: // - n times roundtrip time of data, LET n BE 1 // Round this offset to whole segment data_[layers].set_start(off_start); data_[layers].set_datasize(seg_size_); } // Make sure that all corresponding data in lower // layers have been sent out, i.e., the last byte of // current segment of the new layer should be less // than the last byte of all lower layers if (data_[layers].end() > data_[layers-1].start()) // XXX Do not send anything if we don't have // data!! Otherwise we'll dramatically increase // the sending rate of lower laters. return NULL; // return output(size, layers-1); sending_[layers] = 1; AppData *res = output(size, layers); if (res == NULL) { // Drop the newly added layer because we // don't have data sending_[layers] = 0; // However, do prefetching in case we'll add // it again later int st = (int)floor((data_[layers].start()+ pref_srtt_*LAYERBW_) /seg_size_+0.5)*seg_size_; int et = (int)floor((data_[layers].end()+ pref_srtt_*LAYERBW_) /seg_size_+0.5)*seg_size_; if (et > pref_[layers]) { pref_[i] = et; MediaSegment s(st, et); check_availability(i, s); } for (i = 0; i < layers; i++) if (buffer_[i] < MWM(srtt)) { res = output(size, i); if (res != NULL) break; } } else { /* LAYERBW_*srtt;should we drain this */ drained_[layers]= 0; debug("sending Just ADDED layer %d, t: %.2f\n", i, now); } return res; } /* * Find out which next step is closer * Second cond is for the cases where totbufs2 becomes * saturated */ scenario = 0; // Initial value if((totbufs1 <= totbufs2) && (totbufs1 > TotalBuf(layers, buffer_))) { /* go for next scenario 1 with sb1 backoff */ scenario = 1; } else { /* go for next scenario 2 with sb2 backoffs */ scenario = 2; } /* decide which layer needs more data */ if (scenario == 1) { for (l=0; l<layers; l++) { if (buffer_[l] >= optbufs1[l]) continue; //if (buffer_[l] < optbufs1[l]) { if ((buffer_[l-1] <= buffer_[l]+seg_size_) && (l > 0)) idx = l-1; else idx = l; // debug("Cs1:sending layer %d to fill buffer, t: %.2f\n", // idx,now); return output(size, idx); } } else if (scenario == 2) { l=0; done = 0; while ((l<layers) && (!done)){ if (TotalBuf(layers, buffer_) >= totbufs2) { done ++; } else { if (buffer_[l]<min(optbufs2[l], optbufs1[l])) { if((buffer_[l-1] <= buffer_[l]+ seg_size_) && (l>0)) idx = l-1; else idx = l;// debug("Cs2:sending layer %d to fill buffer, t: %.2f\n", // idx,now); return output(size, idx); } l++; } } /* while */ } else panic("# ERROR: Unknown scenario: %d !!\n", scenario); /* special cases when we get out of this for loop */ if(scenario == 1){ panic("# Should not reach here, totbuf: %.2f, \totbufs1: %.2f, layers: %d\n", TotalBuf(layers, buffer_), totbufs1, layers); } if (scenario == 2) { /* * this is the point where we have satisfied buffer * requirement for the next scenario 1 already, * i.e. the MIN() value. * so we relax that and shoot for bufs2[l] */ /* * if scenario 2, repeat the while loop without min * cond we have alreddy satisfied the condition for * the next scenario 1 */ l=0; while (l < layers) { if (buffer_[l] < optbufs2[l]) { if ((buffer_[l-1] <= buffer_[l]+ seg_size_) && (l>0)) idx = l-1; else idx = l;// debug("Cs22:sending layer %d to fill buffer, t: %.2f\n", idx,now); return output(size, idx); } l++; }/* while */ } panic("# Opps, should not reach here, bs1: %d, bs2: %d, \scen: %d, totbufs1: %.2f, totbufs2: %.2f, totbufavail: %.2f\n", bs1, bs2, scenario, totbufs1, totbufs2, TotalBuf(layers, buffer_)); /* NEVER REACH HERE */ } else { /* rate < layers*LAYERBW_ */ /******************* ** Draining phase ** *******************//* debug("-->> DRAINING, layers: %d rate: %.3f, avgrate: %.3f, srtt:%.3f, \ slope: %.3f\n", layers, rate, avgrate_, srtt, seg_size_/srtt);*/ /* * At the beginning of a new drain phase OR * another drop in rate during a draining phase OR * dec of slope during a draining phase that results in * a new drop */ /* * 1) the highest priority action at this point is to ensure * all surviving layers have min amount of buffering, if not, * try to fill that layer */ double lowest=buffer_[0]; int lowix=0; for(i=0;i<layers;i++) { if (lowest>buffer_[i]) { lowest=buffer_[i]; lowix=i; } } if (lowest<MWM(srtt)) { last_depart = now;// debug("A':sending layer %d, below MWM in Drain t: %.2f\n",// lowix, now); return output(size, lowix); } if((nextAdjPoint < 0) || /* first draining phase */ (flag >= 0) || /* after a filling phase */ (now >= nextAdjPoint) || /* end of the curr interval */ ((rate < last_rate) && (flag < 0)) || /* new backoff */ (AllZero(tosend, layers))) /* all pkt are sent */ { /* start of a new interval */ /* * XXX, should update the nextAdjPoint diff for * diff cases */ nextAdjPoint = now + srtt; bufToDrain = LAYERBW_*layers - rate; /* * calculate optimal dist. of bufToDrain across all * layers. FinalDrainArray[] is the output * FinalBuffer[] is the final state */ if (bufToDrain <= 0) panic("# ERROR: bufToDrain: %.2f\n", bufToDrain); DrainPacket(bufToDrain, FinalDrainArray, layers, rate, srtt, FinalBuffer); for(l=0; l<MAX_LAYER; l++){ tosend[l] = 0; } for(l=0; l<layers; l++){ tosend[l] = srtt*LAYERBW_ - FinalDrainArray[l]; // Correct for numerical error if (fabs(tosend[l]) < QA_EPSILON) tosend[l] = 0.0; } /* * XXX, not sure if this is the best thing * we might only increase it */ tosendPtr = 0; /* debug only */ if ((bufToDrain <= 0) || AllZero(FinalDrainArray, layers) || AllZero(tosend, layers)) { debug("# Error: bufToDrain: %.2f, %d layers, " "srtt: %.2f\n", bufToDrain, layers, srtt); for (l=0; l<layers; l++) debug("# FinalDrainArray[%d]: %.2f, " "tosend[%d]: %.2f\n", l, FinalDrainArray[l],l, tosend[l]); /* Tcl::instance().eval("[Test instance] flush-trace"); abort(); */ } /*******/ } flag = -1; last_rate = rate; done = 0; cnt = 1; while ((!done) && (cnt <= layers)) { if (tosend[tosendPtr] > 0) { if ((buffer_[tosendPtr-1] <= buffer_[tosendPtr] + seg_size_) && (tosendPtr > 0)) idx = tosendPtr-1; else idx = tosendPtr; tosend[tosendPtr] -= seg_size_; if (tosend[tosendPtr] < 0) tosend[tosendPtr] = 0; return output(size, idx); } cnt++; tosendPtr = (tosendPtr+1) % layers; } // XXX End of Drain Phase // For now, send a chunk from the base layer. Modify it later!! return output(size, 0); } /* if (rate >= layers*LAYERBW_) */ panic("# QA::get_data() reached the end. \n"); /*NOTREACHED*/ return NULL;}//-----------------------------------------//-------------- misc routine//------------------------------------------// return 1 is all first "len" element of "arr" are zero// and 0 otherwiseint QA::AllZero(double *arr, int len){ int i; for (i=0; i<len; i++) if (arr[i] != 0.0) // debug("-- arr[%d}: %f\n", i, arr[i]); return 0; return 1;}//// Calculate accumulative amount of buffering for the lowest "n" layers//double QA::TotalBuf(int n, double *buffer){ double totbuf = 0.0; int i; for(i=0; i<n; i++) totbuf += buffer[i]; return totbuf;}// Update buffer_ information for a given layer// Get an output data packet from applications aboveAppData* QA::output(int& size, int layer){ int i; assert((sending_[layer] == 1) || (startTime_ == -1)); // In order to send out a segment, all corresponding segments of // the lower layers must have been sent out if (layer > 0) if (data_[layer-1].start() <= data_[layer].start()) return output(size, layer-1); // Get and output the data at the current data pointer MediaRequest q(MEDIAREQ_GETSEG); q.set_name(page_); q.set_layer(layer); q.set_st(data_[layer].start()); q.set_datasize(seg_size_); q.set_app(this); AppData* res = target()->get_data(size, &q); assert(res != NULL); HttpMediaData *p = (HttpMediaData *)res; if (p->datasize() <= 0) { // When the data is not available: // Should NOT advance sending data pointer because // if this is a cache which is downloading from a slow // link, it is possible that the requested data will // become available in the near future!! // We have already sent out the last segment of the base layer, // now we are requested for the segment beyond the last one // in the base layer. In this case, consider the transmission // is complete and tear down the connection. if (p->is_finished()) { rap()->stop(); // XXX Shouldn't this be done inside mcache/mserver?? Tcl::instance().evalf("%s finish-stream %s", target()->name(), name()); } else if (!p->is_last()) { // If we coulnd't find anything within q, move data // pointer forward to skip holes. MediaSegment tmp(q.et(), q.et()+seg_size_); check_layers(p->layer(), tmp); // If we can, advance. Otherwise wait for // lower layers to advance first. if (tmp.datasize() > 0) { assert(tmp.datasize() <= seg_size_); data_[p->layer()].set_start(tmp.start()); data_[p->layer()].set_end(tmp.end()); } } delete p; return NULL; } // Set current data pointer to the right ones // If available data is more than seg_size_, only // advance data pointer by seg_size_. If less data // is available, only advance data by the amount // of available data. // // XXX Currently the cache above does NOT pack data // from discontinugous blocks into one packet. May // need to do that later. // if (p->is_last())// data_[p->layer()].set_last(); assert((p->datasize() > 0) && (p->datasize() <= seg_size_)); // XXX Before we move data pointer forward, make sure we don't violate // layer ordering rules. Note we only need to check end_ because // start_ is p->et() which is guaranteed to be valid MediaSegment tmp(p->et(), p->et()+seg_size_); check_layers(p->layer(), tmp); if (tmp.datasize() > 0) { assert(tmp.datasize() <= seg_size_); data_[p->layer()].set_start(tmp.start()); data_[p->layer()].set_end(tmp.end()); } else { // Print error messages, do not send anything and wait for // next time so that hopefully lower layers will already // have advanced. fprintf(stderr, "# ERROR We cannot advance pointers for " "segment (%d %d)\n", tmp.start(), tmp.end()); for (i = 0; i < layer; i++) fprintf(stderr, "Layer %d, data ptr (%d %d) \n", i, data_[i].start(), data_[i].end()); delete p; return NULL; } // Let me know that we've sent out this segment. This is used // later to drain data (DrainBuffers()) outlist_[p->layer()].add(MediaSegment(p->st(), p->et())); buffer_[layer] += p->datasize(); bw_[layer] += p->datasize(); drained_[layer] -= p->datasize(); //offset_[layer] += seg_size_; avgrate_ = rate_weight_*rate() + (1-rate_weight_)*avgrate_; // DEBUG check for (i = 0; i < layer-1; i++) if (data_[i].end() < data_[i+1].end()) { for (int j = 0; j < layer; j++) fprintf(stderr, "layer i: (%d %d)\n", data_[i].start(), data_[i].end()); panic("# ERROR Wrong layer sending order!!\n"); } return res;}void QA::check_layers(int layer, MediaSegment& tmp) { // XXX While we are moving pointer forward, make sure // that we are not violating layer boundary constraint for (int i = layer-1; i >= 0; i--) // We cannot go faster than a lower layer!! if (tmp.end() > data_[i].end()) tmp.set_end(data_[i].end());}//// This is optimal buffer distribution for scenario 1.// NOTE: rate is the current rate before the backoff// Jan 28, 99//// This routines performs buffer sharing by giveing max share
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -