⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 coop-agent.cc

📁 这是P2P流媒体方案-NICE的实现源码
💻 CC
📖 第 1 页 / 共 5 页
字号:
	if (ap->u.clusterrefresh_p.cluster_remove == true) { // Delete#ifdef LOG_COOP_JUNK          printf ("[coop %d ] . . (c-rcr) : src self-remove\n", id);#endif 	  bool check = l->DeleteClusterMember(la_ag);	  assert (check == true);	  delete la_ag;	}#ifdef LOG_COOP_JUNK        printf ("[coop %d ] . . (c-rcr) : src stay as root (root-xfer to src)\n\n", id);#endif 	do_internal_root_transfer(l,other_root);        log_cluster_change_info(l->lid);#ifdef LOG_COOP_CLUSTER_CHANGE	display_cluster_info(l,l->lid);#endif 		return;      }    }    // else(s does not think it is the root): nothing to do  }  else { // I am not the root    AgentInfo old_root(l->root->ag.agent_id,l->root->ag.agent_addr);    if ( (l->root->ag.agent_id == ap->src_agent) /*&&          (l->root->ag.node_id == ap->src)*/ ) { // I think s is the root      if (match_cluster_to_root_info(l) == true) {	change = true;	match_to_root_done = true;      }      if ( (l->lid + 1) < MAX_LAYERS) {        if (layers.arr[l->lid+1] == NULL)          layers.arr[l->lid+1] = new LayerInfo (l->lid+1, this);	put_cluster_refresh_hl_info_into_layer_agent(ap, layers.arr[l->lid+1],id,udp_recv_agent_addr);	if (m_ping_in_progress == true) {	//  m_ping_resps_received = get_ping_response_count_in_layer(layers.arr[l->lid+1]);	  test_for_all_pings_received = true;        }      }      if ( ( (ap->u.clusterrefresh_p.is_root == true) && // s thinks it is root	     (ap->u.clusterrefresh_p.root_xfer == true) ) ||	   (ap->u.clusterrefresh_p.is_root == false) ) { // s doesnt think it is root	void *pos = l->ag_list.Locate(ap->u.clusterrefresh_p.agent_arr[0].ag.agent_id);	if (pos == NULL)	  assert (pos != NULL);	LayerAgentInfo *new_root = l->ag_list.GetAt(pos);	assert ( (old_root.agent_id != new_root->ag.agent_id) /*|| (old_root.node_id != new_root->ag.node_id) */); // THE IF STMT (NEXT LINE) IS NOT NEEDED	if ( (old_root.agent_id != new_root->ag.agent_id) /*|| (old_root.node_id != new_root->ag.node_id) */) {#ifdef LOG_COOP_JUNK          printf ("[coop %d ] . . (c-rcr) : have a new cluster-root < [ %d ] >\n", id, new_root->ag.agent_id);#endif 	  l->root = new_root;	  change = true;	  cancel_higher_layer_ping_timers();	}	// Join next higher layer, if it was a root transfer to me	if (self_check(l->root) == true) {#ifdef LOG_COOP_JUNK          printf ("[coop %d ] . . (c-rcr) : me not root, root xfer to me\n", id);#endif 	  join_to_next_higher_layer(l->lid);	  l->delay_root_xfer = true;        }	else {	  // The higher layer ping timer is set only when the next higher	  // layer info is known. If I am joining as the new leader, then	  // this info is not known.	  set_higher_layer_ping_timer();	}      }    } // else I dont think s is the root: do nothing  }  if (match_to_root_done == false) { // Only in this case, is this check  // required to see if we need to delete and free this member. In all  // other cases, we have already done the delete and returned from the  // function.  // So if match_to_root not done, and control is here, then do this  // check and delete if appropriate.    if (ap->u.clusterrefresh_p.cluster_remove == true) { // Delete      bool check = l->DeleteClusterMember(la_ag);      assert (check == true); // This is true after the match_to_root stuff.      if (check == true) {#ifdef LOG_COOP_JUNK        printf ("[coop %d ] . . (c-rcr) : src self-remove\n", id);#endif         delete la_ag;        change = true;      }    }  }  if ( (m_ping_in_progress == true) && (test_for_all_pings_received == true) ) {    m_ping_resps_received = get_ping_response_count_in_layer(layers.arr[l->lid+1]);    if ( (l->lid + 1) != m_ping_lid )      assert ( (l->lid+1) == m_ping_lid);    if (check_all_ping_responses_received() == true) {      int lower_lid = m_ping_lid - 1;      //printf ("[%d %d] At %8.4f ping before-process handle-cluster-refresh\n", id, n->id, /* Scheduler::my_clock() */ my_clock());      if (process_ping_responses () == true) {	//printf ("[ %d ] At %8.4f ping handle-cluster-refresh\n", id, /* Scheduler::my_clock() */ my_clock());      	change = true;        l = layers.arr[lower_lid];      }    }  }  if (change == true) {    assert (l != NULL);    log_cluster_change_info(l->lid);#ifdef LOG_COOP_CLUSTER_CHANGE    display_cluster_info(l,l->lid);#endif   }  return;}void coopAgent::handle_cluster_merge (AppPacket * ap) {  if (valid_cluster_merge_packet(ap) == false)    return;#ifdef LOG_COOP_RECV  printf ("[coop %d ] At %8.4f (c-merge) : recvd-valid-merge : lid %d from < [ %d ]>\n", id, /* Scheduler::my_clock() */ my_clock(),ap->u.clustermerge_p.layer_id,ap->src_agent);  for (int i = 0; i < ap->u.clustermerge_p.mbr_count; i++)    printf ("[coop %d ] . . (c-merge) < [ %d ] >\n", id, ap->u.clustermerge_p.agent_arr[i].ag.agent_id);  printf ("\n");#endif   LayerInfo *l = layers.arr[ap->u.clustermerge_p.layer_id];  assert (l != NULL);  int count = l->AddExtraMembersFromAltRootPacket(ap);  if (count > 0) {    log_cluster_change_info(l->lid);#ifdef LOG_COOP_CLUSTER_CHANGE    display_cluster_info(l,l->lid);#endif   }  l->cr_msgt.CancelTimer();  send_all_cluster_refresh_packet(l,false,true,NULL);  l->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT);  return;}void coopAgent::handle_ping_query (AppPacket *ap) {    assert (ap->st == PING_QUERY);  int this_lid = ap->u.pingq_p.lid;  bool accept = false;  double dist = -1.0;#ifdef LOG_COOP_JUNK_PING  printf ("[coop %d ] At %8.4f recv-pingq : lid %d : from < [ %d ] >\n", id, /* Scheduler::my_clock() */ my_clock(), ap->u.pingq_p.lid, ap->src_agent);#endif   if (layers.arr[this_lid] != NULL) {    if (layers.arr[this_lid]->me_in_layer == true) {      /* This is new stuff: reject a ping request if accepting this       * member increases my cluster radius */      LayerInfo * lower_layer = layers.arr[this_lid - 1];      assert (lower_layer != NULL);      assert (lower_layer->root != NULL);      assert (self_check(lower_layer->root) == true);      LayerAgentInfo * my_farthest = find_farthest_agent_to_me_in_layer(lower_layer);      assert (my_farthest != NULL);      long my_farthest_dist_long = USECS(my_farthest->dist);      {	struct sockaddr_in their_addr; 	memcpy(& their_addr, & ap->src_agent_addr, sizeof(struct sockaddr_in));	their_addr.sin_port = htons(DIST_EST_PORT);        dist = estimate_rtt_wrapper(their_addr);      }      long this_dist_long = USECS(dist);      if (my_farthest_dist_long > this_dist_long)        accept = true;      else {#ifdef LOG_COOP_JUNK_PING  	printf ("[coop %d ] . . recv-pingq : reject (dist-violation)\n", id);#endif       }    }  }  AppPacket *resp_p = new AppPacket (PING_RESPONSE);//sunny change3 (6lines)	resp_p->src_agent=this->id;    	memcpy(& resp_p->src_agent_addr, & this->udp_recv_agent_addr, sizeof(struct sockaddr_in));/* no dst_agent	resp_p->dst_agent=ap->src_ag.agent_id;    	memcpy(& resp_p->dst_agent_addr, & ap->src_ag.agent_addr, sizeof(struct sockaddr_in));*///sunny   resp_p->u.pingresp_p.accept = accept;  resp_p->u.pingresp_p.lid = this_lid;  resp_p->u.pingresp_p.src_time = ap->u.pingq_p.src_time;  resp_p->u.pingresp_p.dist = dist;  if (accept == true) {    assert (this_lid > 0);    LayerInfo * lower_layer = layers.arr[this_lid - 1];    //assert (lower_layer != NULL);    //assert (lower_layer->root != NULL);    //assert (self_check(lower_layer->root) == true);    void * pos = lower_layer->ag_list.Locate(ap->src_agent);    if (pos != NULL) {#ifdef LOG_COOP_JUNK      printf ("[coop %d ] At %8.4f pinger < [ %d ] > dup\n", id, /* Scheduler::my_clock() */ my_clock(),ap->src_agent);#endif       LayerAgentInfo * this_la = lower_layer->ag_list.GetAt(pos);      lower_layer->ag_list.RemoveAt(pos);      delete this_la;      log_cluster_change_info(lower_layer->lid);#ifdef LOG_COOP_CLUSTER_CHANGE    display_cluster_info(lower_layer,lower_layer->lid);#endif     }    put_layer_cluster_info_into_packet(lower_layer,resp_p);  }#ifdef LOG_COOP_JUNK    printf ("[coop %d ] At %8.4f ping-check-debug-send < [ %d ] > dist %f\n", id, /* Scheduler::my_clock() */ my_clock(),ap->src_agent, resp_p->u.pingresp_p.dist);#endif  send_pkt_wrapper(resp_p,ap->src_agent,ap->src_agent_addr);  return;}void coopAgent::handle_ping_response (AppPacket * ap) {  assert (ap->st == PING_RESPONSE);  if ( (m_ping_in_progress == false) || (ap->u.pingresp_p.lid != m_ping_lid) )    return;  LayerInfo * l = layers.arr[m_ping_lid];  assert (l != NULL);  void * pos = l->ag_list.Locate(ap->src_agent);  if (pos == NULL)    return;  LayerAgentInfo * la = l->ag_list.GetAt(pos);  //  la->dist = (/* Scheduler::my_clock() */ my_clock() - ap->u.pingresp_p.src_time) / 2.0;  //modify ping to just estimate rtt...  if (ap->u.pingresp_p.dist >= 0.0) {    la->dist = ap->u.pingresp_p.dist;    la->dist_clock = my_clock();    la->valid_dist_clock = true;  }  /*  if ((la->valid_dist_clock == false) || (my_clock() - la->dist_clock >= DIST_CLOCK_THRESHOLD)) {    //  dist = MAX_RTT;    struct sockaddr_in their_addr;     memcpy(& their_addr, & la->ag.agent_addr, sizeof(struct sockaddr_in));    their_addr.sin_port = htons(DIST_EST_PORT);    la->dist = estimate_rtt_wrapper(their_addr);    la->dist_clock = my_clock();    la->valid_dist_clock = true;  }  */    if (la->lower_layer != NULL) {    delete la->lower_layer;    la->lower_layer = NULL;  }#ifdef LOG_COOP_JUNK_PING  printf ("[coop %d ] At %8.4f recv-pingresp : lid %d : from < [ %d ] > dist %f %s\n", id, /* Scheduler::my_clock() */ my_clock(), m_ping_lid, ap->src_agent, la->dist, (ap->u.pingresp_p.accept == true) ? "" : "*reject*" );#endif   if (ap->u.pingresp_p.accept == true) {    la->lower_layer = new LayerInfo(m_ping_lid-1,this);    put_packet_cluster_info_into_layer(ap,la->lower_layer);    m_ping_resps_received ++;  }  else {    bool success = l->DeleteClusterMember(la);    delete la;    assert (success == true);  }  if (check_all_ping_responses_received() == true) {    int lower_lid = m_ping_lid - 1;    if (process_ping_responses () == true) {      //printf ("[ %d ] At %8.4f ping recv-ping-resp\n", id, /* Scheduler::my_clock() */ my_clock());      LayerInfo * new_layer = layers.arr[lower_lid];      assert (new_layer != NULL);      log_cluster_change_info(new_layer->lid);#ifdef LOG_COOP_CLUSTER_CHANGE      display_cluster_info(new_layer,new_layer->lid);#endif     }  }  return;}bool coopAgent::check_all_ping_responses_received (void) {  assert (m_ping_in_progress == true);  LayerInfo *l = layers.arr[m_ping_lid];  assert (l != NULL);  // If this higher layer has BSE, then cannot count it, since BSE is not sent  // a ping.  int ignore_bse;  if (l->ag_list.Locate(bse.agent_id) != NULL)    ignore_bse = 1;  else    ignore_bse = 0;  // The -1 is due to not including the root of the lower layer in this  if ( m_ping_resps_received == ( l->ag_list.GetSize() - 1 - ignore_bse) )    return true;  return false;}/* Returns true if there is a change */bool coopAgent::process_ping_responses (void) {  bool ret_val = false;  assert (m_ping_lid > 0);  LayerInfo * hl = layers.arr[m_ping_lid];  assert (hl != NULL);  LayerInfo * my_highest_layer = layers.arr[m_ping_lid - 1];  assert (my_highest_layer != NULL);  assert (my_highest_layer->root != NULL);  if (my_highest_layer->root->dist < 0.0)    assert (my_highest_layer->root->dist >= 0.0);  assert (my_highest_layer->me_in_layer == true);  LayerAgentInfo * min_la = find_closest_agent_in_layer(hl,&(my_highest_layer->root->ag),true);  if (min_la != NULL) {    long my_highest_layer_root_dist_long = USECS(my_highest_layer->root->dist);    long min_la_dist_long = USECS(min_la->dist);    if (my_highest_layer_root_dist_long > min_la_dist_long) {      ret_val = true;#ifdef LOG_COOP_JUNK_PING_SWITCH      printf ("[coop %d ] At %8.4f switch-root : lid %d : from < [ %d ] > to < [ %d ] > dist %8.4f \n", id, /* Scheduler::my_clock() */ my_clock(), m_ping_lid - 1, my_highest_layer->root->ag.agent_id, min_la->ag.agent_id, min_la->dist);#endif       // Send the cluster remove in old cluster      // Delete self      void * pos = my_highest_layer->ag_list.Locate(id);      assert (pos != NULL);      LayerAgentInfo * self_ag = my_highest_layer->ag_list.GetAt(pos);      my_highest_layer->ag_list.RemoveAt(pos);      my_highest_layer->me_in_layer = false;        if (my_highest_layer->ag_list.GetSize() > 0)	send_cluster_remove(my_highest_layer,false);      delete my_highest_layer;      // Send the join to new cluster      layers.arr[m_ping_lid-1] = min_la->lower_layer;      min_la->lower_layer = NULL;      LayerInfo * new_layer = layers.arr[m_ping_lid-1];      bool check = new_layer->AddClusterMember(self_ag);      assert (check == true);      new_layer->me_in_layer = true;          assert (new_layer->root != NULL);      assert ( (new_layer->root->ag.agent_id == min_la->ag.agent_id) /* && (new_layer->root->ag.node_id == min_la->ag.node_id) */);      assert (min_la->dist >= 0.0);      new_layer->root->dist = min_la->dist;      AppPacket *pkt = new AppPacket(JOIN_QUERY);      init_join_query_packet(pkt,m_ping_lid-1,/* Scheduler::my_clock() */ my_clock(),true, udp_recv_agent_addr);//sunny change3 (6lines)	pkt->src_agent=this->id;    	memcpy(& pkt->src_agent_addr, & this->udp_recv_agent_addr, sizeof(struct sockaddr_in));	pkt->dst_agent=new_layer->root->ag.agent_id;    	memcpy(& pkt->dst_agent_addr, & new_layer->root->ag.agent_addr, sizeof(struct sockaddr_in));//sunny       send_pkt_wrapper(pkt,new_layer->root->ag.agent_id, new_layer->root->ag.agent_addr);      new_layer->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT);      new_layer->cr_chkt.SetTimer(CONST_CLUSTER_REFRESH_CHECK_TIMEOUT);      assert (state == ATTACHED);    }  }  cancel_higher_layer_ping_timers();  set_higher_layer_ping_timer();  return ret_val;}void coopAgent::handle_data_ack_packet (AppPacket *ap) {  assert (ap->st == PACKET_DATA_ACK);//#ifdef LOG_DATA_PACKET_RECV  printf ("[coop %d ] At %8.4f recv ack-pkt : < [ %d ] > seq %d : from < [ %d ] >\n", id, /* Scheduler::my_clock() */ my_clock(), ap->u.dataack_p.original_src.agent_id, ap->u.dataack_p.seq_no, ap->src_agent);//#endif   return;}int coopAgent::handle_data_packet (AppPacket *ap) {  assert (ap->st == PACKET_DATA);//#ifdef LOG_DATA_PACKET_RECV  printf ("[coop %d ] At %8.4f recv data-pkt : < [ %d ] > seq %d : from < [ %d ] >\n", id, my_clock(), ap->u.data_p.original_src.agent_id, ap->u.data_p.seq_no, ap->src_agent);printf("sunny: handle_data_packet orig_time: %f\n", ap->u.data_p.orig_time);//#endif /* Send an ack back */  AppPacket *new_ack_p = new AppPacket(PACKET_DATA_ACK);//sunny change3 (6lines)	new_ack_p->src_agent=this->id;    	memcpy(& new_ack_p->src_agent_addr, & this->udp_recv_agent_addr, sizeof(struct sockaddr_in));	new_ack_p->dst_agent=ap->src_agent;    	memcpy(& new_ack_p->dst_agent_addr, & ap->src_agent_addr, sizeof(struct sockaddr_in));//sunny   new_ack_p->u.dataack_p.seq_no = ap->u.data_p.seq_no;  new_ack_p->u.dataack_p.original_src.agent_id = ap->u.data_p.original_src.agent_id;  memcpy((void*)&(new_ack_p->u.dataack_p.original_src.agent_addr), (void*)&(ap->u.data_p.original_src.agent_addr), sizeof(struct sockaddr_in));  send_pkt_wrapper(new_ack_p, ap->src_agent, ap->src_agent_addr);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -