⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 coop-agent.cc,v

📁 这是P2P流媒体方案-NICE的实现源码
💻 CC,V
📖 第 1 页 / 共 5 页
字号:
#endif       }    }  }  AppPacket *resp_p = new AppPacket (PING_RESPONSE);  resp_p->u.pingresp_p.accept = accept;  resp_p->u.pingresp_p.lid = this_lid;  resp_p->u.pingresp_p.src_time = ap->u.pingq_p.src_time;  resp_p->u.pingresp_p.dist = dist;  if (accept == true) {    assert (this_lid > 0);    LayerInfo * lower_layer = layers.arr[this_lid - 1];    //assert (lower_layer != NULL);    //assert (lower_layer->root != NULL);    //assert (self_check(lower_layer->root) == true);    void * pos = lower_layer->ag_list.Locate(ap->src_agent);    if (pos != NULL) {#ifdef LOG_COOP_JUNK      printf ("[coop %d ] At %8.4f pinger < [ %d ] > dup\n", id, /* Scheduler::my_clock() */ my_clock(),ap->src_agent);#endif       LayerAgentInfo * this_la = lower_layer->ag_list.GetAt(pos);      lower_layer->ag_list.RemoveAt(pos);      delete this_la;      log_cluster_change_info(lower_layer->lid);#ifdef LOG_COOP_CLUSTER_CHANGE    display_cluster_info(lower_layer,lower_layer->lid);#endif     }    put_layer_cluster_info_into_packet(lower_layer,resp_p);  }#ifdef LOG_COOP_JUNK    printf ("[coop %d ] At %8.4f ping-check-debug-send < [ %d ] > dist %f\n", id, /* Scheduler::my_clock() */ my_clock(),ap->src_agent, resp_p->u.pingresp_p.dist);#endif  send_pkt_wrapper(resp_p,ap->src_agent,ap->src_agent_addr);  return;}void coopAgent::handle_ping_response (AppPacket * ap) {  assert (ap->st == PING_RESPONSE);  if ( (m_ping_in_progress == false) || (ap->u.pingresp_p.lid != m_ping_lid) )    return;  LayerInfo * l = layers.arr[m_ping_lid];  assert (l != NULL);  void * pos = l->ag_list.Locate(ap->src_agent);  if (pos == NULL)    return;  LayerAgentInfo * la = l->ag_list.GetAt(pos);  //  la->dist = (/* Scheduler::my_clock() */ my_clock() - ap->u.pingresp_p.src_time) / 2.0;  //modify ping to just estimate rtt...  if (ap->u.pingresp_p.dist >= 0.0) {    la->dist = ap->u.pingresp_p.dist;    la->dist_clock = my_clock();    la->valid_dist_clock = true;  }  /*  if ((la->valid_dist_clock == false) || (my_clock() - la->dist_clock >= DIST_CLOCK_THRESHOLD)) {    //  dist = MAX_RTT;    struct sockaddr_in their_addr;     memcpy(& their_addr, & la->ag.agent_addr, sizeof(struct sockaddr_in));    their_addr.sin_port = htons(DIST_EST_PORT);    la->dist = estimate_rtt_wrapper(their_addr);    la->dist_clock = my_clock();    la->valid_dist_clock = true;  }  */    if (la->lower_layer != NULL) {    delete la->lower_layer;    la->lower_layer = NULL;  }#ifdef LOG_COOP_JUNK_PING  printf ("[coop %d ] At %8.4f recv-pingresp : lid %d : from < [ %d ] > dist %f %s\n", id, /* Scheduler::my_clock() */ my_clock(), m_ping_lid, ap->src_agent, la->dist, (ap->u.pingresp_p.accept == true) ? "" : "*reject*" );#endif   if (ap->u.pingresp_p.accept == true) {    la->lower_layer = new LayerInfo(m_ping_lid-1,this);    put_packet_cluster_info_into_layer(ap,la->lower_layer);    m_ping_resps_received ++;  }  else {    bool success = l->DeleteClusterMember(la);    delete la;    assert (success == true);  }  if (check_all_ping_responses_received() == true) {    int lower_lid = m_ping_lid - 1;    if (process_ping_responses () == true) {      //printf ("[ %d ] At %8.4f ping recv-ping-resp\n", id, /* Scheduler::my_clock() */ my_clock());      LayerInfo * new_layer = layers.arr[lower_lid];      assert (new_layer != NULL);      log_cluster_change_info(new_layer->lid);#ifdef LOG_COOP_CLUSTER_CHANGE      display_cluster_info(new_layer,new_layer->lid);#endif     }  }  return;}bool coopAgent::check_all_ping_responses_received (void) {  assert (m_ping_in_progress == true);  LayerInfo *l = layers.arr[m_ping_lid];  assert (l != NULL);  // If this higher layer has BSE, then cannot count it, since BSE is not sent  // a ping.  int ignore_bse;  if (l->ag_list.Locate(bse.agent_id) != NULL)    ignore_bse = 1;  else    ignore_bse = 0;  // The -1 is due to not including the root of the lower layer in this  if ( m_ping_resps_received == ( l->ag_list.GetSize() - 1 - ignore_bse) )    return true;  return false;}/* Returns true if there is a change */bool coopAgent::process_ping_responses (void) {  bool ret_val = false;  assert (m_ping_lid > 0);  LayerInfo * hl = layers.arr[m_ping_lid];  assert (hl != NULL);  LayerInfo * my_highest_layer = layers.arr[m_ping_lid - 1];  assert (my_highest_layer != NULL);  assert (my_highest_layer->root != NULL);  if (my_highest_layer->root->dist < 0.0)    assert (my_highest_layer->root->dist >= 0.0);  assert (my_highest_layer->me_in_layer == true);  LayerAgentInfo * min_la = find_closest_agent_in_layer(hl,&(my_highest_layer->root->ag),true);  if (min_la != NULL) {    long my_highest_layer_root_dist_long = USECS(my_highest_layer->root->dist);    long min_la_dist_long = USECS(min_la->dist);    if (my_highest_layer_root_dist_long > min_la_dist_long) {      ret_val = true;#ifdef LOG_COOP_JUNK_PING_SWITCH      printf ("[coop %d ] At %8.4f switch-root : lid %d : from < [ %d ] > to < [ %d ] > dist %8.4f \n", id, /* Scheduler::my_clock() */ my_clock(), m_ping_lid - 1, my_highest_layer->root->ag.agent_id, min_la->ag.agent_id, min_la->dist);#endif       // Send the cluster remove in old cluster      // Delete self      void * pos = my_highest_layer->ag_list.Locate(id);      assert (pos != NULL);      LayerAgentInfo * self_ag = my_highest_layer->ag_list.GetAt(pos);      my_highest_layer->ag_list.RemoveAt(pos);      my_highest_layer->me_in_layer = false;        if (my_highest_layer->ag_list.GetSize() > 0)	send_cluster_remove(my_highest_layer,false);      delete my_highest_layer;      // Send the join to new cluster      layers.arr[m_ping_lid-1] = min_la->lower_layer;      min_la->lower_layer = NULL;      LayerInfo * new_layer = layers.arr[m_ping_lid-1];      bool check = new_layer->AddClusterMember(self_ag);      assert (check == true);      new_layer->me_in_layer = true;          assert (new_layer->root != NULL);      assert ( (new_layer->root->ag.agent_id == min_la->ag.agent_id) /* && (new_layer->root->ag.node_id == min_la->ag.node_id) */);      assert (min_la->dist >= 0.0);      new_layer->root->dist = min_la->dist;      AppPacket *pkt = new AppPacket(JOIN_QUERY);      init_join_query_packet(pkt,m_ping_lid-1,/* Scheduler::my_clock() */ my_clock(),true, udp_recv_agent_addr);      send_pkt_wrapper(pkt,new_layer->root->ag.agent_id, new_layer->root->ag.agent_addr);      new_layer->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT);      new_layer->cr_chkt.SetTimer(CONST_CLUSTER_REFRESH_CHECK_TIMEOUT);      assert (state == ATTACHED);    }  }  cancel_higher_layer_ping_timers();  set_higher_layer_ping_timer();  return ret_val;}void coopAgent::handle_data_ack_packet (AppPacket *ap) {  assert (ap->st == PACKET_DATA_ACK);#ifdef LOG_DATA_PACKET_RECV  printf ("[coop %d ] At %8.4f recv ack-pkt : < [ %d ] > seq %d : from < [ %d ] >\n", id, /* Scheduler::my_clock() */ my_clock(), ap->u.dataack_p.original_src.agent_id, ap->u.dataack_p.seq_no, ap->src_agent);#endif   return;}int coopAgent::handle_data_packet (AppPacket *ap) {  assert (ap->st == PACKET_DATA);#ifdef LOG_DATA_PACKET_RECV  printf ("[coop %d ] At %8.4f recv data-pkt : < [ %d ] > seq %d : from < [ %d ] >\n", id, my_clock(), ap->u.data_p.original_src.agent_id, ap->u.data_p.seq_no, ap->src_agent);#endif /* Send an ack back */  AppPacket *new_ack_p = new AppPacket(PACKET_DATA_ACK);  new_ack_p->u.dataack_p.seq_no = ap->u.data_p.seq_no;  new_ack_p->u.dataack_p.original_src.agent_id = ap->u.data_p.original_src.agent_id;  memcpy((void*)&(new_ack_p->u.dataack_p.original_src.agent_addr), (void*)&(ap->u.data_p.original_src.agent_addr), sizeof(struct sockaddr_in));  send_pkt_wrapper(new_ack_p, ap->src_agent, ap->src_agent_addr);  if (m_use_packet_cache == true) {    if (add_to_packet_cache(ap->u.data_p.original_src.agent_id, ap->u.data_p.seq_no) == false)      return 1;  }  int this_lid = ap->u.data_p.lid;  LayerInfo *l = layers.arr[this_lid];  /* The 3 if conditions below enforce strict checking of packet source */  if (l != NULL) {    if (l->me_in_layer == true) {      // Somewhat looser checking of data packet forwarding      // if (l->ag_list.Locate(ap->src_agent) != NULL) {        forward_data_packet(ap->u.data_p.payload, ap->u.data_p.data_len, ap->u.data_p.original_src.agent_id, ap->u.data_p.original_src.agent_addr, ap->u.data_p.seq_no, ap->u.data_p.base_lid, ap->src_agent, false);      // }    }  }  return 0;}//modifiedvoid coopAgent::forward_data_packet (char* payload, int data_len, int src_aid, struct sockaddr_in src_agent_addr, int src_seqno, int base_lid, int pred_aid, bool is_source) {  int count_pkts = 0;  for (int i = 0; i < MAX_LAYERS; i++) {    LayerInfo * l = layers.arr[i];    if (l != NULL) {      if (l->me_in_layer == true) {      // Forward to this layer only if predecessor not in this layer        if (l->ag_list.Locate(pred_aid) == NULL) { 	  if (l->lid >= base_lid)	    count_pkts += send_data_pkt_to_layer(payload, data_len, l,src_aid,base_lid,src_agent_addr,src_seqno);        }      }    }  }  if (state == JOIN) {    if ( (curr_join_q_lid < MAX_LAYERS) && (curr_join_q_lid > 0) ) {      LayerInfo * q_l = layers.arr[curr_join_q_lid];      assert (q_l != NULL);      assert (q_l->me_in_layer == false);      if (q_l->lid >= base_lid)	count_pkts += send_data_pkt_to_layer(payload, data_len, q_l,src_aid,base_lid,src_agent_addr,src_seqno);    }    if (curr_join_q_lid < 0) { // Starting join query at BSE      // For join to higher layers, we probably already have the higher      // target layer from the previous leader which left the cluster or      // did a root xfer to me. Can use that as a fall back option.      // This should not be for layer 0.      LayerInfo * target_layer = layers.arr[target_join_q_lid];      if (target_layer != NULL) {	assert (target_layer->me_in_layer == false);	if (target_layer->lid >= base_lid)	  count_pkts += send_data_pkt_to_layer(payload, data_len, target_layer,src_aid,base_lid,src_agent_addr,src_seqno);      }    }  }  if (is_source == true) {       AppPacket *ap = new AppPacket(PACKET_DATA);       ap->u.data_p.original_src.agent_id = src_aid;       //       ap->u.data_p.original_src.node_id = src_nid;       memcpy(& ap->u.data_p.original_src.agent_addr, & src_agent_addr, sizeof(struct sockaddr_in));       ap->u.data_p.seq_no = src_seqno;       ap->u.data_p.lid = -1;       //ap->u.data_p.data_len = data_len;       // Put the data in here?? do we need to?       //memcpy(ap->u.data_p.payload#ifdef LOG_DATA_PACKET_SEND       // KCR-CHANGE       //printf ("[coop %d ] At %8.4f send data-pkt : < [ %d ] > seq %d : to < [ %d ] > bse\n", id, /* Scheduler::my_clock() */ my_clock(), src_aid, src_seqno, bse.agent_id);#endif        //send_pkt_wrapper(ap,bse.agent_id, bse.agent_addr);       count_pkts ++;  }#ifdef LOG_DATA_PACKET_SEND_COUNT  if (count_pkts > 0)    printf ("[coop %d ] At %8.4f fwd-data count %d for < [ %d ]> seq-no %d\n",id, /* Scheduler::my_clock() */ my_clock(), count_pkts, src_aid, src_seqno);#endif   return;}int coopAgent::send_data_pkt_to_layer (char* payload, int data_len, LayerInfo * l, int src_aid, int base_lid, struct sockaddr_in src_agent_addr, int src_seqno) {  int count_pkts = 0;  for (void * pos = l->ag_list.GetHeadPosition();       pos != NULL;       l->ag_list.GetNext(pos) ) {    LayerAgentInfo * this_la = l->ag_list.GetAt(pos);    if (self_check(this_la) == false) {      if ( ( (bse.agent_id != this_la->ag.agent_id) /* || (bse.node_id != this_la->ag.node_id) */) &&	   ( ( (this_la->ag.agent_id != src_aid) /*|| (this_la->ag.node_id != src_nid) */) ) ) {       AppPacket *ap = new AppPacket(PACKET_DATA);       ap->u.data_p.original_src.agent_id = src_aid;       //       ap->u.data_p.original_src.node_id = src_nid;       memcpy(& ap->u.data_p.original_src.agent_addr, & src_agent_addr, sizeof(struct sockaddr_in));       ap->u.data_p.seq_no = src_seqno;       ap->u.data_p.lid = l->lid;       ap->u.data_p.base_lid = base_lid;       ap->u.data_p.data_len = data_len;       ap->u.data_p.payload = (char *)malloc(data_len);       memcpy(ap->u.data_p.payload, payload, data_len);#ifdef LOG_DATA_PACKET_SEND       printf ("[coop %d ] At %8.4f send data-pkt : < [ %d ] > seq %d : to < [ %d ] > lid %d dist_est %f\n", id, /* Scheduler::my_clock() */ my_clock(), src_aid, src_seqno, this_la->ag.agent_id, l->lid, this_la->dist);#endif        send_pkt_wrapper(ap,this_la->ag.agent_id, this_la->ag.agent_addr);       count_pkts ++;     }   } }  return count_pkts;}/* Have to be the cluster root to accept this merge *//* If we want to accept cluster merge at non-root agents, then * we have to change the mechanism of the merge, i.e. the merging * root should be able to first figure out which is the correct * root, and then advise the children appropriately of its * change to be the new correct root. */bool coopAgent::valid_cluster_merge_packet (AppPacket * ap) {  LayerInfo *l = layers.arr[ap->u.clustermerge_p.layer_id];  if (l == NULL)    return false;  if (l->me_in_layer == false)    return false;  if (self_check(l->root) == false)    return false;  return true;}bool coopAgent::valid_cluster_refresh_packet (AppPacket * ap) {  LayerInfo * l = layers.arr[ap->u.clusterrefresh_p.layer_id];  if (l == NULL)    return false;  if (l->me_in_layer == false)    return false;  void * pos = l->ag_list.Locate(ap->src_agent);  if (pos == NULL)    return false;  return true;}bool coopAgent::match_cluster_to_root_info (LayerInfo *l) {  bool has_change = false;  for (void * pos = l->ag_list.GetHeadPosition();       pos != NULL;       l->ag_list.GetNext(pos) ) {    LayerAgentInfo * la_ag = l->ag_list.GetAt(pos);    la_ag->valid_tmp = false;  }  for (int i = 0; i < l->root->agent_dist_arr_count; i++) {    void * pos = l->ag_list.Locate(l->root->agent_dist_arr[i].ag.agent_id);    LayerAgentInfo * this_la;    if (pos == NULL) {      this_la = new LayerAgentInfo (l->root->agent_dist_arr[i].ag.agent_id,l->root->agent_dist_arr[i].ag.agent_addr);      assert (self_check(this_la) == false); // Cannot have self unknown

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -