📄 bse-agent.cc,v
字号:
#ifdef LOG_BSE_RECV printf ("[bse %d ] At %8.4f recvd-valid-refresh : < [ %d ] > lid %d ", id, /* Scheduler::Clock */ my_clock(), ap->src_agent, top_layer->lid);#endif LayerAgentInfo *la_ag = top_layer->ag_list.GetAt(pos); if (ap->u.clusterrefresh_p.cluster_remove == true) {#ifdef LOG_BSE_RECV printf ("*remove*\n");#endif bool check = top_layer->DeleteClusterMember(la_ag); assert (check == true); delete la_ag; log_cluster_change_info ();#ifdef LOG_BSE_CLUSTER_CHANGE display_top_layer_info();#endif } else {#ifdef LOG_BSE_RECV printf ("\n");#endif la_ag->refresh_agent(ap); }fflush(0); return;}void bseAgent::handle_data_ack_packet (AppPacket *ap) { assert (ap->st == PACKET_DATA_ACK);#ifdef LOG_DATA_PACKET_RECV printf ("[bse %d ] At %8.4f recv ack-pkt : < [ %d ] > seq %d from < [ %d ] >\n", id, /* Scheduler::Clock */ my_clock(), ap->u.dataack_p.original_src.agent_id, ap->u.dataack_p.seq_no, ap->src_agent);#endif return;}int bseAgent::handle_data_packet (AppPacket *ap) { assert (ap->st == PACKET_DATA); assert (ap->u.data_p.original_src.agent_id == ap->src_agent);#ifdef LOG_DATA_PACKET_RECV printf ("[bse %d ] At %8.4f recv data-pkt : < [ %d ] > seq %d from < [ %d ] >\n", id, /* Scheduler::Clock */ my_clock(), ap->u.data_p.original_src.agent_id, ap->u.data_p.seq_no, ap->src_agent);#endif assert (top_layer != NULL);/* Send an ack back */ AppPacket *new_ack_p = new AppPacket(PACKET_DATA_ACK); new_ack_p->u.dataack_p.seq_no = ap->u.data_p.seq_no; new_ack_p->u.dataack_p.original_src.agent_id = ap->u.data_p.original_src.agent_id; memcpy((void*)&(new_ack_p->u.dataack_p.original_src.agent_addr), (void*)&(ap->u.data_p.original_src.agent_addr), sizeof(struct sockaddr_in)); send_pkt_wrapper(new_ack_p, ap->src_agent, ap->src_agent_addr); fflush(0); if (top_layer->lid < ap->u.data_p.base_lid) return 0; for (void * pos = top_layer->ag_list.GetHeadPosition(); pos != NULL; top_layer->ag_list.GetNext(pos) ) { LayerAgentInfo * this_la = top_layer->ag_list.GetAt(pos); if (this_la->ag.agent_id == ap->u.data_p.original_src.agent_id) continue; if (self_check(this_la) == false) { AppPacket *new_p = new AppPacket(PACKET_DATA); new_p->u.data_p.original_src.agent_id = ap->u.data_p.original_src.agent_id; memcpy ( & new_p->u.data_p.original_src.agent_addr, & ap->u.data_p.original_src.agent_addr, sizeof(struct sockaddr_in)); new_p->u.data_p.seq_no = ap->u.data_p.seq_no; new_p->u.data_p.lid = top_layer->lid; new_p->u.data_p.base_lid = ap->u.data_p.base_lid;#ifdef LOG_DATA_PACKET_SEND // KCR-CHANGE //printf ("[bse %d ] At %8.4f send data-pkt : < [ %d ] > seq %d : to < [ %d ] > lid %d dist_est %f \n", id, /* Scheduler::Clock */ my_clock(), ap->u.data_p.original_src.agent_id, ap->u.data_p.seq_no, this_la->ag.agent_id, top_layer->lid, this_la->dist);#endif //send_pkt_wrapper(new_p,this_la->ag.agent_id, this_la->ag.agent_addr); delete new_p; } } fflush(0); return 0;} void bseAgent::handle_cluster_refresh_msg_timeout (void) { assert (top_layer != NULL);#ifdef LOG_BSE_SEND printf ("\n[bse %d ] At %8.4f : refresh-timeout: lid %d\n", id, /* Scheduler::Clock */ my_clock(), top_layer->lid);#endif send_top_layer_cluster_refresh(NULL); top_layer->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT); fflush(0); return;}void bseAgent::send_top_layer_cluster_refresh (LayerAgentInfo * ignore_agent) { assert (top_layer != NULL);#ifdef LOG_BSE_SEND printf ("\n[bse %d ] At %8.4f (b-scr) : send-refresh : lid %d\n", id, /* Scheduler::Clock */ my_clock(), top_layer->lid);#endif for (void * pos = top_layer->ag_list.GetHeadPosition(); pos != NULL; top_layer->ag_list.GetNext(pos) ) { LayerAgentInfo * la = top_layer->ag_list.GetAt(pos); if (ignore_agent != NULL) { if (ignore_agent->ag.agent_id == la->ag.agent_id) continue; } if (self_check(la) == false) {#ifdef LOG_BSE_SEND printf ("[bse %d ] . . (b-scr) : < [ %d ] >\n", id, la->ag.agent_id);#endif AppPacket *ap = new AppPacket (CLUSTER_REFRESH); put_layer_cluster_info_into_packet(top_layer,ap); ap->u.clusterrefresh_p.root_xfer = false; ap->u.clusterrefresh_p.is_root = true; send_pkt_wrapper(ap,la->ag.agent_id,la->ag.agent_addr); } }#ifdef LOG_BSE_SEND printf ("\n");#endif fflush(0); return;}void bseAgent::handle_cluster_refresh_check_timeout (void) { bool change = false; assert (top_layer != NULL);#ifdef LOG_BSE_JUNK printf ("[bse %d] At %8.4f (b-rc) : refresh-chk : lid %d\n", id, /* Scheduler::Clock */ my_clock(), top_layer->lid);#endif fflush(0); for (void * pos = top_layer->ag_list.GetHeadPosition(); pos != NULL; ) { LayerAgentInfo * la_ag = top_layer->ag_list.GetAt(pos); void * old_pos = pos; top_layer->ag_list.GetNext(pos); if (self_check(la_ag) == true) continue; if (la_ag->refresh == false) { /* Member is lost */ top_layer->ag_list.RemoveAt(old_pos);#ifdef LOG_BSE_JUNK printf ("[bse %d ] . . (b-rc) : < [ %d ] > *lost*\n", id, la_ag->ag.agent_id);#endif delete la_ag; change = true; } else { la_ag->refresh = false; // Resetting the refresh flag#ifdef LOG_BSE_JUNK printf ("[bse %d ] . . (b-rc) : < [ %d ] >\n", id, la_ag->ag.agent_id);#endif } }#ifdef LOG_BSE_JUNK printf ("\n");#endif assert (top_layer->ag_list.GetSize() >= 1); if (top_layer->ag_list.GetSize() == 1) { // I am the only one if (top_layer->lid > 0) {#ifdef LOG_BSE_CLUSTER_CHANGE printf ("[bse %d ] At %8.4f cluster-info : lid %d : deleted\n", id,/* Scheduler::Clock */ my_clock(), top_layer->lid);#endif top_layer->lid --;#ifdef LOG_BSE_JUNK printf ("[bse %d ] At %8.4f refresh-chk layer-decrement to lid %d\n", id, /* Scheduler::Clock */ my_clock(), top_layer->lid);#endif change = true; } } if (top_layer->ag_list.GetSize() >= (UPPER_3K+1)) { split_top_layer_using_two_partition(); change = true; } if (change == true) { log_cluster_change_info ();#ifdef LOG_BSE_CLUSTER_CHANGE display_top_layer_info();#endif } top_layer->cr_chkt.SetTimer(CONST_CLUSTER_REFRESH_CHECK_TIMEOUT); fflush(0); return;}void bseAgent::split_top_layer_using_two_partition (void) {#ifdef LOG_BSE_JUNK printf ("[bse %d ] At %8.4f cluster-split : lid %d\n", id, /* Scheduler::Clock */ my_clock(), top_layer->lid);#endif LayerAgentInfo ** ag_arr = top_layer->CreateLayerAgentInfoArray(); assert (ag_arr != NULL); double * self_included_cost = create_cost_matrix(id,ag_arr,top_layer->ag_list.GetSize()); // This function already updates the agent_arr by deleting this agent double * cost = delete_agent_from_cost_matrix(self_included_cost,ag_arr,top_layer->ag_list.GetSize(),id); free(self_included_cost); void * pos = top_layer->ag_list.Locate(id); assert (pos != NULL); LayerAgentInfo * self = top_layer->ag_list.GetAt(pos); bool check = top_layer->DeleteClusterMember(self); assert (check == true); int root1_index, root2_index; int * index_set1; int * index_set2; int set1_size, set2_size; // Partition the set of members int lower_size = top_layer->ag_list.GetSize() / 2; two_partition(top_layer->ag_list.GetSize(),cost,lower_size,index_set1,set1_size,root1_index,index_set2,set2_size,root2_index); free(cost);#ifdef LOG_BSE_PARTITION printf ("[bse %d ] . . cluster-split : Partition 1 : ", id); for (int i = 0; i < set1_size; i++) printf ("%d ", ag_arr[index_set1[i]]->ag.agent_id); printf("\n"); printf ("[bse %d ] . . cluster-split : Partition 2 : ", id); for (int i = 0; i < set2_size; i++) printf ("%d ", ag_arr[index_set2[i]]->ag.agent_id); printf("\n");#endif LayerInfo * tmp_layer1 = new LayerInfo(top_layer->lid,NULL); LayerInfo * tmp_layer2 = new LayerInfo(top_layer->lid,NULL); for (int i = 0; i < set1_size; i++) { top_layer->DeleteClusterMember(ag_arr[index_set1[i]]); bool success = tmp_layer1->AddClusterMember(ag_arr[index_set1[i]]); assert (success == true); } tmp_layer1->root = ag_arr[root1_index]; for (int i = 0; i < set2_size; i++) { top_layer->DeleteClusterMember(ag_arr[index_set2[i]]); bool success = tmp_layer2->AddClusterMember(ag_arr[index_set2[i]]); assert (success == true); } tmp_layer2->root = ag_arr[root2_index]; free(index_set1); free(index_set2);#ifdef LOG_BSE_PARTITION printf ("[bse %d ] At %8.4f cluster-split : root1 < [ %d ] > count %d root2 < [ %d ] > count %d\n", id, /* Scheduler::Clock */ my_clock(), ag_arr[root1_index]->ag.agent_id, set1_size, ag_arr[root2_index]->ag.agent_id, set2_size);#endif free(ag_arr); assert (top_layer->ag_list.GetSize() == 0); // Transfer control of the other cluster send_cluster_remove(tmp_layer1); send_cluster_remove(tmp_layer2); LayerAgentInfo * new_root1 = new LayerAgentInfo(tmp_layer1->root->ag.agent_id,tmp_layer1->root->ag.agent_addr); LayerAgentInfo * new_root2 = new LayerAgentInfo(tmp_layer2->root->ag.agent_id,tmp_layer2->root->ag.agent_addr); // Cleanup ... delete tmp_layer1; delete tmp_layer2; assert (top_layer->lid + 1 < MAX_LAYERS);#ifdef LOG_BSE_CLUSTER_CHANGE printf ("[bse %d ] At %8.4f cluster-info : lid %d : deleted\n", id, /* Scheduler::Clock */ my_clock(), top_layer->lid);#endif #ifdef LOG_BSE_JUNK printf ("[bse %d ] At %8.4f layer-increment : lid %d\n", id, /* Scheduler::Clock */ my_clock(), top_layer->lid+1);#endif // Setup the next higher layer top_layer->lid ++; top_layer->AddClusterRoot(self); top_layer->me_in_layer = true; top_layer->AddClusterMember(new_root1); top_layer->AddClusterMember(new_root2); fflush(0); return;}void bseAgent::send_cluster_remove (LayerInfo * l) {#ifdef LOG_BSE_SEND printf ("[bse %d ] At %8.4f (b-scx) : self-cluster-remove : lid %d : new-root < [ %d ] >\n", id, /* Scheduler::Clock */ my_clock(), top_layer->lid, l->root->ag.agent_id);#endif for (void * pos = l->ag_list.GetHeadPosition(); pos != NULL; l->ag_list.GetNext(pos) ) { LayerAgentInfo * la = l->ag_list.GetAt(pos); assert (self_check(la) == false); AppPacket *ap = new AppPacket (CLUSTER_REFRESH); put_cluster_remove_info_into_packet(l,ap,true,NULL,id,udp_recv_agent_addr);#ifdef LOG_BSE_SEND printf ("[bse %d ] . . (b-scx) : < [ %d ] >\n", id, la->ag.agent_id);#endif send_pkt_wrapper(ap,la->ag.agent_id,la->ag.agent_addr); }#ifdef LOG_BSE_SEND printf ("\n");#endif fflush(0); return;}void bseAgent::log_cluster_change_info (void) { m_last_change = /* Scheduler::Clock */ my_clock(); return;}void bseAgent::display_top_layer_info (void) { assert (top_layer != NULL); printf ("[bse %d ] At %8.4f cluster-info : lid %d : ldr %s %d : count %d : ", id, /* Scheduler::Clock */ my_clock(), top_layer->lid, (self_check(top_layer->root) == true) ? "self" : "other", top_layer->root->ag.agent_id, top_layer->ag_list.GetSize() );#ifdef LOG_CLUSTER_INFO_DETAILED for (void * pos = top_layer->ag_list.GetHeadPosition(); pos != NULL; top_layer->ag_list.GetNext(pos) ) { printf (" %d", (top_layer->ag_list.GetAt(pos))->ag.agent_id); }#endif printf ("\n"); fflush(0); return;}bool bseAgent::self_check (LayerAgentInfo * la) { if (la->ag.agent_id == id) return true; else return false;}@1.1log@Initial revision@text@d28 1a28 1#include "talker.h"d41 2a42 1#undef LOG_CLUSTER_INFO_DETAILEDd437 1@
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -