⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 filter_core.cc

📁 跑leach需要的
💻 CC
📖 第 1 页 / 共 3 页
字号:
// // filter_core.cc  : Main Diffusion program// authors         : Chalermek Intanagonwiwat and Fabio Silva//// Copyright (C) 2000-2003 by the University of Southern California// $Id: filter_core.cc,v 1.2 2004/01/08 23:05:53 haldar Exp $//// This program is free software; you can redistribute it and/or// modify it under the terms of the GNU General Public License,// version 2, as published by the Free Software Foundation.//// This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the// GNU General Public License for more details.//// You should have received a copy of the GNU General Public License along// with this program; if not, write to the Free Software Foundation, Inc.,// 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.////#include "filter_core.hh"#ifndef NS_DIFFUSIONDiffusionCoreAgent *agent;#endif // !NS_DIFFUSIONclass HashEntry {public:  bool dummy;  HashEntry() {     dummy  = false;  }};class NeighborEntry {public:  int32_t id;  struct timeval tmv;  NeighborEntry(int _id) : id(_id)  {    GetTime(&tmv);  }};int NeighborsTimeoutTimer::expire(){  agent_->neighborsTimeout();  return 0;}int FilterTimeoutTimer::expire(){  agent_->filterTimeout();  return 0;}int DiffusionStopTimer::expire(){  agent_->timeToStop();#ifndef NS_DIFFUSION  exit(0);#endif // !NS_DIFFUSION  // Never gets here !  return 0;}void DiffusionCoreAgent::timeToStop(){#ifdef STATS  char out_filename[100];  FILE *outfile = NULL;  if (stats_){    sprintf(out_filename, "/tmp/diffusion-%d.out", my_id_);    outfile = fopen(out_filename, "w");    if (outfile == NULL){      DiffPrint(DEBUG_ALWAYS,		"Diffusion Error: Cannot create %s\n", out_filename);      return;    }    stats_->printStats(stdout);    if (outfile){      stats_->printStats(outfile);      fclose(outfile);    }  }#endif // STATS}#ifndef NS_DIFFUSIONvoid signal_handler(int p){  agent->timeToStop();  exit(0);}void DiffusionCoreAgent::usage(char *s){  DiffPrint(DEBUG_ALWAYS, "Usage: %s [-d debug] [-f filename] [-t stoptime] [-v] [-h] [-p port]", s);#ifdef IO_LOG  DiffPrint(DEBUG_ALWAYS, " [-l]");#endif // IO_LOG#ifdef STATS  DiffPrint(DEBUG_ALWAYS, " [-s] [-i warm_up_time]");#endif // STATS  DiffPrint(DEBUG_ALWAYS, "\n\n");  DiffPrint(DEBUG_ALWAYS, "\t-d - Sets debug level (0-10)\n");  DiffPrint(DEBUG_ALWAYS, "\t-t - Stops after stoptime seconds\n");  DiffPrint(DEBUG_ALWAYS, "\t-f - Uses filename as the config file\n");  DiffPrint(DEBUG_ALWAYS, "\t-v - Prints diffusion version\n");  DiffPrint(DEBUG_ALWAYS, "\t-h - Prints this information\n");  DiffPrint(DEBUG_ALWAYS, "\t-p - Sets diffusion port to port\n");#ifdef IO_LOG  DiffPrint(DEBUG_ALWAYS, "\t-l - Turns on i/o logging\n");#endif // IO_LOG#ifdef STATS  DiffPrint(DEBUG_ALWAYS, "\t-s - Disables statistics\n");  DiffPrint(DEBUG_ALWAYS, "\t-i - Ignores traffic from the first warm_up_time seconds for stats\n");#endif // STATS  DiffPrint(DEBUG_ALWAYS, "\n");  exit(0);}void DiffusionCoreAgent::run(){  DeviceList::iterator device_itr;  DiffPacket in_pkt;  fd_set fds;  bool flag;  int status, max_sock, fd;  struct timeval tv;  // Main Select Loop  while (1){    // Wait for incoming packets    FD_ZERO(&fds);    max_sock = 0;    // Figure out how much time to wait    timers_manager_->nextTimerTime(&tv);    if (tv.tv_sec == 0 && tv.tv_usec == 0){      // Timer has expired !      timers_manager_->executeAllExpiredTimers();      continue;    }    for (device_itr = in_devices_.begin();	 device_itr != in_devices_.end(); ++device_itr){      (*device_itr)->addInFDS(&fds, &max_sock);    }    status = select(max_sock+1, &fds, NULL, NULL, &tv);    if (status == 0){      // We process all expired timers      timers_manager_->executeAllExpiredTimers();    }    // Check for new packets    if (status > 0){      do{	flag = false;	for (device_itr = in_devices_.begin();	     device_itr != in_devices_.end(); ++device_itr){	  fd = (*device_itr)->checkInFDS(&fds);	  if (fd != -1){	    // Message waiting	    in_pkt = (*device_itr)->recvPacket(fd);	    if (in_pkt)	      recvPacket(in_pkt);	    // Clear this fd	    FD_CLR(fd, &fds);	    status--;	    flag = true;	  }	}      } while ((status > 0) && (flag == true));    }    // This should not happen    if (status < 0){      DiffPrint(DEBUG_IMPORTANT, "Select returned %d\n", status);    }  }}#endif // !NS_DIFFUSIONvoid DiffusionCoreAgent::neighborsTimeout(){  struct timeval tmv;  NeighborEntry *neighbor_entry;  NeighborList::iterator neighbor_itr;  DiffPrint(DEBUG_MORE_DETAILS, "Neighbors Timeout !\n");  GetTime(&tmv);  neighbor_itr = neighbor_list_.begin();  while(neighbor_itr != neighbor_list_.end()){    neighbor_entry = *neighbor_itr;    if (tmv.tv_sec > neighbor_entry->tmv.tv_sec + NEIGHBORS_TIMEOUT){      // This neighbor expired      neighbor_itr = neighbor_list_.erase(neighbor_itr);      delete neighbor_entry;    }    else{      neighbor_itr++;    }  }}void DiffusionCoreAgent::filterTimeout(){  struct timeval tmv;  FilterEntry *filter_entry;  FilterList::iterator filter_itr;  DiffPrint(DEBUG_MORE_DETAILS, "Filter Timeout !\n");  GetTime(&tmv);  filter_itr = filter_list_.begin();  while(filter_itr != filter_list_.end()){    filter_entry = *filter_itr;    if (tmv.tv_sec > filter_entry->tmv_.tv_sec + FILTER_TIMEOUT){      // This filter expired      DiffPrint(DEBUG_NO_DETAILS, "Filter %d, %d, %d timed out !\n",		filter_entry->agent_, filter_entry->handle_,		filter_entry->priority_);      filter_itr = filter_list_.erase(filter_itr);      delete filter_entry;    }    else{      filter_itr++;    }  }}void DiffusionCoreAgent::sendMessage(Message *msg){  Tcl_HashEntry *tcl_hash_entry;  unsigned int key[2];  Message *send_message;    send_message = new Message(DIFFUSION_VERSION, msg->msg_type_, diffusion_port_,			     0, 0, msg->pkt_num_, msg->rdm_id_,			     msg->next_hop_, 0);  send_message->msg_attr_vec_ = CopyAttrs(msg->msg_attr_vec_);  send_message->num_attr_ = send_message->msg_attr_vec_->size();  send_message->data_len_ = CalculateSize(send_message->msg_attr_vec_);  // Adjust message size for logging and check hash  key[0] = msg->pkt_num_;  key[1] = msg->rdm_id_;  tcl_hash_entry = Tcl_FindHashEntry(&htable_, (char *) key);  if (tcl_hash_entry)    msg->new_message_ = 0;  else    msg->new_message_ = 1;  send_message->new_message_ = msg->new_message_;  // Check if message goes to an agent or the network  if (msg->next_port_){    // Message goes to an agent    send_message->last_hop_ = LOCALHOST_ADDR;    // If it's a local message, it has to go to a local agent    if (send_message->next_hop_ != LOCALHOST_ADDR){      DiffPrint(DEBUG_ALWAYS, "Error: Message destination is a local agent but next_hop != LOCALHOST_ADDR !\n");      delete send_message;      return;    }    // Send the message to the agent specified    sendMessageToLibrary(send_message, msg->next_port_);  }  else{    // Message goes to the network    send_message->last_hop_ = my_id_;#ifdef STATS    if (stats_)      stats_->logOutgoingMessage(send_message);#endif // STATS    // Add message to the hash table          if (tcl_hash_entry == NULL)      putHash(key[0], key[1]);    else      DiffPrint(DEBUG_DETAILS, "Node%d: Message being sent is an old message !\n", my_id_);    // Send Message    sendMessageToNetwork(send_message);  }  delete send_message;}void DiffusionCoreAgent::forwardMessage(Message *msg, FilterEntry *filter_entry){  RedirectMessage *original_hdr;  NRAttribute *original_header_attr;  Message *send_message;  // Create an attribute with the original header  original_hdr = new RedirectMessage(msg->new_message_, msg->msg_type_,				     msg->source_port_, msg->data_len_,				     msg->num_attr_, msg->rdm_id_,				     msg->pkt_num_, msg->next_hop_,				     msg->last_hop_, filter_entry->handle_,				     msg->next_port_);  original_header_attr = OriginalHdrAttr.make(NRAttribute::IS,					      (void *)original_hdr,					      sizeof(RedirectMessage));  send_message = new Message(DIFFUSION_VERSION, REDIRECT, diffusion_port_, 0,			     0, pkt_count_, random_id_, LOCALHOST_ADDR, my_id_);  // Increment pkt_counter  pkt_count_++;  // Duplicate the message's attributes  send_message->msg_attr_vec_ = CopyAttrs(msg->msg_attr_vec_);    // Add the extra attribute  send_message->msg_attr_vec_->push_back(original_header_attr);  send_message->num_attr_ = send_message->msg_attr_vec_->size();  send_message->data_len_ = CalculateSize(send_message->msg_attr_vec_);  sendMessageToLibrary(send_message, filter_entry->agent_);  delete send_message;  delete original_hdr;}#ifndef NS_DIFFUSIONvoid DiffusionCoreAgent::sendMessageToLibrary(Message *msg, u_int16_t agent_id){  DiffPacket out_pkt = NULL;  struct hdr_diff *dfh;  int len;  char *pos;  out_pkt = AllocateBuffer(msg->msg_attr_vec_);  dfh = HDR_DIFF(out_pkt);  pos = (char *) out_pkt;  pos = pos + sizeof(struct hdr_diff);  len = PackAttrs(msg->msg_attr_vec_, pos);  LAST_HOP(dfh) = htonl(msg->last_hop_);  NEXT_HOP(dfh) = htonl(msg->next_hop_);  DIFF_VER(dfh) = msg->version_;  MSG_TYPE(dfh) = msg->msg_type_;  DATA_LEN(dfh) = htons(len);  PKT_NUM(dfh) = htonl(msg->pkt_num_);  RDM_ID(dfh) = htonl(msg->rdm_id_);  NUM_ATTR(dfh) = htons(msg->num_attr_);  SRC_PORT(dfh) = htons(msg->source_port_);  sendPacketToLibrary(out_pkt, sizeof(struct hdr_diff) + len, agent_id);  delete [] out_pkt;}#elsevoid DiffusionCoreAgent::sendMessageToLibrary(Message *msg, u_int16_t agent_id){  Message *send_message;  DeviceList::iterator device_itr;  int len;  send_message = CopyMessage(msg);  len = CalculateSize(send_message->msg_attr_vec_);  len = len + sizeof(struct hdr_diff);  for (device_itr = local_out_devices_.begin();       device_itr != local_out_devices_.end(); ++device_itr){    (*device_itr)->sendPacket((DiffPacket) send_message, len, agent_id);  }}#endif // !NS_DIFFUSION#ifndef NS_DIFFUSIONvoid DiffusionCoreAgent::sendMessageToNetwork(Message *msg){  DiffPacket out_pkt = NULL;  struct hdr_diff *dfh;  int len;  char *pos;  out_pkt = AllocateBuffer(msg->msg_attr_vec_);  dfh = HDR_DIFF(out_pkt);  pos = (char *) out_pkt;  pos = pos + sizeof(struct hdr_diff);  len = PackAttrs(msg->msg_attr_vec_, pos);  LAST_HOP(dfh) = htonl(msg->last_hop_);  NEXT_HOP(dfh) = htonl(msg->next_hop_);  DIFF_VER(dfh) = msg->version_;  MSG_TYPE(dfh) = msg->msg_type_;  DATA_LEN(dfh) = htons(len);  PKT_NUM(dfh) = htonl(msg->pkt_num_);  RDM_ID(dfh) = htonl(msg->rdm_id_);  NUM_ATTR(dfh) = htons(msg->num_attr_);  SRC_PORT(dfh) = htons(msg->source_port_);  sendPacketToNetwork(out_pkt, sizeof(struct hdr_diff) + len, msg->next_hop_);  delete [] out_pkt;}#elsevoid DiffusionCoreAgent::sendMessageToNetwork(Message *msg){  Message *send_message;  int len;  int32_t dst;  DeviceList::iterator device_itr;  send_message = CopyMessage(msg);  len = CalculateSize(send_message->msg_attr_vec_);  len = len + sizeof(struct hdr_diff);  dst = send_message->next_hop_;  for (device_itr = out_devices_.begin();       device_itr != out_devices_.end(); ++device_itr){    (*device_itr)->sendPacket((DiffPacket) send_message, len, dst);  }}#endif // !NS_DIFFUSIONvoid DiffusionCoreAgent::sendPacketToLibrary(DiffPacket pkt, int len,					     u_int16_t dst){  DeviceList::iterator device_itr;  for (device_itr = local_out_devices_.begin();       device_itr != local_out_devices_.end(); ++device_itr){    (*device_itr)->sendPacket(pkt, len, dst);  }}void DiffusionCoreAgent::sendPacketToNetwork(DiffPacket pkt, int len, int dst){

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -