📄 two_phase_pull.cc
字号:
hash_entry = new HashEntry(msg->last_hop_); putHash(hash_entry, key[0], key[1]); } // Rebroadcast the exploratory push data message if (!forwarding_history->alreadyForwardedToNetwork(BROADCAST_ADDR)){ data_msg = CopyMessage(msg); data_msg->next_hop_ = BROADCAST_ADDR; data_timer = new TppMessageSendTimer(this, data_msg); // Add data timer to the queue ((DiffusionRouting *)dr_)->addTimer(PUSH_DATA_FORWARD_DELAY + (int) ((PUSH_DATA_FORWARD_JITTER * (GetRand() * 1.0 / RAND_MAX) - (PUSH_DATA_FORWARD_JITTER / 2))), data_timer); // Add broadcast information to forwarding history forwarding_history->forwardingToNetwork(BROADCAST_ADDR); }}void GradientFilter::forwardExploratoryData(Message *msg, TppRoutingEntry *routing_entry, DataForwardingHistory *forwarding_history){#ifdef USE_BROADCAST_TO_MULTIPLE_RECIPIENTS Message *data_msg; TimerCallback *data_timer;#else GradientList::iterator gradient_itr; GradientEntry *gradient_entry;#endif // USE_BROADCAST_TO_MULTIPLE_RECIPIENTS AgentList::iterator agent_itr; AgentEntry *agent_entry; Message *sink_message; unsigned int key[2]; HashEntry *hash_entry; sink_message = CopyMessage(msg); // Step 1: Sink Processing for (agent_itr = routing_entry->agents_.begin(); agent_itr != routing_entry->agents_.end(); ++agent_itr){ agent_entry = *agent_itr; if (!forwarding_history->alreadyForwardedToLibrary(agent_entry->port_)){ // Forward the data message to local sinks sink_message->next_hop_ = LOCALHOST_ADDR; sink_message->next_port_ = agent_entry->port_; // Add agent to the forwarding list forwarding_history->forwardingToLibrary(agent_entry->port_); ((DiffusionRouting *)dr_)->sendMessage(sink_message, filter_handle_); } } delete sink_message; // Step 1A: Reinforcement Processing if ((!forwarding_history->alreadyReinforced()) && (routing_entry->agents_.size() > 0) && (msg->last_hop_ != LOCALHOST_ADDR)){ // Send reinforcement to 'last_hop' sendPositiveReinforcement(routing_entry->attrs_, msg->rdm_id_, msg->pkt_num_, msg->last_hop_); // Record reinforcement in the forwarding history so we do it only // once per received data message forwarding_history->sendingReinforcement(); } // Step 2: Intermediate Processing // Set reinforcement flags if (msg->last_hop_ != LOCALHOST_ADDR){ setReinforcementFlags(routing_entry, msg->last_hop_, NEW_MESSAGE); } // Add message information to the hash table if (msg->last_hop_ != LOCALHOST_ADDR){ key[0] = msg->pkt_num_; key[1] = msg->rdm_id_; hash_entry = new HashEntry(msg->last_hop_); putHash(hash_entry, key[0], key[1]); } // Forward the EXPLORATORY message#ifdef USE_BROADCAST_TO_MULTIPLE_RECIPIENTS if (!forwarding_history->alreadyForwardedToNetwork(BROADCAST_ADDR)){ if (routing_entry->gradients_.size() > 0){ // Broadcast DATA message data_msg = CopyMessage(msg); data_msg->next_hop_ = BROADCAST_ADDR; // Add to the forwarding history forwarding_history->forwardingToNetwork(BROADCAST_ADDR); data_timer = new TppMessageSendTimer(this, data_msg); // Add timer for forwarding the data packet ((DiffusionRouting *)dr_)->addTimer(DATA_FORWARD_DELAY + (int) ((DATA_FORWARD_JITTER * (GetRand() * 1.0 / RAND_MAX) - (DATA_FORWARD_JITTER / 2))), data_timer); } }#else // Forward DATA to all output gradients for (gradient_itr = routing_entry->gradients_.begin(); gradient_itr != routing_entry->gradients_.end(); ++gradient_itr){ gradient_entry = *gradient_itr; // Check forwarding history if (!forwarding_history->alreadyForwardedToNetwork(gradient_entry->node_addr_)){ msg->next_hop_ = gradient_entry->node_addr_; ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_); // Add to the forwarding history forwarding_history->forwardingToNetwork(gradient_entry->node_addr_); } }#endif // USE_BROADCAST_TO_MULTIPLE_RECIPIENTS}void GradientFilter::forwardData(Message *msg, TppRoutingEntry *routing_entry, DataForwardingHistory *forwarding_history){ GradientList::iterator gradient_itr; AgentList::iterator agent_itr; GradientEntry *gradient_entry; AgentEntry *agent_entry; Message *sink_message, *negative_reinforcement_msg; bool has_sink = false; sink_message = CopyMessage(msg); // Step 1: Sink Processing for (agent_itr = routing_entry->agents_.begin(); agent_itr != routing_entry->agents_.end(); ++agent_itr){ agent_entry = *agent_itr; has_sink = true; if (!forwarding_history->alreadyForwardedToLibrary(agent_entry->port_)){ // Forward DATA to local sinks sink_message->next_hop_ = LOCALHOST_ADDR; sink_message->next_port_ = agent_entry->port_; // Add agent to the forwarding list forwarding_history->forwardingToLibrary(agent_entry->port_); ((DiffusionRouting *)dr_)->sendMessage(sink_message, filter_handle_); } } delete sink_message; // Step 2: Intermediate Processing // Set reinforcement flags if (msg->last_hop_ != LOCALHOST_ADDR){ setReinforcementFlags(routing_entry, msg->last_hop_, NEW_MESSAGE); } // Forward DATA only to reinforced gradients gradient_itr = routing_entry->gradients_.begin(); gradient_entry = findReinforcedGradients(&routing_entry->gradients_, gradient_itr, &gradient_itr); if (gradient_entry){ while (gradient_entry){ // Found reinforced gradient, forward data message to this // neighbor only if the messages comes from a different neighbor if (gradient_entry->node_addr_ != msg->last_hop_){ msg->next_hop_ = gradient_entry->node_addr_; // Check if we have forwarded the message to this neighbor already if (!forwarding_history->alreadyForwardedToNetwork(msg->next_hop_)){ DiffPrint(DEBUG_NO_DETAILS, "Node%d: Forwarding data using Reinforced Gradient to node %d !\n", ((DiffusionRouting *)dr_)->getNodeId(), gradient_entry->node_addr_); ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_); // Add the node to the forwarding history forwarding_history->forwardingToNetwork(msg->next_hop_); } } // Move to the next one gradient_itr++; gradient_entry = findReinforcedGradients(&routing_entry->gradients_, gradient_itr, &gradient_itr); } } else{ // We could not find a reinforced path, so we send a negative // reinforcement to last_hop if ((!has_sink) && (msg->last_hop_ != LOCALHOST_ADDR)){ negative_reinforcement_msg = new Message(DIFFUSION_VERSION, NEGATIVE_REINFORCEMENT, 0, 0, routing_entry->attrs_->size(), pkt_count_, random_id_, msg->last_hop_, LOCALHOST_ADDR); negative_reinforcement_msg->msg_attr_vec_ = CopyAttrs(routing_entry->attrs_); DiffPrint(DEBUG_NO_DETAILS, "Sending Negative Reinforcement to node %d !\n", msg->last_hop_); ((DiffusionRouting *)dr_)->sendMessage(negative_reinforcement_msg, filter_handle_); pkt_count_++; delete negative_reinforcement_msg; } }}void GradientFilter::sendPositiveReinforcement(NRAttrVec *reinf_attrs, int32_t data_rdm_id, int32_t data_pkt_num, int32_t destination){ ReinforcementBlob *reinforcement_blob; NRAttribute *reinforcement_attr; TimerCallback *reinforcement_timer; Message *pos_reinf_message; NRAttrVec *attrs; reinforcement_blob = new ReinforcementBlob(data_rdm_id, data_pkt_num); reinforcement_attr = ReinforcementAttr.make(NRAttribute::IS, (void *) reinforcement_blob, sizeof(ReinforcementBlob)); attrs = CopyAttrs(reinf_attrs); attrs->push_back(reinforcement_attr); pos_reinf_message = new Message(DIFFUSION_VERSION, POSITIVE_REINFORCEMENT, 0, 0, attrs->size(), pkt_count_, random_id_, destination, LOCALHOST_ADDR); pos_reinf_message->msg_attr_vec_ = CopyAttrs(attrs); DiffPrint(DEBUG_NO_DETAILS, "Sending Positive Reinforcement to node %d !\n", destination); // Create timer for sending this message reinforcement_timer = new TppMessageSendTimer(this, pos_reinf_message); // Add timer to the event queue ((DiffusionRouting *)dr_)->addTimer(POS_REINFORCEMENT_SEND_DELAY + (int) ((POS_REINFORCEMENT_JITTER * (GetRand() * 1.0 / RAND_MAX) - (POS_REINFORCEMENT_JITTER / 2))), reinforcement_timer); pkt_count_++; ClearAttrs(attrs); delete attrs; delete reinforcement_blob;}GradientEntry * GradientFilter::findReinforcedGradients(GradientList *gradients, GradientList::iterator start, GradientList::iterator *place){ GradientList::iterator gradient_itr; GradientEntry *gradient_entry; for (gradient_itr = start; gradient_itr != gradients->end(); ++gradient_itr){ gradient_entry = *gradient_itr; if (gradient_entry->reinforced_){ *place = gradient_itr; return gradient_entry; } } return NULL;}GradientEntry * GradientFilter::findReinforcedGradient(int32_t node_addr, TppRoutingEntry *routing_entry){ GradientList::iterator gradient_itr; GradientEntry *gradient_entry; gradient_itr = routing_entry->gradients_.begin(); gradient_entry = findReinforcedGradients(&routing_entry->gradients_, gradient_itr, &gradient_itr); if (gradient_entry){ while(gradient_entry){ if (gradient_entry->node_addr_ == node_addr) return gradient_entry; // This is not the gradient we are looking for, keep looking gradient_itr++; gradient_entry = findReinforcedGradients(&routing_entry->gradients_, gradient_itr, &gradient_itr); } } return NULL;}void GradientFilter::deleteGradient(TppRoutingEntry *routing_entry, GradientEntry *gradient_entry){ GradientList::iterator gradient_itr; GradientEntry *current_entry; for (gradient_itr = routing_entry->gradients_.begin(); gradient_itr != routing_entry->gradients_.end(); ++gradient_itr){ current_entry = *gradient_itr; if (current_entry == gradient_entry){ gradient_itr = routing_entry->gradients_.erase(gradient_itr); delete gradient_entry; return; } } DiffPrint(DEBUG_ALWAYS, "Error: deleteGradient could not find gradient to delete !\n");}void GradientFilter::setReinforcementFlags(TppRoutingEntry *routing_entry, int32_t last_hop, int new_message){ DataNeighborList::iterator data_neighbor_itr; DataNeighborEntry *data_neighbor_entry; for (data_neighbor_itr = routing_entry->data_neighbors_.begin(); data_neighbor_itr != routing_entry->data_neighbors_.end(); ++data_neighbor_itr){ data_neighbor_entry = *data_neighbor_itr; if (data_neighbor_entry->neighbor_id_ == last_hop){ if (data_neighbor_entry->data_flag_ > 0) return; data_neighbor_entry->data_flag_ = new_message; return; } } // We need to add a new data neighbor data_neighbor_entry = new DataNeighborEntry(last_hop, new_message); routing_entry->data_neighbors_.push_back(data_neighbor_entry);}void GradientFilter::sendInterest(NRAttrVec *attrs, TppRoutingEntry *routing_entry){ AgentList::iterator agent_itr; AgentEntry *agent_entry; Message *msg = new Message(DIFFUSION_VERSION, INTEREST, 0, 0, attrs->size(), 0, 0, LOCALHOST_ADDR, LOCALHOST_ADDR); msg->msg_attr_vec_ = CopyAttrs(attrs); for (agent_itr = routing_entry->agents_.begin(); agent_itr != routing_entry->agents_.end(); ++agent_itr){ agent_entry = *agent_itr; msg->next_port_ = agent_entry->port_; ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_); } delete msg;}void GradientFilter::sendDisinterest(NRAttrVec *attrs, TppRoutingEntry *routing_entry){ NRAttrVec *new_attrs; NRSimpleAttribute<int> *nrclass = NULL; new_attrs = CopyAttrs(attrs); nrclass = NRClassAttr.find(new_attrs); if (!nrclass){ DiffPrint(DEBUG_ALWAYS, "Error: sendDisinterest couldn't find the class attribute !\n"); ClearAttrs(new_attrs); delete new_attrs; return; } // Change the class_key value nrclass->setVal(NRAttribute::DISINTEREST_CLASS); sendInterest(new_attrs, routing_entry); ClearAttrs(new_attrs); delete new_attrs;}void GradientFilter::recv(Message *msg, handle h){ if (h != filter_handle_){ DiffPrint(DEBUG_ALWAYS, "Error: received msg for handle %d, subscribed to handle %d !\n", h, filter_handle_); return; } if (msg->new_message_ == 1) processNewMessage(msg); else processOldMessage(msg);}void GradientFilter::processOldMessage(Message *msg){ TppRoutingEntry *routing_entry; RoutingTable::iterator routing_itr; switch (msg->msg_type_){ case INTEREST: DiffPrint(DEBUG_NO_DETAILS, "Node%d: Received Old Interest !\n", ((DiffusionRouting *)dr_)->getNodeId()); if (msg->last_hop_ == LOCALHOST_ADDR){ // Old interest should not come from local agent DiffPrint(DEBUG_ALWAYS, "Warning: Old Interest from local agent !\n"); break; } // Get the routing entry for these attrs routing_entry = findRoutingEntry(msg->msg_attr_vec_); if (routing_entry) updateGradient(routing_entry, msg->last_hop_, false); break; case DATA: DiffPrint(DEBUG_NO_DETAILS, "Node%d: Received an old Data message !\n", ((DiffusionRouting *)dr_)->getNodeId()); // Find the correct routing entry routing_itr = routing_list_.begin(); routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr, &routing_itr); while (routing_entry){ DiffPrint(DEBUG_NO_DETAILS, "Set flags to %d to OLD_MESSAGE !\n", msg->last_hop_); // Set reinforcement flags if (msg->last_hop_ != LOCALHOST_ADDR){ setReinforcementFlags(routing_entry, msg->last_hop_, OLD_MESSAGE); } // Continue going through other data types routing_itr++; routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr, &routing_itr); } break; case PUSH_EXPLORATORY_DATA: // Just drop it DiffPrint(DEBUG_NO_DETAILS, "Received an old Push Exploratory Data. Loop detected !\n");
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -