📄 one_phase_pull.cc
字号:
for (routing_itr = routing_list_.begin(); routing_itr != routing_list_.end(); routing_itr++){ routing_entry = *routing_itr; // Step 1: Delete expired round ids routing_entry->deleteExpiredRoundIds(); // Step 2: Remove the routing entry if no round ids left if (routing_entry->round_ids_.size() == 0){ // Deleting Routing Entry DiffPrint(DEBUG_DETAILS, "Nothing left for this data type, cleaning up !\n"); routing_itr = routing_list_.erase(routing_itr); delete routing_entry; } }}void OnePhasePullFilter::reinforcementTimeout(){ DataNeighborList::iterator data_neighbor_itr; OPPDataNeighborEntry *data_neighbor_entry; RoutingTable::iterator routing_itr; RoutingEntry *routing_entry; Message *my_message; DiffPrint(DEBUG_MORE_DETAILS, "Reinforcement Timeout !\n"); routing_itr = routing_list_.begin(); while (routing_itr != routing_list_.end()){ routing_entry = *routing_itr; // Step 1: Delete expired gradients data_neighbor_itr = routing_entry->data_neighbors_.begin(); while (data_neighbor_itr != routing_entry->data_neighbors_.end()){ data_neighbor_entry = *data_neighbor_itr; if ((!data_neighbor_entry->new_messages_) && (data_neighbor_entry->messages_ > 0)){ my_message = new Message(DIFFUSION_VERSION, NEGATIVE_REINFORCEMENT, 0, 0, routing_entry->attrs_->size(), pkt_count_, random_id_, data_neighbor_entry->node_id_, LOCALHOST_ADDR); my_message->msg_attr_vec_ = CopyAttrs(routing_entry->attrs_); DiffPrint(DEBUG_NO_DETAILS, "Node%d: Sending Negative Reinforcement to node %d !\n", ((DiffusionRouting *)dr_)->getNodeId(), data_neighbor_entry->node_id_); ((DiffusionRouting *)dr_)->sendMessage(my_message, filter_handle_); pkt_count_++; delete my_message; // Done. Delete entry data_neighbor_itr = routing_entry->data_neighbors_.erase(data_neighbor_itr); delete data_neighbor_entry; } else{ data_neighbor_itr++; } } // Step 2: Delete data neighbors with no activity, zero flags data_neighbor_itr = routing_entry->data_neighbors_.begin(); while (data_neighbor_itr != routing_entry->data_neighbors_.end()){ data_neighbor_entry = *data_neighbor_itr; if (data_neighbor_entry->messages_ > 0){ data_neighbor_entry->messages_ = 0; data_neighbor_entry->new_messages_ = false; data_neighbor_itr++; } else{ // Delete entry data_neighbor_itr = routing_entry->data_neighbors_.erase(data_neighbor_itr); delete data_neighbor_entry; } } // Advance to the next routing entry routing_itr++; }}int OnePhasePullFilter::subscriptionTimeout(NRAttrVec *attrs){ SubscriptionList::iterator subscription_itr; SubscriptionEntry *subscription_entry; RoutingEntry *routing_entry; struct timeval tmv; DiffPrint(DEBUG_MORE_DETAILS, "Subscription Timeout !\n"); GetTime(&tmv); // Find the correct Routing entry routing_entry = findRoutingEntry(attrs); if (routing_entry){ // Routing entry found subscription_itr = routing_entry->subscription_list_.begin(); // Go through all attributes while (subscription_itr != routing_entry->subscription_list_.end()){ subscription_entry = *subscription_itr; // Check timeouts if (tmv.tv_sec > (subscription_entry->tv_.tv_sec + SUBSCRIPTION_TIMEOUT)){ // Time expired, send disinterest message sendDisinterest(subscription_entry->attrs_, routing_entry); subscription_itr = routing_entry->subscription_list_.erase(subscription_itr); delete subscription_entry; } else{ subscription_itr++; } } } else{ DiffPrint(DEBUG_DETAILS, "Warning: Could't find subscription entry - maybe deleted by GradientTimeout ?\n"); // Cancel Timer return -1; } // Keep Timer return 0;}void OnePhasePullFilter::deleteRoutingEntry(RoutingEntry *routing_entry){ RoutingTable::iterator routing_itr; RoutingEntry *current_entry; // Go through the routing table for (routing_itr = routing_list_.begin(); routing_itr != routing_list_.end(); ++routing_itr){ current_entry = *routing_itr; // Is this the entry we are looking for ? if (current_entry == routing_entry){ routing_itr = routing_list_.erase(routing_itr); delete routing_entry; return; } } DiffPrint(DEBUG_ALWAYS, "Error: Could not find entry to delete !\n");}RoutingEntry * OnePhasePullFilter::matchRoutingEntry(NRAttrVec *attrs, RoutingTable::iterator start, RoutingTable::iterator *place){ RoutingTable::iterator routing_itr; RoutingEntry *routing_entry; for (routing_itr = start; routing_itr != routing_list_.end(); ++routing_itr){ routing_entry = *routing_itr; if (MatchAttrs(routing_entry->attrs_, attrs)){ *place = routing_itr; return routing_entry; } } return NULL;}RoutingEntry * OnePhasePullFilter::findRoutingEntry(NRAttrVec *attrs){ RoutingTable::iterator routing_itr; RoutingEntry *routing_entry; for (routing_itr = routing_list_.begin(); routing_itr != routing_list_.end(); ++routing_itr){ routing_entry = *routing_itr; if (PerfectMatch(routing_entry->attrs_, attrs)) return routing_entry; } return NULL;}SubscriptionEntry * OnePhasePullFilter::findMatchingSubscription(RoutingEntry *routing_entry, NRAttrVec *attrs){ SubscriptionList::iterator subscription_itr; SubscriptionEntry *subscription_entry; for (subscription_itr = routing_entry->subscription_list_.begin(); subscription_itr != routing_entry->subscription_list_.end(); ++subscription_itr){ subscription_entry = *subscription_itr; if (PerfectMatch(subscription_entry->attrs_, attrs)) return subscription_entry; } return NULL;}void OnePhasePullFilter::forwardData(Message *msg, RoutingEntry *routing_entry, DataForwardingHistory *forwarding_history){ NRSimpleAttribute<void *> *nr_data_attr = NULL; NRAttrVec::iterator attribute_iterator; FlowIdList msg_flow_list, sinks_flow_list, local_flow_list; FlowIdList out_flow_list; int32_t out_neighbor; int *packed_flows; FlowIdList::iterator flow_id_itr; RoundIdList::iterator round_id_itr; SinkList::iterator sink_itr; RoundIdEntry *round_id_entry; SinkEntry *sink_entry; Message *sink_message, *out_message; // Step 0: Read flows from message // Find NRFlowAttr and remove from the message attribute_iterator = msg->msg_attr_vec_->begin(); nr_data_attr = NRFlowAttr.find_from(msg->msg_attr_vec_, attribute_iterator, &attribute_iterator); if (!nr_data_attr){ DiffPrint(DEBUG_ALWAYS, "Cannot find NRFlowAttr !\n"); return; } msg->msg_attr_vec_->erase(attribute_iterator); // Read flow ids from list readFlowsFromList(nr_data_attr->getLen() / sizeof(int), &msg_flow_list, nr_data_attr->getVal()); // Fill lists of sinks and flows routing_entry->getSinksFromList(&msg_flow_list, &sinks_flow_list); routing_entry->getFlowsFromList(&msg_flow_list, &local_flow_list); // Step 1: Sink Processing if (sinks_flow_list.size() > 0){ // Copy original message so we can change it sink_message = CopyMessage(msg); // Go through all rounds for (round_id_itr = routing_entry->round_ids_.begin(); round_id_itr != routing_entry->round_ids_.end(); round_id_itr++){ round_id_entry = *round_id_itr; flow_id_itr = find(sinks_flow_list.begin(), sinks_flow_list.end(), round_id_entry->round_id_); if (flow_id_itr != sinks_flow_list.end()){ // Flows match ! Send message to sink for (sink_itr = round_id_entry->sinks_.begin(); sink_itr != round_id_entry->sinks_.end(); ++sink_itr){ sink_entry = *sink_itr; if (!forwarding_history->alreadyForwardedToLibrary(sink_entry->port_)){ // Forward DATA to local sinks sink_message->next_hop_ = LOCALHOST_ADDR; sink_message->next_port_ = sink_entry->port_; // Add sink to the forwarding list forwarding_history->forwardingToLibrary(sink_entry->port_); ((DiffusionRouting *)dr_)->sendMessage(sink_message, filter_handle_); } } // Remove sink from the flow_list if (!removeFlowFromList(&msg_flow_list, round_id_entry->round_id_)){ // We should not get here DiffPrint(DEBUG_ALWAYS, "Cannot remove flow from msg_flow_list !\n"); } } } // Delete sink message delete sink_message; } // Step 2: Intermediate Processing DiffPrint(DEBUG_NO_DETAILS, "Node%d: Forwarding Data\n", ((DiffusionRouting *)dr_)->getNodeId()); // Set reinforcement flags if (msg->last_hop_ != LOCALHOST_ADDR) routing_entry->updateNeighborDataInfo(msg->last_hop_, true); // Work on local list until we finish processing all flows while (local_flow_list.size() > 0){ // Initialize out_flow_list out_flow_list.clear(); // Move first flow from the local flow list to out_flow_list out_flow_list.push_back(*(local_flow_list.begin())); local_flow_list.erase(local_flow_list.begin()); // Remove flow from the flow_list if (!removeFlowFromList(&msg_flow_list, *(out_flow_list.begin()))){ // We should not get here DiffPrint(DEBUG_ALWAYS, "Cannot remove flow from msg_flow_list !\n"); } // Select output_neighbor out_neighbor = routing_entry->getNeighborFromFlow(*(out_flow_list.begin())); // Must have a valid neighbor if (out_neighbor == BROADCAST_ADDR) continue; // Go through all other local flows for (flow_id_itr = local_flow_list.begin(); flow_id_itr != local_flow_list.end(); flow_id_itr++){ // Check if output neighbor for this flow matches current if (routing_entry->getNeighborFromFlow(*flow_id_itr) == out_neighbor){ // Yes it does ! // Remove flow from the flow_list if (!removeFlowFromList(&msg_flow_list, *flow_id_itr)){ // We should not get here DiffPrint(DEBUG_ALWAYS, "Cannot remove flow from msg_flow_list !\n"); } // Aggregate both in a single message out_flow_list.push_back(*flow_id_itr); flow_id_itr = local_flow_list.erase(flow_id_itr); } } // out_flow_list should have a list of flow for out_neighbor out_message = CopyMessage(msg); out_message->next_hop_ = out_neighbor; packed_flows = writeFlowsToList(&out_flow_list); out_message->msg_attr_vec_->push_back(NRFlowAttr.make(NRAttribute::IS, (void *) packed_flows, sizeof(int) * out_flow_list.size())); // NRFlowAttr.make will copy this, so we must delete it delete [] packed_flows; // Send it out DiffPrint(DEBUG_NO_DETAILS, "Forwarding data to node %d !\n", out_neighbor); ((DiffusionRouting *)dr_)->sendMessage(out_message, filter_handle_); // Delete message delete out_message; } // Done processing for this data type, we replace the NRFlowAttr // with the (possibly) shorter msg_flow_list list packed_flows = writeFlowsToList(&msg_flow_list); nr_data_attr->setVal((void *) packed_flows, sizeof(int) * msg_flow_list.size()); msg->msg_attr_vec_->push_back(nr_data_attr); // setVal makes a copy of this, so we must delete it delete [] packed_flows;}void OnePhasePullFilter::sendInterest(NRAttrVec *attrs, RoutingEntry *routing_entry){ RoundIdList::iterator round_id_itr; RoundIdEntry *round_id_entry; SinkList::iterator sink_itr; SinkEntry *sink_entry; Message *msg = new Message(DIFFUSION_VERSION, INTEREST, 0, 0, attrs->size(), 0, 0, LOCALHOST_ADDR, LOCALHOST_ADDR); msg->msg_attr_vec_ = CopyAttrs(attrs); // Go through all round ids for (round_id_itr = routing_entry->round_ids_.begin(); round_id_itr != routing_entry->round_ids_.end(); round_id_itr++){ round_id_entry = *round_id_itr; // Send interest message to all local sinks for (sink_itr = round_id_entry->sinks_.begin(); sink_itr != round_id_entry->sinks_.end(); ++sink_itr){ sink_entry = *sink_itr; msg->next_port_ = sink_entry->port_; ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_); } } delete msg;}void OnePhasePullFilter::sendDisinterest(NRAttrVec *attrs, RoutingEntry *routing_entry){ NRAttrVec *new_attrs; NRSimpleAttribute<int> *nrclass = NULL; new_attrs = CopyAttrs(attrs); nrclass = NRClassAttr.find(new_attrs); if (!nrclass){ DiffPrint(DEBUG_ALWAYS, "Error: sendDisinterest couldn't find the class attribute !\n"); ClearAttrs(new_attrs); delete new_attrs; return; } // Change the class_key value nrclass->setVal(NRAttribute::DISINTEREST_CLASS); sendInterest(new_attrs, routing_entry); ClearAttrs(new_attrs); delete new_attrs;}void OnePhasePullFilter::readFlowsFromList(int number_of_flows, FlowIdList *flow_list, void *source_blob){ int *current_flow; // Point to the beginning of the list current_flow = (int *) source_blob; for (int i = 0; i < number_of_flows; i++){ flow_list->push_back(*current_flow); // Advance to next flow current_flow++; }}int * OnePhasePullFilter::writeFlowsToList(FlowIdList *flow_list){ FlowIdList::iterator flow_itr; int number_of_flows; int *flows, *current;; number_of_flows = flow_list->size(); flows = new int[number_of_flows]; current = flows; for (flow_itr = flow_list->begin(); flow_itr != flow_list->end(); flow_itr++){ *current = *flow_itr; current++; } return flows;}bool OnePhasePullFilter::removeFlowFromList(FlowIdList *flow_list, int32_t flow){ FlowIdList::iterator flow_itr; flow_itr = find(flow_list->begin(), flow_list->end(), flow); if (flow_itr != flow_list->end()){ flow_itr = flow_list->erase(flow_itr);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -