📄 coop-agent.cc
字号:
/************************************************** * File: coop-agent.cc Author: Suman Banerjee <suman@cs.umd.edu> Date: July 31, 2001 Terms: GPL NICE implementation in myns This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. * **************************************************/#include <stdio.h>#include <stdlib.h>#include <assert.h>#undef LOG_COOP_SEND#undef LOG_COOP_RECV#undef LOG_COOP_JUNK#undef LOG_COOP_RECV_2#undef LOG_COOP_JUNK_PING#define LOG_COOP_JUNK_PING_SWITCH#define LOG_COOP_START#define LOG_COOP_STOP#define LOG_COOP_CLUSTER_CHANGE#undef LOG_COOP_CLUSTER_PERIODIC#undef LOG_CLUSTER_INFO_DETAILED#define LOG_DATA_PACKET_INIT#define LOG_DATA_PACKET_RECV#define LOG_DATA_PACKET_SEND#undef LOG_DATA_PACKET_SEND_COUNT#define TESTING_JUNK#include "nicenode.h"#include <scheduler.h>#include "app-packet.h"#include "coop-agent.h"#include "setpartition.h"#include "common.h"bool global_use_packet_cache = true;coopAgent::coopAgent (void) : commonAgent () { jqt.ca = this; m_hlprt.ca = this; m_hlpit.ca = this;}coopAgent::~coopAgent (void) { if (started == true) { init_layer_arr(); jqt.CancelTimer(); cancel_higher_layer_ping_timers(); }}void coopAgent::init (int Id, int Index, Node *N) { Agent::init(Id,Index,N); t = AGENT_APPLICATION_COOP; for (int i = 0; i < MAX_LAYERS; i++) { layers.arr[i] = NULL; } m_data_pkt_seq = 0; for (int i = 0; i < MAX_LAYERS; i++) m_last_change[i] = -1.0; return;}void coopAgent::start (void) { Agent::start(); if (bse.agent_id == -1) { printf ("[Err] BSE is not known to agent %d\n", id); exit (-1); } state = INIT;#ifdef LOG_COOP_START printf ("[coop %d %d ] At %8.4f start\n", id, n->id, Scheduler::Clock());#endif // LOG_COOP_START init_layer_arr(); init_join_query(0); init_packet_cache(); m_use_packet_cache = global_use_packet_cache; m_ping_in_progress = false; return;}/* dst_join_lid < 0 => that attempt to join the same layer as before */void coopAgent::init_join_query (int dst_join_lid) { if (dst_join_lid >= 0) { if (state == JOIN) { if (target_join_q_lid > dst_join_lid) target_join_q_lid = dst_join_lid; return; } } else { assert (state == JOIN); } assert (m_ping_in_progress == false);#ifdef LOG_COOP_JUNK printf ("[coop %d %d ] At %8.4f init-join-query lid %d\n", id, n->id, Scheduler::Clock(), dst_join_lid);#endif // LOG_COOP_JUNK state = JOIN; curr_join_q_lid = -1; if (dst_join_lid >= 0) target_join_q_lid = dst_join_lid; AppPacket *p = new AppPacket(JOIN_QUERY); init_join_query_packet(p,target_join_q_lid,Scheduler::Clock(),false); send_pkt_wrapper(p,bse.agent_id,bse.node_id); /* Set the timeout to resend the request if needed */ jqt.SetTimer(CONST_JOIN_QUERY_TIMEOUT); return;}void coopAgent::init_layer_arr (void) { for (int i = 0; i < MAX_LAYERS; i++) { if (layers.arr[i] != NULL) { delete layers.arr[i]; layers.arr[i] = NULL; } } return;}void coopAgent::stop (void) { assert (started == true);#ifdef LOG_COOP_STOP printf ("[coop %d %d ] At %8.4f stop\n", id, n->id, Scheduler::Clock());#endif // LOG_COOP_STOP jqt.CancelTimer(); cancel_higher_layer_ping_timers(); init_layer_arr(); Agent::stop(); return;}void coopAgent::specific_send_data_pkt (void) { assert (started == true);#ifdef LOG_DATA_PACKET_INIT printf ("[coop %d %d ] At %8.4f source data-pkt : seq %d\n", id, n->id, Scheduler::Clock(), m_data_pkt_seq);#endif // LOG_DATA_PACKET_INIT forward_data_packet(id, n->id, m_data_pkt_seq ++,-1,-1,true); return;}void coopAgent::specific_rx_pkt_handler (Packet *p) { if (p->t != PACKET_APP) { printf ("[Err] COOP %d received unknown packet type\n", id); return; } AppPacket *ap = (AppPacket *)p; rx_pkt_wrapper(ap); switch (ap->st) { case JOIN_QUERY : case JOIN_FORWARD : handle_join_query_forward(ap); break; case JOIN_RESPONSE : handle_join_response(ap); break; case CLUSTER_REFRESH : handle_cluster_refresh(ap); break; case CLUSTER_MERGE : handle_cluster_merge(ap); break; case PING_QUERY : handle_ping_query(ap); break; case PING_RESPONSE : handle_ping_response(ap); break; case PACKET_DATA : handle_data_packet(ap); break; default : printf ("[Err] COOP %d received unknown packet subtype\n", id); } return;}void coopAgent::handle_join_query_forward (AppPacket * ap) { assert ( (ap->st == JOIN_QUERY) || (ap->st == JOIN_FORWARD) ); int qlid; double src_time; bool attach; AgentInfo src_ag; AgentInfo original_dst; if (ap->st == JOIN_QUERY) { qlid = ap->u.joinq_p.q_lid; attach = ap->u.joinq_p.attach; src_ag.agent_id = ap->src_agent; src_ag.node_id = ap->src; original_dst.agent_id = id; original_dst.node_id = n->id; src_time = ap->u.joinq_p.send_time; } else { qlid = ap->u.joinforward_p.q_lid; attach = ap->u.joinforward_p.attach; src_ag.agent_id = ap->u.joinforward_p.src_ag.agent_id; src_ag.node_id = ap->u.joinforward_p.src_ag.node_id; original_dst.agent_id = ap->u.joinforward_p.original_dst.agent_id; original_dst.node_id = ap->u.joinforward_p.original_dst.node_id; src_time = ap->u.joinforward_p.send_time; } int required_qlid; if (attach == true) required_qlid = qlid; else required_qlid = qlid - 1; /* First check if this agent should send a response at all */ if (valid_join_query_forward_packet(qlid,attach) == false) { AppPacket *resp_p = new AppPacket(JOIN_RESPONSE); resp_p->u.joinresp_p.layer_id = required_qlid; resp_p->u.joinresp_p.accept = false; resp_p->u.joinresp_p.mbr_count = 0; resp_p->u.joinresp_p.exp_src.agent_id = original_dst.agent_id; resp_p->u.joinresp_p.exp_src.node_id = original_dst.node_id; send_pkt_wrapper(resp_p,src_ag.agent_id, src_ag.node_id); return; }#ifdef LOG_COOP_JUNK printf ("[coop %d %d ] At %8.4f (c-jqf) recvd-valid-join-qf : lid %d from < [ %d %d ] > exp-src < [ %d %d ] >\n", id, n->id, Scheduler::Clock(), qlid,ap->src_agent,ap->src, src_ag.agent_id, src_ag.node_id);#endif // LOG_COOP_JUNK LayerInfo * this_layer = layers.arr[required_qlid]; if ( self_check(this_layer->root) == false) { /* If I am not the cluster root, then forward this to cluster root */ AppPacket *fwd_p = new AppPacket(JOIN_FORWARD); put_info_into_join_forward_packet (src_ag,qlid,original_dst,src_time,attach,fwd_p); send_pkt_wrapper(fwd_p,this_layer->root->ag.agent_id, this_layer->root->ag.node_id);#ifdef LOG_COOP_JUNK printf ("[coop %d %d ] . . (c-jqf) : fwd to root < [ %d %d ] >\n", id, n->id, this_layer->root->ag.agent_id, this_layer->root->ag.node_id);#endif // LOG_COOP_JUNK return; } if (attach == false) { void * pos = this_layer->ag_list.Locate(src_ag.agent_id); if (pos != NULL) {#ifdef LOG_COOP_JUNK printf ("[coop %d %d ] . . (c-jqf) : dup delete\n", id, n->id);#endif // LOG_COOP_JUNK LayerAgentInfo * this_la = this_layer->ag_list.GetAt(pos); assert (this_la != NULL); this_layer->ag_list.RemoveAt(pos); delete this_la; log_cluster_change_info(this_layer->lid);#ifdef LOG_COOP_CLUSTER_CHANGE display_cluster_info(this_layer,this_layer->lid);#endif // LOG_COOP_CLUSTER_CHANGE } /* Create response packet */ AppPacket *resp_p = new AppPacket(JOIN_RESPONSE); put_layer_cluster_info_into_packet(this_layer,resp_p); resp_p->u.joinresp_p.accept = true; resp_p->u.joinresp_p.exp_src.agent_id = original_dst.agent_id; resp_p->u.joinresp_p.exp_src.node_id = original_dst.node_id; send_pkt_wrapper(resp_p,src_ag.agent_id, src_ag.node_id);#ifdef LOG_COOP_JUNK printf ("[coop %d %d ] . . (c-jqf) : sent response\n", id, n->id);#endif // LOG_COOP_JUNK } else { /* Attach appropriately to the cluster */ LayerAgentInfo *la_ag = new LayerAgentInfo(src_ag.agent_id,src_ag.node_id); if (this_layer->AddClusterMember(la_ag) == false) { delete la_ag; la_ag = this_layer->FindClusterMember(src_ag.agent_id); la_ag->refresh = true; } // if (ap->st == JOIN_QUERY) { // and not JOIN_FORWARD la_ag->dist = Scheduler::Clock() - src_time; // }#ifdef LOG_COOP_JUNK printf ("[coop %d %d ] . . (c-jqf) : *attach*\n", id, n->id);#endif // LOG_COOP_JUNK // Immediate update of new member joining the cluster this_layer->cr_msgt.CancelTimer(); send_all_cluster_refresh_packet(this_layer,false,true,NULL); this_layer->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT); } return;}void coopAgent::split_cluster_using_two_partition (LayerInfo * l) { LayerAgentInfo ** ag_arr = l->CreateLayerAgentInfoArray(); assert (ag_arr != NULL); double * cost = create_cost_matrix(id,n->id,ag_arr,l->ag_list.GetSize()); int root1_index, root2_index; int * index_set1 = NULL; int * index_set2 = NULL; int set1_size, set2_size; // Partition the set of members int lower_size = l->ag_list.GetSize() / 2; two_partition(l->ag_list.GetSize(),cost,lower_size,index_set1,set1_size,root1_index,index_set2,set2_size,root2_index); assert (index_set1 != NULL); assert (index_set2 != NULL); free(cost); // After the split, cluster1 is my cluster, cluster2 is the other cluster LayerAgentInfo ** cluster1 = (LayerAgentInfo **) safe_malloc (sizeof (LayerAgentInfo *) * set1_size); bool need_to_swap = true; for (int i = 0; i < set1_size; i++) { cluster1[i] = ag_arr[index_set1[i]]; if (self_check(cluster1[i]) == true) need_to_swap = false; } LayerAgentInfo ** cluster2 = (LayerAgentInfo **) safe_malloc (sizeof (LayerAgentInfo *) * set2_size); for (int i = 0; i < set2_size; i++) cluster2[i] = ag_arr[index_set2[i]]; LayerAgentInfo * c1_root = ag_arr[root1_index]; LayerAgentInfo * c2_root = ag_arr[root2_index]; if (need_to_swap == true) { LayerAgentInfo ** tmp_cl = cluster1; cluster1 = cluster2; cluster2 = tmp_cl; int tmp_cl_size = set1_size; set1_size = set2_size; set2_size = tmp_cl_size; LayerAgentInfo * tmp_root = c1_root; c1_root = c2_root; c2_root = tmp_root; } free(ag_arr); free(index_set1); free(index_set2); // Purge my own cluster of members that are in the other cluster // Create a temporary layer to send out the root transfer message LayerInfo * tmp_layer = new LayerInfo (l->lid,this); for (int i = 0; i < set2_size; i++) { tmp_layer->AddClusterMember(cluster2[i]); l->DeleteClusterMember(cluster2[i]); } tmp_layer->root = c2_root;#ifdef LOG_COOP_JUNK printf ("[coop %d %d ] At %8.4f split-cluster-self lid %d\n", id, n->id, Scheduler::Clock(), l->lid);#endif // LOG_COOP_JUNK
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -