📄 bse-agent.cc
字号:
return 0; for (void * pos = top_layer->ag_list.GetHeadPosition(); pos != NULL; top_layer->ag_list.GetNext(pos) ) { LayerAgentInfo * this_la = top_layer->ag_list.GetAt(pos); if (this_la->ag.agent_id == ap->u.data_p.original_src.agent_id) continue; if (self_check(this_la) == false) { AppPacket *new_p = new AppPacket(PACKET_DATA); new_p->u.data_p.original_src.agent_id = ap->u.data_p.original_src.agent_id; memcpy ( & new_p->u.data_p.original_src.agent_addr, & ap->u.data_p.original_src.agent_addr, sizeof(struct sockaddr_in)); new_p->u.data_p.seq_no = ap->u.data_p.seq_no; new_p->u.data_p.lid = top_layer->lid; new_p->u.data_p.base_lid = ap->u.data_p.base_lid;#ifdef LOG_DATA_PACKET_SEND // KCR-CHANGE //printf ("[bse %d ] At %8.4f send data-pkt : < [ %d ] > seq %d : to < [ %d ] > lid %d dist_est %f \n", id, /* Scheduler::Clock */ my_clock(), ap->u.data_p.original_src.agent_id, ap->u.data_p.seq_no, this_la->ag.agent_id, top_layer->lid, this_la->dist);#endif //send_pkt_wrapper(new_p,this_la->ag.agent_id, this_la->ag.agent_addr); delete new_p; } } fflush(0); return 0;} void bseAgent::handle_cluster_refresh_msg_timeout (void) { assert (top_layer != NULL);#ifdef LOG_BSE_SEND printf ("\n[bse %d ] At %8.4f : refresh-timeout: lid %d\n", id, /* Scheduler::Clock */ my_clock(), top_layer->lid);#endif send_top_layer_cluster_refresh(NULL); top_layer->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT); fflush(0); return;}void bseAgent::send_top_layer_cluster_refresh (LayerAgentInfo * ignore_agent) { assert (top_layer != NULL);#ifdef LOG_BSE_SEND printf ("\n[bse %d ] At %8.4f (b-scr) : send-refresh : lid %d\n", id, /* Scheduler::Clock */ my_clock(), top_layer->lid);#endif for (void * pos = top_layer->ag_list.GetHeadPosition(); pos != NULL; top_layer->ag_list.GetNext(pos) ) { LayerAgentInfo * la = top_layer->ag_list.GetAt(pos); if (ignore_agent != NULL) { if (ignore_agent->ag.agent_id == la->ag.agent_id) continue; } if (self_check(la) == false) {#ifdef LOG_BSE_SEND printf ("[bse %d ] . . (b-scr) : < [ %d ] >\n", id, la->ag.agent_id);#endif AppPacket *ap = new AppPacket (CLUSTER_REFRESH); put_layer_cluster_info_into_packet(top_layer,ap); ap->u.clusterrefresh_p.root_xfer = false; ap->u.clusterrefresh_p.is_root = true; send_pkt_wrapper(ap,la->ag.agent_id,la->ag.agent_addr); } }#ifdef LOG_BSE_SEND printf ("\n");#endif fflush(0); return;}void bseAgent::handle_cluster_refresh_check_timeout (void) { bool change = false; assert (top_layer != NULL);#ifdef LOG_BSE_JUNK printf ("[bse %d] At %8.4f (b-rc) : refresh-chk : lid %d\n", id, /* Scheduler::Clock */ my_clock(), top_layer->lid);#endif fflush(0); for (void * pos = top_layer->ag_list.GetHeadPosition(); pos != NULL; ) { LayerAgentInfo * la_ag = top_layer->ag_list.GetAt(pos); void * old_pos = pos; top_layer->ag_list.GetNext(pos); if (self_check(la_ag) == true) continue; if (la_ag->refresh == false) { /* Member is lost */ top_layer->ag_list.RemoveAt(old_pos);#ifdef LOG_BSE_JUNK printf ("[bse %d ] . . (b-rc) : < [ %d ] > *lost*\n", id, la_ag->ag.agent_id);#endif delete la_ag; change = true; } else { la_ag->refresh = false; // Resetting the refresh flag#ifdef LOG_BSE_JUNK printf ("[bse %d ] . . (b-rc) : < [ %d ] >\n", id, la_ag->ag.agent_id);#endif } }#ifdef LOG_BSE_JUNK printf ("\n");#endif assert (top_layer->ag_list.GetSize() >= 1); if (top_layer->ag_list.GetSize() == 1) { // I am the only one if (top_layer->lid > 0) {#ifdef LOG_BSE_CLUSTER_CHANGE printf ("[bse %d ] At %8.4f cluster-info : lid %d : deleted\n", id,/* Scheduler::Clock */ my_clock(), top_layer->lid);#endif top_layer->lid --;#ifdef LOG_BSE_JUNK printf ("[bse %d ] At %8.4f refresh-chk layer-decrement to lid %d\n", id, /* Scheduler::Clock */ my_clock(), top_layer->lid);#endif change = true; } } if (top_layer->ag_list.GetSize() >= (UPPER_3K+1)) { split_top_layer_using_two_partition(); change = true; } if (change == true) { log_cluster_change_info ();#ifdef LOG_BSE_CLUSTER_CHANGE display_top_layer_info();#endif } top_layer->cr_chkt.SetTimer(CONST_CLUSTER_REFRESH_CHECK_TIMEOUT); fflush(0); return;}void bseAgent::split_top_layer_using_two_partition (void) {#ifdef LOG_BSE_JUNK printf ("[bse %d ] At %8.4f cluster-split : lid %d\n", id, /* Scheduler::Clock */ my_clock(), top_layer->lid);#endif LayerAgentInfo ** ag_arr = top_layer->CreateLayerAgentInfoArray(); assert (ag_arr != NULL); double * self_included_cost = create_cost_matrix(id,ag_arr,top_layer->ag_list.GetSize()); // This function already updates the agent_arr by deleting this agent double * cost = delete_agent_from_cost_matrix(self_included_cost,ag_arr,top_layer->ag_list.GetSize(),id); free(self_included_cost); void * pos = top_layer->ag_list.Locate(id); assert (pos != NULL); LayerAgentInfo * self = top_layer->ag_list.GetAt(pos); bool check = top_layer->DeleteClusterMember(self); assert (check == true); int root1_index, root2_index; int * index_set1; int * index_set2; int set1_size, set2_size; // Partition the set of members int lower_size = top_layer->ag_list.GetSize() / 2; two_partition(top_layer->ag_list.GetSize(),cost,lower_size,index_set1,set1_size,root1_index,index_set2,set2_size,root2_index); free(cost);#ifdef LOG_BSE_PARTITION printf ("[bse %d ] . . cluster-split : Partition 1 : ", id); for (int i = 0; i < set1_size; i++) printf ("%d ", ag_arr[index_set1[i]]->ag.agent_id); printf("\n"); printf ("[bse %d ] . . cluster-split : Partition 2 : ", id); for (int i = 0; i < set2_size; i++) printf ("%d ", ag_arr[index_set2[i]]->ag.agent_id); printf("\n");#endif LayerInfo * tmp_layer1 = new LayerInfo(top_layer->lid,NULL); LayerInfo * tmp_layer2 = new LayerInfo(top_layer->lid,NULL); for (int i = 0; i < set1_size; i++) { top_layer->DeleteClusterMember(ag_arr[index_set1[i]]); bool success = tmp_layer1->AddClusterMember(ag_arr[index_set1[i]]); assert (success == true); } tmp_layer1->root = ag_arr[root1_index]; for (int i = 0; i < set2_size; i++) { top_layer->DeleteClusterMember(ag_arr[index_set2[i]]); bool success = tmp_layer2->AddClusterMember(ag_arr[index_set2[i]]); assert (success == true); } tmp_layer2->root = ag_arr[root2_index]; free(index_set1); free(index_set2);#ifdef LOG_BSE_PARTITION printf ("[bse %d ] At %8.4f cluster-split : root1 < [ %d ] > count %d root2 < [ %d ] > count %d\n", id, /* Scheduler::Clock */ my_clock(), ag_arr[root1_index]->ag.agent_id, set1_size, ag_arr[root2_index]->ag.agent_id, set2_size);#endif free(ag_arr); assert (top_layer->ag_list.GetSize() == 0); // Transfer control of the other cluster send_cluster_remove(tmp_layer1); send_cluster_remove(tmp_layer2); LayerAgentInfo * new_root1 = new LayerAgentInfo(tmp_layer1->root->ag.agent_id,tmp_layer1->root->ag.agent_addr); LayerAgentInfo * new_root2 = new LayerAgentInfo(tmp_layer2->root->ag.agent_id,tmp_layer2->root->ag.agent_addr); // Cleanup ... delete tmp_layer1; delete tmp_layer2; assert (top_layer->lid + 1 < MAX_LAYERS);#ifdef LOG_BSE_CLUSTER_CHANGE printf ("[bse %d ] At %8.4f cluster-info : lid %d : deleted\n", id, /* Scheduler::Clock */ my_clock(), top_layer->lid);#endif #ifdef LOG_BSE_JUNK printf ("[bse %d ] At %8.4f layer-increment : lid %d\n", id, /* Scheduler::Clock */ my_clock(), top_layer->lid+1);#endif // Setup the next higher layer top_layer->lid ++; top_layer->AddClusterRoot(self); top_layer->me_in_layer = true; top_layer->AddClusterMember(new_root1); top_layer->AddClusterMember(new_root2); fflush(0); return;}void bseAgent::send_cluster_remove (LayerInfo * l) {#ifdef LOG_BSE_SEND printf ("[bse %d ] At %8.4f (b-scx) : self-cluster-remove : lid %d : new-root < [ %d ] >\n", id, /* Scheduler::Clock */ my_clock(), top_layer->lid, l->root->ag.agent_id);#endif for (void * pos = l->ag_list.GetHeadPosition(); pos != NULL; l->ag_list.GetNext(pos) ) { LayerAgentInfo * la = l->ag_list.GetAt(pos); assert (self_check(la) == false); AppPacket *ap = new AppPacket (CLUSTER_REFRESH); put_cluster_remove_info_into_packet(l,ap,true,NULL,id,udp_recv_agent_addr);#ifdef LOG_BSE_SEND printf ("[bse %d ] . . (b-scx) : < [ %d ] >\n", id, la->ag.agent_id);#endif send_pkt_wrapper(ap,la->ag.agent_id,la->ag.agent_addr); }#ifdef LOG_BSE_SEND printf ("\n");#endif fflush(0); return;}void bseAgent::log_cluster_change_info (void) { m_last_change = /* Scheduler::Clock */ my_clock(); return;}void bseAgent::display_top_layer_info (void) { assert (top_layer != NULL); printf ("[bse %d ] At %8.4f cluster-info : lid %d : ldr %s %d : count %d : ", id, /* Scheduler::Clock */ my_clock(), top_layer->lid, (self_check(top_layer->root) == true) ? "self" : "other", top_layer->root->ag.agent_id, top_layer->ag_list.GetSize() );#ifdef LOG_CLUSTER_INFO_DETAILED struct_pkt_reset(); struct_pkt_set_size(top_layer->ag_list.GetSize()); struct_pkt_set_lid(top_layer->lid); struct_pkt_set_ldr(top_layer->root->ag.agent_id); pkt_info.my_id = 0; for (void * pos = top_layer->ag_list.GetHeadPosition(); pos != NULL; top_layer->ag_list.GetNext(pos) ) { int tid = (top_layer->ag_list.GetAt(pos))->ag.agent_id; printf (" %d", tid); struct_pkt_add_member(tid); } send_struct_pkt();#endif printf ("\n"); fflush(0); return;}bool bseAgent::self_check (LayerAgentInfo * la) { if (la->ag.agent_id == id) return true; else return false;}//PRMint bseAgent::send_data_pkt_to_layer (LayerInfo * l, int src_seqno) { int count_pkts = 0; double now = /*Scheduler::Clock();*/my_clock(); m_clock_cache[src_seqno % CLOCK_CACHE_SIZE] = now; //fprintf(stderr, "%d\n", src_seqno); //do some bitmap stuff here int i, base; char bitmap[MAX_BITMAP]; for ( i = 0, base = src_seqno - BITMAP_SIZE; i < BITMAP_SIZE; i++ ) { if ( base+i < 0 ) continue; bitmap[i] = 1; //i am the source } for (void * pos = l->ag_list.GetHeadPosition(); pos != NULL; l->ag_list.GetNext(pos) ) { LayerAgentInfo * this_la = l->ag_list.GetAt(pos); if ( this_la->ag.agent_id != this->id ) { AppPacket *ap = new AppPacket(PACKET_DATA); ap->u.data_p.original_src.agent_id = this->id; //ap->u.data_p.original_src.node_id = this->n->id; memcpy(&ap->u.data_p.original_src.agent_addr, &this->udp_recv_agent_addr, sizeof(struct sockaddr_in)); ap->u.data_p.seq_no = src_seqno; ap->u.data_p.lid = l->lid; ap->u.data_p.type = FRESH_DATA; ap->u.data_p.orig_time = now; memcpy( ap->u.data_p.bitmap, bitmap, BITMAP_SIZE * sizeof (char) );#ifdef LOG_DATA_PACKET_SEND /* printf ("[coop %d %d ] At %8.4f send data-pkt : < [ %d %d ] > seq %d : to < [ %d %d ] > lid %d\n", id, n->id, Scheduler::Clock(), src_aid, src_nid, src_seqno, this_la->ag.agent_id, this_la->ag.node_id, l->lid);*/#endif LOG_DATA_PACKET_SEND //send_pkt_wrapper(ap,this_la->ag.agent_id, this_la->ag.node_id); send_pkt_wrapper(ap,this_la->ag.agent_id,this_la->ag.agent_addr); count_pkts ++; } } return count_pkts;}//PRMvoid bseAgent::specific_send_data_pkt() { assert (started == true);/*#ifdef LOG_DATA_PACKET_INIT printf ("[coop %d %d ] At %8.4f source data-pkt : seq %d\n", id, n->id, Scheduler::Clock(), m_data_pkt_seq);#endif LOG_DATA_PACKET_INIT*/ /* THe following should be used as stats only (NOT SURE) if ( m_data_pkt_seq < CUT_OFF ) up_per_pkt[m_data_pkt_seq] = UpCount; */ if ( top_layer != NULL ) send_data_pkt_to_layer ( top_layer, m_data_pkt_seq++); /* if ( top_layer && m_data_pkt_seq%1000 == 1 ) fprintf(stderr, "bse: first packet top_layer = %d\n", top_layer->lid ); */ return;}void bseAgent::handle_retransmit_request( AppPacket *ap ) { int seq = ap->u.retransmit_p.seq_no; AppPacket *nap; PacketAgentInfo *pai = &ap->u.retransmit_p.src; char *bitmap = ap->u.retransmit_p.bitmap; int cseq; if ( pai->agent_id != id ) return; for ( int i = 0 ; i < BITMAP_SIZE; i++ ) { if ( (cseq = seq+i-BITMAP_SIZE ) < 0 ) continue; if ( bitmap[i] && cseq < m_data_pkt_seq && cseq >= m_data_pkt_seq - CLOCK_CACHE_SIZE) { nap = new AppPacket(PACKET_DATA); // some changes nap->src_agent = this->id; nap->src_agent_addr = this->udp_recv_agent_addr; // nap->u.data_p.original_src = *pai; nap->u.data_p.seq_no = cseq; nap->u.data_p.lid = PROACTIVE_SEND_LID; nap->u.data_p.type = PROACTIVE_DATA; nap->u.data_p.orig_time = m_clock_cache[cseq % CLOCK_CACHE_SIZE]; //send_pkt_wrapper(nap, ap->src_agent, ap->src ); send_pkt_wrapper(nap, ap->src_agent, ap->src_agent_addr ); } }}//PRM-added
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -