⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 two_phase_pull.cc

📁 跑leach需要的
💻 CC
📖 第 1 页 / 共 3 页
字号:
    hash_entry = new HashEntry(msg->last_hop_);    putHash(hash_entry, key[0], key[1]);  }  // Rebroadcast the exploratory push data message  if (!forwarding_history->alreadyForwardedToNetwork(BROADCAST_ADDR)){    data_msg = CopyMessage(msg);    data_msg->next_hop_ = BROADCAST_ADDR;    data_timer = new TppMessageSendTimer(this, data_msg);    // Add data timer to the queue    ((DiffusionRouting *)dr_)->addTimer(PUSH_DATA_FORWARD_DELAY +					(int) ((PUSH_DATA_FORWARD_JITTER * (GetRand() * 1.0 / RAND_MAX) - (PUSH_DATA_FORWARD_JITTER / 2))),					data_timer);    // Add broadcast information to forwarding history    forwarding_history->forwardingToNetwork(BROADCAST_ADDR);  }}void GradientFilter::forwardExploratoryData(Message *msg,					    TppRoutingEntry *routing_entry,					    DataForwardingHistory *forwarding_history){#ifdef USE_BROADCAST_TO_MULTIPLE_RECIPIENTS  Message *data_msg;  TimerCallback *data_timer;#else  GradientList::iterator gradient_itr;  GradientEntry *gradient_entry;#endif // USE_BROADCAST_TO_MULTIPLE_RECIPIENTS  AgentList::iterator agent_itr;  AgentEntry *agent_entry;  Message *sink_message;  unsigned int key[2];  HashEntry *hash_entry;  sink_message = CopyMessage(msg);  // Step 1: Sink Processing  for (agent_itr = routing_entry->agents_.begin();       agent_itr != routing_entry->agents_.end(); ++agent_itr){    agent_entry = *agent_itr;    if (!forwarding_history->alreadyForwardedToLibrary(agent_entry->port_)){      // Forward the data message to local sinks      sink_message->next_hop_ = LOCALHOST_ADDR;      sink_message->next_port_ = agent_entry->port_;      // Add agent to the forwarding list      forwarding_history->forwardingToLibrary(agent_entry->port_);      ((DiffusionRouting *)dr_)->sendMessage(sink_message, filter_handle_);    }  }  delete sink_message;  // Step 1A: Reinforcement Processing  if ((!forwarding_history->alreadyReinforced()) &&      (routing_entry->agents_.size() > 0) &&      (msg->last_hop_ != LOCALHOST_ADDR)){    // Send reinforcement to 'last_hop'    sendPositiveReinforcement(routing_entry->attrs_, msg->rdm_id_,			      msg->pkt_num_, msg->last_hop_);    // Record reinforcement in the forwarding history so we do it only    // once per received data message    forwarding_history->sendingReinforcement();  }  // Step 2: Intermediate Processing  // Set reinforcement flags  if (msg->last_hop_ != LOCALHOST_ADDR){    setReinforcementFlags(routing_entry, msg->last_hop_, NEW_MESSAGE);  }  // Add message information to the hash table  if (msg->last_hop_ != LOCALHOST_ADDR){    key[0] = msg->pkt_num_;    key[1] = msg->rdm_id_;    hash_entry = new HashEntry(msg->last_hop_);    putHash(hash_entry, key[0], key[1]);  }  // Forward the EXPLORATORY message#ifdef USE_BROADCAST_TO_MULTIPLE_RECIPIENTS  if (!forwarding_history->alreadyForwardedToNetwork(BROADCAST_ADDR)){    if (routing_entry->gradients_.size() > 0){      // Broadcast DATA message      data_msg = CopyMessage(msg);      data_msg->next_hop_ = BROADCAST_ADDR;      // Add to the forwarding history      forwarding_history->forwardingToNetwork(BROADCAST_ADDR);      data_timer = new TppMessageSendTimer(this, data_msg);      // Add timer for forwarding the data packet      ((DiffusionRouting *)dr_)->addTimer(DATA_FORWARD_DELAY +					  (int) ((DATA_FORWARD_JITTER * (GetRand() * 1.0 / RAND_MAX) - (DATA_FORWARD_JITTER / 2))),					  data_timer);    }  }#else  // Forward DATA to all output gradients  for (gradient_itr = routing_entry->gradients_.begin();       gradient_itr != routing_entry->gradients_.end(); ++gradient_itr){    gradient_entry = *gradient_itr;    // Check forwarding history    if (!forwarding_history->alreadyForwardedToNetwork(gradient_entry->node_addr_)){      msg->next_hop_ = gradient_entry->node_addr_;      ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);      // Add to the forwarding history      forwarding_history->forwardingToNetwork(gradient_entry->node_addr_);    }  }#endif // USE_BROADCAST_TO_MULTIPLE_RECIPIENTS}void GradientFilter::forwardData(Message *msg, TppRoutingEntry *routing_entry,				 DataForwardingHistory *forwarding_history){  GradientList::iterator gradient_itr;  AgentList::iterator agent_itr;  GradientEntry *gradient_entry;  AgentEntry *agent_entry;  Message *sink_message, *negative_reinforcement_msg;  bool has_sink = false;  sink_message = CopyMessage(msg);  // Step 1: Sink Processing  for (agent_itr = routing_entry->agents_.begin(); agent_itr != routing_entry->agents_.end(); ++agent_itr){    agent_entry = *agent_itr;    has_sink = true;    if (!forwarding_history->alreadyForwardedToLibrary(agent_entry->port_)){      // Forward DATA to local sinks      sink_message->next_hop_ = LOCALHOST_ADDR;      sink_message->next_port_ = agent_entry->port_;      // Add agent to the forwarding list      forwarding_history->forwardingToLibrary(agent_entry->port_);      ((DiffusionRouting *)dr_)->sendMessage(sink_message, filter_handle_);    }  }  delete sink_message;  // Step 2: Intermediate Processing  // Set reinforcement flags  if (msg->last_hop_ != LOCALHOST_ADDR){    setReinforcementFlags(routing_entry, msg->last_hop_, NEW_MESSAGE);  }  // Forward DATA only to reinforced gradients  gradient_itr = routing_entry->gradients_.begin();  gradient_entry = findReinforcedGradients(&routing_entry->gradients_,					   gradient_itr, &gradient_itr);  if (gradient_entry){    while (gradient_entry){      // Found reinforced gradient, forward data message to this      // neighbor only if the messages comes from a different neighbor      if (gradient_entry->node_addr_ != msg->last_hop_){	msg->next_hop_ = gradient_entry->node_addr_;	// Check if we have forwarded the message to this neighbor already	if (!forwarding_history->alreadyForwardedToNetwork(msg->next_hop_)){	  DiffPrint(DEBUG_NO_DETAILS,		    "Node%d: Forwarding data using Reinforced Gradient to node %d !\n",		    ((DiffusionRouting *)dr_)->getNodeId(), gradient_entry->node_addr_);	  ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);	  // Add the node to the forwarding history	  forwarding_history->forwardingToNetwork(msg->next_hop_);	}      }      // Move to the next one      gradient_itr++;      gradient_entry = findReinforcedGradients(&routing_entry->gradients_,					       gradient_itr, &gradient_itr);    }  }  else{    // We could not find a reinforced path, so we send a negative    // reinforcement to last_hop    if ((!has_sink) && (msg->last_hop_ != LOCALHOST_ADDR)){      negative_reinforcement_msg = new Message(DIFFUSION_VERSION,					       NEGATIVE_REINFORCEMENT,					       0, 0,					       routing_entry->attrs_->size(),					       pkt_count_,					       random_id_,					       msg->last_hop_,					       LOCALHOST_ADDR);      negative_reinforcement_msg->msg_attr_vec_ = CopyAttrs(routing_entry->attrs_);      DiffPrint(DEBUG_NO_DETAILS,		"Sending Negative Reinforcement to node %d !\n",		msg->last_hop_);      ((DiffusionRouting *)dr_)->sendMessage(negative_reinforcement_msg,					     filter_handle_);      pkt_count_++;      delete negative_reinforcement_msg;    }  }}void GradientFilter::sendPositiveReinforcement(NRAttrVec *reinf_attrs,					       int32_t data_rdm_id,					       int32_t data_pkt_num,					       int32_t destination){  ReinforcementBlob *reinforcement_blob;  NRAttribute *reinforcement_attr;  TimerCallback *reinforcement_timer;  Message *pos_reinf_message;  NRAttrVec *attrs;  reinforcement_blob = new ReinforcementBlob(data_rdm_id, data_pkt_num);  reinforcement_attr = ReinforcementAttr.make(NRAttribute::IS,					      (void *) reinforcement_blob,					      sizeof(ReinforcementBlob));  attrs = CopyAttrs(reinf_attrs);  attrs->push_back(reinforcement_attr);  pos_reinf_message = new Message(DIFFUSION_VERSION, POSITIVE_REINFORCEMENT,				  0, 0, attrs->size(), pkt_count_,				  random_id_, destination, LOCALHOST_ADDR);  pos_reinf_message->msg_attr_vec_ = CopyAttrs(attrs);  DiffPrint(DEBUG_NO_DETAILS, "Sending Positive Reinforcement to node %d !\n",	    destination);  // Create timer for sending this message  reinforcement_timer = new TppMessageSendTimer(this, pos_reinf_message);  // Add timer to the event queue  ((DiffusionRouting *)dr_)->addTimer(POS_REINFORCEMENT_SEND_DELAY +				      (int) ((POS_REINFORCEMENT_JITTER * (GetRand() * 1.0 / RAND_MAX) - (POS_REINFORCEMENT_JITTER / 2))),				      reinforcement_timer);  pkt_count_++;  ClearAttrs(attrs);  delete attrs;  delete reinforcement_blob;}GradientEntry * GradientFilter::findReinforcedGradients(GradientList *gradients,							GradientList::iterator start,							GradientList::iterator *place){  GradientList::iterator gradient_itr;  GradientEntry *gradient_entry;  for (gradient_itr = start; gradient_itr != gradients->end(); ++gradient_itr){    gradient_entry = *gradient_itr;    if (gradient_entry->reinforced_){      *place = gradient_itr;      return gradient_entry;    }  }  return NULL;}GradientEntry * GradientFilter::findReinforcedGradient(int32_t node_addr,						       TppRoutingEntry *routing_entry){  GradientList::iterator gradient_itr;  GradientEntry *gradient_entry;  gradient_itr = routing_entry->gradients_.begin();  gradient_entry = findReinforcedGradients(&routing_entry->gradients_,					   gradient_itr, &gradient_itr);  if (gradient_entry){    while(gradient_entry){      if (gradient_entry->node_addr_ == node_addr)	return gradient_entry;      // This is not the gradient we are looking for, keep looking      gradient_itr++;      gradient_entry = findReinforcedGradients(&routing_entry->gradients_,					       gradient_itr, &gradient_itr);    }  }  return NULL;}void GradientFilter::deleteGradient(TppRoutingEntry *routing_entry,				    GradientEntry *gradient_entry){  GradientList::iterator gradient_itr;  GradientEntry *current_entry;  for (gradient_itr = routing_entry->gradients_.begin();       gradient_itr != routing_entry->gradients_.end(); ++gradient_itr){    current_entry = *gradient_itr;    if (current_entry == gradient_entry){      gradient_itr = routing_entry->gradients_.erase(gradient_itr);      delete gradient_entry;      return;    }  }  DiffPrint(DEBUG_ALWAYS,	    "Error: deleteGradient could not find gradient to delete !\n");}void GradientFilter::setReinforcementFlags(TppRoutingEntry *routing_entry,					   int32_t last_hop, int new_message){  DataNeighborList::iterator data_neighbor_itr;  DataNeighborEntry *data_neighbor_entry;  for (data_neighbor_itr = routing_entry->data_neighbors_.begin();       data_neighbor_itr != routing_entry->data_neighbors_.end();       ++data_neighbor_itr){    data_neighbor_entry = *data_neighbor_itr;    if (data_neighbor_entry->neighbor_id_ == last_hop){      if (data_neighbor_entry->data_flag_ > 0)	return;      data_neighbor_entry->data_flag_ = new_message;      return;    }  }  // We need to add a new data neighbor  data_neighbor_entry = new DataNeighborEntry(last_hop, new_message);  routing_entry->data_neighbors_.push_back(data_neighbor_entry);}void GradientFilter::sendInterest(NRAttrVec *attrs, TppRoutingEntry *routing_entry){  AgentList::iterator agent_itr;  AgentEntry *agent_entry;  Message *msg = new Message(DIFFUSION_VERSION, INTEREST, 0, 0,			     attrs->size(), 0, 0, LOCALHOST_ADDR,			     LOCALHOST_ADDR);  msg->msg_attr_vec_ = CopyAttrs(attrs);  for (agent_itr = routing_entry->agents_.begin(); agent_itr != routing_entry->agents_.end(); ++agent_itr){    agent_entry = *agent_itr;    msg->next_port_ = agent_entry->port_;    ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);  }  delete msg;}void GradientFilter::sendDisinterest(NRAttrVec *attrs,				     TppRoutingEntry *routing_entry){  NRAttrVec *new_attrs;  NRSimpleAttribute<int> *nrclass = NULL;  new_attrs = CopyAttrs(attrs);  nrclass = NRClassAttr.find(new_attrs);  if (!nrclass){    DiffPrint(DEBUG_ALWAYS,	      "Error: sendDisinterest couldn't find the class attribute !\n");    ClearAttrs(new_attrs);    delete new_attrs;    return;  }  // Change the class_key value  nrclass->setVal(NRAttribute::DISINTEREST_CLASS);  sendInterest(new_attrs, routing_entry);     ClearAttrs(new_attrs);  delete new_attrs;}void GradientFilter::recv(Message *msg, handle h){  if (h != filter_handle_){    DiffPrint(DEBUG_ALWAYS,	      "Error: received msg for handle %d, subscribed to handle %d !\n",	      h, filter_handle_);    return;  }  if (msg->new_message_ == 1)    processNewMessage(msg);  else    processOldMessage(msg);}void GradientFilter::processOldMessage(Message *msg){  TppRoutingEntry *routing_entry;  RoutingTable::iterator routing_itr;  switch (msg->msg_type_){  case INTEREST:    DiffPrint(DEBUG_NO_DETAILS, "Node%d: Received Old Interest !\n", ((DiffusionRouting *)dr_)->getNodeId());    if (msg->last_hop_ == LOCALHOST_ADDR){      // Old interest should not come from local agent      DiffPrint(DEBUG_ALWAYS, "Warning: Old Interest from local agent !\n");      break;    }    // Get the routing entry for these attrs          routing_entry = findRoutingEntry(msg->msg_attr_vec_);    if (routing_entry)      updateGradient(routing_entry, msg->last_hop_, false);    break;  case DATA:     DiffPrint(DEBUG_NO_DETAILS, "Node%d: Received an old Data message !\n", ((DiffusionRouting *)dr_)->getNodeId());    // Find the correct routing entry    routing_itr = routing_list_.begin();    routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,				      &routing_itr);    while (routing_entry){      DiffPrint(DEBUG_NO_DETAILS,		"Set flags to %d to OLD_MESSAGE !\n", msg->last_hop_);      // Set reinforcement flags      if (msg->last_hop_ != LOCALHOST_ADDR){	setReinforcementFlags(routing_entry, msg->last_hop_, OLD_MESSAGE);      }      // Continue going through other data types      routing_itr++;      routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,					&routing_itr);    }    break;  case PUSH_EXPLORATORY_DATA:    // Just drop it    DiffPrint(DEBUG_NO_DETAILS,	      "Received an old Push Exploratory Data. Loop detected !\n");

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -