📄 coop-agent.cc,v
字号:
l->AddClusterMember(this_la); has_change = true; } else { this_la = l->ag_list.GetAt(pos); } this_la->valid_tmp = true; } // Purge the members that are not in the cluster any more for (void * pos = l->ag_list.GetHeadPosition(); pos != NULL; ) { LayerAgentInfo * la_ag = l->ag_list.GetAt(pos); void * old_pos = pos; l->ag_list.GetNext(pos); if (la_ag->valid_tmp == false) { l->ag_list.RemoveAt(old_pos); assert (la_ag != NULL); delete la_ag; has_change = true; } } return has_change;}/* Send query to the next lower layer if needed (i.e. to * one below this_lid). * If this is the target layer, then need to figure out point of * attachment */void coopAgent::continue_join_query (int this_lid) {#ifdef LOG_COOP_JUNK printf ("[coop %d ] At %8.4f (c-cjq) : continue-join-query : lid %d\n", id, /* Scheduler::my_clock() */ my_clock(), this_lid);#endif if (this_lid > target_join_q_lid) { /* Probe all the cluster members */ LayerInfo *l = layers.arr[this_lid]; void * pos = l->ag_list.Locate(id); if (pos != NULL) { // I am considered to be part of this cluster ! assert (self_check(l->root) == false); // I cannot be the root delete_self_from_layer(l); }#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-cjq) : join-continue\n", id);#endif send_join_query_to_cluster(this_lid); m_join_resp_received = 0; curr_join_q_lid = this_lid; jqt.SetTimer(CONST_JOIN_QUERY_TIMEOUT); return; } else { /* Find point of attachment */ assert (this_lid == target_join_q_lid); LayerInfo * lbase = layers.arr[this_lid]; LayerAgentInfo *self = new LayerAgentInfo(id,udp_recv_agent_addr); self->dist = 0.0; if (lbase->AddClusterMember(self) == false) { delete self; self = lbase->FindClusterMember(id); assert(self->dist >= 0.0); } lbase->me_in_layer = true; assert (lbase->root != NULL); if (lbase->root->dist < 0.0) assert (lbase->root->dist >= 0.0); AppPacket *pkt = new AppPacket(JOIN_QUERY); init_join_query_packet(pkt,this_lid,/* Scheduler::my_clock() */ my_clock(),true, udp_recv_agent_addr); send_pkt_wrapper(pkt,lbase->root->ag.agent_id, lbase->root->ag.agent_addr); lbase->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT); lbase->cr_chkt.SetTimer(CONST_CLUSTER_REFRESH_CHECK_TIMEOUT); state = ATTACHED; set_higher_layer_ping_timer();#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-cjq) : attach to my-root < [ %d ] >\n", id, lbase->root->ag.agent_id);#endif log_cluster_change_info(lbase->lid);#ifdef LOG_COOP_CLUSTER_CHANGE display_cluster_info(lbase,lbase->lid);#endif } return;}bool coopAgent::valid_join_query_forward_packet (int qlid, bool attach) { if ( (qlid < 0) || ( (qlid == 0) && (attach == false) ) ) { /* Wrong packet */ printf ("[Err] Found packet with qlid %d\n",qlid); assert (0); return false; } int lower_lid; if (attach == true) /* This is an attach request */ lower_lid = qlid; else lower_lid = qlid - 1; if (layers.arr[lower_lid] == NULL) return false; /* I must be part of this cluster. */ if (layers.arr[lower_lid]->me_in_layer == false) return false; return true;}/* Returns true if this is a valid join response packet that * we are expecting */bool coopAgent::valid_join_response_packet (AppPacket *ap) { if (state != JOIN) return false; int this_lid = ap->u.joinresp_p.layer_id; if ( (ap->src_agent == bse.agent_id) /* && (ap->src == bse.node_id) */) { if (curr_join_q_lid != -1) // BSE-JF && (curr_join_q_lid != (this_lid+1) ) return false; if (target_join_q_lid > this_lid) { assert (0); return false; } } else { /* Check for other layer queries */ if (curr_join_q_lid != (this_lid+1)) return false; LayerInfo * last_layer = layers.arr[curr_join_q_lid]; if (last_layer == NULL) return false; if (last_layer->ag_list.Locate(ap->u.joinresp_p.exp_src.agent_id) == NULL) return false; } return true;}/* Sends a join query to all members specified in the given * layer. The expected response is all members in the immediate * lower layer. */void coopAgent::send_join_query_to_cluster (int layer_lid) { LayerInfo * this_layer = layers.arr[layer_lid];#ifdef LOG_COOP_SEND printf ("[coop %d ] At %8.4f (c-jqtc) : join-query-to-cluster : lid %d\n", id, /* Scheduler::my_clock() */ my_clock(), layer_lid);#endif for (void * pos = this_layer->ag_list.GetHeadPosition(); pos != NULL; this_layer->ag_list.GetNext(pos) ) { LayerAgentInfo * coop_ag = this_layer->ag_list.GetAt(pos); if ( (coop_ag->ag.agent_id == bse.agent_id) /* && (coop_ag->ag.node_id == bse.node_id) */ ) // Dont send this join query to the BSE continue; AppPacket *p = new AppPacket(JOIN_QUERY); init_join_query_packet (p,layer_lid,/* Scheduler::my_clock() */ my_clock(),false,udp_recv_agent_addr);/* DEBUG */ if ( (layer_lid < 0) || (layer_lid == 0) ) { printf ("[Errcheck] 2: qlid %d state %d\n", layer_lid, state); fflush(stdout); assert(0); }/* DEBUG */ send_pkt_wrapper(p,coop_ag->ag.agent_id,coop_ag->ag.agent_addr);#ifdef LOG_COOP_SEND printf ("[coop %d ] . . (c-jqtc) < [ %d ] >\n", id, coop_ag->ag.agent_id);#endif }#ifdef LOG_COOP_SEND printf ("\n");#endif return;}void coopAgent::cancel_higher_layer_ping_timers (void) { if (m_ping_in_progress == true) { m_ping_in_progress = false; m_hlprt.CancelTimer(); flush_lower_layers(layers.arr[m_ping_lid]); } m_hlpit.CancelTimer(); return;}void coopAgent::set_higher_layer_ping_timer (void) { m_hlpit.SetTimer(CONST_HIGHER_LAYER_PING_INTERVAL_TIMEOUT); return;}void coopAgent::handle_join_query_timeout (void) { process_received_join_responses(); return;}void coopAgent::process_received_join_responses (void) { assert (state == JOIN); if (curr_join_q_lid == -1) { /* BSE defaulted ! */ printf ("[coop %d ] At %8.4f BSE did not respond to join-query\n", id, /* Scheduler::my_clock() */ my_clock()); init_join_query(-1);#ifdef LOG_COOP_JUNK printf ("[coop %d ] At %8.4f process-join-resps : bse defaults target lid %d\n", id, /* Scheduler::my_clock() */ my_clock(), target_join_q_lid);#endif return; } /* Find the closest agent to this member and re-start * the query business for the lower layer */ LayerInfo * this_layer = layers.arr[curr_join_q_lid]; void * pos = this_layer->ag_list.Locate(id); assert (pos == NULL); LayerAgentInfo * min_dist_la_ag = find_closest_agent_in_layer(this_layer,NULL,true); if (min_dist_la_ag != NULL) { /* Send next set of queries if needed. */ // assert (layers.arr[curr_join_q_lid] == NULL); THIS ONE WAS WRONG if (min_dist_la_ag->dist < 0.0) printf ("[Err] At %f [ %d ] found closest agent [ %d ] with neg dist\n", /* Scheduler::my_clock() */ my_clock(), id, min_dist_la_ag->ag.agent_id); int lower_lid = curr_join_q_lid - 1; flush_layer(lower_lid); layers.arr[lower_lid] = min_dist_la_ag->lower_layer; min_dist_la_ag->lower_layer = NULL; LayerAgentInfo * mdla_in_lower = layers.arr[lower_lid]->FindClusterMember(min_dist_la_ag->ag.agent_id); assert (mdla_in_lower != NULL); mdla_in_lower->dist = min_dist_la_ag->dist;#ifdef LOG_COOP_JUNK printf ("[coop %d ] At %8.4f process-join-resps : lid %d closest < [ %d ] > dist %f\n", id, /* Scheduler::my_clock() */ my_clock(), curr_join_q_lid, min_dist_la_ag->ag.agent_id, min_dist_la_ag->dist);#endif continue_join_query(lower_lid); } else { /* Have to re-query the higher layer if no one in * curr_join_q_lid responded. */#ifdef LOG_COOP_JUNK printf ("[coop %d ] At %8.4f process-join-resps layer defaults lid %d\n", id, /* Scheduler::my_clock() */ my_clock(), this_layer->lid);#endif delete this_layer; layers.arr[curr_join_q_lid] = NULL; int higher_lid = curr_join_q_lid + 1; if (higher_lid < MAX_LAYERS) { if (layers.arr[higher_lid] != NULL) { /* Retry from the higher layer */ continue_join_query(higher_lid); } else { /* Have to re-start with the BSE */ init_join_query(-1); } } } return;}/* Find a new leader in the cluster. I am leaving this cluster, * so I am not included in this. */LayerAgentInfo * coopAgent::find_new_cluster_leader (LayerInfo *l, bool omit_self) { if (l->root != NULL) assert (self_check(l->root) == true);#undef LOG_NEW_LEADER_CHOOSE#ifdef LOG_NEW_LEADER_CHOOSE printf ("[coop %d ] : At %8.4f (c-fnl) : find-new-ldr : lid %d %s\n", id, /* Scheduler::my_clock() */ my_clock(), l->lid, (omit_self == true) ? "omit-self" : "include-self");#endif double max_dist = -1.0; int max_count = -1; LayerAgentInfo * new_leader = NULL; for (void * pos = l->ag_list.GetHeadPosition(); pos != NULL; l->ag_list.GetNext(pos) ) { LayerAgentInfo * this_la = l->ag_list.GetAt(pos); if (self_check(this_la) == true) { continue; } // HAVE TO LOOK AT HOW TO ACCOMODATE THIS COUNT THING int this_count = count_dist_known_members_for_agent(this_la,omit_self);#ifdef LOG_NEW_LEADER_CHOOSE printf ("[coop %d ] : . . (c-fnl) : mbr %d : count %d : ", id, this_la->ag.agent_id, this_la->agent_dist_arr_count); for (int i = 0; i < this_la->agent_dist_arr_count; i++) printf ("( %d %f )", this_la->agent_dist_arr[i].ag.agent_id, this_la->agent_dist_arr[i].dist); printf ("\n[coop %d ] : . . (c-fnl) : this-count %d\n", id, this_count);#endif if (this_count == 0) { if (max_count < 0) { max_count = 0; new_leader = this_la; } continue; } assert (this_count > 0); if (this_count < max_count) continue; // KCR-CHANGE // double this_dist = get_farthest_dist_of_member_to_agent(l,this_la,omit_self,true); double this_dist = get_avg_dist_of_member_to_agent(l,this_la,omit_self,true);#ifdef LOG_NEW_LEADER_CHOOSE printf ("[coop %d ] : . . (c-fnl) : farthest-dist %f\n", id, this_dist);#endif assert (this_dist >= 0.0); if (this_count > max_count) { max_count = this_count; max_dist = this_dist; new_leader = this_la; } else { //(this_count == max_count) assert (max_dist >= 0.0); long this_dist_long = USECS(this_dist); long max_dist_long = USECS(max_dist); if (this_dist_long < max_dist_long) { max_dist = this_dist; new_leader = this_la; } } } if (omit_self == false) { // Need to compare self too void * pos = l->ag_list.Locate(id); if (pos != NULL) { LayerAgentInfo * self_ag = l->ag_list.GetAt(pos); LayerAgentInfo * my_farthest = find_farthest_agent_to_me_in_layer(l); // KCR-CHANGE double my_farthest_dist = find_avg_agent_dist_to_me_in_layer(l); int my_count = count_dist_known_members_for_me(l);#ifdef LOG_NEW_LEADER_CHOOSE printf ("[coop %d ] : . . (c-fnl) : mbr self : count %d : ", id, l->ag_list.GetSize() ); for (void * pos2 = l->ag_list.GetHeadPosition(); pos2 != NULL; l->ag_list.GetNext(pos2) ) { LayerAgentInfo * la2 = l->ag_list.GetAt(pos2); printf ("( %d %f )", la2->ag.agent_id, la2->dist); } // KCR-CHANGE //printf ("\n[coop %d ] : . . (c-fnl) : my-count %d : farthest ( %d %f )\n", id, my_count, my_farthest->ag.agent_id, my_farthest->dist); printf ("\n[coop %d ] : . . (c-fnl) : my-count %d : farthest ( %d %f )\n", id, my_count, my_farthest->ag.agent_id, my_farthest_dist);#endif assert (my_farthest != NULL); long max_dist_long = USECS(max_dist); // KCR-CHANGE // long my_farthest_dist_long = USECS(my_farthest->dist); long my_farthest_dist_long = USECS(my_farthest_dist); if ( (my_count > max_count) || ( (my_count == max_count) && ( (max_dist < 0.0) || (my_farthest_dist_long <= max_dist_long) ) ) ) { // The my_farthest->dist <= max_dist ensures that the current incumbent // (i.e. if I am the ldr) continues as ldr if possible new_leader = self_ag; max_count = my_count; // KCR-CHANGE //max_dist = my_farthest->dist; max_dist = my_farthest_dist; } } }#ifdef LOG_NEW_LEADER_CHOOSE if (new_leader != NULL) printf ("\n[coop %d ] : . . (c-fnl) : new-leader %d\n", id, new_leader->ag.agent_id); else printf ("\n[coop %d ] : . . (c-fnl) : new-leader NULL\n", id);#endif return new_leader;}/* Check all the cluster members, if any
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -