⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 coop-agent.old.cc

📁 这是P2P流媒体方案-NICE的实现源码
💻 CC
📖 第 1 页 / 共 5 页
字号:
  LayerInfo * this_layer = layers.arr[required_qlid];    if ( self_check(this_layer->root) == false) {    /* If I am not the cluster root, then forward this to cluster root */    AppPacket *fwd_p = new AppPacket(JOIN_FORWARD);    put_info_into_join_forward_packet (src_ag,qlid,original_dst,src_time,attach,fwd_p);/* DEBUG */    if ( (qlid < 0) || ( (qlid == 0) && (attach == false) ) ) {      printf ("[Errcheck] 1: qlid %d attach %d state %d\n", qlid, attach, state);      fflush(stdout);      assert(0);    }/* DEBUG */    send_pkt_wrapper(fwd_p,this_layer->root->ag.agent_id, this_layer->root->ag.agent_addr);#ifdef LOG_COOP_JUNK    printf ("[coop %d ] . . (c-jqf) : fwd to root < [ %d ] >\n", id, this_layer->root->ag.agent_id);#endif     return;  }    if (attach == false) {        void * pos = this_layer->ag_list.Locate(src_ag.agent_id);    if (pos != NULL) {#ifdef LOG_COOP_JUNK      printf ("[coop %d ] . . (c-jqf) : dup delete\n", id);#endif       LayerAgentInfo * this_la = this_layer->ag_list.GetAt(pos);      assert (this_la != NULL);      this_layer->ag_list.RemoveAt(pos);      delete this_la;      log_cluster_change_info(this_layer->lid);#ifdef LOG_COOP_CLUSTER_CHANGE      display_cluster_info(this_layer,this_layer->lid);#endif     }        /* Create response packet */    AppPacket *resp_p = new AppPacket(JOIN_RESPONSE);    put_layer_cluster_info_into_packet(this_layer,resp_p);    resp_p->u.joinresp_p.accept = true;    resp_p->u.joinresp_p.exp_src.agent_id = original_dst.agent_id;    //    resp_p->u.joinresp_p.exp_src.node_id = original_dst.node_id;    memcpy(& resp_p->u.joinresp_p.exp_src.agent_addr, & original_dst.agent_addr, sizeof(struct sockaddr_in));    send_pkt_wrapper(resp_p,src_ag.agent_id, src_ag.agent_addr);#ifdef LOG_COOP_JUNK    printf ("[coop %d ] . . (c-jqf) : sent response\n", id);#endif   }    else { /* Attach appropriately to the cluster */    LayerAgentInfo *la_ag = new LayerAgentInfo(src_ag.agent_id,src_ag.agent_addr);    if (this_layer->AddClusterMember(la_ag) == false) {      delete la_ag;      la_ag = this_layer->FindClusterMember(src_ag.agent_id);      la_ag->refresh = true;    }    // if (ap->st == JOIN_QUERY) { // and not JOIN_FORWARD    //      la_ag->dist = /* Scheduler::my_clock() */ my_clock() - src_time;    if ((la_ag->valid_dist_clock == false) || (my_clock() - la_ag->dist_clock >= DIST_CLOCK_THRESHOLD)) {      //  dist = MAX_RTT;      struct sockaddr_in their_addr;       memcpy(& their_addr, & la_ag->ag.agent_addr, sizeof(struct sockaddr_in));      their_addr.sin_port = htons(DIST_EST_PORT);      la_ag->dist = estimate_rtt_wrapper(their_addr);      la_ag->dist_clock = my_clock();      la_ag->valid_dist_clock = true;    }    #ifdef LOG_COOP_JUNK    printf ("[coop %d ] . . (c-jqf) : *attach*\n", id);#endif         // Immediate update of new member joining the cluster    this_layer->cr_msgt.CancelTimer();    send_all_cluster_refresh_packet(this_layer,false,true,NULL);    this_layer->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT);  }    return;}void coopAgent::split_cluster_using_two_partition (LayerInfo * l) {  LayerAgentInfo ** ag_arr = l->CreateLayerAgentInfoArray();  assert (ag_arr != NULL);  double * cost = create_cost_matrix(id,ag_arr,l->ag_list.GetSize());  int root1_index, root2_index;  int * index_set1 = NULL;  int * index_set2 = NULL;  int set1_size, set2_size;  // Partition the set of members  int lower_size = l->ag_list.GetSize() / 2;  two_partition(l->ag_list.GetSize(),cost,lower_size,index_set1,set1_size,root1_index,index_set2,set2_size,root2_index);  assert (index_set1 != NULL);  assert (index_set2 != NULL);  free(cost);  // After the split, cluster1 is my cluster, cluster2 is the other cluster  LayerAgentInfo ** cluster1 = (LayerAgentInfo **) safe_malloc (sizeof (LayerAgentInfo *) * set1_size);  bool need_to_swap = true;  for (int i = 0; i < set1_size; i++) {    cluster1[i] = ag_arr[index_set1[i]];    if (self_check(cluster1[i]) == true)      need_to_swap = false;  }  LayerAgentInfo ** cluster2 = (LayerAgentInfo **) safe_malloc (sizeof (LayerAgentInfo *) * set2_size);  for (int i = 0; i < set2_size; i++)    cluster2[i] = ag_arr[index_set2[i]];  LayerAgentInfo * c1_root = ag_arr[root1_index];  LayerAgentInfo * c2_root = ag_arr[root2_index];  if (need_to_swap == true) {    LayerAgentInfo ** tmp_cl = cluster1;    cluster1 = cluster2;    cluster2 = tmp_cl;    int tmp_cl_size = set1_size;    set1_size = set2_size;    set2_size = tmp_cl_size;    LayerAgentInfo * tmp_root = c1_root;    c1_root = c2_root;    c2_root = tmp_root;  }  free(ag_arr);  free(index_set1);  free(index_set2);  // Purge my own cluster of members that are in the other cluster  // Create a temporary layer to send out the root transfer message  LayerInfo * tmp_layer = new LayerInfo (l->lid,this);  for (int i = 0; i < set2_size; i++) {    tmp_layer->AddClusterMember(cluster2[i]);    l->DeleteClusterMember(cluster2[i]);  }  tmp_layer->root = c2_root;#ifdef LOG_COOP_JUNK  printf ("[coop %d ] At %8.4f split-cluster-self lid %d\n", id, /* Scheduler::my_clock() */ my_clock(), l->lid);#endif   // Send refresh or root transfer for my cluster  if (self_check(c1_root) == false) {    do_internal_root_transfer(l,c1_root);  }  else {    l->cr_msgt.CancelTimer();    send_all_cluster_refresh_packet(l,false,true,NULL);    l->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT);  }#ifdef LOG_COOP_JUNK  printf ("[coop %d ] At %8.4f split-cluster-other lid %d\n", id, /* Scheduler::my_clock() */ my_clock(), l->lid);#endif   // Transfer control of the other cluster  send_cluster_remove(tmp_layer,true);  // Cleanup ...  delete tmp_layer;  free(cluster1);  free(cluster2);  return;}/* This will split the cluster associated with lid. The new cluster * that contains this member will stay as is, the other cluster * will be spawned off. */void coopAgent::split_cluster_assuming_2k (LayerInfo * l) {  // Currently, I am implementing a naive scheme. The leader  // chooses its nearest neighbor to stay with it in its cluster.  // Other more intelligent schemes will be implemented later.  assert (l->ag_list.GetSize() == UPPER_2K);  LinkedList<LayerAgentInfo *, double> my_cluster_list;  /* In my_cluster, the agents are sorted in decreasing order of distance */  LinkedList<LayerAgentInfo *, double> other_cluster_list;  /* First, split the agents into my cluster and other cluster */  /* All agents that I know distances to, are my cluster to start with */  LayerAgentInfo *self = NULL;  for (void * pos = l->ag_list.GetHeadPosition();       pos != NULL;       l->ag_list.GetNext(pos) ) {    LayerAgentInfo *la_ag = l->ag_list.GetAt(pos);    if ( self_check(la_ag) == true) {      self = la_ag;      continue;    }    if (la_ag->dist >= 0.0) {      /* -1.0 is for sorting it in decreasing order of distance */      my_cluster_list.Add(la_ag,-1.0 * la_ag->dist);    }    else { /* Sorted by increasing distance order */      other_cluster_list.Add(la_ag,la_ag->dist);    }  }  /* Whichever set has the higher number of agents, need to transfer   * some agents to the other set */  void *pos2 = other_cluster_list.GetHeadPosition();  while ( my_cluster_list.GetSize() < (LOWER_K - 1) ) {        LayerAgentInfo *a = other_cluster_list.GetAt(pos2);    my_cluster_list.Add(a,-1.0 * a->dist);    void * old_pos2 = pos2;    other_cluster_list.GetNext(pos2);    other_cluster_list.RemoveAt(old_pos2);  }    void *pos3 = my_cluster_list.GetHeadPosition();  while (other_cluster_list.GetSize() < LOWER_K) {        LayerAgentInfo *a = my_cluster_list.GetAt(pos2);    other_cluster_list.Add(a,a->dist);    void * old_pos3 = pos3;    my_cluster_list.GetNext(pos3);    my_cluster_list.RemoveAt(old_pos3);  }  // Re-create my own cluster tree  l->RemoveAgents();  l->AddClusterRoot(self);  l->me_in_layer = true;  for (void *pos4 = my_cluster_list.GetHeadPosition();       pos4 != NULL;       my_cluster_list.GetNext(pos4) ) {    l->AddClusterMember(my_cluster_list.GetAt(pos4));  }  // Create the other cluster  // SimpleTree<LayerAgentInfo *> other_cluster_tree;  // void * pos5 = other_cluster_list.GetHeadPosition();  // assert (pos5 != NULL);  // LayerAgentInfo * oc_root_la = other_cluster.GetAt(pos5);  // void * oc_root_pos = other_cluster_tree.SetRoot(oc_root_la);  // for (pos5 = other_cluster_list.GetNext(pos5);  //      pos5 != NULL;  //      other_cluster_list.GetNext(pos5) ) {  //   other_cluster_tree.AddChild(other_cluster_list.GetAt(pos5),oc_root_pos);  // }  /* HAVE TO SEND MESSAGE TO THE OTHER CLUSTER MEMBERS */  my_cluster_list.RemoveAll();  other_cluster_list.RemoveAll();  return;}void coopAgent::handle_join_response (AppPacket * ap) {  /* Need to validate if this is a desired response packet */  if (valid_join_response_packet(ap) == false)    return;  int this_lid = ap->u.joinresp_p.layer_id;#ifdef LOG_COOP_RECV  printf ("\n[coop %d ] At %8.4f (c-jr) : join-resp : lid %d from < [ %d ] > exp-src < [ %d ] >\n", id, /* Scheduler::my_clock() */ my_clock(),this_lid,ap->src_agent, ap->u.joinresp_p.exp_src.agent_id);#endif   if ( (ap->src_agent == bse.agent_id)/* && (ap->src == bse.node_id)*/ ) {     assert (ap->u.joinresp_p.accept == true);    /*    if (ap->u.joinresp_p.your_id != UNDEFINED_AGENT_ID)       id = ap->u.joinresp_p.your_id; // recv new id    */    // BSE-JF : Have to delete all other members in curr_join_q_lid etc.    // or have to wait till all resps are recvd and decide whether to    // accept BSE or the other members    flush_layer(this_lid);    if (this_lid+1 < MAX_LAYERS) {      flush_layer(this_lid+1);    }    layers.arr[this_lid] = new LayerInfo(this_lid,this);    LayerInfo * this_layer = layers.arr[this_lid];    jqt.CancelTimer();    /* This is the top layer returned by BSE */    put_packet_cluster_info_into_layer(ap,this_layer);    if (this_lid == target_join_q_lid) { // This is the desired layer      void * pos = this_layer->ag_list.Locate(id);      assert (pos != NULL);      LayerAgentInfo * self = this_layer->ag_list.GetAt(pos);      self->dist = 0.0;      LayerAgentInfo * bse_la = this_layer->FindClusterMember(bse.agent_id);      assert (bse_la != NULL);      //      bse_la->dist = /* Scheduler::my_clock() */ my_clock() - ap->send_time;      if ((bse_la->valid_dist_clock == false) || (my_clock() - bse_la->dist_clock >= DIST_CLOCK_THRESHOLD)) {    //  dist = MAX_RTT;	struct sockaddr_in their_addr; 	memcpy(& their_addr, & bse_la->ag.agent_addr, sizeof(struct sockaddr_in));	their_addr.sin_port = htons(DIST_EST_PORT);	bse_la->dist = estimate_rtt_wrapper(their_addr);	bse_la->dist_clock = my_clock();	bse_la->valid_dist_clock = true;      }      this_layer->me_in_layer = true;      this_layer->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT);      this_layer->cr_chkt.SetTimer(CONST_CLUSTER_REFRESH_CHECK_TIMEOUT);      state = ATTACHED;      set_higher_layer_ping_timer();#ifdef LOG_COOP_JUNK      printf ("[coop %d ] . . (c-jr) : attach-to-bse\n\n", id);#endif       log_cluster_change_info(this_layer->lid);#ifdef LOG_COOP_CLUSTER_CHANGE      display_cluster_info(this_layer,this_layer->lid);#endif     }    else {      assert (this_lid > target_join_q_lid);      void * pos = this_layer->ag_list.Locate(id);      if (pos != NULL) { // I am considered to be in this cluster !	assert ( self_check(this_layer->root) == false); // I am not root	#ifdef LOG_COOP_JUNK        printf ("[coop %d ] . . (c-jr) : part of returned cluster! delete\n", id);#endif 	delete_self_from_layer(this_layer);      }#ifdef LOG_COOP_JUNK      printf ("[coop %d ] . . (c-jr) : continue-join-query\n\n", id);#endif       continue_join_query(this_lid);    }  }  else { /* This is a message from the higher layer cluster	  * We need to just save the information till the timeout	  */    LayerInfo * higher_layer = layers.arr[this_lid + 1];    assert (higher_layer != NULL);    void * mem_pos = higher_layer->ag_list.Locate(ap->u.joinresp_p.exp_src.agent_id);    assert (mem_pos != NULL);    LayerAgentInfo *hl_member = higher_layer->ag_list.GetAt(mem_pos);    if (ap->u.joinresp_p.accept == false) {#ifdef LOG_COOP_JUNK      printf ("[coop %d ] . . (c-jr) : join-reject\n\n", id);#endif       higher_layer->DeleteClusterMember(hl_member);      delete hl_member;    }    else {      /* If the response is coming from a different agent (I thought some       * one else was the cluster leader, then we need to update that here */      if ( (ap->u.joinresp_p.exp_src.agent_id != ap->src_agent) /*|| (ap->u.joinresp_p.exp_src.node_id != ap->src) */) {	hl_member->ag.agent_id = ap->src_agent;	//	hl_member->ag.node_id = ap->src;	memcpy(& hl_member->ag.agent_addr, & ap->src_agent_addr, sizeof(struct sockaddr_in));      }      assert (ap->u.joinresp_p.mbr_count > 0);      assert ( (hl_member->ag.agent_id == ap->u.joinresp_p.agent_arr[0].ag.agent_id) /* && (hl_member->ag.node_id == ap->u.joinresp_p.agent_arr[0].ag.node_id) */);      if (hl_member->lower_layer != NULL)	delete hl_member->lower_layer;      else	m_join_resp_received ++;            hl_member->lower_layer = new LayerInfo (this_lid,this);      put_packet_cluster_info_into_layer (ap,hl_member->lower_layer);      LayerAgentInfo * self = hl_member->lower_layer->FindClusterMember(id);      if (self != NULL)	self->dist = 0.0;      /* Using the time metric. Can use any other */      //      hl_member->dist = /* Scheduler::my_clock() */ my_clock() - ap->send_time;      if ((hl_member->valid_dist_clock == false) || (my_clock() - hl_member->dist_clock >= DIST_CLOCK_THRESHOLD)) {    //  dist = MAX_RTT;	struct sockaddr_in their_addr; 	memcpy(& their_addr, & hl_member->ag.agent_addr, sizeof(struct sockaddr_in));	their_addr.sin_port = htons(DIST_EST_PORT);	hl_member->dist = estimate_rtt_wrapper(their_addr);	hl_member->dist_clock = my_clock();	hl_member->valid_dist_clock = true;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -