📄 one_phase_pull.cc
字号:
return true; } return false;}void OnePhasePullFilter::addLocalFlowsToMessage(Message *msg){ RoutingTable::iterator routing_itr; RoundIdList::iterator round_id_itr; RoutingEntry *routing_entry; RoundIdEntry *round_id_entry; FlowIdList local_flows; int *packed_flows; // First we loop through our routing entries for (routing_itr = routing_list_.begin(); routing_itr != routing_list_.end(); routing_itr++){ routing_entry = *routing_itr; // Now go through each round for (round_id_itr = routing_entry->round_ids_.begin(); round_id_itr != routing_entry->round_ids_.end(); round_id_itr++){ round_id_entry = *round_id_itr; local_flows.push_back(round_id_entry->round_id_); } } packed_flows = writeFlowsToList(&local_flows); msg->msg_attr_vec_->push_back(NRFlowAttr.make(NRAttribute::IS, (void *) packed_flows, sizeof(int) * local_flows.size())); // NRFlowAttr.make will copy this, so we must delete it here delete [] packed_flows; local_flows.clear();}void OnePhasePullFilter::recv(Message *msg, handle h){ if (h != filter_handle_){ DiffPrint(DEBUG_ALWAYS, "Error: received msg for handle %d, subscribed to handle %d !\n", h, filter_handle_); return; } if (msg->new_message_ == 1) processNewMessage(msg); else processOldMessage(msg);}void OnePhasePullFilter::processOldMessage(Message *msg){ NRSimpleAttribute<int> *nrsubscription = NULL; NRAttrVec::iterator attribute_iterator; RoutingTable::iterator routing_itr; RoutingEntry *routing_entry; int32_t round_id; switch (msg->msg_type_){ case INTEREST: DiffPrint(DEBUG_NO_DETAILS, "Node%d: Received Old Interest !\n",((DiffusionRouting *)dr_)->getNodeId()); if (msg->last_hop_ == LOCALHOST_ADDR){ // Old interest should not come from local sink DiffPrint(DEBUG_ALWAYS, "Warning: Old Interest from local sink !\n"); break; } // Step 0: Take out the subscription attribute attribute_iterator = msg->msg_attr_vec_->begin(); nrsubscription = NRSubscriptionAttr.find_from(msg->msg_attr_vec_, attribute_iterator, &attribute_iterator); // Return if we cannot find a subscription attribute if (!nrsubscription){ DiffPrint(DEBUG_ALWAYS, "Warning: Can't find SUBSCRIPTION attribute in the message !\n"); return; } // Delete attribute from the message msg->msg_attr_vec_->erase(attribute_iterator); // Get the routing entry for these attrs routing_entry = findRoutingEntry(msg->msg_attr_vec_); if (routing_entry){ // Use subscription id for identifying this flow round_id = nrsubscription->getVal(); // Add gradient to the current round entry routing_entry->addGradient(msg->last_hop_, round_id, false); } // Add the subscription attribute back to the message msg->msg_attr_vec_->push_back(nrsubscription); break; case EXPLORATORY_DATA: case PUSH_EXPLORATORY_DATA: DiffPrint(DEBUG_ALWAYS, "Received and OLD EXPLORATORY message !\n"); break; case DATA: DiffPrint(DEBUG_NO_DETAILS, "Received an old Data message !\n"); // Find the correct routing entry routing_itr = routing_list_.begin(); routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr, &routing_itr); while (routing_entry){ DiffPrint(DEBUG_NO_DETAILS, "Set flags to %d to OLD_MESSAGE !\n", msg->last_hop_); // Set reinforcement flags if (msg->last_hop_ != LOCALHOST_ADDR) routing_entry->updateNeighborDataInfo(msg->last_hop_, false); // Continue going through other data types routing_itr++; routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr, &routing_itr); } break; case NEGATIVE_REINFORCEMENT: DiffPrint(DEBUG_IMPORTANT, "Received an old Negative Reinforcement !\n"); break; default: DiffPrint(DEBUG_ALWAYS, "Received an unknown message type: %d\n", msg->msg_type_); break; }}void OnePhasePullFilter::processNewMessage(Message *msg){ DataForwardingHistory *forwarding_history; NRSimpleAttribute<int> *nrclass = NULL; NRSimpleAttribute<int> *nrscope = NULL; NRSimpleAttribute<int> *nrsubscription = NULL; RoundIdList::iterator round_id_itr; RoutingTable::iterator routing_itr; NRAttrVec::iterator attribute_iterator; RoundIdEntry *round_id_entry; RoutingEntry *routing_entry; SubscriptionEntry *subscription_entry; Message *my_msg; TimerCallback *interest_timer, *subscription_timer; bool new_data_type = false; int32_t round_id; switch (msg->msg_type_){ case INTEREST: DiffPrint(DEBUG_NO_DETAILS, "Received Interest !\n"); nrclass = NRClassAttr.find(msg->msg_attr_vec_); nrscope = NRScopeAttr.find(msg->msg_attr_vec_); if (!nrclass || !nrscope){ DiffPrint(DEBUG_ALWAYS, "Warning: Can't find CLASS/SCOPE attributes in the message !\n"); return; } // Step 0: Take out the subscription attribute attribute_iterator = msg->msg_attr_vec_->begin(); nrsubscription = NRSubscriptionAttr.find_from(msg->msg_attr_vec_, attribute_iterator, &attribute_iterator); // Return if we cannot find a subscription attribute if (!nrsubscription){ DiffPrint(DEBUG_ALWAYS, "Warning: Can't find SUBSCRIPTION attribute in the message !\n"); return; } // Delete attribute from the message msg->msg_attr_vec_->erase(attribute_iterator); // Step 1: Look for the same data type routing_entry = findRoutingEntry(msg->msg_attr_vec_); if (!routing_entry){ // Create a new routing entry for this data type routing_entry = new RoutingEntry; routing_entry->attrs_ = CopyAttrs(msg->msg_attr_vec_); routing_list_.push_back(routing_entry); new_data_type = true; } // Add the subscription attribute back to the message msg->msg_attr_vec_->push_back(nrsubscription); // Use subscription id for identifying this flow round_id = nrsubscription->getVal(); if (msg->last_hop_ == LOCALHOST_ADDR){ // From local sink routing_entry->updateSink(msg->source_port_, round_id); } else{ // Interest received from the network. Add gradient to our // last_hop neighbor // Add gradient to the current round entry routing_entry->addGradient(msg->last_hop_, round_id, true); } if ((nrclass->getVal() == NRAttribute::INTEREST_CLASS) && (nrclass->getOp() == NRAttribute::IS)){ // Global interest messages should always be forwarded if (nrscope->getVal() == NRAttribute::GLOBAL_SCOPE){ interest_timer = new OppInterestForwardTimer(this, CopyMessage(msg)); ((DiffusionRouting *)dr_)->addTimer(INTEREST_FORWARD_DELAY + (int) ((INTEREST_FORWARD_JITTER * (GetRand() * 1.0 / RAND_MAX) - (INTEREST_FORWARD_JITTER / 2))), interest_timer); } } else{ if ((nrclass->getOp() != NRAttribute::IS) && (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE) && (new_data_type)){ subscription_timer = new OppSubscriptionExpirationTimer(this, CopyAttrs(msg->msg_attr_vec_)); ((DiffusionRouting *)dr_)->addTimer(SUBSCRIPTION_DELAY + (int) (SUBSCRIPTION_DELAY * (GetRand() * 1.0 / RAND_MAX)), subscription_timer); } // Subscriptions don't have to match other subscriptions break; } // Step 2: Match interest against other subscriptions routing_itr = routing_list_.begin(); routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr, &routing_itr); while (routing_entry){ // Got a match subscription_entry = findMatchingSubscription(routing_entry, msg->msg_attr_vec_); // Do we already have this subscription if (subscription_entry){ GetTime(&(subscription_entry->tv_)); } else{ // Create a new attribute entry, add it to the attribute list // and send an interest message to the local sink subscription_entry = new SubscriptionEntry(CopyAttrs(msg->msg_attr_vec_)); routing_entry->subscription_list_.push_back(subscription_entry); sendInterest(subscription_entry->attrs_, routing_entry); } // Move to the next RoutingEntry routing_itr++; routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr, &routing_itr); } break; case EXPLORATORY_DATA: case PUSH_EXPLORATORY_DATA: DiffPrint(DEBUG_ALWAYS, "Node%d: Received EXPLORATORY Message !\n",((DiffusionRouting *)dr_)->getNodeId()); break; case DATA: DiffPrint(DEBUG_NO_DETAILS, "Node%d: Received Data !\n",((DiffusionRouting *)dr_)->getNodeId()); // Create data message forwarding cache forwarding_history = new DataForwardingHistory; // If message comes from local source, we include our local flows if (msg->last_hop_ == LOCALHOST_ADDR){ // From local source addLocalFlowsToMessage(msg); } // Find the correct routing entry routing_itr = routing_list_.begin(); routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr, &routing_itr); while (routing_entry){ forwardData(msg, routing_entry, forwarding_history); routing_itr++; routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr, &routing_itr); } delete forwarding_history; break; case NEGATIVE_REINFORCEMENT: DiffPrint(DEBUG_NO_DETAILS, "Received a Negative Reinforcement !\n"); // Find matching routing entry routing_entry = findRoutingEntry(msg->msg_attr_vec_); if (routing_entry){ // Go through all round ids for (round_id_itr = routing_entry->round_ids_.begin(); round_id_itr != routing_entry->round_ids_.end(); round_id_itr++){ round_id_entry = *round_id_itr; // Delete gradient to last hop round_id_entry->deleteGradient(msg->last_hop_); // Delete round id entry if nothing left if (round_id_entry->gradients_.size() == 0){ round_id_itr = routing_entry->round_ids_.erase(round_id_itr); delete round_id_entry; } } // If there are no other gradients we need to send our own // negative reinforcement if (routing_entry->round_ids_.size() == 0){ my_msg = new Message(DIFFUSION_VERSION, NEGATIVE_REINFORCEMENT, 0, 0, routing_entry->attrs_->size(), pkt_count_, random_id_, BROADCAST_ADDR, LOCALHOST_ADDR); my_msg->msg_attr_vec_ = CopyAttrs(routing_entry->attrs_); DiffPrint(DEBUG_NO_DETAILS, "Broadcasting Negative Reinforcement !\n"); ((DiffusionRouting *)dr_)->sendMessage(my_msg, filter_handle_); pkt_count_++; delete my_msg; } } break; default: break; }}handle OnePhasePullFilter::setupFilter(){ NRAttrVec attrs; handle h; // For the One-Phase Pull filter, we set up a filter to receive // messages using this protocol attrs.push_back(NRAlgorithmAttr.make(NRAttribute::EQ, NRAttribute::ONE_PHASE_PULL_ALGORITHM)); h = ((DiffusionRouting *)dr_)->addFilter(&attrs, ONE_PHASE_PULL_FILTER_PRIORITY, filter_callback_); ClearAttrs(&attrs); return h;}#ifndef NS_DIFFUSIONvoid OnePhasePullFilter::run(){ // Doesn't do anything while (1){ sleep(1000); }}#endif // !NS_DIFFUSION#ifdef NS_DIFFUSIONOnePhasePullFilter::OnePhasePullFilter(const char *diffrtg){ DiffAppAgent *agent;#elseOnePhasePullFilter::OnePhasePullFilter(int argc, char **argv){#endif // NS_DIFFUSION struct timeval tv; TimerCallback *reinforcement_timer, *gradient_timer; GetTime(&tv); SetSeed(&tv); pkt_count_ = GetRand(); random_id_ = GetRand(); // Create Diffusion Routing class#ifdef NS_DIFFUSION agent = (DiffAppAgent *)TclObject::lookup(diffrtg); dr_ = agent->dr();#else parseCommandLine(argc, argv); dr_ = NR::createNR(diffusion_port_);#endif // NS_DIFFUSION // Create callback classes and set up pointers filter_callback_ = new OnePhasePullFilterReceive(this); // Set up the filter filter_handle_ = setupFilter(); // Print filter information DiffPrint(DEBUG_IMPORTANT, "One-Phase Pull filter received handle %d\n", filter_handle_); // Add timers for keeping state up-to-date gradient_timer = new OppGradientExpirationCheckTimer(this); ((DiffusionRouting *)dr_)->addTimer(GRADIENT_DELAY, gradient_timer); reinforcement_timer = new OppReinforcementCheckTimer(this); ((DiffusionRouting *)dr_)->addTimer(REINFORCEMENT_DELAY, reinforcement_timer); GetTime(&tv); DiffPrint(DEBUG_ALWAYS, "One-Phase Pull filter initialized at time %ld:%ld!\n", tv.tv_sec, tv.tv_usec);}#ifndef USE_SINGLE_ADDRESS_SPACEint main(int argc, char **argv){ OnePhasePullFilter *app; // Initialize and run the Gradient Filter app = new OnePhasePullFilter(argc, argv); app->run(); return 0;}#endif // !USE_SINGLE_ADDRESS_SPACE
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -