⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 coop-agent.cc

📁 模拟器提供了一个简单易用的平台
💻 CC
📖 第 1 页 / 共 5 页
字号:
/************************************************** *  File: coop-agent.cc  Author: Suman Banerjee <suman@cs.umd.edu>  Date: July 31, 2001  Terms: GPL  NICE implementation in myns  This program is distributed in the hope that it will be useful,  but WITHOUT ANY WARRANTY; without even the implied warranty of  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. * **************************************************/#include <stdio.h>#include <stdlib.h>#include <assert.h>#undef LOG_COOP_SEND#undef LOG_COOP_RECV#undef LOG_COOP_JUNK#undef LOG_COOP_RECV_2#undef LOG_COOP_JUNK_PING#define LOG_COOP_JUNK_PING_SWITCH#define LOG_COOP_START#define LOG_COOP_STOP#define LOG_COOP_CLUSTER_CHANGE#undef  LOG_COOP_CLUSTER_PERIODIC#undef LOG_CLUSTER_INFO_DETAILED#define LOG_DATA_PACKET_INIT#define LOG_DATA_PACKET_RECV#define LOG_DATA_PACKET_SEND#undef LOG_DATA_PACKET_SEND_COUNT#define TESTING_JUNK#include "nicenode.h"#include <scheduler.h>#include "app-packet.h"#include "coop-agent.h"#include "setpartition.h"#include "common.h"bool global_use_packet_cache = true;coopAgent::coopAgent (void) : commonAgent () {  jqt.ca = this;  m_hlprt.ca = this;  m_hlpit.ca = this;}coopAgent::~coopAgent (void) {  if (started == true) {    init_layer_arr();    jqt.CancelTimer();    cancel_higher_layer_ping_timers();  }}void coopAgent::init (int Id, int Index, Node *N) {  Agent::init(Id,Index,N);  t = AGENT_APPLICATION_COOP;  for (int i = 0; i < MAX_LAYERS; i++) {    layers.arr[i] = NULL;  }  m_data_pkt_seq = 0;  for (int i = 0; i < MAX_LAYERS; i++)    m_last_change[i] = -1.0;  return;}void coopAgent::start (void) {  Agent::start();  if (bse.agent_id == -1) {    printf ("[Err] BSE is not known to agent %d\n", id);    exit (-1);  }  state = INIT;#ifdef LOG_COOP_START  printf ("[coop %d %d ] At %8.4f start\n", id, n->id, Scheduler::Clock());#endif // LOG_COOP_START  init_layer_arr();  init_join_query(0);  init_packet_cache();  m_use_packet_cache = global_use_packet_cache;  m_ping_in_progress = false;  return;}/* dst_join_lid < 0 => that attempt to join the same layer as before */void coopAgent::init_join_query (int dst_join_lid) {  if (dst_join_lid >= 0) {    if (state == JOIN) {      if (target_join_q_lid > dst_join_lid)	target_join_q_lid = dst_join_lid;      return;    }  }  else {    assert (state == JOIN);  }  assert (m_ping_in_progress == false);#ifdef LOG_COOP_JUNK  printf ("[coop %d %d ] At %8.4f init-join-query lid %d\n", id, n->id, Scheduler::Clock(), dst_join_lid);#endif // LOG_COOP_JUNK  state = JOIN;  curr_join_q_lid = -1;  if (dst_join_lid >= 0)    target_join_q_lid = dst_join_lid;  AppPacket *p = new AppPacket(JOIN_QUERY);  init_join_query_packet(p,target_join_q_lid,Scheduler::Clock(),false);  send_pkt_wrapper(p,bse.agent_id,bse.node_id);  /* Set the timeout to resend the request if needed */  jqt.SetTimer(CONST_JOIN_QUERY_TIMEOUT);  return;}void coopAgent::init_layer_arr (void) {  for (int i = 0; i < MAX_LAYERS; i++) {    if (layers.arr[i] != NULL) {      delete layers.arr[i];      layers.arr[i] = NULL;    }  }  return;}void coopAgent::stop (void) {  assert (started == true);#ifdef LOG_COOP_STOP  printf ("[coop %d %d ] At %8.4f stop\n", id, n->id, Scheduler::Clock());#endif // LOG_COOP_STOP  jqt.CancelTimer();  cancel_higher_layer_ping_timers();  init_layer_arr();  Agent::stop();  return;}void coopAgent::specific_send_data_pkt (void) {  assert (started == true);#ifdef LOG_DATA_PACKET_INIT  printf ("[coop %d %d ] At %8.4f source data-pkt : seq %d\n", id, n->id, Scheduler::Clock(), m_data_pkt_seq);#endif // LOG_DATA_PACKET_INIT  forward_data_packet(id, n->id, m_data_pkt_seq ++,-1,-1,true);  return;}void coopAgent::specific_rx_pkt_handler  (Packet *p) {  if (p->t != PACKET_APP) {     printf ("[Err] COOP %d received unknown packet type\n", id);     return;  }  AppPacket *ap = (AppPacket *)p;  rx_pkt_wrapper(ap);  switch (ap->st) {  case JOIN_QUERY :  case JOIN_FORWARD :    handle_join_query_forward(ap);    break;  case JOIN_RESPONSE :    handle_join_response(ap);    break;  case CLUSTER_REFRESH :    handle_cluster_refresh(ap);    break;  case CLUSTER_MERGE :    handle_cluster_merge(ap);    break;  case PING_QUERY :    handle_ping_query(ap);    break;  case PING_RESPONSE :    handle_ping_response(ap);    break;  case PACKET_DATA :    handle_data_packet(ap);    break;  default :    printf ("[Err] COOP %d received unknown packet subtype\n", id);  }  return;}void coopAgent::handle_join_query_forward (AppPacket * ap) {  assert ( (ap->st == JOIN_QUERY) || (ap->st == JOIN_FORWARD) );  int qlid;  double src_time;  bool attach;  AgentInfo src_ag;  AgentInfo original_dst;  if (ap->st == JOIN_QUERY) {    qlid = ap->u.joinq_p.q_lid;    attach = ap->u.joinq_p.attach;    src_ag.agent_id = ap->src_agent;    src_ag.node_id = ap->src;    original_dst.agent_id = id;    original_dst.node_id = n->id;    src_time = ap->u.joinq_p.send_time;  }  else {    qlid = ap->u.joinforward_p.q_lid;    attach = ap->u.joinforward_p.attach;    src_ag.agent_id = ap->u.joinforward_p.src_ag.agent_id;    src_ag.node_id = ap->u.joinforward_p.src_ag.node_id;    original_dst.agent_id = ap->u.joinforward_p.original_dst.agent_id;    original_dst.node_id = ap->u.joinforward_p.original_dst.node_id;    src_time = ap->u.joinforward_p.send_time;  }  int required_qlid;  if (attach == true)    required_qlid = qlid;  else    required_qlid = qlid - 1;  /* First check if this agent should send a response at all */  if (valid_join_query_forward_packet(qlid,attach) == false) {    AppPacket *resp_p = new AppPacket(JOIN_RESPONSE);    resp_p->u.joinresp_p.layer_id = required_qlid;    resp_p->u.joinresp_p.accept = false;    resp_p->u.joinresp_p.mbr_count = 0;    resp_p->u.joinresp_p.exp_src.agent_id = original_dst.agent_id;    resp_p->u.joinresp_p.exp_src.node_id = original_dst.node_id;    send_pkt_wrapper(resp_p,src_ag.agent_id, src_ag.node_id);    return;  }#ifdef LOG_COOP_JUNK  printf ("[coop %d %d ] At %8.4f (c-jqf) recvd-valid-join-qf : lid %d from < [ %d %d ] > exp-src < [ %d %d ] >\n", id, n->id, Scheduler::Clock(), qlid,ap->src_agent,ap->src, src_ag.agent_id, src_ag.node_id);#endif // LOG_COOP_JUNK  LayerInfo * this_layer = layers.arr[required_qlid];  if ( self_check(this_layer->root) == false) {    /* If I am not the cluster root, then forward this to cluster root */    AppPacket *fwd_p = new AppPacket(JOIN_FORWARD);    put_info_into_join_forward_packet (src_ag,qlid,original_dst,src_time,attach,fwd_p);    send_pkt_wrapper(fwd_p,this_layer->root->ag.agent_id, this_layer->root->ag.node_id);#ifdef LOG_COOP_JUNK    printf ("[coop %d %d ] . . (c-jqf) : fwd to root < [ %d %d ] >\n", id, n->id, this_layer->root->ag.agent_id, this_layer->root->ag.node_id);#endif // LOG_COOP_JUNK    return;  }  if (attach == false) {    void * pos = this_layer->ag_list.Locate(src_ag.agent_id);    if (pos != NULL) {#ifdef LOG_COOP_JUNK    printf ("[coop %d %d ] . . (c-jqf) : dup delete\n", id, n->id);#endif // LOG_COOP_JUNK      LayerAgentInfo * this_la = this_layer->ag_list.GetAt(pos);      assert (this_la != NULL);      this_layer->ag_list.RemoveAt(pos);      delete this_la;      log_cluster_change_info(this_layer->lid);#ifdef LOG_COOP_CLUSTER_CHANGE      display_cluster_info(this_layer,this_layer->lid);#endif // LOG_COOP_CLUSTER_CHANGE    }    /* Create response packet */    AppPacket *resp_p = new AppPacket(JOIN_RESPONSE);    put_layer_cluster_info_into_packet(this_layer,resp_p);    resp_p->u.joinresp_p.accept = true;    resp_p->u.joinresp_p.exp_src.agent_id = original_dst.agent_id;    resp_p->u.joinresp_p.exp_src.node_id = original_dst.node_id;    send_pkt_wrapper(resp_p,src_ag.agent_id, src_ag.node_id);#ifdef LOG_COOP_JUNK    printf ("[coop %d %d ] . . (c-jqf) : sent response\n", id, n->id);#endif // LOG_COOP_JUNK  }  else { /* Attach appropriately to the cluster */      LayerAgentInfo *la_ag = new LayerAgentInfo(src_ag.agent_id,src_ag.node_id);      if (this_layer->AddClusterMember(la_ag) == false) {	delete la_ag;	la_ag = this_layer->FindClusterMember(src_ag.agent_id);	la_ag->refresh = true;      }      // if (ap->st == JOIN_QUERY) { // and not JOIN_FORWARD      la_ag->dist = Scheduler::Clock() - src_time;      // }#ifdef LOG_COOP_JUNK      printf ("[coop %d %d ] . . (c-jqf) : *attach*\n", id, n->id);#endif // LOG_COOP_JUNK      // Immediate update of new member joining the cluster      this_layer->cr_msgt.CancelTimer();      send_all_cluster_refresh_packet(this_layer,false,true,NULL);      this_layer->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT);  }  return;}void coopAgent::split_cluster_using_two_partition (LayerInfo * l) {  LayerAgentInfo ** ag_arr = l->CreateLayerAgentInfoArray();  assert (ag_arr != NULL);  double * cost = create_cost_matrix(id,n->id,ag_arr,l->ag_list.GetSize());  int root1_index, root2_index;  int * index_set1 = NULL;  int * index_set2 = NULL;  int set1_size, set2_size;  // Partition the set of members  int lower_size = l->ag_list.GetSize() / 2;  two_partition(l->ag_list.GetSize(),cost,lower_size,index_set1,set1_size,root1_index,index_set2,set2_size,root2_index);  assert (index_set1 != NULL);  assert (index_set2 != NULL);  free(cost);  // After the split, cluster1 is my cluster, cluster2 is the other cluster  LayerAgentInfo ** cluster1 = (LayerAgentInfo **) safe_malloc (sizeof (LayerAgentInfo *) * set1_size);  bool need_to_swap = true;  for (int i = 0; i < set1_size; i++) {    cluster1[i] = ag_arr[index_set1[i]];    if (self_check(cluster1[i]) == true)      need_to_swap = false;  }  LayerAgentInfo ** cluster2 = (LayerAgentInfo **) safe_malloc (sizeof (LayerAgentInfo *) * set2_size);  for (int i = 0; i < set2_size; i++)    cluster2[i] = ag_arr[index_set2[i]];  LayerAgentInfo * c1_root = ag_arr[root1_index];  LayerAgentInfo * c2_root = ag_arr[root2_index];  if (need_to_swap == true) {    LayerAgentInfo ** tmp_cl = cluster1;    cluster1 = cluster2;    cluster2 = tmp_cl;    int tmp_cl_size = set1_size;    set1_size = set2_size;    set2_size = tmp_cl_size;    LayerAgentInfo * tmp_root = c1_root;    c1_root = c2_root;    c2_root = tmp_root;  }  free(ag_arr);  free(index_set1);  free(index_set2);  // Purge my own cluster of members that are in the other cluster  // Create a temporary layer to send out the root transfer message  LayerInfo * tmp_layer = new LayerInfo (l->lid,this);  for (int i = 0; i < set2_size; i++) {    tmp_layer->AddClusterMember(cluster2[i]);    l->DeleteClusterMember(cluster2[i]);  }  tmp_layer->root = c2_root;#ifdef LOG_COOP_JUNK  printf ("[coop %d %d ] At %8.4f split-cluster-self lid %d\n", id, n->id, Scheduler::Clock(), l->lid);#endif // LOG_COOP_JUNK

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -