⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 two_phase_pull.cc

📁 跑leach需要的
💻 CC
📖 第 1 页 / 共 3 页
字号:
//// two_phase_pull.cc  : Two-Phase Pull/One-Phase Push Filter// author             : Fabio Silva and Chalermek Intanagonwiwat//// Copyright (C) 2000-2003 by the University of Southern California// $Id: two_phase_pull.cc,v 1.5 2004/01/08 23:05:53 haldar Exp $//// This program is free software; you can redistribute it and/or// modify it under the terms of the GNU General Public License,// version 2, as published by the Free Software Foundation.//// This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the// GNU General Public License for more details.//// You should have received a copy of the GNU General Public License along// with this program; if not, write to the Free Software Foundation, Inc.,// 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.////#include "two_phase_pull.hh"#ifdef NS_DIFFUSIONstatic class GradientFilterClass : public TclClass {public:  GradientFilterClass() : TclClass("Application/DiffApp/GradientFilter") {}  TclObject* create(int argc, const char*const* argv) {    if (argc == 5)      return(new GradientFilter(argv[4]));    else      fprintf(stderr, "Insufficient number of args for creating GradientFilter");    return (NULL);  }} class_gradient_filter;int GradientFilter::command(int argc, const char*const* argv) {  if (argc == 3) {    if (strcasecmp(argv[1], "debug") == 0) {      global_debug_level = atoi(argv[2]);      if (global_debug_level < 1 || global_debug_level > 10) {	global_debug_level = DEBUG_DEFAULT;	printf("Error: Debug level outside range(1-10) or missing !\n");      }    }  }  return DiffApp::command(argc, argv);}#endif // NS_DIFFUSIONvoid GradientFilterReceive::recv(Message *msg, handle h){  app_->recv(msg, h);}int TppMessageSendTimer::expire(){  // Call timeout function  agent_->messageTimeout(msg_);  // Do not reschedule this timer  delete this;  return -1;}int TppInterestForwardTimer::expire(){  // Call timeout function  agent_->interestTimeout(msg_);  // Do not reschedule this timer  delete this;  return -1;}int TppSubscriptionExpirationTimer::expire(){  int retval;  retval = agent_->subscriptionTimeout(attrs_);  // Delete timer if we are not rescheduling it  if (retval == -1)    delete this;  return retval;}int TppGradientExpirationCheckTimer::expire(){  // Call the callback function  agent_->gradientTimeout();  // Reschedule this timer  return 0;}int TppReinforcementCheckTimer::expire(){  // Call the callback function  agent_->reinforcementTimeout();  // Reschedule this timer  return 0;}void GradientFilter::interestTimeout(Message *msg){  DiffPrint(DEBUG_MORE_DETAILS, "Interest Timeout !\n");  msg->last_hop_ = LOCALHOST_ADDR;  msg->next_hop_ = BROADCAST_ADDR;   ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);}void GradientFilter::messageTimeout(Message *msg){  DiffPrint(DEBUG_MORE_DETAILS, "Message Timeout !\n");  ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);}void GradientFilter::gradientTimeout(){  RoutingTable::iterator routing_itr;  GradientList::iterator grad_itr;  AgentList::iterator agent_itr;  TppRoutingEntry *routing_entry;  GradientEntry *gradient_entry;  AgentEntry *agent_entry;  struct timeval tmv;  DiffPrint(DEBUG_MORE_DETAILS, "Gradient Timeout !\n");  GetTime(&tmv);  routing_itr = routing_list_.begin();  while (routing_itr != routing_list_.end()){    routing_entry = *routing_itr;    // Step 1: Delete expired gradients    grad_itr = routing_entry->gradients_.begin();    while (grad_itr != routing_entry->gradients_.end()){      gradient_entry = *grad_itr;      if (tmv.tv_sec > (gradient_entry->tv_.tv_sec + GRADIENT_TIMEOUT)){	DiffPrint(DEBUG_NO_DETAILS, "Deleting Gradient to node %d !\n",		  gradient_entry->node_addr_);	grad_itr = routing_entry->gradients_.erase(grad_itr);	delete gradient_entry;      }      else{	grad_itr++;      }    }    // Step 2: Remove non-active agents    agent_itr = routing_entry->agents_.begin();    while (agent_itr != routing_entry->agents_.end()){      agent_entry = *agent_itr;      if (tmv.tv_sec > (agent_entry->tv_.tv_sec + GRADIENT_TIMEOUT)){	DiffPrint(DEBUG_NO_DETAILS,		  "Deleting Gradient to agent %d !\n", agent_entry->port_);	agent_itr = routing_entry->agents_.erase(agent_itr);	delete agent_entry;      }      else{	agent_itr++;      }    }    // Remove the Routing Entry if no gradients and no agents    if ((routing_entry->gradients_.size() == 0) &&	(routing_entry->agents_.size() == 0)){      // Deleting Routing Entry      DiffPrint(DEBUG_DETAILS,		"Nothing left for this data type, cleaning up !\n");      routing_itr = routing_list_.erase(routing_itr);      delete routing_entry;    }    else{      routing_itr++;    }  }}void GradientFilter::reinforcementTimeout(){  DataNeighborList::iterator data_neighbor_itr;  DataNeighborEntry *data_neighbor_entry;  RoutingTable::iterator routing_itr;  TppRoutingEntry *routing_entry;  Message *my_message;  DiffPrint(DEBUG_MORE_DETAILS, "Reinforcement Timeout !\n");  routing_itr = routing_list_.begin();  while (routing_itr != routing_list_.end()){    routing_entry = *routing_itr;    // Step 1: Delete expired gradients    data_neighbor_itr = routing_entry->data_neighbors_.begin();    while (data_neighbor_itr != routing_entry->data_neighbors_.end()){      data_neighbor_entry = *data_neighbor_itr;      if (data_neighbor_entry->data_flag_ == OLD_MESSAGE){	my_message = new Message(DIFFUSION_VERSION, NEGATIVE_REINFORCEMENT,				 0, 0, routing_entry->attrs_->size(), pkt_count_,				 random_id_, data_neighbor_entry->neighbor_id_,				 LOCALHOST_ADDR);	my_message->msg_attr_vec_ = CopyAttrs(routing_entry->attrs_);	DiffPrint(DEBUG_NO_DETAILS,		  "Sending Negative Reinforcement to node %d !\n",		  data_neighbor_entry->neighbor_id_);	((DiffusionRouting *)dr_)->sendMessage(my_message, filter_handle_);	pkt_count_++;	delete my_message;	// Done. Delete entry	data_neighbor_itr = routing_entry->data_neighbors_.erase(data_neighbor_itr);	delete data_neighbor_entry;      }      else{	data_neighbor_itr++;      }    }    // Step 2: Delete data neighbors with no activity, zero flags    data_neighbor_itr = routing_entry->data_neighbors_.begin();    while (data_neighbor_itr != routing_entry->data_neighbors_.end()){      data_neighbor_entry = *data_neighbor_itr;      if (data_neighbor_entry->data_flag_ == NEW_MESSAGE){	data_neighbor_entry->data_flag_ = 0;	data_neighbor_itr++;      }      else{	// Delete entry	data_neighbor_itr = routing_entry->data_neighbors_.erase(data_neighbor_itr);	delete data_neighbor_entry;      }    }    // Advance to the next routing entry    routing_itr++;  }}int GradientFilter::subscriptionTimeout(NRAttrVec *attrs){  AttributeList::iterator attribute_itr;  AttributeEntry *attribute_entry;  TppRoutingEntry *routing_entry;  struct timeval tmv;  DiffPrint(DEBUG_MORE_DETAILS, "Subscription Timeout !\n");  GetTime(&tmv);  // Find the correct Routing Entry  routing_entry = findRoutingEntry(attrs);  if (routing_entry){    // Step 1: Check Timeouts    attribute_itr = routing_entry->attr_list_.begin();    while (attribute_itr != routing_entry->attr_list_.end()){      attribute_entry = *attribute_itr;      if (tmv.tv_sec > (attribute_entry->tv_.tv_sec + SUBSCRIPTION_TIMEOUT)){	sendDisinterest(attribute_entry->attrs_, routing_entry);	attribute_itr = routing_entry->attr_list_.erase(attribute_itr);	delete attribute_entry;      }      else{	attribute_itr++;      }    }  }  else{    DiffPrint(DEBUG_DETAILS, "Warning: SubscriptionTimeout could't find RE - maybe deleted by GradientTimeout ?\n");    // Cancel Timer    return -1;  }  // Keep Timer  return 0;}void GradientFilter::deleteRoutingEntry(TppRoutingEntry *routing_entry){  RoutingTable::iterator routing_itr;  TppRoutingEntry *current_entry;  for (routing_itr = routing_list_.begin(); routing_itr != routing_list_.end(); ++routing_itr){    current_entry = *routing_itr;    if (current_entry == routing_entry){      routing_itr = routing_list_.erase(routing_itr);      delete routing_entry;      return;    }  }  DiffPrint(DEBUG_ALWAYS, "Error: deleteRoutingEntry could not find entry to delete !\n");}TppRoutingEntry * GradientFilter::matchRoutingEntry(NRAttrVec *attrs, RoutingTable::iterator start, RoutingTable::iterator *place){  RoutingTable::iterator routing_itr;  TppRoutingEntry *routing_entry;  for (routing_itr = start; routing_itr != routing_list_.end(); ++routing_itr){    routing_entry = *routing_itr;    if (MatchAttrs(routing_entry->attrs_, attrs)){      *place = routing_itr;      return routing_entry;    }  }  return NULL;}TppRoutingEntry * GradientFilter::findRoutingEntry(NRAttrVec *attrs){  RoutingTable::iterator routing_itr;  TppRoutingEntry *routing_entry;  for (routing_itr = routing_list_.begin(); routing_itr != routing_list_.end(); ++routing_itr){    routing_entry = *routing_itr;    if (PerfectMatch(routing_entry->attrs_, attrs))      return routing_entry;  }  return NULL;}AttributeEntry * GradientFilter::findMatchingSubscription(TppRoutingEntry *routing_entry,							  NRAttrVec *attrs){  AttributeList::iterator attribute_itr;  AttributeEntry *attribute_entry;  for (attribute_itr = routing_entry->attr_list_.begin(); attribute_itr != routing_entry->attr_list_.end(); ++attribute_itr){    attribute_entry = *attribute_itr;    if (PerfectMatch(attribute_entry->attrs_, attrs))      return attribute_entry;  }  return NULL;}void GradientFilter::updateGradient(TppRoutingEntry *routing_entry,				    int32_t last_hop, bool reinforced){  GradientList::iterator gradient_itr;  GradientEntry *gradient_entry;  for (gradient_itr = routing_entry->gradients_.begin();       gradient_itr != routing_entry->gradients_.end(); ++gradient_itr){    gradient_entry = *gradient_itr;    if (gradient_entry->node_addr_ == last_hop){      GetTime(&(gradient_entry->tv_));      if (reinforced)	gradient_entry->reinforced_ = true;      return;    }  }  // We need to add a new gradient  gradient_entry = new GradientEntry(last_hop);  if (reinforced)    gradient_entry->reinforced_ = true;  routing_entry->gradients_.push_back(gradient_entry);}void GradientFilter::updateAgent(TppRoutingEntry *routing_entry,				 u_int16_t source_port){  AgentList::iterator agent_itr;  AgentEntry *agent_entry;  for (agent_itr = routing_entry->agents_.begin(); agent_itr != routing_entry->agents_.end(); ++agent_itr){    agent_entry = *agent_itr;    if (agent_entry->port_ == source_port){      // We already have this guy      GetTime(&(agent_entry->tv_));      return;    }  }  // This is a new agent, so we create a new entry and add it to the  // list of known agents  agent_entry = new AgentEntry(source_port);  routing_entry->agents_.push_back(agent_entry);}void GradientFilter::forwardPushExploratoryData(Message *msg,						DataForwardingHistory *forwarding_history){  RoutingTable::iterator routing_itr;  TppRoutingEntry *routing_entry;  AgentList::iterator agent_itr;  AgentEntry *agent_entry;  Message *data_msg, *sink_message;  TimerCallback *data_timer;  unsigned int key[2];  HashEntry *hash_entry;  // Sink processing  routing_itr = routing_list_.begin();  routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,				    &routing_itr);  sink_message = CopyMessage(msg);  while (routing_entry){    // Forward message to all local sinks    for (agent_itr = routing_entry->agents_.begin();	 agent_itr != routing_entry->agents_.end(); ++agent_itr){      agent_entry = *agent_itr;      if (!forwarding_history->alreadyForwardedToLibrary(agent_entry->port_)){	// Send DATA message to local sinks	sink_message->next_hop_ = LOCALHOST_ADDR;	sink_message->next_port_ = agent_entry->port_;	((DiffusionRouting *)dr_)->sendMessage(sink_message, filter_handle_);	// Add agent to the forwarding history	forwarding_history->forwardingToLibrary(agent_entry->port_);      }    }    if ((!forwarding_history->alreadyReinforced()) &&	(routing_entry->agents_.size() > 0) &&	(msg->last_hop_ != LOCALHOST_ADDR)){      // Send a positive reinforcement if we have sinks      sendPositiveReinforcement(routing_entry->attrs_, msg->rdm_id_,				msg->pkt_num_, msg->last_hop_);      // Record reinforcement in the forwarding history so we do it      // only once per received data message      forwarding_history->sendingReinforcement();    }    // Look for other matching data types    routing_itr++;    routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,				      &routing_itr);  }  // Delete sink_message after sink processing  delete sink_message;  // Intermediate node processing  // Add message information to the hash table  if (msg->last_hop_ != LOCALHOST_ADDR){    key[0] = msg->pkt_num_;    key[1] = msg->rdm_id_;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -