📄 coop-agent.cc,v
字号:
#endif } } } AppPacket *resp_p = new AppPacket (PING_RESPONSE); resp_p->u.pingresp_p.accept = accept; resp_p->u.pingresp_p.lid = this_lid; resp_p->u.pingresp_p.src_time = ap->u.pingq_p.src_time; resp_p->u.pingresp_p.dist = dist; if (accept == true) { assert (this_lid > 0); LayerInfo * lower_layer = layers.arr[this_lid - 1]; //assert (lower_layer != NULL); //assert (lower_layer->root != NULL); //assert (self_check(lower_layer->root) == true); void * pos = lower_layer->ag_list.Locate(ap->src_agent); if (pos != NULL) {#ifdef LOG_COOP_JUNK printf ("[coop %d ] At %8.4f pinger < [ %d ] > dup\n", id, /* Scheduler::my_clock() */ my_clock(),ap->src_agent);#endif LayerAgentInfo * this_la = lower_layer->ag_list.GetAt(pos); lower_layer->ag_list.RemoveAt(pos); delete this_la; log_cluster_change_info(lower_layer->lid);#ifdef LOG_COOP_CLUSTER_CHANGE display_cluster_info(lower_layer,lower_layer->lid);#endif } put_layer_cluster_info_into_packet(lower_layer,resp_p); }#ifdef LOG_COOP_JUNK printf ("[coop %d ] At %8.4f ping-check-debug-send < [ %d ] > dist %f\n", id, /* Scheduler::my_clock() */ my_clock(),ap->src_agent, resp_p->u.pingresp_p.dist);#endif send_pkt_wrapper(resp_p,ap->src_agent,ap->src_agent_addr); return;}void coopAgent::handle_ping_response (AppPacket * ap) { assert (ap->st == PING_RESPONSE); if ( (m_ping_in_progress == false) || (ap->u.pingresp_p.lid != m_ping_lid) ) return; LayerInfo * l = layers.arr[m_ping_lid]; assert (l != NULL); void * pos = l->ag_list.Locate(ap->src_agent); if (pos == NULL) return; LayerAgentInfo * la = l->ag_list.GetAt(pos); // la->dist = (/* Scheduler::my_clock() */ my_clock() - ap->u.pingresp_p.src_time) / 2.0; //modify ping to just estimate rtt... if (ap->u.pingresp_p.dist >= 0.0) { la->dist = ap->u.pingresp_p.dist; la->dist_clock = my_clock(); la->valid_dist_clock = true; } /* if ((la->valid_dist_clock == false) || (my_clock() - la->dist_clock >= DIST_CLOCK_THRESHOLD)) { // dist = MAX_RTT; struct sockaddr_in their_addr; memcpy(& their_addr, & la->ag.agent_addr, sizeof(struct sockaddr_in)); their_addr.sin_port = htons(DIST_EST_PORT); la->dist = estimate_rtt_wrapper(their_addr); la->dist_clock = my_clock(); la->valid_dist_clock = true; } */ if (la->lower_layer != NULL) { delete la->lower_layer; la->lower_layer = NULL; }#ifdef LOG_COOP_JUNK_PING printf ("[coop %d ] At %8.4f recv-pingresp : lid %d : from < [ %d ] > dist %f %s\n", id, /* Scheduler::my_clock() */ my_clock(), m_ping_lid, ap->src_agent, la->dist, (ap->u.pingresp_p.accept == true) ? "" : "*reject*" );#endif if (ap->u.pingresp_p.accept == true) { la->lower_layer = new LayerInfo(m_ping_lid-1,this); put_packet_cluster_info_into_layer(ap,la->lower_layer); m_ping_resps_received ++; } else { bool success = l->DeleteClusterMember(la); delete la; assert (success == true); } if (check_all_ping_responses_received() == true) { int lower_lid = m_ping_lid - 1; if (process_ping_responses () == true) { //printf ("[ %d ] At %8.4f ping recv-ping-resp\n", id, /* Scheduler::my_clock() */ my_clock()); LayerInfo * new_layer = layers.arr[lower_lid]; assert (new_layer != NULL); log_cluster_change_info(new_layer->lid);#ifdef LOG_COOP_CLUSTER_CHANGE display_cluster_info(new_layer,new_layer->lid);#endif } } return;}bool coopAgent::check_all_ping_responses_received (void) { assert (m_ping_in_progress == true); LayerInfo *l = layers.arr[m_ping_lid]; assert (l != NULL); // If this higher layer has BSE, then cannot count it, since BSE is not sent // a ping. int ignore_bse; if (l->ag_list.Locate(bse.agent_id) != NULL) ignore_bse = 1; else ignore_bse = 0; // The -1 is due to not including the root of the lower layer in this if ( m_ping_resps_received == ( l->ag_list.GetSize() - 1 - ignore_bse) ) return true; return false;}/* Returns true if there is a change */bool coopAgent::process_ping_responses (void) { bool ret_val = false; assert (m_ping_lid > 0); LayerInfo * hl = layers.arr[m_ping_lid]; assert (hl != NULL); LayerInfo * my_highest_layer = layers.arr[m_ping_lid - 1]; assert (my_highest_layer != NULL); assert (my_highest_layer->root != NULL); if (my_highest_layer->root->dist < 0.0) assert (my_highest_layer->root->dist >= 0.0); assert (my_highest_layer->me_in_layer == true); LayerAgentInfo * min_la = find_closest_agent_in_layer(hl,&(my_highest_layer->root->ag),true); if (min_la != NULL) { long my_highest_layer_root_dist_long = USECS(my_highest_layer->root->dist); long min_la_dist_long = USECS(min_la->dist); if (my_highest_layer_root_dist_long > min_la_dist_long) { ret_val = true;#ifdef LOG_COOP_JUNK_PING_SWITCH printf ("[coop %d ] At %8.4f switch-root : lid %d : from < [ %d ] > to < [ %d ] > dist %8.4f \n", id, /* Scheduler::my_clock() */ my_clock(), m_ping_lid - 1, my_highest_layer->root->ag.agent_id, min_la->ag.agent_id, min_la->dist);#endif // Send the cluster remove in old cluster // Delete self void * pos = my_highest_layer->ag_list.Locate(id); assert (pos != NULL); LayerAgentInfo * self_ag = my_highest_layer->ag_list.GetAt(pos); my_highest_layer->ag_list.RemoveAt(pos); my_highest_layer->me_in_layer = false; if (my_highest_layer->ag_list.GetSize() > 0) send_cluster_remove(my_highest_layer,false); delete my_highest_layer; // Send the join to new cluster layers.arr[m_ping_lid-1] = min_la->lower_layer; min_la->lower_layer = NULL; LayerInfo * new_layer = layers.arr[m_ping_lid-1]; bool check = new_layer->AddClusterMember(self_ag); assert (check == true); new_layer->me_in_layer = true; assert (new_layer->root != NULL); assert ( (new_layer->root->ag.agent_id == min_la->ag.agent_id) /* && (new_layer->root->ag.node_id == min_la->ag.node_id) */); assert (min_la->dist >= 0.0); new_layer->root->dist = min_la->dist; AppPacket *pkt = new AppPacket(JOIN_QUERY); init_join_query_packet(pkt,m_ping_lid-1,/* Scheduler::my_clock() */ my_clock(),true, udp_recv_agent_addr); send_pkt_wrapper(pkt,new_layer->root->ag.agent_id, new_layer->root->ag.agent_addr); new_layer->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT); new_layer->cr_chkt.SetTimer(CONST_CLUSTER_REFRESH_CHECK_TIMEOUT); assert (state == ATTACHED); } } cancel_higher_layer_ping_timers(); set_higher_layer_ping_timer(); return ret_val;}void coopAgent::handle_data_ack_packet (AppPacket *ap) { assert (ap->st == PACKET_DATA_ACK);#ifdef LOG_DATA_PACKET_RECV printf ("[coop %d ] At %8.4f recv ack-pkt : < [ %d ] > seq %d : from < [ %d ] >\n", id, /* Scheduler::my_clock() */ my_clock(), ap->u.dataack_p.original_src.agent_id, ap->u.dataack_p.seq_no, ap->src_agent);#endif return;}int coopAgent::handle_data_packet (AppPacket *ap) { assert (ap->st == PACKET_DATA);#ifdef LOG_DATA_PACKET_RECV printf ("[coop %d ] At %8.4f recv data-pkt : < [ %d ] > seq %d : from < [ %d ] >\n", id, my_clock(), ap->u.data_p.original_src.agent_id, ap->u.data_p.seq_no, ap->src_agent);#endif /* Send an ack back */ AppPacket *new_ack_p = new AppPacket(PACKET_DATA_ACK); new_ack_p->u.dataack_p.seq_no = ap->u.data_p.seq_no; new_ack_p->u.dataack_p.original_src.agent_id = ap->u.data_p.original_src.agent_id; memcpy((void*)&(new_ack_p->u.dataack_p.original_src.agent_addr), (void*)&(ap->u.data_p.original_src.agent_addr), sizeof(struct sockaddr_in)); send_pkt_wrapper(new_ack_p, ap->src_agent, ap->src_agent_addr); if (m_use_packet_cache == true) { if (add_to_packet_cache(ap->u.data_p.original_src.agent_id, ap->u.data_p.seq_no) == false) return 1; } int this_lid = ap->u.data_p.lid; LayerInfo *l = layers.arr[this_lid]; /* The 3 if conditions below enforce strict checking of packet source */ if (l != NULL) { if (l->me_in_layer == true) { // Somewhat looser checking of data packet forwarding // if (l->ag_list.Locate(ap->src_agent) != NULL) { forward_data_packet(ap->u.data_p.payload, ap->u.data_p.data_len, ap->u.data_p.original_src.agent_id, ap->u.data_p.original_src.agent_addr, ap->u.data_p.seq_no, ap->u.data_p.base_lid, ap->src_agent, false); // } } } return 0;}//modifiedvoid coopAgent::forward_data_packet (char* payload, int data_len, int src_aid, struct sockaddr_in src_agent_addr, int src_seqno, int base_lid, int pred_aid, bool is_source) { int count_pkts = 0; for (int i = 0; i < MAX_LAYERS; i++) { LayerInfo * l = layers.arr[i]; if (l != NULL) { if (l->me_in_layer == true) { // Forward to this layer only if predecessor not in this layer if (l->ag_list.Locate(pred_aid) == NULL) { if (l->lid >= base_lid) count_pkts += send_data_pkt_to_layer(payload, data_len, l,src_aid,base_lid,src_agent_addr,src_seqno); } } } } if (state == JOIN) { if ( (curr_join_q_lid < MAX_LAYERS) && (curr_join_q_lid > 0) ) { LayerInfo * q_l = layers.arr[curr_join_q_lid]; assert (q_l != NULL); assert (q_l->me_in_layer == false); if (q_l->lid >= base_lid) count_pkts += send_data_pkt_to_layer(payload, data_len, q_l,src_aid,base_lid,src_agent_addr,src_seqno); } if (curr_join_q_lid < 0) { // Starting join query at BSE // For join to higher layers, we probably already have the higher // target layer from the previous leader which left the cluster or // did a root xfer to me. Can use that as a fall back option. // This should not be for layer 0. LayerInfo * target_layer = layers.arr[target_join_q_lid]; if (target_layer != NULL) { assert (target_layer->me_in_layer == false); if (target_layer->lid >= base_lid) count_pkts += send_data_pkt_to_layer(payload, data_len, target_layer,src_aid,base_lid,src_agent_addr,src_seqno); } } } if (is_source == true) { AppPacket *ap = new AppPacket(PACKET_DATA); ap->u.data_p.original_src.agent_id = src_aid; // ap->u.data_p.original_src.node_id = src_nid; memcpy(& ap->u.data_p.original_src.agent_addr, & src_agent_addr, sizeof(struct sockaddr_in)); ap->u.data_p.seq_no = src_seqno; ap->u.data_p.lid = -1; //ap->u.data_p.data_len = data_len; // Put the data in here?? do we need to? //memcpy(ap->u.data_p.payload#ifdef LOG_DATA_PACKET_SEND // KCR-CHANGE //printf ("[coop %d ] At %8.4f send data-pkt : < [ %d ] > seq %d : to < [ %d ] > bse\n", id, /* Scheduler::my_clock() */ my_clock(), src_aid, src_seqno, bse.agent_id);#endif //send_pkt_wrapper(ap,bse.agent_id, bse.agent_addr); count_pkts ++; }#ifdef LOG_DATA_PACKET_SEND_COUNT if (count_pkts > 0) printf ("[coop %d ] At %8.4f fwd-data count %d for < [ %d ]> seq-no %d\n",id, /* Scheduler::my_clock() */ my_clock(), count_pkts, src_aid, src_seqno);#endif return;}int coopAgent::send_data_pkt_to_layer (char* payload, int data_len, LayerInfo * l, int src_aid, int base_lid, struct sockaddr_in src_agent_addr, int src_seqno) { int count_pkts = 0; for (void * pos = l->ag_list.GetHeadPosition(); pos != NULL; l->ag_list.GetNext(pos) ) { LayerAgentInfo * this_la = l->ag_list.GetAt(pos); if (self_check(this_la) == false) { if ( ( (bse.agent_id != this_la->ag.agent_id) /* || (bse.node_id != this_la->ag.node_id) */) && ( ( (this_la->ag.agent_id != src_aid) /*|| (this_la->ag.node_id != src_nid) */) ) ) { AppPacket *ap = new AppPacket(PACKET_DATA); ap->u.data_p.original_src.agent_id = src_aid; // ap->u.data_p.original_src.node_id = src_nid; memcpy(& ap->u.data_p.original_src.agent_addr, & src_agent_addr, sizeof(struct sockaddr_in)); ap->u.data_p.seq_no = src_seqno; ap->u.data_p.lid = l->lid; ap->u.data_p.base_lid = base_lid; ap->u.data_p.data_len = data_len; ap->u.data_p.payload = (char *)malloc(data_len); memcpy(ap->u.data_p.payload, payload, data_len);#ifdef LOG_DATA_PACKET_SEND printf ("[coop %d ] At %8.4f send data-pkt : < [ %d ] > seq %d : to < [ %d ] > lid %d dist_est %f\n", id, /* Scheduler::my_clock() */ my_clock(), src_aid, src_seqno, this_la->ag.agent_id, l->lid, this_la->dist);#endif send_pkt_wrapper(ap,this_la->ag.agent_id, this_la->ag.agent_addr); count_pkts ++; } } } return count_pkts;}/* Have to be the cluster root to accept this merge *//* If we want to accept cluster merge at non-root agents, then * we have to change the mechanism of the merge, i.e. the merging * root should be able to first figure out which is the correct * root, and then advise the children appropriately of its * change to be the new correct root. */bool coopAgent::valid_cluster_merge_packet (AppPacket * ap) { LayerInfo *l = layers.arr[ap->u.clustermerge_p.layer_id]; if (l == NULL) return false; if (l->me_in_layer == false) return false; if (self_check(l->root) == false) return false; return true;}bool coopAgent::valid_cluster_refresh_packet (AppPacket * ap) { LayerInfo * l = layers.arr[ap->u.clusterrefresh_p.layer_id]; if (l == NULL) return false; if (l->me_in_layer == false) return false; void * pos = l->ag_list.Locate(ap->src_agent); if (pos == NULL) return false; return true;}bool coopAgent::match_cluster_to_root_info (LayerInfo *l) { bool has_change = false; for (void * pos = l->ag_list.GetHeadPosition(); pos != NULL; l->ag_list.GetNext(pos) ) { LayerAgentInfo * la_ag = l->ag_list.GetAt(pos); la_ag->valid_tmp = false; } for (int i = 0; i < l->root->agent_dist_arr_count; i++) { void * pos = l->ag_list.Locate(l->root->agent_dist_arr[i].ag.agent_id); LayerAgentInfo * this_la; if (pos == NULL) { this_la = new LayerAgentInfo (l->root->agent_dist_arr[i].ag.agent_id,l->root->agent_dist_arr[i].ag.agent_addr); assert (self_check(this_la) == false); // Cannot have self unknown
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -