⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 coop-agent.cc

📁 这是P2P流媒体方案-NICE的实现源码
💻 CC
📖 第 1 页 / 共 5 页
字号:
  delete tmp_layer;  free(cluster1);  free(cluster2);  return;}/* This will split the cluster associated with lid. The new cluster * that contains this member will stay as is, the other cluster * will be spawned off. */void coopAgent::split_cluster_assuming_2k (LayerInfo * l) {  // Currently, I am implementing a naive scheme. The leader  // chooses its nearest neighbor to stay with it in its cluster.  // Other more intelligent schemes will be implemented later.  assert (l->ag_list.GetSize() == UPPER_2K);  LinkedList<LayerAgentInfo *, double> my_cluster_list;  /* In my_cluster, the agents are sorted in decreasing order of distance */  LinkedList<LayerAgentInfo *, double> other_cluster_list;  /* First, split the agents into my cluster and other cluster */  /* All agents that I know distances to, are my cluster to start with */  LayerAgentInfo *self = NULL;  for (void * pos = l->ag_list.GetHeadPosition();       pos != NULL;       l->ag_list.GetNext(pos) ) {    LayerAgentInfo *la_ag = l->ag_list.GetAt(pos);    if ( self_check(la_ag) == true) {      self = la_ag;      continue;    }    if (la_ag->dist >= 0.0) {      /* -1.0 is for sorting it in decreasing order of distance */      my_cluster_list.Add(la_ag,-1.0 * la_ag->dist);    }    else { /* Sorted by increasing distance order */      other_cluster_list.Add(la_ag,la_ag->dist);    }  }  /* Whichever set has the higher number of agents, need to transfer   * some agents to the other set */  void *pos2 = other_cluster_list.GetHeadPosition();  while ( my_cluster_list.GetSize() < (LOWER_K - 1) ) {        LayerAgentInfo *a = other_cluster_list.GetAt(pos2);    my_cluster_list.Add(a,-1.0 * a->dist);    void * old_pos2 = pos2;    other_cluster_list.GetNext(pos2);    other_cluster_list.RemoveAt(old_pos2);  }    void *pos3 = my_cluster_list.GetHeadPosition();  while (other_cluster_list.GetSize() < LOWER_K) {        LayerAgentInfo *a = my_cluster_list.GetAt(pos2);    other_cluster_list.Add(a,a->dist);    void * old_pos3 = pos3;    my_cluster_list.GetNext(pos3);    my_cluster_list.RemoveAt(old_pos3);  }  // Re-create my own cluster tree  l->RemoveAgents();  l->AddClusterRoot(self);  l->me_in_layer = true;  for (void *pos4 = my_cluster_list.GetHeadPosition();       pos4 != NULL;       my_cluster_list.GetNext(pos4) ) {    l->AddClusterMember(my_cluster_list.GetAt(pos4));  }  // Create the other cluster  // SimpleTree<LayerAgentInfo *> other_cluster_tree;  // void * pos5 = other_cluster_list.GetHeadPosition();  // assert (pos5 != NULL);  // LayerAgentInfo * oc_root_la = other_cluster.GetAt(pos5);  // void * oc_root_pos = other_cluster_tree.SetRoot(oc_root_la);  // for (pos5 = other_cluster_list.GetNext(pos5);  //      pos5 != NULL;  //      other_cluster_list.GetNext(pos5) ) {  //   other_cluster_tree.AddChild(other_cluster_list.GetAt(pos5),oc_root_pos);  // }  /* HAVE TO SEND MESSAGE TO THE OTHER CLUSTER MEMBERS */  my_cluster_list.RemoveAll();  other_cluster_list.RemoveAll();  return;}void coopAgent::handle_join_response (AppPacket * ap) {  /* Need to validate if this is a desired response packet */  if (valid_join_response_packet(ap) == false)    return;  int this_lid = ap->u.joinresp_p.layer_id;//#ifdef LOG_COOP_RECV  printf ("\n[coop %d ] At %8.4f (c-jr) : join-resp : lid %d from < [ %d ] > exp-src < [ %d ] >\n", id, /* Scheduler::my_clock() */ my_clock(),this_lid,ap->src_agent, ap->u.joinresp_p.exp_src.agent_id);//#endif   if ( (ap->src_agent == bse.agent_id)/* && (ap->src == bse.node_id)*/ ) {     assert (ap->u.joinresp_p.accept == true);    /*    if (ap->u.joinresp_p.your_id != UNDEFINED_AGENT_ID)       id = ap->u.joinresp_p.your_id; // recv new id    */    // BSE-JF : Have to delete all other members in curr_join_q_lid etc.    // or have to wait till all resps are recvd and decide whether to    // accept BSE or the other members    flush_layer(this_lid);    if (this_lid+1 < MAX_LAYERS) {      flush_layer(this_lid+1);    }    layers.arr[this_lid] = new LayerInfo(this_lid,this);    LayerInfo * this_layer = layers.arr[this_lid];    jqt.CancelTimer();    /* This is the top layer returned by BSE */    put_packet_cluster_info_into_layer(ap,this_layer);    if (this_lid == target_join_q_lid) { // This is the desired layer      void * pos = this_layer->ag_list.Locate(id);      assert (pos != NULL);      LayerAgentInfo * self = this_layer->ag_list.GetAt(pos);      self->dist = 0.0;      LayerAgentInfo * bse_la = this_layer->FindClusterMember(bse.agent_id);      assert (bse_la != NULL);      //      bse_la->dist = /* Scheduler::my_clock() */ my_clock() - ap->send_time;            /* The following if then else statement are not in myrns*/      if ((bse_la->valid_dist_clock == false) || (my_clock() - bse_la->dist_clock >= DIST_CLOCK_THRESHOLD)) {    //  dist = MAX_RTT;	struct sockaddr_in their_addr; 	memcpy(& their_addr, & bse_la->ag.agent_addr, sizeof(struct sockaddr_in));	their_addr.sin_port = htons(DIST_EST_PORT);	bse_la->dist = estimate_rtt_wrapper(their_addr);	bse_la->dist_clock = my_clock();	bse_la->valid_dist_clock = true;      }      this_layer->me_in_layer = true;      this_layer->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT);      this_layer->cr_chkt.SetTimer(CONST_CLUSTER_REFRESH_CHECK_TIMEOUT);      state = ATTACHED;      set_higher_layer_ping_timer();//#ifdef LOG_COOP_JUNK      printf ("[coop %d ] . . (c-jr) : attach-to-bse\n\n", id);//#endif       log_cluster_change_info(this_layer->lid);//#ifdef LOG_COOP_CLUSTER_CHANGE      display_cluster_info(this_layer,this_layer->lid);//#endif     }    else {      assert (this_lid > target_join_q_lid);      void * pos = this_layer->ag_list.Locate(id);      if (pos != NULL) { // I am considered to be in this cluster !	assert ( self_check(this_layer->root) == false); // I am not root	//#ifdef LOG_COOP_JUNK        printf ("[coop %d ] . . (c-jr) : part of returned cluster! delete\n", id);//#endif 	delete_self_from_layer(this_layer);      }//#ifdef LOG_COOP_JUNK      printf ("[coop %d ] . . (c-jr) : continue-join-query\n\n", id);//#endif       continue_join_query(this_lid);    }  }  else { /* This is a message from the higher layer cluster	  * We need to just save the information till the timeout	  */    LayerInfo * higher_layer = layers.arr[this_lid + 1];    assert (higher_layer != NULL);    void * mem_pos = higher_layer->ag_list.Locate(ap->u.joinresp_p.exp_src.agent_id);    assert (mem_pos != NULL);    LayerAgentInfo *hl_member = higher_layer->ag_list.GetAt(mem_pos);    if (ap->u.joinresp_p.accept == false) {//#ifdef LOG_COOP_JUNK      printf ("[coop %d ] . . (c-jr) : join-reject\n\n", id);//#endif       higher_layer->DeleteClusterMember(hl_member);      delete hl_member;    }    else {      /* If the response is coming from a different agent (I thought some       * one else was the cluster leader, then we need to update that here */      if ( (ap->u.joinresp_p.exp_src.agent_id != ap->src_agent) /*|| (ap->u.joinresp_p.exp_src.node_id != ap->src) */) {	hl_member->ag.agent_id = ap->src_agent;	//	hl_member->ag.node_id = ap->src;	memcpy(& hl_member->ag.agent_addr, & ap->src_agent_addr, sizeof(struct sockaddr_in));      }      assert (ap->u.joinresp_p.mbr_count > 0);      assert ( (hl_member->ag.agent_id == ap->u.joinresp_p.agent_arr[0].ag.agent_id) /* && (hl_member->ag.node_id == ap->u.joinresp_p.agent_arr[0].ag.node_id) */);      if (hl_member->lower_layer != NULL)	delete hl_member->lower_layer;      else	m_join_resp_received ++;            hl_member->lower_layer = new LayerInfo (this_lid,this);      put_packet_cluster_info_into_layer (ap,hl_member->lower_layer);      LayerAgentInfo * self = hl_member->lower_layer->FindClusterMember(id);      if (self != NULL)	self->dist = 0.0;      /* Using the time metric. Can use any other */      //      hl_member->dist = /* Scheduler::my_clock() */ my_clock() - ap->send_time;      if ((hl_member->valid_dist_clock == false) || (my_clock() - hl_member->dist_clock >= DIST_CLOCK_THRESHOLD)) {    //  dist = MAX_RTT;	struct sockaddr_in their_addr; 	memcpy(& their_addr, & hl_member->ag.agent_addr, sizeof(struct sockaddr_in));	their_addr.sin_port = htons(DIST_EST_PORT);	hl_member->dist = estimate_rtt_wrapper(their_addr);	hl_member->dist_clock = my_clock();	hl_member->valid_dist_clock = true;      }//#ifdef LOG_COOP_JUNK        printf ("[coop %d ] . . (c-jr) : dist-estimate %f\n", id, hl_member->dist);//#endif     }    int match_count = higher_layer->ag_list.GetSize();    if (higher_layer->ag_list.Locate(bse.agent_id) != NULL)      match_count --;    if (match_count == m_join_resp_received) {            jqt.CancelTimer();//#ifdef LOG_COOP_JUNK      printf ("[coop %d ] . . (c-jr) : all join resps found\n\n", id);//#endif       process_received_join_responses();    }  }  return;}void coopAgent::handle_cluster_refresh (AppPacket *ap) {  assert (ap->st == CLUSTER_REFRESH);  if (valid_cluster_refresh_packet(ap) == false)    return;  LayerInfo * l = layers.arr[ap->u.clusterrefresh_p.layer_id];  void * this_agent_pos = l->ag_list.Locate(ap->src_agent);   assert (this_agent_pos != NULL);  LayerAgentInfo *la_ag = l->ag_list.GetAt(this_agent_pos);;  bool test_for_all_pings_received = false;  bool change = false;  bool match_to_root_done = false;#ifdef TESTING_JUNK  void * pos = l->ag_list.Locate(id);  if (pos != NULL) {    LayerAgentInfo * self = l->ag_list.GetAt(pos);    assert (self->dist >= 0.0);  }#endif #ifdef LOG_COOP_RECV  printf ("\n[coop %d ] At %8.4f (c-rcr) : recvd-valid-refresh : lid %d from < [ %d ] > %s %s %s\n", id, /* Scheduler::my_clock() */ my_clock(), l->lid,ap->src_agent, (ap->u.clusterrefresh_p.is_root == true) ? "root" : ".", (ap->u.clusterrefresh_p.root_xfer == true) ? "xfer" : ".", (ap->u.clusterrefresh_p.cluster_remove == true) ? "remove" : ".");#endif #ifdef LOG_COOP_RECV_2  printf ("[coop %d ] . . (c-rcr) : dist-info in refresh :", id);  for (int i = 0; i < ap->u.clusterrefresh_p.mbr_count; i++)    printf ("( %d %5.3f )", ap->u.clusterrefresh_p.agent_arr[i].ag.agent_id,ap->u.clusterrefresh_p.agent_arr[i].dist);  printf ("\n");#endif   /* Now this is a valid packet */  la_ag->refresh_agent(ap);  assert (l->root != NULL);  if (self_check(l->root) == true) { // I am the root    if (ap->u.clusterrefresh_p.is_root == true) {    // Pkt src,s, thinks that it is the root      // First check if it is doing a root transfer to me      if ( (ap->u.clusterrefresh_p.root_xfer == true) &&	   ( (ap->u.clusterrefresh_p.agent_arr[0].ag.agent_id == id) /* && 									(ap->u.clusterrefresh_p.agent_arr[0].ag.node_id == n->id) */) ) {        // In which case gladly add all unknown members        bool members_added = false;	if (l->AddExtraMembersFromAltRootPacket (ap) > 0) {	  change = true;          members_added = true;        }	// Do cluster remove	if (ap->u.clusterrefresh_p.cluster_remove == true) { // Delete#ifdef LOG_COOP_JUNK          printf ("[coop %d ] . . (c-rcr) : src self-remove\n", id);#endif 	  bool check = l->DeleteClusterMember(la_ag);	  assert (check == true);	  delete la_ag;	  change = true;	}#ifdef LOG_COOP_JUNK        printf ("[coop %d ] . . (c-rcr) : src and me roots, root xfer to me\n\n", id);#endif 	if (change == true) {          log_cluster_change_info(l->lid);#ifdef LOG_COOP_CLUSTER_CHANGE	  display_cluster_info(l,l->lid);#endif         }	// Immediate update of new member additions        if (members_added == true) {          l->cr_msgt.CancelTimer();          send_all_cluster_refresh_packet(l,false,true,NULL);          l->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT);	}	return;      }      bool stay_as_root = false;      // s_max: source dist to its farthest member      // KCR-CHANGE (2 new lines)      //double s_max = get_farthest_dist_of_member_to_agent(l,la_ag,false,false);      double s_max = get_avg_dist_of_member_to_agent(l,la_ag,false,false);      LayerAgentInfo * my_farthest_ag = find_farthest_agent_to_me_in_layer(l);      double my_max = my_farthest_ag->dist;      my_max = find_avg_agent_dist_to_me_in_layer(l);      long s_max_long = USECS(s_max);      long my_max_long = USECS(my_max);      int s_know_count = count_dist_known_members_for_agent(la_ag,false);      int me_know_count = count_dist_known_members_for_me(l);      if (are_agent_members_contained_in_mine(l,la_ag) == true) {	// All members in la_ag are also known to me	if ( (s_know_count < me_know_count) || // I am the superset	     (my_max_long < s_max_long) || // We are equal in size, but I am better	     ( (my_max_long == s_max_long) && (id < la_ag->ag.agent_id) ) ) { // -do-	  stay_as_root = true;	} // else {stay_as_root = false;}      }      else if (are_my_members_contained_in_agent(l,la_ag) == false) { 	// no one is a superset of the other	if (id < la_ag->ag.agent_id) {	  stay_as_root = true;	} // else {stay_as_root = false;}      } // else my_members_contained_in_agent() == true:         // i.e. s is a superset, so stay_as_root = false      LayerAgentInfo * other_root;      bool member_added = false;      if (ap->u.clusterrefresh_p.root_xfer == true) {	other_root = l->FindClusterMember(ap->u.clusterrefresh_p.agent_arr[0].ag.agent_id);	if (other_root == NULL) {	  LayerAgentInfo *new_guy = new LayerAgentInfo(ap->u.clusterrefresh_p.agent_arr[0].ag.agent_id,ap->u.clusterrefresh_p.agent_arr[0].ag.agent_addr);	  l->AddClusterMember(new_guy);	  other_root = new_guy;	  change = true;	  member_added = true;	}#ifdef LOG_COOP_JUNK        printf ("[coop %d ] . . (c-rcr) : src and me roots, src root xfer to < [ %d ] >\n", id, other_root->ag.agent_id);#endif       }      else {	assert (ap->u.clusterrefresh_p.cluster_remove == false);	other_root = la_ag;      }      if (stay_as_root == true) {	// Add all the extra members and send root challenge	if (l->AddExtraMembersFromAltRootPacket(ap) > 0) {	  change = true;          member_added = true;        }	if (ap->u.clusterrefresh_p.cluster_remove == true) { // Delete#ifdef LOG_COOP_JUNK          printf ("[coop %d ] . . (c-rcr) : src self-remove\n", id);#endif 	  bool check = l->DeleteClusterMember(la_ag);	  assert (check == true);	  delete la_ag;	  change = true;	}#ifdef LOG_COOP_JUNK        printf ("[coop %d ] . . (c-rcr) : me stay as root (root-challenge)\n\n", id);#endif 	send_root_challenge(l,other_root);	if (change == true) {          log_cluster_change_info(l->lid);#ifdef LOG_COOP_CLUSTER_CHANGE	  display_cluster_info(l,l->lid);#endif         }	// Immediate inform other members of new member	if (member_added == true) {          l->cr_msgt.CancelTimer();	  // Do not send this to the other root          send_all_cluster_refresh_packet(l,false,true,other_root);          l->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT);        }	return;      }      else {	change = true;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -