📄 gradient.cc
字号:
RoutingTable::iterator routing_itr; RoutingEntry *routing_entry; AgentList::iterator agent_itr; AgentEntry *agent_entry; Message *data_msg, *sink_message; TimerType *data_timer; unsigned int key[2]; HashEntry *hash_entry; bool has_sink; // Sink processing routing_itr = routing_list_.begin(); routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr, &routing_itr); sink_message = CopyMessage(msg); while (routing_entry){ // Reset the has_sink flag for each new data type has_sink = false; // Forward message to all local sinks for (agent_itr = routing_entry->agents_.begin(); agent_itr != routing_entry->agents_.end(); ++agent_itr){ agent_entry = *agent_itr; // We have sinks for this data message, send reinforcement later has_sink = true; // Send DATA message to local sinks sink_message->next_hop_ = LOCALHOST_ADDR; sink_message->next_port_ = agent_entry->port_; ((DiffusionRouting *)dr_)->sendMessage(sink_message, filter_handle_); } if (has_sink){ // Send a positive reinforcement if we have sinks sendPositiveReinforcement(routing_entry->attrs_, msg->rdm_id_, msg->pkt_num_, msg->last_hop_); } // Look for other matching data types routing_itr++; routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr, &routing_itr); } // Delete sink_message after sink processing delete sink_message; // Intermediate node processing // Add message information to the hash table if (msg->last_hop_ != LOCALHOST_ADDR){ key[0] = msg->pkt_num_; key[1] = msg->rdm_id_; hash_entry = new HashEntry(msg->last_hop_); putHash(hash_entry, key[0], key[1]); } // Rebroadcast the exploratory push data message data_msg = CopyMessage(msg); data_msg->next_hop_ = BROADCAST_ADDR; data_timer = new TimerType(MESSAGE_SEND_TIMER); data_timer->param_ = (void *) data_msg; // Add data timer to the queue ((DiffusionRouting *)dr_)->addTimer(PUSH_DATA_FORWARD_DELAY + (int) ((PUSH_DATA_FORWARD_JITTER * (GetRand() * 1.0 / RAND_MAX) - (PUSH_DATA_FORWARD_JITTER / 2))), (void *) data_timer, timer_callback_);}void GradientFilter::forwardExploratoryData(Message *msg, RoutingEntry *routing_entry){#ifdef USE_BROADCAST_MAC Message *data_msg; TimerType *data_timer;#else GradientList::iterator gradient_itr; GradientEntry *gradient_entry;#endif // USE_BROADCAST_MAC AgentList::iterator agent_itr; AgentEntry *agent_entry; Message *sink_message; unsigned int key[2]; HashEntry *hash_entry; bool has_sink = false; sink_message = CopyMessage(msg); // Step 1: Sink Processing for (agent_itr = routing_entry->agents_.begin(); agent_itr != routing_entry->agents_.end(); ++agent_itr){ agent_entry = *agent_itr; has_sink = true; // Forward the data message to local sinks sink_message->next_hop_ = LOCALHOST_ADDR; sink_message->next_port_ = agent_entry->port_; ((DiffusionRouting *)dr_)->sendMessage(sink_message, filter_handle_); } delete sink_message; if (routing_entry->agents_.size() > 0){ // Send reinforcement to 'last_hop' sendPositiveReinforcement(routing_entry->attrs_, msg->rdm_id_, msg->pkt_num_, msg->last_hop_); } // Step 3: Intermediate Processing // Set reinforcement flags if (msg->last_hop_ != LOCALHOST_ADDR){ setReinforcementFlags(routing_entry, msg->last_hop_, NEW_MESSAGE); } // Add message information to the hash table if (msg->last_hop_ != LOCALHOST_ADDR){ key[0] = msg->pkt_num_; key[1] = msg->rdm_id_; hash_entry = new HashEntry(msg->last_hop_); putHash(hash_entry, key[0], key[1]); } // Forward the EXPLORATORY message#ifdef USE_BROADCAST_MAC if (routing_entry->gradients_.size() > 0){ // Broadcast DATA message data_msg = CopyMessage(msg); data_msg->next_hop_ = BROADCAST_ADDR; data_timer = new TimerType(MESSAGE_SEND_TIMER); data_timer->param_ = (void *) data_msg; // Add timer for forwarding the data packet ((DiffusionRouting *)dr_)->addTimer(DATA_FORWARD_DELAY + (int) ((DATA_FORWARD_JITTER * (GetRand() * 1.0 / RAND_MAX) - (DATA_FORWARD_JITTER / 2))), (void *) data_timer, timer_callback_); }#else // Forward DATA to all output gradients for (gradient_itr = routing_entry->gradients_.begin(); gradient_itr != routing_entry->gradients_.end(); ++gradient_itr){ gradient_entry = *gradient_itr; msg->next_hop_ = gradient_entry->node_addr_; ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_); }#endif //USE_BROADCAST_MAC}void GradientFilter::forwardData(Message *msg, RoutingEntry *routing_entry){ GradientList::iterator gradient_itr; AgentList::iterator agent_itr; GradientEntry *gradient_entry; AgentEntry *agent_entry; Message *sink_message, *negative_reinforcement_msg; bool has_sink = false; sink_message = CopyMessage(msg); // Step 1: Sink Processing for (agent_itr = routing_entry->agents_.begin(); agent_itr != routing_entry->agents_.end(); ++agent_itr){ agent_entry = *agent_itr; has_sink = true; // Forward DATA to local sinks sink_message->next_hop_ = LOCALHOST_ADDR; sink_message->next_port_ = agent_entry->port_; ((DiffusionRouting *)dr_)->sendMessage(sink_message, filter_handle_); } delete sink_message; // Step 2: Intermediate Processing // Set reinforcement flags if (msg->last_hop_ != LOCALHOST_ADDR){ setReinforcementFlags(routing_entry, msg->last_hop_, NEW_MESSAGE); } // Forward DATA only to reinforced gradients gradient_itr = routing_entry->gradients_.begin(); gradient_entry = findReinforcedGradients(&routing_entry->gradients_, gradient_itr, &gradient_itr); if (gradient_entry){ while (gradient_entry){ // Found reinforced path msg->next_hop_ = gradient_entry->node_addr_; DiffPrint(DEBUG_NO_DETAILS, "Forwarding data through Reinforced Gradient to node %d !\n", gradient_entry->node_addr_); ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_); // Move to the next one gradient_itr++; gradient_entry = findReinforcedGradients(&routing_entry->gradients_, gradient_itr, &gradient_itr); } } else{ // We could not find a reinforced path, so we send a negative // reinforcement to last_hop if ((!has_sink) && (msg->last_hop_ != LOCALHOST_ADDR)){ negative_reinforcement_msg = new Message(DIFFUSION_VERSION, NEGATIVE_REINFORCEMENT, 0, 0, routing_entry->attrs_->size(), pkt_count_, random_id_, msg->last_hop_, LOCALHOST_ADDR); negative_reinforcement_msg->msg_attr_vec_ = CopyAttrs(routing_entry->attrs_); DiffPrint(DEBUG_NO_DETAILS, "Sending Negative Reinforcement to node %d !\n", msg->last_hop_); ((DiffusionRouting *)dr_)->sendMessage(negative_reinforcement_msg, filter_handle_); pkt_count_++; delete negative_reinforcement_msg; } }}void GradientFilter::sendPositiveReinforcement(NRAttrVec *reinf_attrs, int32_t data_rdm_id, int32_t data_pkt_num, int32_t destination){ ReinforcementBlob *reinforcement_blob; NRAttribute *reinforcement_attr; TimerType *reinforcement_timer; Message *pos_reinf_message; NRAttrVec *attrs; reinforcement_blob = new ReinforcementBlob(data_rdm_id, data_pkt_num); reinforcement_attr = ReinforcementAttr.make(NRAttribute::IS, (void *) reinforcement_blob, sizeof(ReinforcementBlob)); attrs = CopyAttrs(reinf_attrs); attrs->push_back(reinforcement_attr); pos_reinf_message = new Message(DIFFUSION_VERSION, POSITIVE_REINFORCEMENT, 0, 0, attrs->size(), pkt_count_, random_id_, destination, LOCALHOST_ADDR); pos_reinf_message->msg_attr_vec_ = CopyAttrs(attrs); DiffPrint(DEBUG_NO_DETAILS, "Sending Positive Reinforcement to node %d !\n", destination); // Create timer for sending this message reinforcement_timer = new TimerType(MESSAGE_SEND_TIMER); reinforcement_timer->param_ = (void *) pos_reinf_message; // Add timer to the event queue ((DiffusionRouting *)dr_)->addTimer(POS_REINFORCEMENT_SEND_DELAY + (int) ((POS_REINFORCEMENT_JITTER * (GetRand() * 1.0 / RAND_MAX) - (POS_REINFORCEMENT_JITTER / 2))), (void *) reinforcement_timer, timer_callback_); pkt_count_++; ClearAttrs(attrs); delete reinforcement_blob;}GradientEntry * GradientFilter::findReinforcedGradients(GradientList *gradients, GradientList::iterator start, GradientList::iterator *place){ GradientList::iterator gradient_itr; GradientEntry *gradient_entry; for (gradient_itr = start; gradient_itr != gradients->end(); ++gradient_itr){ gradient_entry = *gradient_itr; if (gradient_entry->reinforced_){ *place = gradient_itr; return gradient_entry; } } return NULL;}GradientEntry * GradientFilter::findReinforcedGradient(int32_t node_addr, RoutingEntry *routing_entry){ GradientList::iterator gradient_itr; GradientEntry *gradient_entry; gradient_itr = routing_entry->gradients_.begin(); gradient_entry = findReinforcedGradients(&routing_entry->gradients_, gradient_itr, &gradient_itr); if (gradient_entry){ while(gradient_entry){ if (gradient_entry->node_addr_ == node_addr) return gradient_entry; // This is not the gradient we are looking for, keep looking gradient_itr++; gradient_entry = findReinforcedGradients(&routing_entry->gradients_, gradient_itr, &gradient_itr); } } return NULL;}void GradientFilter::deleteGradient(RoutingEntry *routing_entry, GradientEntry *gradient_entry){ GradientList::iterator gradient_itr; GradientEntry *current_entry; for (gradient_itr = routing_entry->gradients_.begin(); gradient_itr != routing_entry->gradients_.end(); ++gradient_itr){ current_entry = *gradient_itr; if (current_entry == gradient_entry){ gradient_itr = routing_entry->gradients_.erase(gradient_itr); delete gradient_entry; return; } } DiffPrint(DEBUG_ALWAYS, "Error: deleteGradient could not find gradient to delete !\n");}void GradientFilter::setReinforcementFlags(RoutingEntry *routing_entry, int32_t last_hop, int new_message){ DataNeighborList::iterator data_neighbor_itr; DataNeighborEntry *data_neighbor_entry; for (data_neighbor_itr = routing_entry->data_neighbors_.begin(); data_neighbor_itr != routing_entry->data_neighbors_.end(); ++data_neighbor_itr){ data_neighbor_entry = *data_neighbor_itr; if (data_neighbor_entry->neighbor_id_ == last_hop){ if (data_neighbor_entry->data_flag_ > 0) return; data_neighbor_entry->data_flag_ = new_message; return; } } // We need to add a new data neighbor data_neighbor_entry = new DataNeighborEntry(last_hop, new_message); routing_entry->data_neighbors_.push_back(data_neighbor_entry);}void GradientFilter::sendInterest(NRAttrVec *attrs, RoutingEntry *routing_entry){ AgentList::iterator agent_itr; AgentEntry *agent_entry; Message *msg = new Message(DIFFUSION_VERSION, INTEREST, 0, 0, attrs->size(), 0, 0, LOCALHOST_ADDR, LOCALHOST_ADDR); msg->msg_attr_vec_ = CopyAttrs(attrs); for (agent_itr = routing_entry->agents_.begin(); agent_itr != routing_entry->agents_.end(); ++agent_itr){ agent_entry = *agent_itr; msg->next_port_ = agent_entry->port_; ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_); } delete msg;}void GradientFilter::sendDisinterest(NRAttrVec *attrs, RoutingEntry *routing_entry){ NRAttrVec *newAttrs; NRSimpleAttribute<int> *nrclass = NULL; newAttrs = CopyAttrs(attrs); nrclass = NRClassAttr.find(newAttrs); if (!nrclass){ DiffPrint(DEBUG_ALWAYS, "Error: sendDisinterest couldn't find the class attribute !\n"); ClearAttrs(newAttrs); delete newAttrs; return; } // Change the class_key value nrclass->setVal(NRAttribute::DISINTEREST_CLASS); sendInterest(newAttrs, routing_entry); ClearAttrs(newAttrs); delete newAttrs;}void GradientFilter::recv(Message *msg, handle h){ if (h != filter_handle_){ DiffPrint(DEBUG_ALWAYS, "Error: received msg for handle %d, subscribed to handle %d !\n", h, filter_handle_); return; } if (msg->new_message_ == 1) processNewMessage(msg); else processOldMessage(msg);}void GradientFilter::processOldMessage(Message *msg){ RoutingEntry *routing_entry; RoutingTable::iterator routing_itr; switch (msg->msg_type_){ case INTEREST: DiffPrint(DEBUG_NO_DETAILS, "Received Old Interest !\n"); if (msg->last_hop_ == LOCALHOST_ADDR){ // Old interest should not come from local agent DiffPrint(DEBUG_ALWAYS, "Warning: Old Interest from local agent !\n"); break; } // Get the routing entry for these attrs routing_entry = findRoutingEntry(msg->msg_attr_vec_); if (routing_entry) updateGradient(routing_entry, msg->last_hop_, false); break; case DATA: DiffPrint(DEBUG_NO_DETAILS, "Received an old Data message !\n"); // Find the correct routing entry routing_itr = routing_list_.begin(); routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr, &routing_itr); while (routing_entry){ DiffPrint(DEBUG_NO_DETAILS, "Set flags to %d to OLD_MESSAGE !\n", msg->last_hop_); // Set reinforcement flags
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -