📄 coop-agent.cc
字号:
delete tmp_layer; free(cluster1); free(cluster2); return;}/* This will split the cluster associated with lid. The new cluster * that contains this member will stay as is, the other cluster * will be spawned off. */void coopAgent::split_cluster_assuming_2k (LayerInfo * l) { // Currently, I am implementing a naive scheme. The leader // chooses its nearest neighbor to stay with it in its cluster. // Other more intelligent schemes will be implemented later. assert (l->ag_list.GetSize() == UPPER_2K); LinkedList<LayerAgentInfo *, double> my_cluster_list; /* In my_cluster, the agents are sorted in decreasing order of distance */ LinkedList<LayerAgentInfo *, double> other_cluster_list; /* First, split the agents into my cluster and other cluster */ /* All agents that I know distances to, are my cluster to start with */ LayerAgentInfo *self = NULL; for (void * pos = l->ag_list.GetHeadPosition(); pos != NULL; l->ag_list.GetNext(pos) ) { LayerAgentInfo *la_ag = l->ag_list.GetAt(pos); if ( self_check(la_ag) == true) { self = la_ag; continue; } if (la_ag->dist >= 0.0) { /* -1.0 is for sorting it in decreasing order of distance */ my_cluster_list.Add(la_ag,-1.0 * la_ag->dist); } else { /* Sorted by increasing distance order */ other_cluster_list.Add(la_ag,la_ag->dist); } } /* Whichever set has the higher number of agents, need to transfer * some agents to the other set */ void *pos2 = other_cluster_list.GetHeadPosition(); while ( my_cluster_list.GetSize() < (LOWER_K - 1) ) { LayerAgentInfo *a = other_cluster_list.GetAt(pos2); my_cluster_list.Add(a,-1.0 * a->dist); void * old_pos2 = pos2; other_cluster_list.GetNext(pos2); other_cluster_list.RemoveAt(old_pos2); } void *pos3 = my_cluster_list.GetHeadPosition(); while (other_cluster_list.GetSize() < LOWER_K) { LayerAgentInfo *a = my_cluster_list.GetAt(pos2); other_cluster_list.Add(a,a->dist); void * old_pos3 = pos3; my_cluster_list.GetNext(pos3); my_cluster_list.RemoveAt(old_pos3); } // Re-create my own cluster tree l->RemoveAgents(); l->AddClusterRoot(self); l->me_in_layer = true; for (void *pos4 = my_cluster_list.GetHeadPosition(); pos4 != NULL; my_cluster_list.GetNext(pos4) ) { l->AddClusterMember(my_cluster_list.GetAt(pos4)); } // Create the other cluster // SimpleTree<LayerAgentInfo *> other_cluster_tree; // void * pos5 = other_cluster_list.GetHeadPosition(); // assert (pos5 != NULL); // LayerAgentInfo * oc_root_la = other_cluster.GetAt(pos5); // void * oc_root_pos = other_cluster_tree.SetRoot(oc_root_la); // for (pos5 = other_cluster_list.GetNext(pos5); // pos5 != NULL; // other_cluster_list.GetNext(pos5) ) { // other_cluster_tree.AddChild(other_cluster_list.GetAt(pos5),oc_root_pos); // } /* HAVE TO SEND MESSAGE TO THE OTHER CLUSTER MEMBERS */ my_cluster_list.RemoveAll(); other_cluster_list.RemoveAll(); return;}void coopAgent::handle_join_response (AppPacket * ap) { /* Need to validate if this is a desired response packet */ if (valid_join_response_packet(ap) == false) return; int this_lid = ap->u.joinresp_p.layer_id;//#ifdef LOG_COOP_RECV printf ("\n[coop %d ] At %8.4f (c-jr) : join-resp : lid %d from < [ %d ] > exp-src < [ %d ] >\n", id, /* Scheduler::my_clock() */ my_clock(),this_lid,ap->src_agent, ap->u.joinresp_p.exp_src.agent_id);//#endif if ( (ap->src_agent == bse.agent_id)/* && (ap->src == bse.node_id)*/ ) { assert (ap->u.joinresp_p.accept == true); /* if (ap->u.joinresp_p.your_id != UNDEFINED_AGENT_ID) id = ap->u.joinresp_p.your_id; // recv new id */ // BSE-JF : Have to delete all other members in curr_join_q_lid etc. // or have to wait till all resps are recvd and decide whether to // accept BSE or the other members flush_layer(this_lid); if (this_lid+1 < MAX_LAYERS) { flush_layer(this_lid+1); } layers.arr[this_lid] = new LayerInfo(this_lid,this); LayerInfo * this_layer = layers.arr[this_lid]; jqt.CancelTimer(); /* This is the top layer returned by BSE */ put_packet_cluster_info_into_layer(ap,this_layer); if (this_lid == target_join_q_lid) { // This is the desired layer void * pos = this_layer->ag_list.Locate(id); assert (pos != NULL); LayerAgentInfo * self = this_layer->ag_list.GetAt(pos); self->dist = 0.0; LayerAgentInfo * bse_la = this_layer->FindClusterMember(bse.agent_id); assert (bse_la != NULL); // bse_la->dist = /* Scheduler::my_clock() */ my_clock() - ap->send_time; /* The following if then else statement are not in myrns*/ if ((bse_la->valid_dist_clock == false) || (my_clock() - bse_la->dist_clock >= DIST_CLOCK_THRESHOLD)) { // dist = MAX_RTT; struct sockaddr_in their_addr; memcpy(& their_addr, & bse_la->ag.agent_addr, sizeof(struct sockaddr_in)); their_addr.sin_port = htons(DIST_EST_PORT); bse_la->dist = estimate_rtt_wrapper(their_addr); bse_la->dist_clock = my_clock(); bse_la->valid_dist_clock = true; } this_layer->me_in_layer = true; this_layer->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT); this_layer->cr_chkt.SetTimer(CONST_CLUSTER_REFRESH_CHECK_TIMEOUT); state = ATTACHED; set_higher_layer_ping_timer();//#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-jr) : attach-to-bse\n\n", id);//#endif log_cluster_change_info(this_layer->lid);//#ifdef LOG_COOP_CLUSTER_CHANGE display_cluster_info(this_layer,this_layer->lid);//#endif } else { assert (this_lid > target_join_q_lid); void * pos = this_layer->ag_list.Locate(id); if (pos != NULL) { // I am considered to be in this cluster ! assert ( self_check(this_layer->root) == false); // I am not root //#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-jr) : part of returned cluster! delete\n", id);//#endif delete_self_from_layer(this_layer); }//#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-jr) : continue-join-query\n\n", id);//#endif continue_join_query(this_lid); } } else { /* This is a message from the higher layer cluster * We need to just save the information till the timeout */ LayerInfo * higher_layer = layers.arr[this_lid + 1]; assert (higher_layer != NULL); void * mem_pos = higher_layer->ag_list.Locate(ap->u.joinresp_p.exp_src.agent_id); assert (mem_pos != NULL); LayerAgentInfo *hl_member = higher_layer->ag_list.GetAt(mem_pos); if (ap->u.joinresp_p.accept == false) {//#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-jr) : join-reject\n\n", id);//#endif higher_layer->DeleteClusterMember(hl_member); delete hl_member; } else { /* If the response is coming from a different agent (I thought some * one else was the cluster leader, then we need to update that here */ if ( (ap->u.joinresp_p.exp_src.agent_id != ap->src_agent) /*|| (ap->u.joinresp_p.exp_src.node_id != ap->src) */) { hl_member->ag.agent_id = ap->src_agent; // hl_member->ag.node_id = ap->src; memcpy(& hl_member->ag.agent_addr, & ap->src_agent_addr, sizeof(struct sockaddr_in)); } assert (ap->u.joinresp_p.mbr_count > 0); assert ( (hl_member->ag.agent_id == ap->u.joinresp_p.agent_arr[0].ag.agent_id) /* && (hl_member->ag.node_id == ap->u.joinresp_p.agent_arr[0].ag.node_id) */); if (hl_member->lower_layer != NULL) delete hl_member->lower_layer; else m_join_resp_received ++; hl_member->lower_layer = new LayerInfo (this_lid,this); put_packet_cluster_info_into_layer (ap,hl_member->lower_layer); LayerAgentInfo * self = hl_member->lower_layer->FindClusterMember(id); if (self != NULL) self->dist = 0.0; /* Using the time metric. Can use any other */ // hl_member->dist = /* Scheduler::my_clock() */ my_clock() - ap->send_time; if ((hl_member->valid_dist_clock == false) || (my_clock() - hl_member->dist_clock >= DIST_CLOCK_THRESHOLD)) { // dist = MAX_RTT; struct sockaddr_in their_addr; memcpy(& their_addr, & hl_member->ag.agent_addr, sizeof(struct sockaddr_in)); their_addr.sin_port = htons(DIST_EST_PORT); hl_member->dist = estimate_rtt_wrapper(their_addr); hl_member->dist_clock = my_clock(); hl_member->valid_dist_clock = true; }//#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-jr) : dist-estimate %f\n", id, hl_member->dist);//#endif } int match_count = higher_layer->ag_list.GetSize(); if (higher_layer->ag_list.Locate(bse.agent_id) != NULL) match_count --; if (match_count == m_join_resp_received) { jqt.CancelTimer();//#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-jr) : all join resps found\n\n", id);//#endif process_received_join_responses(); } } return;}void coopAgent::handle_cluster_refresh (AppPacket *ap) { assert (ap->st == CLUSTER_REFRESH); if (valid_cluster_refresh_packet(ap) == false) return; LayerInfo * l = layers.arr[ap->u.clusterrefresh_p.layer_id]; void * this_agent_pos = l->ag_list.Locate(ap->src_agent); assert (this_agent_pos != NULL); LayerAgentInfo *la_ag = l->ag_list.GetAt(this_agent_pos);; bool test_for_all_pings_received = false; bool change = false; bool match_to_root_done = false;#ifdef TESTING_JUNK void * pos = l->ag_list.Locate(id); if (pos != NULL) { LayerAgentInfo * self = l->ag_list.GetAt(pos); assert (self->dist >= 0.0); }#endif #ifdef LOG_COOP_RECV printf ("\n[coop %d ] At %8.4f (c-rcr) : recvd-valid-refresh : lid %d from < [ %d ] > %s %s %s\n", id, /* Scheduler::my_clock() */ my_clock(), l->lid,ap->src_agent, (ap->u.clusterrefresh_p.is_root == true) ? "root" : ".", (ap->u.clusterrefresh_p.root_xfer == true) ? "xfer" : ".", (ap->u.clusterrefresh_p.cluster_remove == true) ? "remove" : ".");#endif #ifdef LOG_COOP_RECV_2 printf ("[coop %d ] . . (c-rcr) : dist-info in refresh :", id); for (int i = 0; i < ap->u.clusterrefresh_p.mbr_count; i++) printf ("( %d %5.3f )", ap->u.clusterrefresh_p.agent_arr[i].ag.agent_id,ap->u.clusterrefresh_p.agent_arr[i].dist); printf ("\n");#endif /* Now this is a valid packet */ la_ag->refresh_agent(ap); assert (l->root != NULL); if (self_check(l->root) == true) { // I am the root if (ap->u.clusterrefresh_p.is_root == true) { // Pkt src,s, thinks that it is the root // First check if it is doing a root transfer to me if ( (ap->u.clusterrefresh_p.root_xfer == true) && ( (ap->u.clusterrefresh_p.agent_arr[0].ag.agent_id == id) /* && (ap->u.clusterrefresh_p.agent_arr[0].ag.node_id == n->id) */) ) { // In which case gladly add all unknown members bool members_added = false; if (l->AddExtraMembersFromAltRootPacket (ap) > 0) { change = true; members_added = true; } // Do cluster remove if (ap->u.clusterrefresh_p.cluster_remove == true) { // Delete#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-rcr) : src self-remove\n", id);#endif bool check = l->DeleteClusterMember(la_ag); assert (check == true); delete la_ag; change = true; }#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-rcr) : src and me roots, root xfer to me\n\n", id);#endif if (change == true) { log_cluster_change_info(l->lid);#ifdef LOG_COOP_CLUSTER_CHANGE display_cluster_info(l,l->lid);#endif } // Immediate update of new member additions if (members_added == true) { l->cr_msgt.CancelTimer(); send_all_cluster_refresh_packet(l,false,true,NULL); l->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT); } return; } bool stay_as_root = false; // s_max: source dist to its farthest member // KCR-CHANGE (2 new lines) //double s_max = get_farthest_dist_of_member_to_agent(l,la_ag,false,false); double s_max = get_avg_dist_of_member_to_agent(l,la_ag,false,false); LayerAgentInfo * my_farthest_ag = find_farthest_agent_to_me_in_layer(l); double my_max = my_farthest_ag->dist; my_max = find_avg_agent_dist_to_me_in_layer(l); long s_max_long = USECS(s_max); long my_max_long = USECS(my_max); int s_know_count = count_dist_known_members_for_agent(la_ag,false); int me_know_count = count_dist_known_members_for_me(l); if (are_agent_members_contained_in_mine(l,la_ag) == true) { // All members in la_ag are also known to me if ( (s_know_count < me_know_count) || // I am the superset (my_max_long < s_max_long) || // We are equal in size, but I am better ( (my_max_long == s_max_long) && (id < la_ag->ag.agent_id) ) ) { // -do- stay_as_root = true; } // else {stay_as_root = false;} } else if (are_my_members_contained_in_agent(l,la_ag) == false) { // no one is a superset of the other if (id < la_ag->ag.agent_id) { stay_as_root = true; } // else {stay_as_root = false;} } // else my_members_contained_in_agent() == true: // i.e. s is a superset, so stay_as_root = false LayerAgentInfo * other_root; bool member_added = false; if (ap->u.clusterrefresh_p.root_xfer == true) { other_root = l->FindClusterMember(ap->u.clusterrefresh_p.agent_arr[0].ag.agent_id); if (other_root == NULL) { LayerAgentInfo *new_guy = new LayerAgentInfo(ap->u.clusterrefresh_p.agent_arr[0].ag.agent_id,ap->u.clusterrefresh_p.agent_arr[0].ag.agent_addr); l->AddClusterMember(new_guy); other_root = new_guy; change = true; member_added = true; }#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-rcr) : src and me roots, src root xfer to < [ %d ] >\n", id, other_root->ag.agent_id);#endif } else { assert (ap->u.clusterrefresh_p.cluster_remove == false); other_root = la_ag; } if (stay_as_root == true) { // Add all the extra members and send root challenge if (l->AddExtraMembersFromAltRootPacket(ap) > 0) { change = true; member_added = true; } if (ap->u.clusterrefresh_p.cluster_remove == true) { // Delete#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-rcr) : src self-remove\n", id);#endif bool check = l->DeleteClusterMember(la_ag); assert (check == true); delete la_ag; change = true; }#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-rcr) : me stay as root (root-challenge)\n\n", id);#endif send_root_challenge(l,other_root); if (change == true) { log_cluster_change_info(l->lid);#ifdef LOG_COOP_CLUSTER_CHANGE display_cluster_info(l,l->lid);#endif } // Immediate inform other members of new member if (member_added == true) { l->cr_msgt.CancelTimer(); // Do not send this to the other root send_all_cluster_refresh_packet(l,false,true,other_root); l->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT); } return; } else { change = true;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -