⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 media-app.cc

📁 柯老师网站上找到的
💻 CC
📖 第 1 页 / 共 4 页
字号:
 	abort();}void MediaApp::stop(){	// Called when we want to stop the RAP agent	rap()->stop();}AppData* MediaApp::get_data(int& nbytes, AppData* req) {	AppData *res;	if (req == NULL) {		MediaRequest p(MEDIAREQ_GETSEG);		p.set_name(page_);		// We simply rotating the layers from which to send data		if (num_layer_ > 0) {			p.set_layer(last_layer_++);			last_layer_ = last_layer_ % num_layer_;		} else 			p.set_layer(0); 		p.set_st(data_[0].start());		p.set_datasize(seg_size_);		p.set_app(this);		res = target()->get_data(nbytes, &p);	} else 		res = target()->get_data(nbytes, req);	// Update the current data pointer	assert(res != NULL);	HttpMediaData *p = (HttpMediaData *)res;	// XXX For now, if the return size is 0, we assume that the 	// transmission stops. Otherwise there is no way to tell the 	// RAP agent that there's no more data to send	if (p->datasize() <= 0) {		// Should NOT advance sending data pointer because 		// if this is a cache which is downloading from a slow		// link, it is possible that the requested data will		// become available in the near future!!		delete p;		return NULL;	} else {		// Set current data pointer to the right ones		// If available data is more than seg_size_, only advance data		// pointer by seg_size_. If less data is available, only 		// advance data by the amount of available data.		//		// XXX Currently the cache above does NOT pack data from 		// discontinugous blocks into one packet. May need to do 		// that later. 		assert((p->datasize() > 0) && (p->datasize() <= seg_size_));		data_[p->layer()].set_start(p->et());		data_[p->layer()].set_datasize(seg_size_);	}	return res;}int MediaApp::command(int argc, const char*const* argv){	Tcl& tcl = Tcl::instance();	if (strcmp(argv[1], "log") == 0) {		int mode;		log_ = Tcl_GetChannel(tcl.interp(), 				      (char*)argv[2], &mode);		if (log_ == 0) {			tcl.resultf("%s: invalid log file handle %s\n",				    name(), argv[2]);			return TCL_ERROR;		}		return TCL_OK;	} else if (strcmp(argv[1], "evTrace") == 0) { 		char buf[1024], *p;		if (log_ != 0) {			sprintf(buf, "%.17g ", 				Scheduler::instance().clock());			p = &(buf[strlen(buf)]);			for (int i = 2; i < argc; i++) {				strcpy(p, argv[i]);				p += strlen(argv[i]);				*(p++) = ' ';			}				// Stick in a newline.			*(p++) = '\n', *p = 0;			Tcl_Write(log_, buf, p-buf);		}		return TCL_OK;	} else if (strcmp(argv[1], "set-layer") == 0) {		int n = atoi(argv[2]);		if (n >= MAX_LAYER) {			fprintf(stderr, 				"Too many layers than maximum allowed.\n");			return TCL_ERROR;		}		num_layer_ = n;		return TCL_OK;	}	return Application::command(argc, argv);}void MediaApp::log(const char* fmt, ...){	char buf[1024], *p;	char *src = Address::instance().print_nodeaddr(rap()->addr());	sprintf(buf, "%.17g i %s ", Scheduler::instance().clock(), src);	delete []src;	p = &(buf[strlen(buf)]);	va_list ap;	va_start(ap, fmt);	vsprintf(p, fmt, ap);	if (log_ != 0)		Tcl_Write(log_, buf, strlen(buf));}//----------------------------------------------------------------------// MediaApp enhanced with quality adaptation//----------------------------------------------------------------------void QATimer::expire(Event *){	a_->UpdateState();	resched(a_->UpdateInterval());}static class QAClass : public TclClass {public:	QAClass() : TclClass("Application/MediaApp/QA") {}	TclObject* create(int argc, const char*const* argv) {		if (argc > 4) 			return (new QA((const char *)(argv[4])));		return NULL;	}} class_qa_app;//#define CHECK 1//#define DBG 1QA::QA(const char *page) : MediaApp(page){	updTimer_ = new QATimer(this);	bind("LAYERBW_", &LAYERBW_);	bind("MAXACTIVELAYERS_", &MAXACTIVELAYERS_);	bind("SRTTWEIGHT_", &SRTTWEIGHT_);	bind("SMOOTHFACTOR_", &SMOOTHFACTOR_);	bind("MAXBKOFF_", &MAXBKOFF_);	bind("debug_output_", &debug_);	bind("pref_srtt_", &pref_srtt_);	for(int j = 0; j < MAX_LAYER; j++) {		buffer_[j] = 0.0;		sending_[j] = 0;		playing_[j] = 0;		drained_[j] = 0.0;		bw_[j] = 0.0;		pref_[j] = 0;	}	poffset_ = 0; 	playTime_ = 0;   // Should initialize it	startTime_ = -1; // Used to tell the first packet	// Moving average weight for transmission rate average	rate_weight_ = 0.01;	avgrate_ = 0.0;}QA::~QA(){	if (updTimer_) {		if (updTimer_->status() != TIMER_IDLE)			updTimer_->cancel();		delete updTimer_;	}}void QA::debug(const char* fmt, ...){	if (!debug_) 		return;	char buf[1024], *p;	char *src = Address::instance().print_nodeaddr(rap()->addr());	char *port = Address::instance().print_portaddr(rap()->addr());	sprintf(buf, "# t %.17g i %s.%s QA ", 		Scheduler::instance().clock(), src, port);	delete []port;	delete []src;	p = &(buf[strlen(buf)]);	va_list ap;	va_start(ap, fmt);	vsprintf(p, fmt, ap);	fprintf(stderr, "%s", buf);}void QA::panic(const char* fmt, ...) {	char buf[1024], *p;	char *src = Address::instance().print_nodeaddr(rap()->addr());	char *port = Address::instance().print_portaddr(rap()->addr());	sprintf(buf, "# t %.17g i %s.%s QA PANIC ", 		Scheduler::instance().clock(), src, port);	delete []port;	delete []src;	p = &(buf[strlen(buf)]);	va_list ap;	va_start(ap, fmt);	vsprintf(p, fmt, ap);	fprintf(stderr, "%s", buf);	// XXX This is specific to OUR test. Remove it in release!!	Tcl::instance().eval("[Test instance] flush-trace");	abort();}// Stop all timersvoid QA::stop(){	rap()->stop();	if (updTimer_->status() != TIMER_IDLE)		updTimer_->cancel();}// Empty for nowint QA::command(int argc, const char*const* argv){	return MediaApp::command(argc, argv);}// When called by RAP, req is NULL. We fill in the next data segment and // return its real size in 'size' and return the app data. AppData* QA::get_data(int& size, AppData*){	int layers, dropped, i, l, idx, bs1, bs2,scenario, done, cnt;	double slope, bufavail, bufneeded, totbufs1, totbufs2, 		optbufs1[MAX_LAYER], optbufs2[MAX_LAYER], bufToDrain;  	static double last_rate = 0.0, last_depart, nextAdjPoint = -1,		FinalDrainArray[MAX_LAYER],		tosend[MAX_LAYER], FinalBuffer[MAX_LAYER];		static int flag,  /* flag keeps the state of the last phase */		tosendPtr = 0;  	// Get RAP info	double rate = seg_size_ / rap()->ipg();	double srtt =  rap()->srtt();	Scheduler& s = Scheduler::instance();	double now = s.clock();	int anyAck = rap()->anyack();	assert((num_layer_ > 0) && (num_layer_ < MAX_LAYER));  	// this part is added for the startup	// to send data for the base layer until the first ACK arrives.	// This is because we don't have an estimate for SRTT and slope of inc	// Make sure that SRTT is updated properly when ACK arrives	if (anyAck == 0) {		sending_[0] = 1;		return output(size, 0);		debug("INIT Phase, send packet: layer 0 in send_pkt, \rate: %.3f, avgrate: %.3f, srtt:%.3f\n", rate, avgrate_, srtt);	}  	layers = 0;	// we can only calc slope when srttt has a right value	// i.e. RAP has received an ACK	slope = seg_size_/srtt;	bufavail = 0.0;	// XXX Is this a correct initial value????	bufneeded = 0.0;   	// calculate layers & bufavail	for (i = 0; i < MAX_LAYER; i++) {		layers += sending_[i];		if (sending_[i] == 1) 			bufavail += buffer_[i];		else			/* debug only */			if ((i < MAX_LAYER - 1) && (sending_[i+1] == 1))				panic("ERROR L%d is not sent but L%d is.\n",				      i, i+1);	}		// check for startup phase	if((layers == 1) && (playing_[0] != 1)){		// L0 still buffers data, we are in startup phase		// let's check		if (sending_[0] == 0) {			panic("ERROR sending[0]=0 !!!");		}		AppData *res = output(size, 0);		debug("STARTUP, send packet: layer 0\n");				// Start playout if we have enough data for L0		// The amount of buffered data for startup can be diff		bufneeded = max(4*BufNeed((LAYERBW_-rate/2.0), slope), 				2*MWM(srtt));		if (buffer_[0] >= bufneeded) {			playing_[0] = 1;			sending_[0] = 1;  			drained_[0] = 0;  /* srtt*LAYERBW; */			startTime_ = now; // start the playback at the client			playTime_ = now;  // playout time of the receiver. 			debug("... START Playing_ layer 0, buffer[0] = %f!\n",			      buffer_[0]);			// start emulating clients consumption			if (updTimer_->status() == TIMER_IDLE)				updTimer_->sched(srtt);		}		return(res);	}  	// Store enough buffer before playing a layer. 	// XXX, NOTE: it is hard to do this, when we add a new layer	// the server sets the playout time of the first segment	// to get to the client in time, It is hard to make sure	// that a layer has MRM worth if data before stasting its	// playback because it adds more delay	// the base layer starts when it has enough buffering	// the higher layers are played out when their data is available	// so this is not needed 	//for (i = 0; i < MAX_LAYER; i++) {	// if ((sending_[i] == 1) && (playing_[i] == 0) &&	//    (buffer_[i] > MWM(srtt))) {	//  debug("Resume PLAYING Layer %d, play: %d send: %d\n",	//	  i, playing_[i], sending_[i]);	//  playing_[i]=1;	//  drained_[i] = 0; /* XXX, not sure about this yet 	//		      * but if we set this to max it causes	//	      * a spike at the adding time	//		      */	//  /* drained_[i]=LAYERBW*SRTT; */	//}	//}  	// perform the primary drop if we are in drain phase 	if (rate < layers*LAYERBW_) {		bufneeded = (MWM(srtt)*layers) + 			BufNeed((layers*LAYERBW_-rate), slope);		//   debug("tot_bufavail: %7.1f bufneeded: %7.1f, layers: %d",		// 			bufavail, bufneeded, layers);		dropped = 0;		// XXX Never ever do primary drop layer 0!!!!		while ((bufneeded > TotalBuf(layers, buffer_)) && 		       (layers > 1)) {			debug("** Primary DROPPED L%d, TotBuf(avail:%.1f \needed:%.1f), buf[%d]: %.2f\n", 			      layers-1, TotalBuf(layers, buffer_), bufneeded,			      layers-1,buffer_[layers-1]);			layers--;			dropped++;			sending_[layers] = 0;			bufneeded = (MWM(srtt)*layers)+ 				BufNeed(((layers)*LAYERBW_-rate),slope); 		}	}     	// just for debugging	// here is the case when even the base layer can not be kept	if ((bufneeded > TotalBuf(layers, buffer_)) && (layers == 1)) {		// XXX We should still continue, shouldn't we????		debug("** Not enough buf to keep the base layer, \TotBuf(avail:%.1f, needed:%.1f), \n",		      TotalBuf(layers, buffer_), bufneeded);	}	if (layers == 0) {		//		panic("** layers =0 !!");		sending_[0] = 1;		playing_[0] = 0;		if (updTimer_->status() != TIMER_IDLE)			updTimer_->cancel();		debug("** RESTART Phase, set playing_[0] to 0 to rebuffer data\n");		return output(size, 0);	}	// now check to see which phase we are in	if (rate >= layers*LAYERBW_) {		/******************		 ** filling phase **		 *******************//*      debug("-->> FILLING, layers: %d now: %.2f, rate: %.3f, avgrate: %.3f, \ srtt:%.3f, slope: %.3f\n", 	  layers, now, rate, avgrate_, srtt, slope);*/      		last_rate = rate; /* this is used for the next drain phase */		flag = 1;		/* 		 * 1) send for any layer that its buffer is below MWM		 * MWM is the min amount of buffering required to absorbe 		 * jitter		 * each active layer must have atleast MWM data at all time		 * this also ensures proper bw share, we do NOT explicitly 		 * alloc BW share during filling		 * Note: since we update state of the buffers on a per-packet 		 * basis, we don't need to ensure that each layer gets a share 		 * of bandwidth equal to its consumption rate. 		 */		for (i=0;i<layers;i++) {			if (buffer_[i] < MWM(srtt)) {				if ((buffer_[i-1] <= buffer_[i]+seg_size_) &&				    (i > 0))					idx = i-1;				else 					idx = i;// 	  debug("A:sending layer %d, less than MWM, t: %.2f\n", // 		i,now);				return output(size, idx);			}		}		/* 		 * Main filling algorithm based on the pesudo code 		 * find the next optimal state to reach 		 */		/* init param */		bs1 = 0;		bs2 = 0;		totbufs1 = 0;		totbufs2 = 0;		for (l=0; l<MAX_LAYER; l++) {			optbufs1[l] = 0.0;			optbufs2[l] = 0.0;		}				// XXX Note: when per-layer BW is low, and srtt is very small 		// (e.g., in a LAN), the following code will result in that 		// one buffered 		// segment will produce a abort() of "maximum backoff reached".		/* next scenario 1 state */		while ((totbufs1 <= TotalBuf(layers, buffer_)) && 		       (bs1 <= MAXBKOFF_)) {			totbufs1 = 0.0;			bs1++;			for (l=0; l<layers;l++) {				optbufs1[l] = bufOptScen1(l,layers,rate,slope,							  bs1)+MWM(srtt);				totbufs1 += optbufs1[l];			}		}		// bs1 is the min no of back off that we can not handle for 		// s1 now		/* next secenario 2 state */		while ((totbufs2 <= TotalBuf(layers, buffer_)) && 		       (bs2 <= MAXBKOFF_)) {			totbufs2 = 0.0;			bs2++;			for (l=0; l<layers;l++) {				optbufs2[l] = bufOptScen2(l,layers,rate,slope,							  bs2)+MWM(srtt);				totbufs2 += optbufs2[l];			}		}				/* 		 * NOTE: at this point, totbufs1 could be less than total 		 * buffering		 * when it is enough for recovery from rate = 0;		 * so totbufs1 <= TotalBuf(layers, buffer) is OK		 * HOWEVER, in this case, we MUST shoot for scenario 2		 */		/* debug *//*       if ((totbufs2 <= TotalBuf(layers, buffer_)) && (bs2 <= MAXBKOFF_)) { 	panic("# ERROR: totbufs1: %.2f,tot bufs2: %.2f, \ totbuf: %.2f, bs1: %d, bs2: %d, totneededbuf1: %.2f, totneededbuf2: %2f\n", 	      totbufs1, totbufs2, TotalBuf(layers, buffer_), bs1, bs2, 	      TotalBuf(layers, optbufs1), TotalBuf(layers, optbufs2));       }*/

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -