⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 coop-agent.cc,v

📁 这是P2P流媒体方案-NICE的实现源码
💻 CC,V
📖 第 1 页 / 共 5 页
字号:
      hl_member->lower_layer = new LayerInfo (this_lid,this);      put_packet_cluster_info_into_layer (ap,hl_member->lower_layer);      LayerAgentInfo * self = hl_member->lower_layer->FindClusterMember(id);      if (self != NULL)	self->dist = 0.0;      /* Using the time metric. Can use any other */      //      hl_member->dist = /* Scheduler::my_clock() */ my_clock() - ap->send_time;      if ((hl_member->valid_dist_clock == false) || (my_clock() - hl_member->dist_clock >= DIST_CLOCK_THRESHOLD)) {    //  dist = MAX_RTT;	struct sockaddr_in their_addr; 	memcpy(& their_addr, & hl_member->ag.agent_addr, sizeof(struct sockaddr_in));	their_addr.sin_port = htons(DIST_EST_PORT);	hl_member->dist = estimate_rtt_wrapper(their_addr);	hl_member->dist_clock = my_clock();	hl_member->valid_dist_clock = true;      }#ifdef LOG_COOP_JUNK        printf ("[coop %d ] . . (c-jr) : dist-estimate %f\n", id, hl_member->dist);#endif     }    int match_count = higher_layer->ag_list.GetSize();    if (higher_layer->ag_list.Locate(bse.agent_id) != NULL)      match_count --;    if (match_count == m_join_resp_received) {            jqt.CancelTimer();#ifdef LOG_COOP_JUNK      printf ("[coop %d ] . . (c-jr) : all join resps found\n\n", id);#endif       process_received_join_responses();    }  }  return;}void coopAgent::handle_cluster_refresh (AppPacket *ap) {  assert (ap->st == CLUSTER_REFRESH);  if (valid_cluster_refresh_packet(ap) == false)    return;  LayerInfo * l = layers.arr[ap->u.clusterrefresh_p.layer_id];  void * this_agent_pos = l->ag_list.Locate(ap->src_agent);   assert (this_agent_pos != NULL);  LayerAgentInfo *la_ag = l->ag_list.GetAt(this_agent_pos);;  bool test_for_all_pings_received = false;  bool change = false;  bool match_to_root_done = false;#ifdef TESTING_JUNK  void * pos = l->ag_list.Locate(id);  if (pos != NULL) {    LayerAgentInfo * self = l->ag_list.GetAt(pos);    assert (self->dist >= 0.0);  }#endif #ifdef LOG_COOP_RECV  printf ("\n[coop %d ] At %8.4f (c-rcr) : recvd-valid-refresh : lid %d from < [ %d ] > %s %s %s\n", id, /* Scheduler::my_clock() */ my_clock(), l->lid,ap->src_agent, (ap->u.clusterrefresh_p.is_root == true) ? "root" : ".", (ap->u.clusterrefresh_p.root_xfer == true) ? "xfer" : ".", (ap->u.clusterrefresh_p.cluster_remove == true) ? "remove" : ".");#endif #ifdef LOG_COOP_RECV_2  printf ("[coop %d ] . . (c-rcr) : dist-info in refresh :", id);  for (int i = 0; i < ap->u.clusterrefresh_p.mbr_count; i++)    printf ("( %d %5.3f )", ap->u.clusterrefresh_p.agent_arr[i].ag.agent_id,ap->u.clusterrefresh_p.agent_arr[i].dist);  printf ("\n");#endif   /* Now this is a valid packet */  la_ag->refresh_agent(ap);  assert (l->root != NULL);  if (self_check(l->root) == true) { // I am the root    if (ap->u.clusterrefresh_p.is_root == true) {    // Pkt src,s, thinks that it is the root      // First check if it is doing a root transfer to me      if ( (ap->u.clusterrefresh_p.root_xfer == true) &&	   ( (ap->u.clusterrefresh_p.agent_arr[0].ag.agent_id == id) /* && 									(ap->u.clusterrefresh_p.agent_arr[0].ag.node_id == n->id) */) ) {        // In which case gladly add all unknown members        bool members_added = false;	if (l->AddExtraMembersFromAltRootPacket (ap) > 0) {	  change = true;          members_added = true;        }	// Do cluster remove	if (ap->u.clusterrefresh_p.cluster_remove == true) { // Delete#ifdef LOG_COOP_JUNK          printf ("[coop %d ] . . (c-rcr) : src self-remove\n", id);#endif 	  bool check = l->DeleteClusterMember(la_ag);	  assert (check == true);	  delete la_ag;	  change = true;	}#ifdef LOG_COOP_JUNK        printf ("[coop %d ] . . (c-rcr) : src and me roots, root xfer to me\n\n", id);#endif 	if (change == true) {          log_cluster_change_info(l->lid);#ifdef LOG_COOP_CLUSTER_CHANGE	  display_cluster_info(l,l->lid);#endif         }	// Immediate update of new member additions        if (members_added == true) {          l->cr_msgt.CancelTimer();          send_all_cluster_refresh_packet(l,false,true,NULL);          l->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT);	}	return;      }      bool stay_as_root = false;      // s_max: source dist to its farthest member      // KCR-CHANGE (2 new lines)      //double s_max = get_farthest_dist_of_member_to_agent(l,la_ag,false,false);      double s_max = get_avg_dist_of_member_to_agent(l,la_ag,false,false);      LayerAgentInfo * my_farthest_ag = find_farthest_agent_to_me_in_layer(l);      double my_max = my_farthest_ag->dist;      my_max = find_avg_agent_dist_to_me_in_layer(l);      long s_max_long = USECS(s_max);      long my_max_long = USECS(my_max);      int s_know_count = count_dist_known_members_for_agent(la_ag,false);      int me_know_count = count_dist_known_members_for_me(l);      if (are_agent_members_contained_in_mine(l,la_ag) == true) {	// All members in la_ag are also known to me	if ( (s_know_count < me_know_count) || // I am the superset	     (my_max_long < s_max_long) || // We are equal in size, but I am better	     ( (my_max_long == s_max_long) && (id < la_ag->ag.agent_id) ) ) { // -do-	  stay_as_root = true;	} // else {stay_as_root = false;}      }      else if (are_my_members_contained_in_agent(l,la_ag) == false) { 	// no one is a superset of the other	if (id < la_ag->ag.agent_id) {	  stay_as_root = true;	} // else {stay_as_root = false;}      } // else my_members_contained_in_agent() == true:         // i.e. s is a superset, so stay_as_root = false      LayerAgentInfo * other_root;      bool member_added = false;      if (ap->u.clusterrefresh_p.root_xfer == true) {	other_root = l->FindClusterMember(ap->u.clusterrefresh_p.agent_arr[0].ag.agent_id);	if (other_root == NULL) {	  LayerAgentInfo *new_guy = new LayerAgentInfo(ap->u.clusterrefresh_p.agent_arr[0].ag.agent_id,ap->u.clusterrefresh_p.agent_arr[0].ag.agent_addr);	  l->AddClusterMember(new_guy);	  other_root = new_guy;	  change = true;	  member_added = true;	}#ifdef LOG_COOP_JUNK        printf ("[coop %d ] . . (c-rcr) : src and me roots, src root xfer to < [ %d ] >\n", id, other_root->ag.agent_id);#endif       }      else {	assert (ap->u.clusterrefresh_p.cluster_remove == false);	other_root = la_ag;      }      if (stay_as_root == true) {	// Add all the extra members and send root challenge	if (l->AddExtraMembersFromAltRootPacket(ap) > 0) {	  change = true;          member_added = true;        }	if (ap->u.clusterrefresh_p.cluster_remove == true) { // Delete#ifdef LOG_COOP_JUNK          printf ("[coop %d ] . . (c-rcr) : src self-remove\n", id);#endif 	  bool check = l->DeleteClusterMember(la_ag);	  assert (check == true);	  delete la_ag;	  change = true;	}#ifdef LOG_COOP_JUNK        printf ("[coop %d ] . . (c-rcr) : me stay as root (root-challenge)\n\n", id);#endif 	send_root_challenge(l,other_root);	if (change == true) {          log_cluster_change_info(l->lid);#ifdef LOG_COOP_CLUSTER_CHANGE	  display_cluster_info(l,l->lid);#endif         }	// Immediate inform other members of new member	if (member_added == true) {          l->cr_msgt.CancelTimer();	  // Do not send this to the other root          send_all_cluster_refresh_packet(l,false,true,other_root);          l->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT);        }	return;      }      else {	change = true;	if (ap->u.clusterrefresh_p.cluster_remove == true) { // Delete#ifdef LOG_COOP_JUNK          printf ("[coop %d ] . . (c-rcr) : src self-remove\n", id);#endif 	  bool check = l->DeleteClusterMember(la_ag);	  assert (check == true);	  delete la_ag;	}#ifdef LOG_COOP_JUNK        printf ("[coop %d ] . . (c-rcr) : src stay as root (root-xfer to src)\n\n", id);#endif 	do_internal_root_transfer(l,other_root);        log_cluster_change_info(l->lid);#ifdef LOG_COOP_CLUSTER_CHANGE	display_cluster_info(l,l->lid);#endif 		return;      }    }    // else(s does not think it is the root): nothing to do  }  else { // I am not the root    AgentInfo old_root(l->root->ag.agent_id,l->root->ag.agent_addr);    if ( (l->root->ag.agent_id == ap->src_agent) /*&&          (l->root->ag.node_id == ap->src)*/ ) { // I think s is the root      if (match_cluster_to_root_info(l) == true) {	change = true;	match_to_root_done = true;      }      if ( (l->lid + 1) < MAX_LAYERS) {        if (layers.arr[l->lid+1] == NULL)          layers.arr[l->lid+1] = new LayerInfo (l->lid+1, this);	put_cluster_refresh_hl_info_into_layer_agent(ap, layers.arr[l->lid+1],id,udp_recv_agent_addr);	if (m_ping_in_progress == true) {	//  m_ping_resps_received = get_ping_response_count_in_layer(layers.arr[l->lid+1]);	  test_for_all_pings_received = true;        }      }      if ( ( (ap->u.clusterrefresh_p.is_root == true) && // s thinks it is root	     (ap->u.clusterrefresh_p.root_xfer == true) ) ||	   (ap->u.clusterrefresh_p.is_root == false) ) { // s doesnt think it is root	void *pos = l->ag_list.Locate(ap->u.clusterrefresh_p.agent_arr[0].ag.agent_id);	if (pos == NULL)	  assert (pos != NULL);	LayerAgentInfo *new_root = l->ag_list.GetAt(pos);	assert ( (old_root.agent_id != new_root->ag.agent_id) /*|| (old_root.node_id != new_root->ag.node_id) */); // THE IF STMT (NEXT LINE) IS NOT NEEDED	if ( (old_root.agent_id != new_root->ag.agent_id) /*|| (old_root.node_id != new_root->ag.node_id) */) {#ifdef LOG_COOP_JUNK          printf ("[coop %d ] . . (c-rcr) : have a new cluster-root < [ %d ] >\n", id, new_root->ag.agent_id);#endif 	  l->root = new_root;	  change = true;	  cancel_higher_layer_ping_timers();	}	// Join next higher layer, if it was a root transfer to me	if (self_check(l->root) == true) {#ifdef LOG_COOP_JUNK          printf ("[coop %d ] . . (c-rcr) : me not root, root xfer to me\n", id);#endif 	  join_to_next_higher_layer(l->lid);	  l->delay_root_xfer = true;        }	else {	  // The higher layer ping timer is set only when the next higher	  // layer info is known. If I am joining as the new leader, then	  // this info is not known.	  set_higher_layer_ping_timer();	}      }    } // else I dont think s is the root: do nothing  }  if (match_to_root_done == false) { // Only in this case, is this check  // required to see if we need to delete and free this member. In all  // other cases, we have already done the delete and returned from the  // function.  // So if match_to_root not done, and control is here, then do this  // check and delete if appropriate.    if (ap->u.clusterrefresh_p.cluster_remove == true) { // Delete      bool check = l->DeleteClusterMember(la_ag);      assert (check == true); // This is true after the match_to_root stuff.      if (check == true) {#ifdef LOG_COOP_JUNK        printf ("[coop %d ] . . (c-rcr) : src self-remove\n", id);#endif         delete la_ag;        change = true;      }    }  }  if ( (m_ping_in_progress == true) && (test_for_all_pings_received == true) ) {    m_ping_resps_received = get_ping_response_count_in_layer(layers.arr[l->lid+1]);    if ( (l->lid + 1) != m_ping_lid )      assert ( (l->lid+1) == m_ping_lid);    if (check_all_ping_responses_received() == true) {      int lower_lid = m_ping_lid - 1;      //printf ("[%d %d] At %8.4f ping before-process handle-cluster-refresh\n", id, n->id, /* Scheduler::my_clock() */ my_clock());      if (process_ping_responses () == true) {	//printf ("[ %d ] At %8.4f ping handle-cluster-refresh\n", id, /* Scheduler::my_clock() */ my_clock());      	change = true;        l = layers.arr[lower_lid];      }    }  }  if (change == true) {    assert (l != NULL);    log_cluster_change_info(l->lid);#ifdef LOG_COOP_CLUSTER_CHANGE    display_cluster_info(l,l->lid);#endif   }  return;}void coopAgent::handle_cluster_merge (AppPacket * ap) {  if (valid_cluster_merge_packet(ap) == false)    return;#ifdef LOG_COOP_RECV  printf ("[coop %d ] At %8.4f (c-merge) : recvd-valid-merge : lid %d from < [ %d ]>\n", id, /* Scheduler::my_clock() */ my_clock(),ap->u.clustermerge_p.layer_id,ap->src_agent);  for (int i = 0; i < ap->u.clustermerge_p.mbr_count; i++)    printf ("[coop %d ] . . (c-merge) < [ %d ] >\n", id, ap->u.clustermerge_p.agent_arr[i].ag.agent_id);  printf ("\n");#endif   LayerInfo *l = layers.arr[ap->u.clustermerge_p.layer_id];  assert (l != NULL);  int count = l->AddExtraMembersFromAltRootPacket(ap);  if (count > 0) {    log_cluster_change_info(l->lid);#ifdef LOG_COOP_CLUSTER_CHANGE    display_cluster_info(l,l->lid);#endif   }  l->cr_msgt.CancelTimer();  send_all_cluster_refresh_packet(l,false,true,NULL);  l->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT);  return;}void coopAgent::handle_ping_query (AppPacket *ap) {    assert (ap->st == PING_QUERY);  int this_lid = ap->u.pingq_p.lid;  bool accept = false;  double dist = -1.0;#ifdef LOG_COOP_JUNK_PING  printf ("[coop %d ] At %8.4f recv-pingq : lid %d : from < [ %d ] >\n", id, /* Scheduler::my_clock() */ my_clock(), ap->u.pingq_p.lid, ap->src_agent);#endif   if (layers.arr[this_lid] != NULL) {    if (layers.arr[this_lid]->me_in_layer == true) {      /* This is new stuff: reject a ping request if accepting this       * member increases my cluster radius */      LayerInfo * lower_layer = layers.arr[this_lid - 1];      assert (lower_layer != NULL);      assert (lower_layer->root != NULL);      assert (self_check(lower_layer->root) == true);      LayerAgentInfo * my_farthest = find_farthest_agent_to_me_in_layer(lower_layer);      assert (my_farthest != NULL);      long my_farthest_dist_long = USECS(my_farthest->dist);      {	struct sockaddr_in their_addr; 	memcpy(& their_addr, & ap->src_agent_addr, sizeof(struct sockaddr_in));	their_addr.sin_port = htons(DIST_EST_PORT);        dist = estimate_rtt_wrapper(their_addr);      }      long this_dist_long = USECS(dist);      if (my_farthest_dist_long > this_dist_long)        accept = true;      else {#ifdef LOG_COOP_JUNK_PING  	printf ("[coop %d ] . . recv-pingq : reject (dist-violation)\n", id);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -