⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 bse-agent.cc,v

📁 这是P2P流媒体方案-NICE的实现源码
💻 CC,V
📖 第 1 页 / 共 2 页
字号:
#ifdef LOG_BSE_RECV  printf ("[bse %d ] At %8.4f recvd-valid-refresh : < [ %d ] > lid %d ", id, /* Scheduler::Clock */ my_clock(), ap->src_agent, top_layer->lid);#endif   LayerAgentInfo *la_ag = top_layer->ag_list.GetAt(pos);  if (ap->u.clusterrefresh_p.cluster_remove == true) {#ifdef LOG_BSE_RECV    printf ("*remove*\n");#endif     bool check = top_layer->DeleteClusterMember(la_ag);    assert (check == true);    delete la_ag;    log_cluster_change_info ();#ifdef LOG_BSE_CLUSTER_CHANGE    display_top_layer_info();#endif   }  else {#ifdef LOG_BSE_RECV    printf ("\n");#endif     la_ag->refresh_agent(ap);  }fflush(0);  return;}void bseAgent::handle_data_ack_packet (AppPacket *ap) {  assert (ap->st == PACKET_DATA_ACK);#ifdef LOG_DATA_PACKET_RECV  printf ("[bse %d ] At %8.4f recv ack-pkt : < [ %d ] > seq %d from < [ %d ] >\n", id, /* Scheduler::Clock */ my_clock(), ap->u.dataack_p.original_src.agent_id, ap->u.dataack_p.seq_no, ap->src_agent);#endif   return;}int bseAgent::handle_data_packet (AppPacket *ap) {  assert (ap->st == PACKET_DATA);  assert (ap->u.data_p.original_src.agent_id == ap->src_agent);#ifdef LOG_DATA_PACKET_RECV  printf ("[bse %d ] At %8.4f recv data-pkt : < [ %d ] > seq %d from < [ %d ] >\n", id, /* Scheduler::Clock */ my_clock(), ap->u.data_p.original_src.agent_id, ap->u.data_p.seq_no, ap->src_agent);#endif   assert (top_layer != NULL);/* Send an ack back */  AppPacket *new_ack_p = new AppPacket(PACKET_DATA_ACK);  new_ack_p->u.dataack_p.seq_no = ap->u.data_p.seq_no;  new_ack_p->u.dataack_p.original_src.agent_id = ap->u.data_p.original_src.agent_id;  memcpy((void*)&(new_ack_p->u.dataack_p.original_src.agent_addr), (void*)&(ap->u.data_p.original_src.agent_addr), sizeof(struct sockaddr_in));  send_pkt_wrapper(new_ack_p, ap->src_agent, ap->src_agent_addr);  fflush(0);  if (top_layer->lid < ap->u.data_p.base_lid)    return 0;  for (void * pos = top_layer->ag_list.GetHeadPosition();       pos != NULL;       top_layer->ag_list.GetNext(pos) ) {    LayerAgentInfo * this_la = top_layer->ag_list.GetAt(pos);    if (this_la->ag.agent_id == ap->u.data_p.original_src.agent_id)       continue;    if (self_check(this_la) == false) {       AppPacket *new_p = new AppPacket(PACKET_DATA);       new_p->u.data_p.original_src.agent_id = ap->u.data_p.original_src.agent_id;       memcpy ( & new_p->u.data_p.original_src.agent_addr, & ap->u.data_p.original_src.agent_addr, sizeof(struct sockaddr_in));       new_p->u.data_p.seq_no = ap->u.data_p.seq_no;       new_p->u.data_p.lid = top_layer->lid;       new_p->u.data_p.base_lid = ap->u.data_p.base_lid;#ifdef LOG_DATA_PACKET_SEND       // KCR-CHANGE       //printf ("[bse %d ] At %8.4f send data-pkt : < [ %d ] > seq %d : to < [ %d ] > lid %d dist_est %f \n", id, /* Scheduler::Clock */ my_clock(), ap->u.data_p.original_src.agent_id, ap->u.data_p.seq_no, this_la->ag.agent_id, top_layer->lid, this_la->dist);#endif        //send_pkt_wrapper(new_p,this_la->ag.agent_id, this_la->ag.agent_addr);       delete new_p;    }  }  fflush(0);  return 0;} void bseAgent::handle_cluster_refresh_msg_timeout (void) {  assert (top_layer != NULL);#ifdef LOG_BSE_SEND  printf ("\n[bse %d ] At %8.4f : refresh-timeout: lid %d\n", id, /* Scheduler::Clock */ my_clock(), top_layer->lid);#endif   send_top_layer_cluster_refresh(NULL);  top_layer->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT);  fflush(0);  return;}void bseAgent::send_top_layer_cluster_refresh (LayerAgentInfo * ignore_agent) {  assert (top_layer != NULL);#ifdef LOG_BSE_SEND  printf ("\n[bse %d ] At %8.4f (b-scr) : send-refresh : lid %d\n", id, /* Scheduler::Clock */ my_clock(), top_layer->lid);#endif   for (void * pos = top_layer->ag_list.GetHeadPosition();       pos != NULL;       top_layer->ag_list.GetNext(pos) ) {    LayerAgentInfo * la = top_layer->ag_list.GetAt(pos);    if (ignore_agent != NULL) {      if (ignore_agent->ag.agent_id == la->ag.agent_id)         continue;    }    if (self_check(la) == false) {#ifdef LOG_BSE_SEND  printf ("[bse %d ] . . (b-scr) : < [ %d ] >\n", id, la->ag.agent_id);#endif       AppPacket *ap = new AppPacket (CLUSTER_REFRESH);      put_layer_cluster_info_into_packet(top_layer,ap);      ap->u.clusterrefresh_p.root_xfer = false;      ap->u.clusterrefresh_p.is_root = true;      send_pkt_wrapper(ap,la->ag.agent_id,la->ag.agent_addr);    }  }#ifdef LOG_BSE_SEND  printf ("\n");#endif   fflush(0);  return;}void bseAgent::handle_cluster_refresh_check_timeout (void) {  bool change = false;  assert (top_layer != NULL);#ifdef LOG_BSE_JUNK      printf ("[bse %d] At %8.4f (b-rc) : refresh-chk : lid %d\n", id, /* Scheduler::Clock */ my_clock(), top_layer->lid);#endif fflush(0);  for (void * pos = top_layer->ag_list.GetHeadPosition();       pos != NULL;       ) {    LayerAgentInfo * la_ag = top_layer->ag_list.GetAt(pos);    void * old_pos = pos;    top_layer->ag_list.GetNext(pos);    if (self_check(la_ag) == true)      continue;    if (la_ag->refresh == false) {      /* Member is lost */      top_layer->ag_list.RemoveAt(old_pos);#ifdef LOG_BSE_JUNK      printf ("[bse %d ] . . (b-rc) : < [ %d ] > *lost*\n", id, la_ag->ag.agent_id);#endif       delete la_ag;      change = true;    }    else {      la_ag->refresh = false; // Resetting the refresh flag#ifdef LOG_BSE_JUNK      printf ("[bse %d ] . . (b-rc) : < [ %d ] >\n", id, la_ag->ag.agent_id);#endif     }  }#ifdef LOG_BSE_JUNK  printf ("\n");#endif   assert (top_layer->ag_list.GetSize() >= 1);  if (top_layer->ag_list.GetSize() == 1) { // I am the only one    if (top_layer->lid > 0) {#ifdef LOG_BSE_CLUSTER_CHANGE  printf ("[bse %d ] At %8.4f cluster-info : lid %d : deleted\n", id,/* Scheduler::Clock */ my_clock(), top_layer->lid);#endif       top_layer->lid --;#ifdef LOG_BSE_JUNK      printf ("[bse %d ] At %8.4f refresh-chk layer-decrement to lid %d\n", id, /* Scheduler::Clock */ my_clock(), top_layer->lid);#endif       change = true;    }  }  if (top_layer->ag_list.GetSize() >= (UPPER_3K+1)) {    split_top_layer_using_two_partition();    change = true;  }  if (change == true) {    log_cluster_change_info ();#ifdef LOG_BSE_CLUSTER_CHANGE    display_top_layer_info();#endif   }  top_layer->cr_chkt.SetTimer(CONST_CLUSTER_REFRESH_CHECK_TIMEOUT);  fflush(0);  return;}void bseAgent::split_top_layer_using_two_partition (void) {#ifdef LOG_BSE_JUNK  printf ("[bse %d ] At %8.4f cluster-split : lid %d\n", id, /* Scheduler::Clock */ my_clock(), top_layer->lid);#endif   LayerAgentInfo ** ag_arr = top_layer->CreateLayerAgentInfoArray();  assert (ag_arr != NULL);  double * self_included_cost = create_cost_matrix(id,ag_arr,top_layer->ag_list.GetSize());  // This function already updates the agent_arr by deleting this agent  double * cost = delete_agent_from_cost_matrix(self_included_cost,ag_arr,top_layer->ag_list.GetSize(),id);  free(self_included_cost);  void * pos = top_layer->ag_list.Locate(id);  assert (pos != NULL);  LayerAgentInfo * self = top_layer->ag_list.GetAt(pos);  bool check = top_layer->DeleteClusterMember(self);  assert (check == true);  int root1_index, root2_index;  int * index_set1;  int * index_set2;  int set1_size, set2_size;  // Partition the set of members  int lower_size = top_layer->ag_list.GetSize() / 2;  two_partition(top_layer->ag_list.GetSize(),cost,lower_size,index_set1,set1_size,root1_index,index_set2,set2_size,root2_index);  free(cost);#ifdef LOG_BSE_PARTITION  printf ("[bse %d ] . . cluster-split : Partition 1 : ", id);  for (int i = 0; i < set1_size; i++)    printf ("%d ", ag_arr[index_set1[i]]->ag.agent_id);  printf("\n");  printf ("[bse %d ] . . cluster-split : Partition 2 : ", id);  for (int i = 0; i < set2_size; i++)    printf ("%d ", ag_arr[index_set2[i]]->ag.agent_id);  printf("\n");#endif     LayerInfo * tmp_layer1 = new LayerInfo(top_layer->lid,NULL);  LayerInfo * tmp_layer2 = new LayerInfo(top_layer->lid,NULL);  for (int i = 0; i < set1_size; i++) {    top_layer->DeleteClusterMember(ag_arr[index_set1[i]]);    bool success = tmp_layer1->AddClusterMember(ag_arr[index_set1[i]]);    assert (success == true);  }  tmp_layer1->root = ag_arr[root1_index];  for (int i = 0; i < set2_size; i++) {    top_layer->DeleteClusterMember(ag_arr[index_set2[i]]);    bool success = tmp_layer2->AddClusterMember(ag_arr[index_set2[i]]);    assert (success == true);  }  tmp_layer2->root = ag_arr[root2_index];  free(index_set1);  free(index_set2);#ifdef LOG_BSE_PARTITION  printf ("[bse %d ] At %8.4f cluster-split : root1 < [ %d ] > count %d     root2 < [ %d ] > count %d\n", id, /* Scheduler::Clock */ my_clock(), ag_arr[root1_index]->ag.agent_id, set1_size, ag_arr[root2_index]->ag.agent_id, set2_size);#endif   free(ag_arr);  assert (top_layer->ag_list.GetSize() == 0);  // Transfer control of the other cluster  send_cluster_remove(tmp_layer1);  send_cluster_remove(tmp_layer2);  LayerAgentInfo * new_root1 = new LayerAgentInfo(tmp_layer1->root->ag.agent_id,tmp_layer1->root->ag.agent_addr);  LayerAgentInfo * new_root2 = new LayerAgentInfo(tmp_layer2->root->ag.agent_id,tmp_layer2->root->ag.agent_addr);  // Cleanup ...  delete tmp_layer1;  delete tmp_layer2;  assert (top_layer->lid + 1 < MAX_LAYERS);#ifdef LOG_BSE_CLUSTER_CHANGE  printf ("[bse %d ] At %8.4f cluster-info : lid %d : deleted\n", id, /* Scheduler::Clock */ my_clock(), top_layer->lid);#endif #ifdef LOG_BSE_JUNK  printf ("[bse %d ] At %8.4f layer-increment : lid %d\n", id, /* Scheduler::Clock */ my_clock(), top_layer->lid+1);#endif   // Setup the next higher layer  top_layer->lid ++;  top_layer->AddClusterRoot(self);  top_layer->me_in_layer = true;  top_layer->AddClusterMember(new_root1);  top_layer->AddClusterMember(new_root2);  fflush(0);  return;}void bseAgent::send_cluster_remove (LayerInfo * l) {#ifdef LOG_BSE_SEND      printf ("[bse %d ] At %8.4f (b-scx) : self-cluster-remove : lid %d : new-root < [ %d ] >\n", id, /* Scheduler::Clock */ my_clock(), top_layer->lid, l->root->ag.agent_id);#endif   for (void * pos = l->ag_list.GetHeadPosition();       pos != NULL;       l->ag_list.GetNext(pos) ) {    LayerAgentInfo * la = l->ag_list.GetAt(pos);    assert (self_check(la) == false);    AppPacket *ap = new AppPacket (CLUSTER_REFRESH);    put_cluster_remove_info_into_packet(l,ap,true,NULL,id,udp_recv_agent_addr);#ifdef LOG_BSE_SEND    printf ("[bse %d ] . . (b-scx) : < [ %d ] >\n", id, la->ag.agent_id);#endif     send_pkt_wrapper(ap,la->ag.agent_id,la->ag.agent_addr);  }#ifdef LOG_BSE_SEND  printf ("\n");#endif   fflush(0);  return;}void bseAgent::log_cluster_change_info (void) {  m_last_change = /* Scheduler::Clock */ my_clock();  return;}void bseAgent::display_top_layer_info (void) {  assert (top_layer != NULL);  printf ("[bse %d ] At %8.4f cluster-info : lid %d : ldr %s %d : count %d : ", id, /* Scheduler::Clock */ my_clock(), top_layer->lid, (self_check(top_layer->root) == true) ? "self" : "other", top_layer->root->ag.agent_id, top_layer->ag_list.GetSize() );#ifdef LOG_CLUSTER_INFO_DETAILED  for (void * pos = top_layer->ag_list.GetHeadPosition();       pos != NULL;       top_layer->ag_list.GetNext(pos) ) {    printf (" %d", (top_layer->ag_list.GetAt(pos))->ag.agent_id);  }#endif   printf ("\n");  fflush(0);  return;}bool bseAgent::self_check (LayerAgentInfo * la) {  if (la->ag.agent_id == id)     return true;  else    return false;}@1.1log@Initial revision@text@d28 1a28 1#include "talker.h"d41 2a42 1#undef LOG_CLUSTER_INFO_DETAILEDd437 1@

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -