📄 coop-agent.cc
字号:
if (ap->u.clusterrefresh_p.cluster_remove == true) { // Delete#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-rcr) : src self-remove\n", id);#endif bool check = l->DeleteClusterMember(la_ag); assert (check == true); delete la_ag; }#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-rcr) : src stay as root (root-xfer to src)\n\n", id);#endif do_internal_root_transfer(l,other_root); log_cluster_change_info(l->lid);#ifdef LOG_COOP_CLUSTER_CHANGE display_cluster_info(l,l->lid);#endif return; } } // else(s does not think it is the root): nothing to do } else { // I am not the root AgentInfo old_root(l->root->ag.agent_id,l->root->ag.agent_addr); if ( (l->root->ag.agent_id == ap->src_agent) /*&& (l->root->ag.node_id == ap->src)*/ ) { // I think s is the root if (match_cluster_to_root_info(l) == true) { change = true; match_to_root_done = true; } if ( (l->lid + 1) < MAX_LAYERS) { if (layers.arr[l->lid+1] == NULL) layers.arr[l->lid+1] = new LayerInfo (l->lid+1, this); put_cluster_refresh_hl_info_into_layer_agent(ap, layers.arr[l->lid+1],id,udp_recv_agent_addr); if (m_ping_in_progress == true) { // m_ping_resps_received = get_ping_response_count_in_layer(layers.arr[l->lid+1]); test_for_all_pings_received = true; } } if ( ( (ap->u.clusterrefresh_p.is_root == true) && // s thinks it is root (ap->u.clusterrefresh_p.root_xfer == true) ) || (ap->u.clusterrefresh_p.is_root == false) ) { // s doesnt think it is root void *pos = l->ag_list.Locate(ap->u.clusterrefresh_p.agent_arr[0].ag.agent_id); if (pos == NULL) assert (pos != NULL); LayerAgentInfo *new_root = l->ag_list.GetAt(pos); assert ( (old_root.agent_id != new_root->ag.agent_id) /*|| (old_root.node_id != new_root->ag.node_id) */); // THE IF STMT (NEXT LINE) IS NOT NEEDED if ( (old_root.agent_id != new_root->ag.agent_id) /*|| (old_root.node_id != new_root->ag.node_id) */) {#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-rcr) : have a new cluster-root < [ %d ] >\n", id, new_root->ag.agent_id);#endif l->root = new_root; change = true; cancel_higher_layer_ping_timers(); } // Join next higher layer, if it was a root transfer to me if (self_check(l->root) == true) {#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-rcr) : me not root, root xfer to me\n", id);#endif join_to_next_higher_layer(l->lid); l->delay_root_xfer = true; } else { // The higher layer ping timer is set only when the next higher // layer info is known. If I am joining as the new leader, then // this info is not known. set_higher_layer_ping_timer(); } } } // else I dont think s is the root: do nothing } if (match_to_root_done == false) { // Only in this case, is this check // required to see if we need to delete and free this member. In all // other cases, we have already done the delete and returned from the // function. // So if match_to_root not done, and control is here, then do this // check and delete if appropriate. if (ap->u.clusterrefresh_p.cluster_remove == true) { // Delete bool check = l->DeleteClusterMember(la_ag); assert (check == true); // This is true after the match_to_root stuff. if (check == true) {#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-rcr) : src self-remove\n", id);#endif delete la_ag; change = true; } } } if ( (m_ping_in_progress == true) && (test_for_all_pings_received == true) ) { m_ping_resps_received = get_ping_response_count_in_layer(layers.arr[l->lid+1]); if ( (l->lid + 1) != m_ping_lid ) assert ( (l->lid+1) == m_ping_lid); if (check_all_ping_responses_received() == true) { int lower_lid = m_ping_lid - 1; //printf ("[%d %d] At %8.4f ping before-process handle-cluster-refresh\n", id, n->id, /* Scheduler::my_clock() */ my_clock()); if (process_ping_responses () == true) { //printf ("[ %d ] At %8.4f ping handle-cluster-refresh\n", id, /* Scheduler::my_clock() */ my_clock()); change = true; l = layers.arr[lower_lid]; } } } if (change == true) { assert (l != NULL); log_cluster_change_info(l->lid);#ifdef LOG_COOP_CLUSTER_CHANGE display_cluster_info(l,l->lid);#endif } return;}void coopAgent::handle_cluster_merge (AppPacket * ap) { if (valid_cluster_merge_packet(ap) == false) return;#ifdef LOG_COOP_RECV printf ("[coop %d ] At %8.4f (c-merge) : recvd-valid-merge : lid %d from < [ %d ]>\n", id, /* Scheduler::my_clock() */ my_clock(),ap->u.clustermerge_p.layer_id,ap->src_agent); for (int i = 0; i < ap->u.clustermerge_p.mbr_count; i++) printf ("[coop %d ] . . (c-merge) < [ %d ] >\n", id, ap->u.clustermerge_p.agent_arr[i].ag.agent_id); printf ("\n");#endif LayerInfo *l = layers.arr[ap->u.clustermerge_p.layer_id]; assert (l != NULL); int count = l->AddExtraMembersFromAltRootPacket(ap); if (count > 0) { log_cluster_change_info(l->lid);#ifdef LOG_COOP_CLUSTER_CHANGE display_cluster_info(l,l->lid);#endif } l->cr_msgt.CancelTimer(); send_all_cluster_refresh_packet(l,false,true,NULL); l->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT); return;}void coopAgent::handle_ping_query (AppPacket *ap) { assert (ap->st == PING_QUERY); int this_lid = ap->u.pingq_p.lid; bool accept = false; double dist = -1.0;#ifdef LOG_COOP_JUNK_PING printf ("[coop %d ] At %8.4f recv-pingq : lid %d : from < [ %d ] >\n", id, /* Scheduler::my_clock() */ my_clock(), ap->u.pingq_p.lid, ap->src_agent);#endif if (layers.arr[this_lid] != NULL) { if (layers.arr[this_lid]->me_in_layer == true) { /* This is new stuff: reject a ping request if accepting this * member increases my cluster radius */ LayerInfo * lower_layer = layers.arr[this_lid - 1]; assert (lower_layer != NULL); assert (lower_layer->root != NULL); assert (self_check(lower_layer->root) == true); LayerAgentInfo * my_farthest = find_farthest_agent_to_me_in_layer(lower_layer); assert (my_farthest != NULL); long my_farthest_dist_long = USECS(my_farthest->dist); { struct sockaddr_in their_addr; memcpy(& their_addr, & ap->src_agent_addr, sizeof(struct sockaddr_in)); their_addr.sin_port = htons(DIST_EST_PORT); dist = estimate_rtt_wrapper(their_addr); } long this_dist_long = USECS(dist); if (my_farthest_dist_long > this_dist_long) accept = true; else {#ifdef LOG_COOP_JUNK_PING printf ("[coop %d ] . . recv-pingq : reject (dist-violation)\n", id);#endif } } } AppPacket *resp_p = new AppPacket (PING_RESPONSE);//sunny change3 (6lines) resp_p->src_agent=this->id; memcpy(& resp_p->src_agent_addr, & this->udp_recv_agent_addr, sizeof(struct sockaddr_in));/* no dst_agent resp_p->dst_agent=ap->src_ag.agent_id; memcpy(& resp_p->dst_agent_addr, & ap->src_ag.agent_addr, sizeof(struct sockaddr_in));*///sunny resp_p->u.pingresp_p.accept = accept; resp_p->u.pingresp_p.lid = this_lid; resp_p->u.pingresp_p.src_time = ap->u.pingq_p.src_time; resp_p->u.pingresp_p.dist = dist; if (accept == true) { assert (this_lid > 0); LayerInfo * lower_layer = layers.arr[this_lid - 1]; //assert (lower_layer != NULL); //assert (lower_layer->root != NULL); //assert (self_check(lower_layer->root) == true); void * pos = lower_layer->ag_list.Locate(ap->src_agent); if (pos != NULL) {#ifdef LOG_COOP_JUNK printf ("[coop %d ] At %8.4f pinger < [ %d ] > dup\n", id, /* Scheduler::my_clock() */ my_clock(),ap->src_agent);#endif LayerAgentInfo * this_la = lower_layer->ag_list.GetAt(pos); lower_layer->ag_list.RemoveAt(pos); delete this_la; log_cluster_change_info(lower_layer->lid);#ifdef LOG_COOP_CLUSTER_CHANGE display_cluster_info(lower_layer,lower_layer->lid);#endif } put_layer_cluster_info_into_packet(lower_layer,resp_p); }#ifdef LOG_COOP_JUNK printf ("[coop %d ] At %8.4f ping-check-debug-send < [ %d ] > dist %f\n", id, /* Scheduler::my_clock() */ my_clock(),ap->src_agent, resp_p->u.pingresp_p.dist);#endif send_pkt_wrapper(resp_p,ap->src_agent,ap->src_agent_addr); return;}void coopAgent::handle_ping_response (AppPacket * ap) { assert (ap->st == PING_RESPONSE); if ( (m_ping_in_progress == false) || (ap->u.pingresp_p.lid != m_ping_lid) ) return; LayerInfo * l = layers.arr[m_ping_lid]; assert (l != NULL); void * pos = l->ag_list.Locate(ap->src_agent); if (pos == NULL) return; LayerAgentInfo * la = l->ag_list.GetAt(pos); // la->dist = (/* Scheduler::my_clock() */ my_clock() - ap->u.pingresp_p.src_time) / 2.0; //modify ping to just estimate rtt... if (ap->u.pingresp_p.dist >= 0.0) { la->dist = ap->u.pingresp_p.dist; la->dist_clock = my_clock(); la->valid_dist_clock = true; } /* if ((la->valid_dist_clock == false) || (my_clock() - la->dist_clock >= DIST_CLOCK_THRESHOLD)) { // dist = MAX_RTT; struct sockaddr_in their_addr; memcpy(& their_addr, & la->ag.agent_addr, sizeof(struct sockaddr_in)); their_addr.sin_port = htons(DIST_EST_PORT); la->dist = estimate_rtt_wrapper(their_addr); la->dist_clock = my_clock(); la->valid_dist_clock = true; } */ if (la->lower_layer != NULL) { delete la->lower_layer; la->lower_layer = NULL; }#ifdef LOG_COOP_JUNK_PING printf ("[coop %d ] At %8.4f recv-pingresp : lid %d : from < [ %d ] > dist %f %s\n", id, /* Scheduler::my_clock() */ my_clock(), m_ping_lid, ap->src_agent, la->dist, (ap->u.pingresp_p.accept == true) ? "" : "*reject*" );#endif if (ap->u.pingresp_p.accept == true) { la->lower_layer = new LayerInfo(m_ping_lid-1,this); put_packet_cluster_info_into_layer(ap,la->lower_layer); m_ping_resps_received ++; } else { bool success = l->DeleteClusterMember(la); delete la; assert (success == true); } if (check_all_ping_responses_received() == true) { int lower_lid = m_ping_lid - 1; if (process_ping_responses () == true) { //printf ("[ %d ] At %8.4f ping recv-ping-resp\n", id, /* Scheduler::my_clock() */ my_clock()); LayerInfo * new_layer = layers.arr[lower_lid]; assert (new_layer != NULL); log_cluster_change_info(new_layer->lid);#ifdef LOG_COOP_CLUSTER_CHANGE display_cluster_info(new_layer,new_layer->lid);#endif } } return;}bool coopAgent::check_all_ping_responses_received (void) { assert (m_ping_in_progress == true); LayerInfo *l = layers.arr[m_ping_lid]; assert (l != NULL); // If this higher layer has BSE, then cannot count it, since BSE is not sent // a ping. int ignore_bse; if (l->ag_list.Locate(bse.agent_id) != NULL) ignore_bse = 1; else ignore_bse = 0; // The -1 is due to not including the root of the lower layer in this if ( m_ping_resps_received == ( l->ag_list.GetSize() - 1 - ignore_bse) ) return true; return false;}/* Returns true if there is a change */bool coopAgent::process_ping_responses (void) { bool ret_val = false; assert (m_ping_lid > 0); LayerInfo * hl = layers.arr[m_ping_lid]; assert (hl != NULL); LayerInfo * my_highest_layer = layers.arr[m_ping_lid - 1]; assert (my_highest_layer != NULL); assert (my_highest_layer->root != NULL); if (my_highest_layer->root->dist < 0.0) assert (my_highest_layer->root->dist >= 0.0); assert (my_highest_layer->me_in_layer == true); LayerAgentInfo * min_la = find_closest_agent_in_layer(hl,&(my_highest_layer->root->ag),true); if (min_la != NULL) { long my_highest_layer_root_dist_long = USECS(my_highest_layer->root->dist); long min_la_dist_long = USECS(min_la->dist); if (my_highest_layer_root_dist_long > min_la_dist_long) { ret_val = true;#ifdef LOG_COOP_JUNK_PING_SWITCH printf ("[coop %d ] At %8.4f switch-root : lid %d : from < [ %d ] > to < [ %d ] > dist %8.4f \n", id, /* Scheduler::my_clock() */ my_clock(), m_ping_lid - 1, my_highest_layer->root->ag.agent_id, min_la->ag.agent_id, min_la->dist);#endif // Send the cluster remove in old cluster // Delete self void * pos = my_highest_layer->ag_list.Locate(id); assert (pos != NULL); LayerAgentInfo * self_ag = my_highest_layer->ag_list.GetAt(pos); my_highest_layer->ag_list.RemoveAt(pos); my_highest_layer->me_in_layer = false; if (my_highest_layer->ag_list.GetSize() > 0) send_cluster_remove(my_highest_layer,false); delete my_highest_layer; // Send the join to new cluster layers.arr[m_ping_lid-1] = min_la->lower_layer; min_la->lower_layer = NULL; LayerInfo * new_layer = layers.arr[m_ping_lid-1]; bool check = new_layer->AddClusterMember(self_ag); assert (check == true); new_layer->me_in_layer = true; assert (new_layer->root != NULL); assert ( (new_layer->root->ag.agent_id == min_la->ag.agent_id) /* && (new_layer->root->ag.node_id == min_la->ag.node_id) */); assert (min_la->dist >= 0.0); new_layer->root->dist = min_la->dist; AppPacket *pkt = new AppPacket(JOIN_QUERY); init_join_query_packet(pkt,m_ping_lid-1,/* Scheduler::my_clock() */ my_clock(),true, udp_recv_agent_addr);//sunny change3 (6lines) pkt->src_agent=this->id; memcpy(& pkt->src_agent_addr, & this->udp_recv_agent_addr, sizeof(struct sockaddr_in)); pkt->dst_agent=new_layer->root->ag.agent_id; memcpy(& pkt->dst_agent_addr, & new_layer->root->ag.agent_addr, sizeof(struct sockaddr_in));//sunny send_pkt_wrapper(pkt,new_layer->root->ag.agent_id, new_layer->root->ag.agent_addr); new_layer->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT); new_layer->cr_chkt.SetTimer(CONST_CLUSTER_REFRESH_CHECK_TIMEOUT); assert (state == ATTACHED); } } cancel_higher_layer_ping_timers(); set_higher_layer_ping_timer(); return ret_val;}void coopAgent::handle_data_ack_packet (AppPacket *ap) { assert (ap->st == PACKET_DATA_ACK);//#ifdef LOG_DATA_PACKET_RECV printf ("[coop %d ] At %8.4f recv ack-pkt : < [ %d ] > seq %d : from < [ %d ] >\n", id, /* Scheduler::my_clock() */ my_clock(), ap->u.dataack_p.original_src.agent_id, ap->u.dataack_p.seq_no, ap->src_agent);//#endif return;}int coopAgent::handle_data_packet (AppPacket *ap) { assert (ap->st == PACKET_DATA);//#ifdef LOG_DATA_PACKET_RECV printf ("[coop %d ] At %8.4f recv data-pkt : < [ %d ] > seq %d : from < [ %d ] >\n", id, my_clock(), ap->u.data_p.original_src.agent_id, ap->u.data_p.seq_no, ap->src_agent);printf("sunny: handle_data_packet orig_time: %f\n", ap->u.data_p.orig_time);//#endif /* Send an ack back */ AppPacket *new_ack_p = new AppPacket(PACKET_DATA_ACK);//sunny change3 (6lines) new_ack_p->src_agent=this->id; memcpy(& new_ack_p->src_agent_addr, & this->udp_recv_agent_addr, sizeof(struct sockaddr_in)); new_ack_p->dst_agent=ap->src_agent; memcpy(& new_ack_p->dst_agent_addr, & ap->src_agent_addr, sizeof(struct sockaddr_in));//sunny new_ack_p->u.dataack_p.seq_no = ap->u.data_p.seq_no; new_ack_p->u.dataack_p.original_src.agent_id = ap->u.data_p.original_src.agent_id; memcpy((void*)&(new_ack_p->u.dataack_p.original_src.agent_addr), (void*)&(ap->u.data_p.original_src.agent_addr), sizeof(struct sockaddr_in)); send_pkt_wrapper(new_ack_p, ap->src_agent, ap->src_agent_addr);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -