📄 bse-agent.cc
字号:
/************************************************** * File: bse-agent.cc Author: Suman Banerjee <suman@cs.umd.edu> Date: July 31, 2001 Terms: GPL NICE implementation in myns This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. * **************************************************/#include <stdio.h>#include <stdlib.h>#include <assert.h>#include "nicenode.h"#include <scheduler.h>#include "app-packet.h"#include "bse-agent.h"#include "setpartition.h"#include "common.h"#define LOG_BSE_START#define LOG_BSE_STOP#undef LOG_BSE_SEND#undef LOG_BSE_RECV#undef LOG_BSE_JUNK#undef LOG_BSE_PARTITION#define LOG_BSE_CLUSTER_CHANGE#undef LOG_CLUSTER_INFO_DETAILED#define LOG_DATA_PACKET_SEND#define LOG_DATA_PACKET_RECVvoid bseAgent::init (int Id, int Index, Node *N) { Agent::init(Id,Index,N); t = AGENT_APPLICATION_BSE; top_layer = NULL; return;}void bseAgent::start (void) { Agent::start();#ifdef LOG_BSE_START printf ("[bse %d %d ] At %8.4f start\n", id, n->id, Scheduler::Clock());#endif // LOG_BSE_START reset_layer_info(0); log_cluster_change_info ();#ifdef LOG_BSE_CLUSTER_CHANGE display_top_layer_info();#endif // LOG_BSE_CLUSTER_CHANGE return;}void bseAgent::stop (void) {#ifdef LOG_BSE_STOP printf ("[bse %d %d ] At %8.4f stop\n", id, n->id, Scheduler::Clock());#endif // LOG_BSE_STOP assert (started == true); if (top_layer != NULL) delete top_layer; Agent::stop(); return;}void bseAgent::specific_rx_pkt_handler (Packet *p) { if (p->t != PACKET_APP) { printf ("[Err] BSE received unknown packet type\n"); return; } AppPacket *ap = (AppPacket *)p; rx_pkt_wrapper(ap); switch (ap->st) { case JOIN_QUERY : handle_join_query(ap); break; case CLUSTER_REFRESH : handle_cluster_refresh(ap); break; case CLUSTER_MERGE : handle_cluster_merge(ap); break; case JOIN_FORWARD : printf ("[Err] BSE received Join-forward, ignored\n"); break; case PING_QUERY : case PING_RESPONSE : printf ("[Err] BSE received ping, ignored\n"); break; case PACKET_DATA : handle_data_packet(ap); break; default : printf ("[Err] BSE received unknown packet subtype\n"); } return;}void bseAgent::reset_layer_info (int top_lid) { if (top_layer != NULL) { top_layer->MakeEmpty(); top_layer->lid = top_lid; // delete top_layer;#ifdef LOG_BSE_CLUSTER_CHANGE printf ("[bse %d %d ] At %8.4f cluster-info : lid %d : deleted\n", id, n->id, Scheduler::Clock(), top_layer->lid);#endif // LOG_BSE_CLUSTER_CHANGE } else top_layer = new LayerInfo(top_lid,this);#ifdef LOG_BSE_JUNK printf ("[bse %d %d ] At %8.4f reset-layer : lid %d\n", id, n->id, Scheduler::Clock(), top_lid);#endif // LOG_BSE_JUNK LayerAgentInfo * self = new LayerAgentInfo (id,n->id); self->dist = 0.0; top_layer->AddClusterRoot(self); top_layer->me_in_layer = true; top_layer->cr_chkt.CancelTimer(); top_layer->cr_msgt.CancelTimer(); top_layer->cr_chkt.SetTimer(CONST_CLUSTER_REFRESH_CHECK_TIMEOUT); top_layer->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT); return;}void bseAgent::handle_join_query (AppPacket *p) { assert (p->st == JOIN_QUERY); // || (p->st == JOIN_FORWARD) ); /* Join forward not accepted right now. To do so, changes needed: See BSE-JF change in coop-agent.cc */ int qlid; AgentInfo src_ag; AgentInfo original_dst; double src_time; if (p->st == JOIN_QUERY) { qlid = p->u.joinq_p.q_lid; src_ag.agent_id = p->src_agent; src_ag.node_id = p->src; original_dst.agent_id = id; original_dst.node_id = n->id; src_time = p->u.joinq_p.send_time; } else { qlid = p->u.joinforward_p.q_lid; src_ag.agent_id = p->u.joinforward_p.src_ag.agent_id; src_ag.node_id = p->u.joinforward_p.src_ag.node_id; original_dst.agent_id = p->u.joinforward_p.original_dst.agent_id; original_dst.node_id = p->u.joinforward_p.original_dst.node_id; src_time = p->u.joinforward_p.send_time; } #ifdef LOG_BSE_RECV printf ("[bse %d %d ] At %8.4f recvd-join-query : < [ %d %d ] > lid %d ( top lid %d )\n", id, n->id, Scheduler::Clock(), src_ag.agent_id, src_ag.node_id, qlid, top_layer->lid);#endif // LOG_BSE_RECV if (top_layer->lid < qlid) { // Someone claims to need to join to a higher layer than I know of reset_layer_info(qlid); } bool do_attach = false; LayerAgentInfo * new_member = NULL; if (top_layer->lid == qlid) { do_attach = true; new_member = new LayerAgentInfo(src_ag.agent_id, src_ag.node_id); bool is_new_member = top_layer->AddClusterMember(new_member); if (is_new_member == false) { delete new_member; new_member = top_layer->FindClusterMember(src_ag.agent_id); new_member->refresh = true; } else { log_cluster_change_info ();#ifdef LOG_BSE_CLUSTER_CHANGE display_top_layer_info();#endif // LOG_BSE_CLUSTER_CHANGE } new_member->dist = Scheduler::Clock() - src_time; //if (is_new_member == false) // return; } /* Create response packet */ AppPacket *resp_p = new AppPacket(JOIN_RESPONSE); put_layer_cluster_info_into_packet(top_layer,resp_p); resp_p->u.joinresp_p.accept = true; resp_p->u.joinresp_p.exp_src.agent_id = original_dst.agent_id; resp_p->u.joinresp_p.exp_src.node_id = original_dst.node_id; send_pkt_wrapper(resp_p,src_ag.agent_id, src_ag.node_id); if (do_attach == true) {#ifdef LOG_BSE_SEND printf ("\n[bse %d %d ] At %8.4f : new-member < [ %d %d ] >\n", id, n->id, Scheduler::Clock(), new_member->ag.agent_id, new_member->ag.node_id);#endif // LOG_BSE_SEND top_layer->cr_msgt.CancelTimer(); send_top_layer_cluster_refresh(new_member); top_layer->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT); } return;}// Accept the merge only if this is the only member in the top layer// or if there is no membervoid bseAgent::handle_cluster_merge (AppPacket * ap) { if ( top_layer->lid != ap->u.clustermerge_p.layer_id ) return; if ( (top_layer->ag_list.GetSize() == 1) || ( (top_layer->ag_list.GetSize() == 2) && (top_layer->ag_list.Locate(ap->src_agent) != NULL) ) ) {#ifdef LOG_BSE_RECV printf ("[bse %d %d ] At %8.4f recvd-valid-merge : < [ %d %d ] > lid %d\n", id, n->id, Scheduler::Clock(), ap->src_agent, ap->src, ap->u.clustermerge_p.layer_id);#endif // LOG_BSE_RECV reset_layer_info(ap->u.clustermerge_p.layer_id); top_layer->AddExtraMembersFromAltRootPacket(ap); log_cluster_change_info ();#ifdef LOG_BSE_CLUSTER_CHANGE display_top_layer_info();#endif // LOG_BSE_CLUSTER_CHANGE } return;}void bseAgent::handle_cluster_refresh (AppPacket *ap) { if (top_layer->lid != ap->u.clusterrefresh_p.layer_id) return; void * pos = top_layer->ag_list.Locate(ap->src_agent); if (pos == NULL) return;#ifdef LOG_BSE_RECV printf ("[bse %d %d ] At %8.4f recvd-valid-refresh : < [ %d %d ] > lid %d ", id, n->id, Scheduler::Clock(), ap->src_agent, ap->src, top_layer->lid);#endif // LOG_BSE_RECV LayerAgentInfo *la_ag = top_layer->ag_list.GetAt(pos); if (ap->u.clusterrefresh_p.cluster_remove == true) {#ifdef LOG_BSE_RECV printf ("*remove*\n");#endif // LOG_BSE_RECV bool check = top_layer->DeleteClusterMember(la_ag); assert (check == true); delete la_ag; log_cluster_change_info ();#ifdef LOG_BSE_CLUSTER_CHANGE display_top_layer_info();#endif // LOG_BSE_CLUSTER_CHANGE } else {#ifdef LOG_BSE_RECV printf ("\n");#endif // LOG_BSE_RECV la_ag->refresh_agent(ap); } return;}void bseAgent::handle_data_packet (AppPacket *ap) { assert (ap->st == PACKET_DATA); assert ( (ap->u.data_p.original_src.agent_id == ap->src_agent) && (ap->u.data_p.original_src.node_id == ap->src) );#ifdef LOG_DATA_PACKET_RECV printf ("[bse %d %d ] At %8.4f fwd data-pkt : < [ %d %d ] > seq %d\n", id, n->id, Scheduler::Clock(), ap->u.data_p.original_src.agent_id, ap->u.data_p.original_src.node_id, ap->u.data_p.seq_no);#endif // LOG_DATA_PACKET_RECV assert (top_layer != NULL);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -