📄 coop-agent.cc,v
字号:
hl_member->lower_layer = new LayerInfo (this_lid,this); put_packet_cluster_info_into_layer (ap,hl_member->lower_layer); LayerAgentInfo * self = hl_member->lower_layer->FindClusterMember(id); if (self != NULL) self->dist = 0.0; /* Using the time metric. Can use any other */ // hl_member->dist = /* Scheduler::my_clock() */ my_clock() - ap->send_time; if ((hl_member->valid_dist_clock == false) || (my_clock() - hl_member->dist_clock >= DIST_CLOCK_THRESHOLD)) { // dist = MAX_RTT; struct sockaddr_in their_addr; memcpy(& their_addr, & hl_member->ag.agent_addr, sizeof(struct sockaddr_in)); their_addr.sin_port = htons(DIST_EST_PORT); hl_member->dist = estimate_rtt_wrapper(their_addr); hl_member->dist_clock = my_clock(); hl_member->valid_dist_clock = true; }#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-jr) : dist-estimate %f\n", id, hl_member->dist);#endif } int match_count = higher_layer->ag_list.GetSize(); if (higher_layer->ag_list.Locate(bse.agent_id) != NULL) match_count --; if (match_count == m_join_resp_received) { jqt.CancelTimer();#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-jr) : all join resps found\n\n", id);#endif process_received_join_responses(); } } return;}void coopAgent::handle_cluster_refresh (AppPacket *ap) { assert (ap->st == CLUSTER_REFRESH); if (valid_cluster_refresh_packet(ap) == false) return; LayerInfo * l = layers.arr[ap->u.clusterrefresh_p.layer_id]; void * this_agent_pos = l->ag_list.Locate(ap->src_agent); assert (this_agent_pos != NULL); LayerAgentInfo *la_ag = l->ag_list.GetAt(this_agent_pos);; bool test_for_all_pings_received = false; bool change = false; bool match_to_root_done = false;#ifdef TESTING_JUNK void * pos = l->ag_list.Locate(id); if (pos != NULL) { LayerAgentInfo * self = l->ag_list.GetAt(pos); assert (self->dist >= 0.0); }#endif #ifdef LOG_COOP_RECV printf ("\n[coop %d ] At %8.4f (c-rcr) : recvd-valid-refresh : lid %d from < [ %d ] > %s %s %s\n", id, /* Scheduler::my_clock() */ my_clock(), l->lid,ap->src_agent, (ap->u.clusterrefresh_p.is_root == true) ? "root" : ".", (ap->u.clusterrefresh_p.root_xfer == true) ? "xfer" : ".", (ap->u.clusterrefresh_p.cluster_remove == true) ? "remove" : ".");#endif #ifdef LOG_COOP_RECV_2 printf ("[coop %d ] . . (c-rcr) : dist-info in refresh :", id); for (int i = 0; i < ap->u.clusterrefresh_p.mbr_count; i++) printf ("( %d %5.3f )", ap->u.clusterrefresh_p.agent_arr[i].ag.agent_id,ap->u.clusterrefresh_p.agent_arr[i].dist); printf ("\n");#endif /* Now this is a valid packet */ la_ag->refresh_agent(ap); assert (l->root != NULL); if (self_check(l->root) == true) { // I am the root if (ap->u.clusterrefresh_p.is_root == true) { // Pkt src,s, thinks that it is the root // First check if it is doing a root transfer to me if ( (ap->u.clusterrefresh_p.root_xfer == true) && ( (ap->u.clusterrefresh_p.agent_arr[0].ag.agent_id == id) /* && (ap->u.clusterrefresh_p.agent_arr[0].ag.node_id == n->id) */) ) { // In which case gladly add all unknown members bool members_added = false; if (l->AddExtraMembersFromAltRootPacket (ap) > 0) { change = true; members_added = true; } // Do cluster remove if (ap->u.clusterrefresh_p.cluster_remove == true) { // Delete#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-rcr) : src self-remove\n", id);#endif bool check = l->DeleteClusterMember(la_ag); assert (check == true); delete la_ag; change = true; }#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-rcr) : src and me roots, root xfer to me\n\n", id);#endif if (change == true) { log_cluster_change_info(l->lid);#ifdef LOG_COOP_CLUSTER_CHANGE display_cluster_info(l,l->lid);#endif } // Immediate update of new member additions if (members_added == true) { l->cr_msgt.CancelTimer(); send_all_cluster_refresh_packet(l,false,true,NULL); l->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT); } return; } bool stay_as_root = false; // s_max: source dist to its farthest member // KCR-CHANGE (2 new lines) //double s_max = get_farthest_dist_of_member_to_agent(l,la_ag,false,false); double s_max = get_avg_dist_of_member_to_agent(l,la_ag,false,false); LayerAgentInfo * my_farthest_ag = find_farthest_agent_to_me_in_layer(l); double my_max = my_farthest_ag->dist; my_max = find_avg_agent_dist_to_me_in_layer(l); long s_max_long = USECS(s_max); long my_max_long = USECS(my_max); int s_know_count = count_dist_known_members_for_agent(la_ag,false); int me_know_count = count_dist_known_members_for_me(l); if (are_agent_members_contained_in_mine(l,la_ag) == true) { // All members in la_ag are also known to me if ( (s_know_count < me_know_count) || // I am the superset (my_max_long < s_max_long) || // We are equal in size, but I am better ( (my_max_long == s_max_long) && (id < la_ag->ag.agent_id) ) ) { // -do- stay_as_root = true; } // else {stay_as_root = false;} } else if (are_my_members_contained_in_agent(l,la_ag) == false) { // no one is a superset of the other if (id < la_ag->ag.agent_id) { stay_as_root = true; } // else {stay_as_root = false;} } // else my_members_contained_in_agent() == true: // i.e. s is a superset, so stay_as_root = false LayerAgentInfo * other_root; bool member_added = false; if (ap->u.clusterrefresh_p.root_xfer == true) { other_root = l->FindClusterMember(ap->u.clusterrefresh_p.agent_arr[0].ag.agent_id); if (other_root == NULL) { LayerAgentInfo *new_guy = new LayerAgentInfo(ap->u.clusterrefresh_p.agent_arr[0].ag.agent_id,ap->u.clusterrefresh_p.agent_arr[0].ag.agent_addr); l->AddClusterMember(new_guy); other_root = new_guy; change = true; member_added = true; }#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-rcr) : src and me roots, src root xfer to < [ %d ] >\n", id, other_root->ag.agent_id);#endif } else { assert (ap->u.clusterrefresh_p.cluster_remove == false); other_root = la_ag; } if (stay_as_root == true) { // Add all the extra members and send root challenge if (l->AddExtraMembersFromAltRootPacket(ap) > 0) { change = true; member_added = true; } if (ap->u.clusterrefresh_p.cluster_remove == true) { // Delete#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-rcr) : src self-remove\n", id);#endif bool check = l->DeleteClusterMember(la_ag); assert (check == true); delete la_ag; change = true; }#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-rcr) : me stay as root (root-challenge)\n\n", id);#endif send_root_challenge(l,other_root); if (change == true) { log_cluster_change_info(l->lid);#ifdef LOG_COOP_CLUSTER_CHANGE display_cluster_info(l,l->lid);#endif } // Immediate inform other members of new member if (member_added == true) { l->cr_msgt.CancelTimer(); // Do not send this to the other root send_all_cluster_refresh_packet(l,false,true,other_root); l->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT); } return; } else { change = true; if (ap->u.clusterrefresh_p.cluster_remove == true) { // Delete#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-rcr) : src self-remove\n", id);#endif bool check = l->DeleteClusterMember(la_ag); assert (check == true); delete la_ag; }#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-rcr) : src stay as root (root-xfer to src)\n\n", id);#endif do_internal_root_transfer(l,other_root); log_cluster_change_info(l->lid);#ifdef LOG_COOP_CLUSTER_CHANGE display_cluster_info(l,l->lid);#endif return; } } // else(s does not think it is the root): nothing to do } else { // I am not the root AgentInfo old_root(l->root->ag.agent_id,l->root->ag.agent_addr); if ( (l->root->ag.agent_id == ap->src_agent) /*&& (l->root->ag.node_id == ap->src)*/ ) { // I think s is the root if (match_cluster_to_root_info(l) == true) { change = true; match_to_root_done = true; } if ( (l->lid + 1) < MAX_LAYERS) { if (layers.arr[l->lid+1] == NULL) layers.arr[l->lid+1] = new LayerInfo (l->lid+1, this); put_cluster_refresh_hl_info_into_layer_agent(ap, layers.arr[l->lid+1],id,udp_recv_agent_addr); if (m_ping_in_progress == true) { // m_ping_resps_received = get_ping_response_count_in_layer(layers.arr[l->lid+1]); test_for_all_pings_received = true; } } if ( ( (ap->u.clusterrefresh_p.is_root == true) && // s thinks it is root (ap->u.clusterrefresh_p.root_xfer == true) ) || (ap->u.clusterrefresh_p.is_root == false) ) { // s doesnt think it is root void *pos = l->ag_list.Locate(ap->u.clusterrefresh_p.agent_arr[0].ag.agent_id); if (pos == NULL) assert (pos != NULL); LayerAgentInfo *new_root = l->ag_list.GetAt(pos); assert ( (old_root.agent_id != new_root->ag.agent_id) /*|| (old_root.node_id != new_root->ag.node_id) */); // THE IF STMT (NEXT LINE) IS NOT NEEDED if ( (old_root.agent_id != new_root->ag.agent_id) /*|| (old_root.node_id != new_root->ag.node_id) */) {#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-rcr) : have a new cluster-root < [ %d ] >\n", id, new_root->ag.agent_id);#endif l->root = new_root; change = true; cancel_higher_layer_ping_timers(); } // Join next higher layer, if it was a root transfer to me if (self_check(l->root) == true) {#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-rcr) : me not root, root xfer to me\n", id);#endif join_to_next_higher_layer(l->lid); l->delay_root_xfer = true; } else { // The higher layer ping timer is set only when the next higher // layer info is known. If I am joining as the new leader, then // this info is not known. set_higher_layer_ping_timer(); } } } // else I dont think s is the root: do nothing } if (match_to_root_done == false) { // Only in this case, is this check // required to see if we need to delete and free this member. In all // other cases, we have already done the delete and returned from the // function. // So if match_to_root not done, and control is here, then do this // check and delete if appropriate. if (ap->u.clusterrefresh_p.cluster_remove == true) { // Delete bool check = l->DeleteClusterMember(la_ag); assert (check == true); // This is true after the match_to_root stuff. if (check == true) {#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-rcr) : src self-remove\n", id);#endif delete la_ag; change = true; } } } if ( (m_ping_in_progress == true) && (test_for_all_pings_received == true) ) { m_ping_resps_received = get_ping_response_count_in_layer(layers.arr[l->lid+1]); if ( (l->lid + 1) != m_ping_lid ) assert ( (l->lid+1) == m_ping_lid); if (check_all_ping_responses_received() == true) { int lower_lid = m_ping_lid - 1; //printf ("[%d %d] At %8.4f ping before-process handle-cluster-refresh\n", id, n->id, /* Scheduler::my_clock() */ my_clock()); if (process_ping_responses () == true) { //printf ("[ %d ] At %8.4f ping handle-cluster-refresh\n", id, /* Scheduler::my_clock() */ my_clock()); change = true; l = layers.arr[lower_lid]; } } } if (change == true) { assert (l != NULL); log_cluster_change_info(l->lid);#ifdef LOG_COOP_CLUSTER_CHANGE display_cluster_info(l,l->lid);#endif } return;}void coopAgent::handle_cluster_merge (AppPacket * ap) { if (valid_cluster_merge_packet(ap) == false) return;#ifdef LOG_COOP_RECV printf ("[coop %d ] At %8.4f (c-merge) : recvd-valid-merge : lid %d from < [ %d ]>\n", id, /* Scheduler::my_clock() */ my_clock(),ap->u.clustermerge_p.layer_id,ap->src_agent); for (int i = 0; i < ap->u.clustermerge_p.mbr_count; i++) printf ("[coop %d ] . . (c-merge) < [ %d ] >\n", id, ap->u.clustermerge_p.agent_arr[i].ag.agent_id); printf ("\n");#endif LayerInfo *l = layers.arr[ap->u.clustermerge_p.layer_id]; assert (l != NULL); int count = l->AddExtraMembersFromAltRootPacket(ap); if (count > 0) { log_cluster_change_info(l->lid);#ifdef LOG_COOP_CLUSTER_CHANGE display_cluster_info(l,l->lid);#endif } l->cr_msgt.CancelTimer(); send_all_cluster_refresh_packet(l,false,true,NULL); l->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT); return;}void coopAgent::handle_ping_query (AppPacket *ap) { assert (ap->st == PING_QUERY); int this_lid = ap->u.pingq_p.lid; bool accept = false; double dist = -1.0;#ifdef LOG_COOP_JUNK_PING printf ("[coop %d ] At %8.4f recv-pingq : lid %d : from < [ %d ] >\n", id, /* Scheduler::my_clock() */ my_clock(), ap->u.pingq_p.lid, ap->src_agent);#endif if (layers.arr[this_lid] != NULL) { if (layers.arr[this_lid]->me_in_layer == true) { /* This is new stuff: reject a ping request if accepting this * member increases my cluster radius */ LayerInfo * lower_layer = layers.arr[this_lid - 1]; assert (lower_layer != NULL); assert (lower_layer->root != NULL); assert (self_check(lower_layer->root) == true); LayerAgentInfo * my_farthest = find_farthest_agent_to_me_in_layer(lower_layer); assert (my_farthest != NULL); long my_farthest_dist_long = USECS(my_farthest->dist); { struct sockaddr_in their_addr; memcpy(& their_addr, & ap->src_agent_addr, sizeof(struct sockaddr_in)); their_addr.sin_port = htons(DIST_EST_PORT); dist = estimate_rtt_wrapper(their_addr); } long this_dist_long = USECS(dist); if (my_farthest_dist_long > this_dist_long) accept = true; else {#ifdef LOG_COOP_JUNK_PING printf ("[coop %d ] . . recv-pingq : reject (dist-violation)\n", id);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -