⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 bse-agent.cc

📁 这是P2P流媒体方案-NICE的实现源码
💻 CC
📖 第 1 页 / 共 2 页
字号:
    return 0;  for (void * pos = top_layer->ag_list.GetHeadPosition();       pos != NULL;       top_layer->ag_list.GetNext(pos) ) {    LayerAgentInfo * this_la = top_layer->ag_list.GetAt(pos);    if (this_la->ag.agent_id == ap->u.data_p.original_src.agent_id)       continue;    if (self_check(this_la) == false) {       AppPacket *new_p = new AppPacket(PACKET_DATA);       new_p->u.data_p.original_src.agent_id = ap->u.data_p.original_src.agent_id;       memcpy ( & new_p->u.data_p.original_src.agent_addr, & ap->u.data_p.original_src.agent_addr, sizeof(struct sockaddr_in));       new_p->u.data_p.seq_no = ap->u.data_p.seq_no;       new_p->u.data_p.lid = top_layer->lid;       new_p->u.data_p.base_lid = ap->u.data_p.base_lid;#ifdef LOG_DATA_PACKET_SEND       // KCR-CHANGE       //printf ("[bse %d ] At %8.4f send data-pkt : < [ %d ] > seq %d : to < [ %d ] > lid %d dist_est %f \n", id, /* Scheduler::Clock */ my_clock(), ap->u.data_p.original_src.agent_id, ap->u.data_p.seq_no, this_la->ag.agent_id, top_layer->lid, this_la->dist);#endif        //send_pkt_wrapper(new_p,this_la->ag.agent_id, this_la->ag.agent_addr);       delete new_p;    }  }  fflush(0);  return 0;} void bseAgent::handle_cluster_refresh_msg_timeout (void) {  assert (top_layer != NULL);#ifdef LOG_BSE_SEND  printf ("\n[bse %d ] At %8.4f : refresh-timeout: lid %d\n", id, /* Scheduler::Clock */ my_clock(), top_layer->lid);#endif   send_top_layer_cluster_refresh(NULL);  top_layer->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT);  fflush(0);  return;}void bseAgent::send_top_layer_cluster_refresh (LayerAgentInfo * ignore_agent) {  assert (top_layer != NULL);#ifdef LOG_BSE_SEND  printf ("\n[bse %d ] At %8.4f (b-scr) : send-refresh : lid %d\n", id, /* Scheduler::Clock */ my_clock(), top_layer->lid);#endif   for (void * pos = top_layer->ag_list.GetHeadPosition();       pos != NULL;       top_layer->ag_list.GetNext(pos) ) {    LayerAgentInfo * la = top_layer->ag_list.GetAt(pos);    if (ignore_agent != NULL) {      if (ignore_agent->ag.agent_id == la->ag.agent_id)         continue;    }    if (self_check(la) == false) {#ifdef LOG_BSE_SEND  printf ("[bse %d ] . . (b-scr) : < [ %d ] >\n", id, la->ag.agent_id);#endif       AppPacket *ap = new AppPacket (CLUSTER_REFRESH);      put_layer_cluster_info_into_packet(top_layer,ap);      ap->u.clusterrefresh_p.root_xfer = false;      ap->u.clusterrefresh_p.is_root = true;      send_pkt_wrapper(ap,la->ag.agent_id,la->ag.agent_addr);    }  }#ifdef LOG_BSE_SEND  printf ("\n");#endif   fflush(0);  return;}void bseAgent::handle_cluster_refresh_check_timeout (void) {  bool change = false;  assert (top_layer != NULL);#ifdef LOG_BSE_JUNK      printf ("[bse %d] At %8.4f (b-rc) : refresh-chk : lid %d\n", id, /* Scheduler::Clock */ my_clock(), top_layer->lid);#endif fflush(0);  for (void * pos = top_layer->ag_list.GetHeadPosition();       pos != NULL;       ) {    LayerAgentInfo * la_ag = top_layer->ag_list.GetAt(pos);    void * old_pos = pos;    top_layer->ag_list.GetNext(pos);    if (self_check(la_ag) == true)      continue;    if (la_ag->refresh == false) {      /* Member is lost */      top_layer->ag_list.RemoveAt(old_pos);#ifdef LOG_BSE_JUNK      printf ("[bse %d ] . . (b-rc) : < [ %d ] > *lost*\n", id, la_ag->ag.agent_id);#endif       delete la_ag;      change = true;    }    else {      la_ag->refresh = false; // Resetting the refresh flag#ifdef LOG_BSE_JUNK      printf ("[bse %d ] . . (b-rc) : < [ %d ] >\n", id, la_ag->ag.agent_id);#endif     }  }#ifdef LOG_BSE_JUNK  printf ("\n");#endif   assert (top_layer->ag_list.GetSize() >= 1);  if (top_layer->ag_list.GetSize() == 1) { // I am the only one    if (top_layer->lid > 0) {#ifdef LOG_BSE_CLUSTER_CHANGE  printf ("[bse %d ] At %8.4f cluster-info : lid %d : deleted\n", id,/* Scheduler::Clock */ my_clock(), top_layer->lid);#endif       top_layer->lid --;#ifdef LOG_BSE_JUNK      printf ("[bse %d ] At %8.4f refresh-chk layer-decrement to lid %d\n", id, /* Scheduler::Clock */ my_clock(), top_layer->lid);#endif       change = true;    }  }  if (top_layer->ag_list.GetSize() >= (UPPER_3K+1)) {    split_top_layer_using_two_partition();    change = true;  }  if (change == true) {    log_cluster_change_info ();#ifdef LOG_BSE_CLUSTER_CHANGE    display_top_layer_info();#endif   }  top_layer->cr_chkt.SetTimer(CONST_CLUSTER_REFRESH_CHECK_TIMEOUT);  fflush(0);  return;}void bseAgent::split_top_layer_using_two_partition (void) {#ifdef LOG_BSE_JUNK  printf ("[bse %d ] At %8.4f cluster-split : lid %d\n", id, /* Scheduler::Clock */ my_clock(), top_layer->lid);#endif   LayerAgentInfo ** ag_arr = top_layer->CreateLayerAgentInfoArray();  assert (ag_arr != NULL);  double * self_included_cost = create_cost_matrix(id,ag_arr,top_layer->ag_list.GetSize());  // This function already updates the agent_arr by deleting this agent  double * cost = delete_agent_from_cost_matrix(self_included_cost,ag_arr,top_layer->ag_list.GetSize(),id);  free(self_included_cost);  void * pos = top_layer->ag_list.Locate(id);  assert (pos != NULL);  LayerAgentInfo * self = top_layer->ag_list.GetAt(pos);  bool check = top_layer->DeleteClusterMember(self);  assert (check == true);  int root1_index, root2_index;  int * index_set1;  int * index_set2;  int set1_size, set2_size;  // Partition the set of members  int lower_size = top_layer->ag_list.GetSize() / 2;  two_partition(top_layer->ag_list.GetSize(),cost,lower_size,index_set1,set1_size,root1_index,index_set2,set2_size,root2_index);  free(cost);#ifdef LOG_BSE_PARTITION  printf ("[bse %d ] . . cluster-split : Partition 1 : ", id);  for (int i = 0; i < set1_size; i++)    printf ("%d ", ag_arr[index_set1[i]]->ag.agent_id);  printf("\n");  printf ("[bse %d ] . . cluster-split : Partition 2 : ", id);  for (int i = 0; i < set2_size; i++)    printf ("%d ", ag_arr[index_set2[i]]->ag.agent_id);  printf("\n");#endif     LayerInfo * tmp_layer1 = new LayerInfo(top_layer->lid,NULL);  LayerInfo * tmp_layer2 = new LayerInfo(top_layer->lid,NULL);  for (int i = 0; i < set1_size; i++) {    top_layer->DeleteClusterMember(ag_arr[index_set1[i]]);    bool success = tmp_layer1->AddClusterMember(ag_arr[index_set1[i]]);    assert (success == true);  }  tmp_layer1->root = ag_arr[root1_index];  for (int i = 0; i < set2_size; i++) {    top_layer->DeleteClusterMember(ag_arr[index_set2[i]]);    bool success = tmp_layer2->AddClusterMember(ag_arr[index_set2[i]]);    assert (success == true);  }  tmp_layer2->root = ag_arr[root2_index];  free(index_set1);  free(index_set2);#ifdef LOG_BSE_PARTITION  printf ("[bse %d ] At %8.4f cluster-split : root1 < [ %d ] > count %d     root2 < [ %d ] > count %d\n", id, /* Scheduler::Clock */ my_clock(), ag_arr[root1_index]->ag.agent_id, set1_size, ag_arr[root2_index]->ag.agent_id, set2_size);#endif   free(ag_arr);  assert (top_layer->ag_list.GetSize() == 0);  // Transfer control of the other cluster  send_cluster_remove(tmp_layer1);  send_cluster_remove(tmp_layer2);  LayerAgentInfo * new_root1 = new LayerAgentInfo(tmp_layer1->root->ag.agent_id,tmp_layer1->root->ag.agent_addr);  LayerAgentInfo * new_root2 = new LayerAgentInfo(tmp_layer2->root->ag.agent_id,tmp_layer2->root->ag.agent_addr);  // Cleanup ...  delete tmp_layer1;  delete tmp_layer2;  assert (top_layer->lid + 1 < MAX_LAYERS);#ifdef LOG_BSE_CLUSTER_CHANGE  printf ("[bse %d ] At %8.4f cluster-info : lid %d : deleted\n", id, /* Scheduler::Clock */ my_clock(), top_layer->lid);#endif #ifdef LOG_BSE_JUNK  printf ("[bse %d ] At %8.4f layer-increment : lid %d\n", id, /* Scheduler::Clock */ my_clock(), top_layer->lid+1);#endif   // Setup the next higher layer  top_layer->lid ++;  top_layer->AddClusterRoot(self);  top_layer->me_in_layer = true;  top_layer->AddClusterMember(new_root1);  top_layer->AddClusterMember(new_root2);  fflush(0);  return;}void bseAgent::send_cluster_remove (LayerInfo * l) {#ifdef LOG_BSE_SEND      printf ("[bse %d ] At %8.4f (b-scx) : self-cluster-remove : lid %d : new-root < [ %d ] >\n", id, /* Scheduler::Clock */ my_clock(), top_layer->lid, l->root->ag.agent_id);#endif   for (void * pos = l->ag_list.GetHeadPosition();       pos != NULL;       l->ag_list.GetNext(pos) ) {    LayerAgentInfo * la = l->ag_list.GetAt(pos);    assert (self_check(la) == false);    AppPacket *ap = new AppPacket (CLUSTER_REFRESH);    put_cluster_remove_info_into_packet(l,ap,true,NULL,id,udp_recv_agent_addr);#ifdef LOG_BSE_SEND    printf ("[bse %d ] . . (b-scx) : < [ %d ] >\n", id, la->ag.agent_id);#endif     send_pkt_wrapper(ap,la->ag.agent_id,la->ag.agent_addr);  }#ifdef LOG_BSE_SEND  printf ("\n");#endif   fflush(0);  return;}void bseAgent::log_cluster_change_info (void) {  m_last_change = /* Scheduler::Clock */ my_clock();  return;}void bseAgent::display_top_layer_info (void) {  assert (top_layer != NULL);  printf ("[bse %d ] At %8.4f cluster-info : lid %d : ldr %s %d : count %d : ", id, /* Scheduler::Clock */ my_clock(), top_layer->lid, (self_check(top_layer->root) == true) ? "self" : "other", top_layer->root->ag.agent_id, top_layer->ag_list.GetSize() );#ifdef LOG_CLUSTER_INFO_DETAILED  struct_pkt_reset();  struct_pkt_set_size(top_layer->ag_list.GetSize());  struct_pkt_set_lid(top_layer->lid);  struct_pkt_set_ldr(top_layer->root->ag.agent_id);  pkt_info.my_id = 0;  for (void * pos = top_layer->ag_list.GetHeadPosition();       pos != NULL;       top_layer->ag_list.GetNext(pos) ) {    int tid = (top_layer->ag_list.GetAt(pos))->ag.agent_id;    printf (" %d", tid);    struct_pkt_add_member(tid);  }  send_struct_pkt();#endif   printf ("\n");  fflush(0);  return;}bool bseAgent::self_check (LayerAgentInfo * la) {  if (la->ag.agent_id == id)     return true;  else    return false;}//PRMint bseAgent::send_data_pkt_to_layer (LayerInfo * l, int src_seqno) {  int count_pkts = 0;  double now = /*Scheduler::Clock();*/my_clock();	m_clock_cache[src_seqno % CLOCK_CACHE_SIZE] = now;	//fprintf(stderr, "%d\n", src_seqno);	//do some bitmap stuff here	int i, base;	char bitmap[MAX_BITMAP];	for ( i = 0, base = src_seqno - BITMAP_SIZE; i < BITMAP_SIZE; i++ ) {		if ( base+i < 0 )			continue;		bitmap[i] = 1;	//i am the source	}  for (void * pos = l->ag_list.GetHeadPosition();       pos != NULL;       l->ag_list.GetNext(pos) ) {    LayerAgentInfo * this_la = l->ag_list.GetAt(pos);    if ( this_la->ag.agent_id != this->id ) {       AppPacket *ap = new AppPacket(PACKET_DATA);       ap->u.data_p.original_src.agent_id = this->id;       //ap->u.data_p.original_src.node_id = this->n->id;       memcpy(&ap->u.data_p.original_src.agent_addr, &this->udp_recv_agent_addr, sizeof(struct sockaddr_in));         ap->u.data_p.seq_no = src_seqno;       ap->u.data_p.lid = l->lid;		ap->u.data_p.type = FRESH_DATA;		ap->u.data_p.orig_time = now;		memcpy( ap->u.data_p.bitmap, bitmap, BITMAP_SIZE * sizeof (char) );#ifdef LOG_DATA_PACKET_SEND		/*     printf ("[coop %d %d ] At %8.4f send data-pkt : < [ %d %d ] > seq %d : to < [ %d %d ] > lid %d\n", id, n->id, Scheduler::Clock(), src_aid, src_nid, src_seqno, this_la->ag.agent_id, this_la->ag.node_id, l->lid);*/#endif LOG_DATA_PACKET_SEND		//send_pkt_wrapper(ap,this_la->ag.agent_id, this_la->ag.node_id);		send_pkt_wrapper(ap,this_la->ag.agent_id,this_la->ag.agent_addr);       count_pkts ++;   } }  return count_pkts;}//PRMvoid bseAgent::specific_send_data_pkt() {  assert (started == true);/*#ifdef LOG_DATA_PACKET_INIT  printf ("[coop %d %d ] At %8.4f source data-pkt : seq %d\n", id, n->id, Scheduler::Clock(), m_data_pkt_seq);#endif LOG_DATA_PACKET_INIT*/  /* THe following should be used as stats only (NOT SURE)	if ( m_data_pkt_seq < CUT_OFF )		up_per_pkt[m_data_pkt_seq] = UpCount;  */	if ( top_layer != NULL )		send_data_pkt_to_layer ( top_layer, m_data_pkt_seq++);	/*	if ( top_layer && m_data_pkt_seq%1000 == 1 )		fprintf(stderr, "bse: first packet top_layer = %d\n", top_layer->lid );	*/  return;}void bseAgent::handle_retransmit_request( AppPacket *ap ) {	int seq = ap->u.retransmit_p.seq_no;	AppPacket *nap;	PacketAgentInfo *pai = &ap->u.retransmit_p.src;	char *bitmap = ap->u.retransmit_p.bitmap;	int cseq;	if ( pai->agent_id != id )		return;	for ( int i = 0 ; i < BITMAP_SIZE; i++ ) {		if ( (cseq = seq+i-BITMAP_SIZE ) < 0 )			continue;		if ( bitmap[i] && cseq < m_data_pkt_seq && cseq >= m_data_pkt_seq - CLOCK_CACHE_SIZE) {       		nap = new AppPacket(PACKET_DATA);		// some changes			nap->src_agent = this->id;		nap->src_agent_addr = this->udp_recv_agent_addr;			//       		nap->u.data_p.original_src = *pai;       		nap->u.data_p.seq_no = cseq;       		nap->u.data_p.lid = PROACTIVE_SEND_LID;			nap->u.data_p.type = PROACTIVE_DATA;	   		nap->u.data_p.orig_time = m_clock_cache[cseq % CLOCK_CACHE_SIZE];			//send_pkt_wrapper(nap, ap->src_agent, ap->src );			send_pkt_wrapper(nap, ap->src_agent, ap->src_agent_addr );     	}	}}//PRM-added

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -