⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 coop-agent.cc

📁 这是P2P流媒体方案-NICE的实现源码
💻 CC
📖 第 1 页 / 共 5 页
字号:
  //PRM change  int this_lid = ap->u.data_p.lid;	// additional changes/*	if ( ap->u.data_p.seq_no < CUT_OFF) {*/		num_all_recv++;		m_all_recv++;		if (this_lid==-1) {			num_rand_recv++;		}//sunny change4fd=fopen(filename, "w");fprintf(fd, "id:%d, num_all_recv:%d, pk_duplicate:%d", this->id, num_all_recv, pk_duplicate);fclose(fd);/*	}	else		m_mature=1;*/	// end of add  //PRM	data_pkt *dptr = &ap->u.data_p;  	//double now = Scheduler::Clock();	double now = my_clock();	if (max_delay < now - dptr->orig_time )		max_delay = now - dptr->orig_time;	if ( dptr->seq_no > m_max_seq_recv ) {		m_max_seq_recv = dptr->seq_no;		if ( dptr->type == FRESH_DATA ) {			m_dist_from_src = dptr->overlay_hop + 1;			m_parent = ap->src_agent;		}	}//sunny  if (m_use_packet_cache == true) {printf("sunny: before add_to _packet_cache in handle_data_pkt \n");  //sunny change// addtional change place changed   if (add_to_packet_cache(ap->u.data_p.original_src.agent_id, /* ap->u.data_p.original_src.node_id, */ ap->u.data_p.seq_no, ap->u.data_p.type, ap->u.data_p.orig_time, ap->u.data_p.data_len, ap->u.data_p.payload) == false)  {	pk_duplicate++;//sunny change4fd=fopen(filename, "w");fprintf(fd, "id:%d, num_all_recv:%d, pk_duplicate:%d", this->id, num_all_recv, pk_duplicate);fclose(fd);    return 1;  }  else if ( now- dptr->orig_time > VERY_LATE + m_dist_from_src * ONE_HOP_DELAY )	return 1;}printf("sunny: after add_to_packet_cache in handle_data_packet\n");  if ( SHOW_ALL ) {	if ( dptr->seq_no < CUT_OFF && dptr->orig_time > m_recent_up_time ) 	  printf("pkt_deliv ag %d s %d at %8.4f lat %8.4f %d %d\n", id, dptr->seq_no, now, now-dptr->orig_time, dptr->type, m_parent);	  /*  printf ("[coop %d %d ] At %8.4f recv data-pkt : < [ %d %d ] > seq %d : ffrom < [ %d %d ] > type: %d\n", id, n->id, Scheduler::Clock(), ap->u.data_p.original_src.agent_id, ap->u.data_p.original_src.node_id, ap->u.data_p.seq_no, ap->src_agent, ap->src, ap->u.data_p.type);  */  }  //PRM//RPM  ---- handle_data_pkt///*	pkt_recv_count++;	int unit = BLOCK_SIZE + FEC_SIZE;		int start = dptr->seq_no - dptr->seq_no % unit;		if ( dptr->seq_no < CUT_OFF && dptr->orig_time > m_recent_up_time ) {		if ( m_src_aid < 0 ) {			m_src_aid = dptr->original_src.agent_id;			//m_src_nid should not be need(->fillbitmap->lookup_packet_cache)			//m_src_nid = dptr->original_src.node_id;		}		num_pkt_deliv++;		m_pkt_deliv++;		num_pkt_deliv_type[dptr->type]++;		m_num_pkt_deliv_type[dptr->type]++;		m_latency += now - dptr->orig_time;		deliv_per_pkt[dptr->seq_no]++;		latency_per_pkt[dptr->seq_no] += now - dptr->orig_time;	}	g_one_hop_latency_sum += now - ap->send_time;	g_one_hop_cnt++;	if ( isInternal() ) {		g_nonleaf_latency_sum += now - ap->send_time;		g_nonleaf_cnt++;*/		/*		if ( dptr->seq_no % 1000 == 1 && ap->src_agent == 0 && dptr->type == FRESH_DATA )			fprintf(stderr, "one-hop latency from source = %f\n",					now - ap->send_time );		*//*	}	if ( !isInternal() && dptr->type == FRESH_DATA ) {		g_leaf_latency_sum += now - dptr->orig_time;		g_leaf_cnt++;	}*/	//PRM	//LayerInfo *l = layers.arr[this_lid];  //PRM	LayerInfo *l;	if ( this_lid >= 0 )  		l = layers.arr[this_lid];	else 		l = NULL;//PRM  /* The 3 if conditions below enforce strict checking of packet source */  if (l != NULL) {    if (l->me_in_layer == true) {      // Somewhat looser checking of data packet forwarding      // if (l->ag_list.Locate(ap->src_agent) != NULL) {printf(" forward data packet in handle_Data pkt\n");      forward_data_packet(ap->u.data_p.payload, ap->u.data_p.data_len, ap->u.data_p.original_src.agent_id, ap->u.data_p.original_src.agent_addr, 			  ap->u.data_p.seq_no, ap->u.data_p.base_lid, ap->src_agent, false,dptr->type, dptr->orig_time, dptr->overlay_hop);      // }    }  }    else if (this_lid < 0) {printf("sunny: retransmitted pkt: this_lis is less than 0 in handle_data_pkt\n");          forward_data_packet(ap->u.data_p.payload, ap->u.data_p.data_len, ap->u.data_p.original_src.agent_id, ap->u.data_p.original_src.agent_addr, 			  ap->u.data_p.seq_no, ap->u.data_p.base_lid, ap->src_agent, false,dptr->type, dptr->orig_time, dptr->overlay_hop);  }  //proactive retransmit request  requestProactive(ap);  //PRM  return 0;}//modifiedvoid coopAgent::forward_data_packet (char* payload, int data_len, int src_aid, struct sockaddr_in src_agent_addr, int src_seqno, int base_lid, int pred_aid, bool is_source, int type, double ot, int overlay_hop) {  int count_pkts = 0;  for (int i = 0; i < MAX_LAYERS; i++) {    LayerInfo * l = layers.arr[i];    if (l != NULL) {      if (l->me_in_layer == true) {      // Forward to this layer only if predecessor not in this layer        if (l->ag_list.Locate(pred_aid) == NULL) { 	  if (l->lid >= base_lid)	    count_pkts += send_data_pkt_to_layer(payload, data_len, l,src_aid,base_lid,src_agent_addr,src_seqno, type, ot, overlay_hop);        }      }    }  }  if (state == JOIN) {    if ( (curr_join_q_lid < MAX_LAYERS) && (curr_join_q_lid > 0) ) {      LayerInfo * q_l = layers.arr[curr_join_q_lid];      assert (q_l != NULL);      assert (q_l->me_in_layer == false);      if (q_l->lid >= base_lid)	count_pkts += send_data_pkt_to_layer(payload, data_len, q_l,src_aid,base_lid,src_agent_addr,src_seqno,type,ot,overlay_hop);    }    if (curr_join_q_lid < 0) { // Starting join query at BSE      // For join to higher layers, we probably already have the higher      // target layer from the previous leader which left the cluster or      // did a root xfer to me. Can use that as a fall back option.      // This should not be for layer 0.      LayerInfo * target_layer = layers.arr[target_join_q_lid];      if (target_layer != NULL) {	assert (target_layer->me_in_layer == false);	if (target_layer->lid >= base_lid)	  count_pkts += send_data_pkt_to_layer(payload, data_len, target_layer,src_aid,base_lid,src_agent_addr,src_seqno,type,ot,overlay_hop);      }    }  }  //PRM//random send	//call random send if rand() > r	if ( type != PROACTIVE_DATA && rand() < RAND_MAX * PROB_RAND_SEND ) 	  {	    /*	    int ra_id;	    struct sockaddr_in random_target_addr;	    	    int lid = RANDOM_SEND_LID;	    int repeat;	    int num_sent = 0;	    int arr[RAND_NUM_AT_A_TIME];	    */	    /*char* payload, int data_len, int src_aid, struct sockaddr_in src_agent_addr, int src_seqno, int base_lid, int pred_aid, bool is_source, int type, double ot, int overlay_hop	    */	    count_pkts += randomSend(payload, data_len,src_aid, src_agent_addr, src_seqno, ot, -1, RANDOM_SEND_LID);	    /*	    repeat = idCache.getSize();	    printf("*******repeat : cache size %d\n", repeat);	    if ( repeat > RAND_NUM_AT_A_TIME )	      repeat = RAND_NUM_AT_A_TIME;			    */	  }	//#endif  return;}//PRM - random Send//int coopAgent::randomSend( int src_aid, int src_nid, int src_seq, double ot, int aid, int nid, int type ) {int coopAgent::randomSend(char * payload, int data_len, int src_aid, struct sockaddr_in src_agent_addr, int src_seq , double ot, int aid, int type) {  //koo    printf("^^^^^^^^^^^^^^^^^^^^^^^\n");  printf("Random Send is called\n");	int ra_id;	struct sockaddr_in random_target_addr;	int lid = RANDOM_SEND_LID;	int repeat;	int num_sent = 0;	int arr[RAND_NUM_AT_A_TIME];	//if ( type == ADOPT_LID ) {	//	lid = ADOPT_LID;	//	repeat = 1;	//}	//else {		repeat = idCache.getSize();	printf("*******repeat : cache size %d\n", repeat);	if ( repeat > RAND_NUM_AT_A_TIME )	  repeat = RAND_NUM_AT_A_TIME;		//}	//do some bitmap stuff here	int i, base;	char bitmap[MAX_BITMAP];	for ( i = 0, base = m_max_seq_recv - BITMAP_SIZE; i < BITMAP_SIZE; i++ ) {		if ( base+i < 0 )			continue;		//lookup packet cache stores a list of packets that have been received		if ( lookup_packet_cache( src_aid, /*src_nid,*/ base+i ) )			bitmap[i] = 1;		else			bitmap[i] = 0;	}	//repeat = beta	for ( i = 0 ; i < repeat; i++ ) {		int itemp = 20;		int flag = 1;		while (--itemp && flag) {			if ( aid < 0 ) {				ra_id = getRandomAgent(MISS_BASED_CORR);				//rn_id = getRandomNode(ra_id);				//getRandomNode(ra_id, &random_target_addr);				printf("@@@@@@@@@@@@!!!!!!RA_ID get %d\n",ra_id);							}			else {				ra_id = aid;			}			int j;			for ( j = 0 ; j < num_sent; j++ ) {				if ( ra_id == arr[j] )					break;			}			if ( j == num_sent )				flag = 0;		}		if ( flag == 0 )			arr[num_sent] = ra_id;		else 			ra_id = -1;				printf("@@@@@@@@@@@@!!!!!!RA_ID get %d, and ****** my id %d\n",ra_id, id);		if ( ra_id >= 0 && ra_id != id ) {   			AppPacket *ap = new AppPacket(PACKET_DATA);   			ap->u.data_p.original_src.agent_id = src_aid;			   			//ap->u.data_p.original_src.node_id = src_nid;   			memcpy(&ap->u.data_p.original_src.agent_addr, &src_agent_addr, sizeof(struct sockaddr_in));			//char* payload, int data_len, int src_aid, struct sockaddr_in src_agent_addr, int src_seqno, int base_lid, int pred_aid,			//bool is_source, int type, double ot, int overlay_hop			ap->u.data_p.seq_no = src_seq;   			ap->u.data_p.lid = lid;			ap->u.data_p.type = RAND_SEND_DATA;			ap->u.data_p.orig_time = ot;			ap->u.data_p.data_len = data_len;						printf("----------------------------\n");			printf("PPPPPPPPP rinting the contents of payload before : %s\n", payload);			ap->u.data_p.payload = (char *) malloc (sizeof(char) * data_len);			memcpy(ap->u.data_p.payload, payload, data_len);									printf("SSSSSSSSS after memory copy %s\n",ap->u.data_p.payload);			memcpy( ap->u.data_p.bitmap, bitmap, BITMAP_SIZE *sizeof (char) );									ap->u.data_p.type = type;			getRandomNode(ra_id, &random_target_addr);//sunny change3 (6lines)			ap->src_agent=this->id;    			memcpy(& ap->src_agent_addr, & this->udp_recv_agent_addr, sizeof(struct sockaddr_in));			ap->dst_agent=ra_id;    			memcpy(& ap->dst_agent_addr, & random_target_addr, sizeof(struct sockaddr_in));//sunny 			send_pkt_wrapper(ap, ra_id, random_target_addr);			if ( ap->u.data_p.seq_no < CUT_OFF )				num_rand_send++;			num_sent++;printf("ra_id : %d\n", ra_id);		}	}printf("randomsend\n");	return num_sent;}//PRM -random send//PRM-changedint coopAgent::send_data_pkt_to_layer (char* payload, int data_len, LayerInfo * l, int src_aid, int base_lid, struct sockaddr_in src_agent_addr, int src_seqno ,int type, double ot, int overlay_hop) {  int count_pkts = 0;    //PRM	//do some bitmap stuff here	int i, base;	char bitmap[MAX_BITMAP];	//for ( i = 0, base = src_seqno - BITMAP_SIZE; i < BITMAP_SIZE; i++ ) {	for ( i = 0, base = m_max_seq_recv - BITMAP_SIZE;  type != PROACTIVE_DATA && i < BITMAP_SIZE; i++ ) {		if ( base+i < 0 )			continue;		//we are not keeping fixed size of data packet 		if ( base+i < m_max_seq_recv - BITMAP_SIZE )			bitmap[i] = 0;		else if ( lookup_packet_cache( src_aid, /*src_nid,*/ base+i ) )			bitmap[i] = 1;		else			bitmap[i] = 0;	}printf("sunny :: bitmap made in send_data_pkt_to_layer\n");  //PRM  AppPacket *ap = new AppPacket(PACKET_DATA);    ap->u.data_p.original_src.agent_id = src_aid;  memcpy(& ap->u.data_p.original_src.agent_addr, & src_agent_addr, sizeof(struct sockaddr_in));  ap->u.data_p.seq_no = src_seqno;  ap->u.data_p.lid = l->lid;  ap->u.data_p.base_lid = base_lid;  ap->u.data_p.data_len = data_len;  //ap->u.data_p.payload = (char *)malloc(data_len);  ap->u.data_p.payload = payload;  //memcpy(ap->u.data_p.payload, payload, data_len);//PRM		ap->u.data_p.type = type;		ap->u.data_p.orig_time = ot;		//fprintf(stderr, "\t\t %d\n", ap->u.data_p.hop_count );		memcpy( ap->u.data_p.bitmap, bitmap, BITMAP_SIZE * sizeof (char) );		ap->u.data_p.overlay_hop = overlay_hop +1;		//fprintf(stderr, "\t\t %d\n", ap->u.data_p.hop_count );printf("sunny: appPacket in send_data_pkt_to_layer\n");//PRM  for (void * pos = l->ag_list.GetHeadPosition();       pos != NULL;       l->ag_list.GetNext(pos) ) {        LayerAgentInfo * this_la = l->ag_list.GetAt(pos);    if (self_check(this_la) == false) {      if ( ( (bse.agent_id != this_la->ag.agent_id) /* || (bse.node_id != this_la->ag.node_id) */) &&	   ( ( (this_la->ag.agent_id != src_aid) /*|| (this_la->ag.node_id != src_nid) */) ) ) {#ifdef LOG_DATA_PACKET_SEND	printf ("[coop %d ] At %8.4f send data-pkt : < [ %d ] > seq %d : to < [ %d ] > lid %d dist_est %f len %d\n", id,

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -