⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 coop-agent.cc,v

📁 这是P2P流媒体方案-NICE的实现源码
💻 CC,V
📖 第 1 页 / 共 5 页
字号:
      l->AddClusterMember(this_la);      has_change = true;    }    else {      this_la = l->ag_list.GetAt(pos);    }    this_la->valid_tmp = true;  }  // Purge the members that are not in the cluster any more  for (void * pos = l->ag_list.GetHeadPosition();       pos != NULL;       ) {        LayerAgentInfo * la_ag = l->ag_list.GetAt(pos);    void * old_pos = pos;    l->ag_list.GetNext(pos);    if (la_ag->valid_tmp == false) {      l->ag_list.RemoveAt(old_pos);      assert (la_ag != NULL);      delete la_ag;      has_change = true;    }  }  return has_change;}/* Send query to the next lower layer if needed (i.e. to * one below this_lid). * If this is the target layer, then need to figure out point of * attachment */void coopAgent::continue_join_query (int this_lid) {#ifdef LOG_COOP_JUNK  printf ("[coop %d ] At %8.4f (c-cjq) : continue-join-query : lid %d\n", id, /* Scheduler::my_clock() */ my_clock(), this_lid);#endif   if (this_lid > target_join_q_lid) { /* Probe all the cluster members */    LayerInfo *l = layers.arr[this_lid];    void * pos = l->ag_list.Locate(id);    if (pos != NULL) { // I am considered to be part of this cluster !      assert (self_check(l->root) == false); // I cannot be the root      delete_self_from_layer(l);      }#ifdef LOG_COOP_JUNK    printf ("[coop %d ] . . (c-cjq) : join-continue\n", id);#endif     send_join_query_to_cluster(this_lid);    m_join_resp_received = 0;    curr_join_q_lid = this_lid;    jqt.SetTimer(CONST_JOIN_QUERY_TIMEOUT);    return;  }  else { /* Find point of attachment */    assert (this_lid == target_join_q_lid);    LayerInfo * lbase = layers.arr[this_lid];    LayerAgentInfo *self = new LayerAgentInfo(id,udp_recv_agent_addr);    self->dist = 0.0;    if (lbase->AddClusterMember(self) == false) {      delete self;      self = lbase->FindClusterMember(id);      assert(self->dist >= 0.0);    }    lbase->me_in_layer = true;        assert (lbase->root != NULL);    if (lbase->root->dist < 0.0)      assert (lbase->root->dist >= 0.0);    AppPacket *pkt = new AppPacket(JOIN_QUERY);    init_join_query_packet(pkt,this_lid,/* Scheduler::my_clock() */ my_clock(),true, udp_recv_agent_addr);    send_pkt_wrapper(pkt,lbase->root->ag.agent_id, lbase->root->ag.agent_addr);    lbase->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT);    lbase->cr_chkt.SetTimer(CONST_CLUSTER_REFRESH_CHECK_TIMEOUT);    state = ATTACHED;    set_higher_layer_ping_timer();#ifdef LOG_COOP_JUNK    printf ("[coop %d ] . . (c-cjq) : attach to my-root < [ %d ] >\n", id, lbase->root->ag.agent_id);#endif     log_cluster_change_info(lbase->lid);#ifdef LOG_COOP_CLUSTER_CHANGE    display_cluster_info(lbase,lbase->lid);#endif   }  return;}bool coopAgent::valid_join_query_forward_packet (int qlid, bool attach) {    if ( (qlid < 0) || ( (qlid == 0) && (attach == false) ) ) {     /* Wrong packet */    printf ("[Err] Found packet with qlid %d\n",qlid);    assert (0);    return false;  }    int lower_lid;  if (attach == true) /* This is an attach request */    lower_lid = qlid;  else    lower_lid = qlid - 1;  if (layers.arr[lower_lid] == NULL)    return false;  /* I must be part of this cluster. */  if (layers.arr[lower_lid]->me_in_layer == false)    return false;  return true;}/* Returns true if this is a valid join response packet that * we are expecting */bool coopAgent::valid_join_response_packet (AppPacket *ap) {  if (state != JOIN)    return false;  int this_lid = ap->u.joinresp_p.layer_id;  if ( (ap->src_agent == bse.agent_id) /* && (ap->src == bse.node_id) */) {    if (curr_join_q_lid != -1) // BSE-JF && (curr_join_q_lid != (this_lid+1) )      return false;    if (target_join_q_lid > this_lid) {      assert (0);      return false;    }  }  else { /* Check for other layer queries */    if (curr_join_q_lid != (this_lid+1))      return false;    LayerInfo * last_layer = layers.arr[curr_join_q_lid];    if (last_layer == NULL)      return false;    if (last_layer->ag_list.Locate(ap->u.joinresp_p.exp_src.agent_id) == NULL)      return false;  }  return true;}/* Sends a join query to all members specified in the given * layer. The expected response is all members in the immediate * lower layer. */void coopAgent::send_join_query_to_cluster (int layer_lid) {  LayerInfo * this_layer = layers.arr[layer_lid];#ifdef LOG_COOP_SEND    printf ("[coop %d ] At %8.4f (c-jqtc) : join-query-to-cluster : lid %d\n", id, /* Scheduler::my_clock() */ my_clock(), layer_lid);#endif   for (void * pos = this_layer->ag_list.GetHeadPosition();       pos != NULL;       this_layer->ag_list.GetNext(pos) ) {    LayerAgentInfo * coop_ag = this_layer->ag_list.GetAt(pos);    if ( (coop_ag->ag.agent_id == bse.agent_id) /* && (coop_ag->ag.node_id == bse.node_id) */ ) // Dont send this join query to the BSE      continue;    AppPacket *p = new AppPacket(JOIN_QUERY);    init_join_query_packet (p,layer_lid,/* Scheduler::my_clock() */ my_clock(),false,udp_recv_agent_addr);/* DEBUG */    if ( (layer_lid < 0) || (layer_lid == 0) ) {      printf ("[Errcheck] 2: qlid %d state %d\n", layer_lid, state);      fflush(stdout);      assert(0);    }/* DEBUG */    send_pkt_wrapper(p,coop_ag->ag.agent_id,coop_ag->ag.agent_addr);#ifdef LOG_COOP_SEND    printf ("[coop %d ] . . (c-jqtc) < [ %d ] >\n", id, coop_ag->ag.agent_id);#endif   }#ifdef LOG_COOP_SEND  printf ("\n");#endif   return;}void coopAgent::cancel_higher_layer_ping_timers (void) {  if (m_ping_in_progress == true) {    m_ping_in_progress = false;    m_hlprt.CancelTimer();    flush_lower_layers(layers.arr[m_ping_lid]);  }  m_hlpit.CancelTimer();  return;}void coopAgent::set_higher_layer_ping_timer (void) {  m_hlpit.SetTimer(CONST_HIGHER_LAYER_PING_INTERVAL_TIMEOUT);  return;}void coopAgent::handle_join_query_timeout (void) {  process_received_join_responses();  return;}void coopAgent::process_received_join_responses (void) {  assert (state == JOIN);  if (curr_join_q_lid  == -1) { /* BSE defaulted ! */    printf ("[coop %d ] At %8.4f BSE did not respond to join-query\n", id, /* Scheduler::my_clock() */ my_clock());    init_join_query(-1);#ifdef LOG_COOP_JUNK    printf ("[coop %d ] At %8.4f process-join-resps : bse defaults target lid %d\n", id, /* Scheduler::my_clock() */ my_clock(), target_join_q_lid);#endif     return;  }  /* Find the closest agent to this member and re-start   * the query business for the lower layer */  LayerInfo * this_layer = layers.arr[curr_join_q_lid];  void * pos = this_layer->ag_list.Locate(id);  assert (pos == NULL);  LayerAgentInfo * min_dist_la_ag = find_closest_agent_in_layer(this_layer,NULL,true);  if (min_dist_la_ag != NULL) {    /* Send next set of queries if needed. */    // assert (layers.arr[curr_join_q_lid] == NULL); THIS ONE WAS WRONG    if (min_dist_la_ag->dist < 0.0)      printf ("[Err] At %f [ %d ] found closest agent [ %d ] with neg dist\n", /* Scheduler::my_clock() */ my_clock(), id, min_dist_la_ag->ag.agent_id);    int lower_lid = curr_join_q_lid - 1;    flush_layer(lower_lid);    layers.arr[lower_lid] = min_dist_la_ag->lower_layer;    min_dist_la_ag->lower_layer = NULL;        LayerAgentInfo * mdla_in_lower = layers.arr[lower_lid]->FindClusterMember(min_dist_la_ag->ag.agent_id);    assert (mdla_in_lower != NULL);    mdla_in_lower->dist = min_dist_la_ag->dist;#ifdef LOG_COOP_JUNK    printf ("[coop %d ] At %8.4f process-join-resps : lid %d closest < [ %d ] > dist %f\n", id, /* Scheduler::my_clock() */ my_clock(), curr_join_q_lid, min_dist_la_ag->ag.agent_id, min_dist_la_ag->dist);#endif     continue_join_query(lower_lid);  }  else { /* Have to re-query the higher layer if no one in	  * curr_join_q_lid responded. */#ifdef LOG_COOP_JUNK    printf ("[coop %d ] At %8.4f process-join-resps  layer defaults lid %d\n", id, /* Scheduler::my_clock() */ my_clock(), this_layer->lid);#endif     delete this_layer;    layers.arr[curr_join_q_lid] = NULL;    int higher_lid = curr_join_q_lid + 1;    if (higher_lid < MAX_LAYERS) {      if (layers.arr[higher_lid] != NULL) { /* Retry from the higher layer */	continue_join_query(higher_lid);      }      else { /* Have to re-start with the BSE */	init_join_query(-1);      }    }  }  return;}/* Find a new leader in the cluster. I am leaving this cluster, * so I am not included in this. */LayerAgentInfo * coopAgent::find_new_cluster_leader (LayerInfo *l, bool omit_self) {  if (l->root != NULL)    assert (self_check(l->root) == true);#undef LOG_NEW_LEADER_CHOOSE#ifdef LOG_NEW_LEADER_CHOOSE  printf ("[coop %d ] : At %8.4f (c-fnl) : find-new-ldr : lid %d  %s\n", id, /* Scheduler::my_clock() */ my_clock(), l->lid, (omit_self == true) ? "omit-self" : "include-self");#endif   double max_dist = -1.0;  int max_count = -1;  LayerAgentInfo * new_leader = NULL;  for (void * pos = l->ag_list.GetHeadPosition();       pos != NULL;       l->ag_list.GetNext(pos) ) {    LayerAgentInfo * this_la = l->ag_list.GetAt(pos);    if (self_check(this_la) == true) {      continue;    }    // HAVE TO LOOK AT HOW TO ACCOMODATE THIS COUNT THING    int this_count = count_dist_known_members_for_agent(this_la,omit_self);#ifdef LOG_NEW_LEADER_CHOOSE    printf ("[coop %d ] : . . (c-fnl) : mbr %d : count %d : ", id,  this_la->ag.agent_id, this_la->agent_dist_arr_count);    for (int i = 0; i < this_la->agent_dist_arr_count; i++)      printf ("( %d %f )", this_la->agent_dist_arr[i].ag.agent_id, this_la->agent_dist_arr[i].dist);    printf ("\n[coop %d ] : . . (c-fnl) : this-count %d\n", id,  this_count);#endif     if (this_count == 0) {      if (max_count < 0) {        max_count = 0;        new_leader = this_la;      }      continue;    }    assert (this_count > 0);    if (this_count < max_count)      continue;    // KCR-CHANGE    // double this_dist = get_farthest_dist_of_member_to_agent(l,this_la,omit_self,true);    double this_dist = get_avg_dist_of_member_to_agent(l,this_la,omit_self,true);#ifdef LOG_NEW_LEADER_CHOOSE    printf ("[coop %d ] : . . (c-fnl) : farthest-dist %f\n", id, this_dist);#endif     assert (this_dist >= 0.0);    if (this_count > max_count) {      max_count = this_count;      max_dist = this_dist;      new_leader = this_la;    }    else { //(this_count == max_count)      assert (max_dist >= 0.0);      long this_dist_long = USECS(this_dist);      long max_dist_long = USECS(max_dist);      if (this_dist_long < max_dist_long) {	max_dist = this_dist;	new_leader = this_la;      }    }  }  if (omit_self == false) { // Need to compare self too    void * pos = l->ag_list.Locate(id);    if (pos != NULL) {      LayerAgentInfo * self_ag = l->ag_list.GetAt(pos);      LayerAgentInfo * my_farthest = find_farthest_agent_to_me_in_layer(l);      // KCR-CHANGE      double my_farthest_dist = find_avg_agent_dist_to_me_in_layer(l);      int my_count = count_dist_known_members_for_me(l);#ifdef LOG_NEW_LEADER_CHOOSE      printf ("[coop %d ] : . . (c-fnl) : mbr self : count %d : ", id, l->ag_list.GetSize() );      for (void * pos2 = l->ag_list.GetHeadPosition();	   pos2 != NULL;	   l->ag_list.GetNext(pos2) ) {	LayerAgentInfo * la2 = l->ag_list.GetAt(pos2);	printf ("( %d %f )", la2->ag.agent_id, la2->dist);      }      // KCR-CHANGE      //printf ("\n[coop %d ] : . . (c-fnl) : my-count %d : farthest ( %d %f )\n", id, my_count, my_farthest->ag.agent_id, my_farthest->dist);      printf ("\n[coop %d ] : . . (c-fnl) : my-count %d : farthest ( %d %f )\n", id, my_count, my_farthest->ag.agent_id, my_farthest_dist);#endif             assert (my_farthest != NULL);      long max_dist_long = USECS(max_dist);      // KCR-CHANGE      // long my_farthest_dist_long = USECS(my_farthest->dist);      long my_farthest_dist_long = USECS(my_farthest_dist);      if ( (my_count > max_count) ||	   ( (my_count == max_count) &&	     ( (max_dist < 0.0) || (my_farthest_dist_long <= max_dist_long) ) ) ) {	// The my_farthest->dist <= max_dist ensures that the current incumbent	// (i.e. if I am the ldr) continues as ldr if possible	new_leader = self_ag;	max_count = my_count;        // KCR-CHANGE	//max_dist = my_farthest->dist;	max_dist = my_farthest_dist;      }    }  }#ifdef LOG_NEW_LEADER_CHOOSE  if (new_leader != NULL)      printf ("\n[coop %d ] : . . (c-fnl) : new-leader %d\n", id, new_leader->ag.agent_id);  else    printf ("\n[coop %d ] : . . (c-fnl) : new-leader NULL\n", id);#endif   return new_leader;}/* Check all the cluster members, if any

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -