📄 two_phase_pull.cc
字号:
//// two_phase_pull.cc : Two-Phase Pull/One-Phase Push Filter// author : Fabio Silva and Chalermek Intanagonwiwat//// Copyright (C) 2000-2003 by the University of Southern California// $Id: two_phase_pull.cc,v 1.5 2004/01/08 23:05:53 haldar Exp $//// This program is free software; you can redistribute it and/or// modify it under the terms of the GNU General Public License,// version 2, as published by the Free Software Foundation.//// This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the// GNU General Public License for more details.//// You should have received a copy of the GNU General Public License along// with this program; if not, write to the Free Software Foundation, Inc.,// 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.////#include "two_phase_pull.hh"#ifdef NS_DIFFUSIONstatic class GradientFilterClass : public TclClass {public: GradientFilterClass() : TclClass("Application/DiffApp/GradientFilter") {} TclObject* create(int argc, const char*const* argv) { if (argc == 5) return(new GradientFilter(argv[4])); else fprintf(stderr, "Insufficient number of args for creating GradientFilter"); return (NULL); }} class_gradient_filter;int GradientFilter::command(int argc, const char*const* argv) { if (argc == 3) { if (strcasecmp(argv[1], "debug") == 0) { global_debug_level = atoi(argv[2]); if (global_debug_level < 1 || global_debug_level > 10) { global_debug_level = DEBUG_DEFAULT; printf("Error: Debug level outside range(1-10) or missing !\n"); } } } return DiffApp::command(argc, argv);}#endif // NS_DIFFUSIONvoid GradientFilterReceive::recv(Message *msg, handle h){ app_->recv(msg, h);}int TppMessageSendTimer::expire(){ // Call timeout function agent_->messageTimeout(msg_); // Do not reschedule this timer delete this; return -1;}int TppInterestForwardTimer::expire(){ // Call timeout function agent_->interestTimeout(msg_); // Do not reschedule this timer delete this; return -1;}int TppSubscriptionExpirationTimer::expire(){ int retval; retval = agent_->subscriptionTimeout(attrs_); // Delete timer if we are not rescheduling it if (retval == -1) delete this; return retval;}int TppGradientExpirationCheckTimer::expire(){ // Call the callback function agent_->gradientTimeout(); // Reschedule this timer return 0;}int TppReinforcementCheckTimer::expire(){ // Call the callback function agent_->reinforcementTimeout(); // Reschedule this timer return 0;}void GradientFilter::interestTimeout(Message *msg){ DiffPrint(DEBUG_MORE_DETAILS, "Interest Timeout !\n"); msg->last_hop_ = LOCALHOST_ADDR; msg->next_hop_ = BROADCAST_ADDR; ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);}void GradientFilter::messageTimeout(Message *msg){ DiffPrint(DEBUG_MORE_DETAILS, "Message Timeout !\n"); ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);}void GradientFilter::gradientTimeout(){ RoutingTable::iterator routing_itr; GradientList::iterator grad_itr; AgentList::iterator agent_itr; TppRoutingEntry *routing_entry; GradientEntry *gradient_entry; AgentEntry *agent_entry; struct timeval tmv; DiffPrint(DEBUG_MORE_DETAILS, "Gradient Timeout !\n"); GetTime(&tmv); routing_itr = routing_list_.begin(); while (routing_itr != routing_list_.end()){ routing_entry = *routing_itr; // Step 1: Delete expired gradients grad_itr = routing_entry->gradients_.begin(); while (grad_itr != routing_entry->gradients_.end()){ gradient_entry = *grad_itr; if (tmv.tv_sec > (gradient_entry->tv_.tv_sec + GRADIENT_TIMEOUT)){ DiffPrint(DEBUG_NO_DETAILS, "Deleting Gradient to node %d !\n", gradient_entry->node_addr_); grad_itr = routing_entry->gradients_.erase(grad_itr); delete gradient_entry; } else{ grad_itr++; } } // Step 2: Remove non-active agents agent_itr = routing_entry->agents_.begin(); while (agent_itr != routing_entry->agents_.end()){ agent_entry = *agent_itr; if (tmv.tv_sec > (agent_entry->tv_.tv_sec + GRADIENT_TIMEOUT)){ DiffPrint(DEBUG_NO_DETAILS, "Deleting Gradient to agent %d !\n", agent_entry->port_); agent_itr = routing_entry->agents_.erase(agent_itr); delete agent_entry; } else{ agent_itr++; } } // Remove the Routing Entry if no gradients and no agents if ((routing_entry->gradients_.size() == 0) && (routing_entry->agents_.size() == 0)){ // Deleting Routing Entry DiffPrint(DEBUG_DETAILS, "Nothing left for this data type, cleaning up !\n"); routing_itr = routing_list_.erase(routing_itr); delete routing_entry; } else{ routing_itr++; } }}void GradientFilter::reinforcementTimeout(){ DataNeighborList::iterator data_neighbor_itr; DataNeighborEntry *data_neighbor_entry; RoutingTable::iterator routing_itr; TppRoutingEntry *routing_entry; Message *my_message; DiffPrint(DEBUG_MORE_DETAILS, "Reinforcement Timeout !\n"); routing_itr = routing_list_.begin(); while (routing_itr != routing_list_.end()){ routing_entry = *routing_itr; // Step 1: Delete expired gradients data_neighbor_itr = routing_entry->data_neighbors_.begin(); while (data_neighbor_itr != routing_entry->data_neighbors_.end()){ data_neighbor_entry = *data_neighbor_itr; if (data_neighbor_entry->data_flag_ == OLD_MESSAGE){ my_message = new Message(DIFFUSION_VERSION, NEGATIVE_REINFORCEMENT, 0, 0, routing_entry->attrs_->size(), pkt_count_, random_id_, data_neighbor_entry->neighbor_id_, LOCALHOST_ADDR); my_message->msg_attr_vec_ = CopyAttrs(routing_entry->attrs_); DiffPrint(DEBUG_NO_DETAILS, "Sending Negative Reinforcement to node %d !\n", data_neighbor_entry->neighbor_id_); ((DiffusionRouting *)dr_)->sendMessage(my_message, filter_handle_); pkt_count_++; delete my_message; // Done. Delete entry data_neighbor_itr = routing_entry->data_neighbors_.erase(data_neighbor_itr); delete data_neighbor_entry; } else{ data_neighbor_itr++; } } // Step 2: Delete data neighbors with no activity, zero flags data_neighbor_itr = routing_entry->data_neighbors_.begin(); while (data_neighbor_itr != routing_entry->data_neighbors_.end()){ data_neighbor_entry = *data_neighbor_itr; if (data_neighbor_entry->data_flag_ == NEW_MESSAGE){ data_neighbor_entry->data_flag_ = 0; data_neighbor_itr++; } else{ // Delete entry data_neighbor_itr = routing_entry->data_neighbors_.erase(data_neighbor_itr); delete data_neighbor_entry; } } // Advance to the next routing entry routing_itr++; }}int GradientFilter::subscriptionTimeout(NRAttrVec *attrs){ AttributeList::iterator attribute_itr; AttributeEntry *attribute_entry; TppRoutingEntry *routing_entry; struct timeval tmv; DiffPrint(DEBUG_MORE_DETAILS, "Subscription Timeout !\n"); GetTime(&tmv); // Find the correct Routing Entry routing_entry = findRoutingEntry(attrs); if (routing_entry){ // Step 1: Check Timeouts attribute_itr = routing_entry->attr_list_.begin(); while (attribute_itr != routing_entry->attr_list_.end()){ attribute_entry = *attribute_itr; if (tmv.tv_sec > (attribute_entry->tv_.tv_sec + SUBSCRIPTION_TIMEOUT)){ sendDisinterest(attribute_entry->attrs_, routing_entry); attribute_itr = routing_entry->attr_list_.erase(attribute_itr); delete attribute_entry; } else{ attribute_itr++; } } } else{ DiffPrint(DEBUG_DETAILS, "Warning: SubscriptionTimeout could't find RE - maybe deleted by GradientTimeout ?\n"); // Cancel Timer return -1; } // Keep Timer return 0;}void GradientFilter::deleteRoutingEntry(TppRoutingEntry *routing_entry){ RoutingTable::iterator routing_itr; TppRoutingEntry *current_entry; for (routing_itr = routing_list_.begin(); routing_itr != routing_list_.end(); ++routing_itr){ current_entry = *routing_itr; if (current_entry == routing_entry){ routing_itr = routing_list_.erase(routing_itr); delete routing_entry; return; } } DiffPrint(DEBUG_ALWAYS, "Error: deleteRoutingEntry could not find entry to delete !\n");}TppRoutingEntry * GradientFilter::matchRoutingEntry(NRAttrVec *attrs, RoutingTable::iterator start, RoutingTable::iterator *place){ RoutingTable::iterator routing_itr; TppRoutingEntry *routing_entry; for (routing_itr = start; routing_itr != routing_list_.end(); ++routing_itr){ routing_entry = *routing_itr; if (MatchAttrs(routing_entry->attrs_, attrs)){ *place = routing_itr; return routing_entry; } } return NULL;}TppRoutingEntry * GradientFilter::findRoutingEntry(NRAttrVec *attrs){ RoutingTable::iterator routing_itr; TppRoutingEntry *routing_entry; for (routing_itr = routing_list_.begin(); routing_itr != routing_list_.end(); ++routing_itr){ routing_entry = *routing_itr; if (PerfectMatch(routing_entry->attrs_, attrs)) return routing_entry; } return NULL;}AttributeEntry * GradientFilter::findMatchingSubscription(TppRoutingEntry *routing_entry, NRAttrVec *attrs){ AttributeList::iterator attribute_itr; AttributeEntry *attribute_entry; for (attribute_itr = routing_entry->attr_list_.begin(); attribute_itr != routing_entry->attr_list_.end(); ++attribute_itr){ attribute_entry = *attribute_itr; if (PerfectMatch(attribute_entry->attrs_, attrs)) return attribute_entry; } return NULL;}void GradientFilter::updateGradient(TppRoutingEntry *routing_entry, int32_t last_hop, bool reinforced){ GradientList::iterator gradient_itr; GradientEntry *gradient_entry; for (gradient_itr = routing_entry->gradients_.begin(); gradient_itr != routing_entry->gradients_.end(); ++gradient_itr){ gradient_entry = *gradient_itr; if (gradient_entry->node_addr_ == last_hop){ GetTime(&(gradient_entry->tv_)); if (reinforced) gradient_entry->reinforced_ = true; return; } } // We need to add a new gradient gradient_entry = new GradientEntry(last_hop); if (reinforced) gradient_entry->reinforced_ = true; routing_entry->gradients_.push_back(gradient_entry);}void GradientFilter::updateAgent(TppRoutingEntry *routing_entry, u_int16_t source_port){ AgentList::iterator agent_itr; AgentEntry *agent_entry; for (agent_itr = routing_entry->agents_.begin(); agent_itr != routing_entry->agents_.end(); ++agent_itr){ agent_entry = *agent_itr; if (agent_entry->port_ == source_port){ // We already have this guy GetTime(&(agent_entry->tv_)); return; } } // This is a new agent, so we create a new entry and add it to the // list of known agents agent_entry = new AgentEntry(source_port); routing_entry->agents_.push_back(agent_entry);}void GradientFilter::forwardPushExploratoryData(Message *msg, DataForwardingHistory *forwarding_history){ RoutingTable::iterator routing_itr; TppRoutingEntry *routing_entry; AgentList::iterator agent_itr; AgentEntry *agent_entry; Message *data_msg, *sink_message; TimerCallback *data_timer; unsigned int key[2]; HashEntry *hash_entry; // Sink processing routing_itr = routing_list_.begin(); routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr, &routing_itr); sink_message = CopyMessage(msg); while (routing_entry){ // Forward message to all local sinks for (agent_itr = routing_entry->agents_.begin(); agent_itr != routing_entry->agents_.end(); ++agent_itr){ agent_entry = *agent_itr; if (!forwarding_history->alreadyForwardedToLibrary(agent_entry->port_)){ // Send DATA message to local sinks sink_message->next_hop_ = LOCALHOST_ADDR; sink_message->next_port_ = agent_entry->port_; ((DiffusionRouting *)dr_)->sendMessage(sink_message, filter_handle_); // Add agent to the forwarding history forwarding_history->forwardingToLibrary(agent_entry->port_); } } if ((!forwarding_history->alreadyReinforced()) && (routing_entry->agents_.size() > 0) && (msg->last_hop_ != LOCALHOST_ADDR)){ // Send a positive reinforcement if we have sinks sendPositiveReinforcement(routing_entry->attrs_, msg->rdm_id_, msg->pkt_num_, msg->last_hop_); // Record reinforcement in the forwarding history so we do it // only once per received data message forwarding_history->sendingReinforcement(); } // Look for other matching data types routing_itr++; routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr, &routing_itr); } // Delete sink_message after sink processing delete sink_message; // Intermediate node processing // Add message information to the hash table if (msg->last_hop_ != LOCALHOST_ADDR){ key[0] = msg->pkt_num_; key[1] = msg->rdm_id_;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -