📄 media-app.cc
字号:
abort();}void MediaApp::stop(){ // Called when we want to stop the RAP agent rap()->stop();}AppData* MediaApp::get_data(int& nbytes, AppData* req) { AppData *res; if (req == NULL) { MediaRequest p(MEDIAREQ_GETSEG); p.set_name(page_); // We simply rotating the layers from which to send data if (num_layer_ > 0) { p.set_layer(last_layer_++); last_layer_ = last_layer_ % num_layer_; } else p.set_layer(0); p.set_st(data_[0].start()); p.set_datasize(seg_size_); p.set_app(this); res = target()->get_data(nbytes, &p); } else res = target()->get_data(nbytes, req); // Update the current data pointer assert(res != NULL); HttpMediaData *p = (HttpMediaData *)res; // XXX For now, if the return size is 0, we assume that the // transmission stops. Otherwise there is no way to tell the // RAP agent that there's no more data to send if (p->datasize() <= 0) { // Should NOT advance sending data pointer because // if this is a cache which is downloading from a slow // link, it is possible that the requested data will // become available in the near future!! delete p; return NULL; } else { // Set current data pointer to the right ones // If available data is more than seg_size_, only advance data // pointer by seg_size_. If less data is available, only // advance data by the amount of available data. // // XXX Currently the cache above does NOT pack data from // discontinugous blocks into one packet. May need to do // that later. assert((p->datasize() > 0) && (p->datasize() <= seg_size_)); data_[p->layer()].set_start(p->et()); data_[p->layer()].set_datasize(seg_size_); } return res;}int MediaApp::command(int argc, const char*const* argv){ Tcl& tcl = Tcl::instance(); if (strcmp(argv[1], "log") == 0) { int mode; log_ = Tcl_GetChannel(tcl.interp(), (char*)argv[2], &mode); if (log_ == 0) { tcl.resultf("%s: invalid log file handle %s\n", name(), argv[2]); return TCL_ERROR; } return TCL_OK; } else if (strcmp(argv[1], "evTrace") == 0) { char buf[1024], *p; if (log_ != 0) { sprintf(buf, "%.17g ", Scheduler::instance().clock()); p = &(buf[strlen(buf)]); for (int i = 2; i < argc; i++) { strcpy(p, argv[i]); p += strlen(argv[i]); *(p++) = ' '; } // Stick in a newline. *(p++) = '\n', *p = 0; Tcl_Write(log_, buf, p-buf); } return TCL_OK; } else if (strcmp(argv[1], "set-layer") == 0) { int n = atoi(argv[2]); if (n >= MAX_LAYER) { fprintf(stderr, "Too many layers than maximum allowed.\n"); return TCL_ERROR; } num_layer_ = n; return TCL_OK; } return Application::command(argc, argv);}void MediaApp::log(const char* fmt, ...){ char buf[1024], *p; char *src = Address::instance().print_nodeaddr(rap()->addr()); sprintf(buf, "%.17g i %s ", Scheduler::instance().clock(), src); delete []src; p = &(buf[strlen(buf)]); va_list ap; va_start(ap, fmt); vsprintf(p, fmt, ap); if (log_ != 0) Tcl_Write(log_, buf, strlen(buf));}//----------------------------------------------------------------------// MediaApp enhanced with quality adaptation//----------------------------------------------------------------------void QATimer::expire(Event *){ a_->UpdateState(); resched(a_->UpdateInterval());}static class QAClass : public TclClass {public: QAClass() : TclClass("Application/MediaApp/QA") {} TclObject* create(int argc, const char*const* argv) { if (argc > 4) return (new QA((const char *)(argv[4]))); return NULL; }} class_qa_app;//#define CHECK 1//#define DBG 1QA::QA(const char *page) : MediaApp(page){ updTimer_ = new QATimer(this); bind("LAYERBW_", &LAYERBW_); bind("MAXACTIVELAYERS_", &MAXACTIVELAYERS_); bind("SRTTWEIGHT_", &SRTTWEIGHT_); bind("SMOOTHFACTOR_", &SMOOTHFACTOR_); bind("MAXBKOFF_", &MAXBKOFF_); bind("debug_output_", &debug_); bind("pref_srtt_", &pref_srtt_); for(int j = 0; j < MAX_LAYER; j++) { buffer_[j] = 0.0; sending_[j] = 0; playing_[j] = 0; drained_[j] = 0.0; bw_[j] = 0.0; pref_[j] = 0; } poffset_ = 0; playTime_ = 0; // Should initialize it startTime_ = -1; // Used to tell the first packet // Moving average weight for transmission rate average rate_weight_ = 0.01; avgrate_ = 0.0;}QA::~QA(){ if (updTimer_) { if (updTimer_->status() != TIMER_IDLE) updTimer_->cancel(); delete updTimer_; }}void QA::debug(const char* fmt, ...){ if (!debug_) return; char buf[1024], *p; char *src = Address::instance().print_nodeaddr(rap()->addr()); char *port = Address::instance().print_portaddr(rap()->addr()); sprintf(buf, "# t %.17g i %s.%s QA ", Scheduler::instance().clock(), src, port); delete []port; delete []src; p = &(buf[strlen(buf)]); va_list ap; va_start(ap, fmt); vsprintf(p, fmt, ap); fprintf(stderr, "%s", buf);}void QA::panic(const char* fmt, ...) { char buf[1024], *p; char *src = Address::instance().print_nodeaddr(rap()->addr()); char *port = Address::instance().print_portaddr(rap()->addr()); sprintf(buf, "# t %.17g i %s.%s QA PANIC ", Scheduler::instance().clock(), src, port); delete []port; delete []src; p = &(buf[strlen(buf)]); va_list ap; va_start(ap, fmt); vsprintf(p, fmt, ap); fprintf(stderr, "%s", buf); // XXX This is specific to OUR test. Remove it in release!! Tcl::instance().eval("[Test instance] flush-trace"); abort();}// Stop all timersvoid QA::stop(){ rap()->stop(); if (updTimer_->status() != TIMER_IDLE) updTimer_->cancel();}// Empty for nowint QA::command(int argc, const char*const* argv){ return MediaApp::command(argc, argv);}// When called by RAP, req is NULL. We fill in the next data segment and // return its real size in 'size' and return the app data. AppData* QA::get_data(int& size, AppData*){ int layers, dropped, i, l, idx, bs1, bs2,scenario, done, cnt; double slope, bufavail, bufneeded, totbufs1, totbufs2, optbufs1[MAX_LAYER], optbufs2[MAX_LAYER], bufToDrain; static double last_rate = 0.0, last_depart, nextAdjPoint = -1, FinalDrainArray[MAX_LAYER], tosend[MAX_LAYER], FinalBuffer[MAX_LAYER]; static int flag, /* flag keeps the state of the last phase */ tosendPtr = 0; // Get RAP info double rate = seg_size_ / rap()->ipg(); double srtt = rap()->srtt(); Scheduler& s = Scheduler::instance(); double now = s.clock(); int anyAck = rap()->anyack(); assert((num_layer_ > 0) && (num_layer_ < MAX_LAYER)); // this part is added for the startup // to send data for the base layer until the first ACK arrives. // This is because we don't have an estimate for SRTT and slope of inc // Make sure that SRTT is updated properly when ACK arrives if (anyAck == 0) { sending_[0] = 1; return output(size, 0); debug("INIT Phase, send packet: layer 0 in send_pkt, \rate: %.3f, avgrate: %.3f, srtt:%.3f\n", rate, avgrate_, srtt); } layers = 0; // we can only calc slope when srttt has a right value // i.e. RAP has received an ACK slope = seg_size_/srtt; bufavail = 0.0; // XXX Is this a correct initial value???? bufneeded = 0.0; // calculate layers & bufavail for (i = 0; i < MAX_LAYER; i++) { layers += sending_[i]; if (sending_[i] == 1) bufavail += buffer_[i]; else /* debug only */ if ((i < MAX_LAYER - 1) && (sending_[i+1] == 1)) panic("ERROR L%d is not sent but L%d is.\n", i, i+1); } // check for startup phase if((layers == 1) && (playing_[0] != 1)){ // L0 still buffers data, we are in startup phase // let's check if (sending_[0] == 0) { panic("ERROR sending[0]=0 !!!"); } AppData *res = output(size, 0); debug("STARTUP, send packet: layer 0\n"); // Start playout if we have enough data for L0 // The amount of buffered data for startup can be diff bufneeded = max(4*BufNeed((LAYERBW_-rate/2.0), slope), 2*MWM(srtt)); if (buffer_[0] >= bufneeded) { playing_[0] = 1; sending_[0] = 1; drained_[0] = 0; /* srtt*LAYERBW; */ startTime_ = now; // start the playback at the client playTime_ = now; // playout time of the receiver. debug("... START Playing_ layer 0, buffer[0] = %f!\n", buffer_[0]); // start emulating clients consumption if (updTimer_->status() == TIMER_IDLE) updTimer_->sched(srtt); } return(res); } // Store enough buffer before playing a layer. // XXX, NOTE: it is hard to do this, when we add a new layer // the server sets the playout time of the first segment // to get to the client in time, It is hard to make sure // that a layer has MRM worth if data before stasting its // playback because it adds more delay // the base layer starts when it has enough buffering // the higher layers are played out when their data is available // so this is not needed //for (i = 0; i < MAX_LAYER; i++) { // if ((sending_[i] == 1) && (playing_[i] == 0) && // (buffer_[i] > MWM(srtt))) { // debug("Resume PLAYING Layer %d, play: %d send: %d\n", // i, playing_[i], sending_[i]); // playing_[i]=1; // drained_[i] = 0; /* XXX, not sure about this yet // * but if we set this to max it causes // * a spike at the adding time // */ // /* drained_[i]=LAYERBW*SRTT; */ //} //} // perform the primary drop if we are in drain phase if (rate < layers*LAYERBW_) { bufneeded = (MWM(srtt)*layers) + BufNeed((layers*LAYERBW_-rate), slope); // debug("tot_bufavail: %7.1f bufneeded: %7.1f, layers: %d", // bufavail, bufneeded, layers); dropped = 0; // XXX Never ever do primary drop layer 0!!!! while ((bufneeded > TotalBuf(layers, buffer_)) && (layers > 1)) { debug("** Primary DROPPED L%d, TotBuf(avail:%.1f \needed:%.1f), buf[%d]: %.2f\n", layers-1, TotalBuf(layers, buffer_), bufneeded, layers-1,buffer_[layers-1]); layers--; dropped++; sending_[layers] = 0; bufneeded = (MWM(srtt)*layers)+ BufNeed(((layers)*LAYERBW_-rate),slope); } } // just for debugging // here is the case when even the base layer can not be kept if ((bufneeded > TotalBuf(layers, buffer_)) && (layers == 1)) { // XXX We should still continue, shouldn't we???? debug("** Not enough buf to keep the base layer, \TotBuf(avail:%.1f, needed:%.1f), \n", TotalBuf(layers, buffer_), bufneeded); } if (layers == 0) { // panic("** layers =0 !!"); sending_[0] = 1; playing_[0] = 0; if (updTimer_->status() != TIMER_IDLE) updTimer_->cancel(); debug("** RESTART Phase, set playing_[0] to 0 to rebuffer data\n"); return output(size, 0); } // now check to see which phase we are in if (rate >= layers*LAYERBW_) { /****************** ** filling phase ** *******************//* debug("-->> FILLING, layers: %d now: %.2f, rate: %.3f, avgrate: %.3f, \ srtt:%.3f, slope: %.3f\n", layers, now, rate, avgrate_, srtt, slope);*/ last_rate = rate; /* this is used for the next drain phase */ flag = 1; /* * 1) send for any layer that its buffer is below MWM * MWM is the min amount of buffering required to absorbe * jitter * each active layer must have atleast MWM data at all time * this also ensures proper bw share, we do NOT explicitly * alloc BW share during filling * Note: since we update state of the buffers on a per-packet * basis, we don't need to ensure that each layer gets a share * of bandwidth equal to its consumption rate. */ for (i=0;i<layers;i++) { if (buffer_[i] < MWM(srtt)) { if ((buffer_[i-1] <= buffer_[i]+seg_size_) && (i > 0)) idx = i-1; else idx = i;// debug("A:sending layer %d, less than MWM, t: %.2f\n", // i,now); return output(size, idx); } } /* * Main filling algorithm based on the pesudo code * find the next optimal state to reach */ /* init param */ bs1 = 0; bs2 = 0; totbufs1 = 0; totbufs2 = 0; for (l=0; l<MAX_LAYER; l++) { optbufs1[l] = 0.0; optbufs2[l] = 0.0; } // XXX Note: when per-layer BW is low, and srtt is very small // (e.g., in a LAN), the following code will result in that // one buffered // segment will produce a abort() of "maximum backoff reached". /* next scenario 1 state */ while ((totbufs1 <= TotalBuf(layers, buffer_)) && (bs1 <= MAXBKOFF_)) { totbufs1 = 0.0; bs1++; for (l=0; l<layers;l++) { optbufs1[l] = bufOptScen1(l,layers,rate,slope, bs1)+MWM(srtt); totbufs1 += optbufs1[l]; } } // bs1 is the min no of back off that we can not handle for // s1 now /* next secenario 2 state */ while ((totbufs2 <= TotalBuf(layers, buffer_)) && (bs2 <= MAXBKOFF_)) { totbufs2 = 0.0; bs2++; for (l=0; l<layers;l++) { optbufs2[l] = bufOptScen2(l,layers,rate,slope, bs2)+MWM(srtt); totbufs2 += optbufs2[l]; } } /* * NOTE: at this point, totbufs1 could be less than total * buffering * when it is enough for recovery from rate = 0; * so totbufs1 <= TotalBuf(layers, buffer) is OK * HOWEVER, in this case, we MUST shoot for scenario 2 */ /* debug *//* if ((totbufs2 <= TotalBuf(layers, buffer_)) && (bs2 <= MAXBKOFF_)) { panic("# ERROR: totbufs1: %.2f,tot bufs2: %.2f, \ totbuf: %.2f, bs1: %d, bs2: %d, totneededbuf1: %.2f, totneededbuf2: %2f\n", totbufs1, totbufs2, TotalBuf(layers, buffer_), bs1, bs2, TotalBuf(layers, optbufs1), TotalBuf(layers, optbufs2)); }*/
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -