📄 one_phase_pull.cc
字号:
//// one_phase_pull.cc : One-Phase Pull Filter// author : Fabio Silva//// Copyright (C) 2000-2003 by the University of Southern California// $Id: one_phase_pull.cc,v 1.5 2004/01/08 23:05:53 haldar Exp $//// This program is free software; you can redistribute it and/or// modify it under the terms of the GNU General Public License,// version 2, as published by the Free Software Foundation.//// This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the// GNU General Public License for more details.//// You should have received a copy of the GNU General Public License along// with this program; if not, write to the Free Software Foundation, Inc.,// 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.////#include "one_phase_pull.hh"#ifdef NS_DIFFUSIONstatic class OnePhasePullFilterClass : public TclClass {public: OnePhasePullFilterClass() : TclClass("Application/DiffApp/OnePhasePullFilter") {} TclObject* create(int argc, const char*const* argv) { if (argc == 5) return(new OnePhasePullFilter(argv[4])); else fprintf(stderr, "Insufficient number of args for creating OnePhasePullFilter"); return (NULL); }} class_one_phase_pull_filter;int OnePhasePullFilter::command(int argc, const char*const* argv) { if (argc == 3) { if (strcasecmp(argv[1], "debug") == 0) { global_debug_level = atoi(argv[2]); if (global_debug_level < 1 || global_debug_level > 10) { global_debug_level = DEBUG_DEFAULT; printf("Error: Debug level outside range(1-10) or missing !\n"); } } } return DiffApp::command(argc, argv);}#endif // NS_DIFFUSIONvoid OnePhasePullFilterReceive::recv(Message *msg, handle h){ filter_->recv(msg, h);}int OppMessageSendTimer::expire(){ // Call timeout function agent_->messageTimeout(msg_); // Do not reschedule this timer delete this; return -1;}int OppInterestForwardTimer::expire(){ // Call timeout function agent_->interestTimeout(msg_); // Do not reschedule this timer delete this; return -1;}int OppSubscriptionExpirationTimer::expire(){ int retval; retval = agent_->subscriptionTimeout(attrs_); // Delete timer if we are not rescheduling it if (retval == -1) delete this; return retval;}int OppGradientExpirationCheckTimer::expire(){ // Call the callback function agent_->gradientTimeout(); // Reschedule this timer return 0;}int OppReinforcementCheckTimer::expire(){ // Call the callback function agent_->reinforcementTimeout(); // Reschedule this timer return 0;}RoundIdEntry * RoutingEntry::findRoundIdEntry(int32_t round_id){ RoundIdList::iterator round_id_itr; RoundIdEntry *round_id_entry; // Iterate through round ids for this routing entry for (round_id_itr = round_ids_.begin(); round_id_itr != round_ids_.end(); round_id_itr++){ round_id_entry = *round_id_itr; // Check if round ids match if (round_id_entry->round_id_ == round_id) return round_id_entry; } // Couldn't find a matching round id entry return NULL;}RoundIdEntry * RoutingEntry::addRoundIdEntry(int32_t round_id){ RoundIdEntry *round_id_entry; // Create a new round id entry round_id_entry = new RoundIdEntry(round_id); // Add it to the round id list round_ids_.push_back(round_id_entry); return round_id_entry;}void RoutingEntry::updateNeighborDataInfo(int32_t node_id, bool new_message){ DataNeighborList::iterator data_neighbor_itr; OPPDataNeighborEntry *data_neighbor_entry; for (data_neighbor_itr = data_neighbors_.begin(); data_neighbor_itr != data_neighbors_.end(); ++data_neighbor_itr){ data_neighbor_entry = *data_neighbor_itr; // Find neighbor if (data_neighbor_entry->node_id_ == node_id){ // Increment message count data_neighbor_entry->messages_++; // If this is a new message, just set flag and return if (new_message){ data_neighbor_entry->new_messages_ = new_message; return; } } } // We need to add a new data neighbor data_neighbor_entry = new OPPDataNeighborEntry(node_id); data_neighbor_entry->new_messages_ = new_message; data_neighbors_.push_back(data_neighbor_entry);}void RoutingEntry::addGradient(int32_t last_hop, int32_t round_id, bool new_gradient){ RoundIdEntry *round_id_entry; OPPGradientEntry *gradient_entry; // Look for an existing routing id entry round_id_entry = findRoundIdEntry(round_id); // Create new entry if not found if (!round_id_entry) round_id_entry = addRoundIdEntry(round_id); if (new_gradient){ // Marks the beginning of a new round round_id_entry->gradients_.clear(); } else{ // Look for a gradient to our last_hop neighbor gradient_entry = round_id_entry->findGradient(last_hop); if (gradient_entry){ // Gradient already in the list, we just update time GetTime(&gradient_entry->tv_); return; } } // Gradient not yet in the list, add this neighbor to the list round_id_entry->addGradient(last_hop);}void RoutingEntry::updateSink(u_int16_t sink_id, int32_t round_id){ RoundIdEntry *round_id_entry; // Lock for an existing round id entry round_id_entry = findRoundIdEntry(round_id); // Create new entry if not found if (!round_id_entry) round_id_entry = addRoundIdEntry(round_id); // Add/Update this sink round_id_entry->updateSink(sink_id);}void RoutingEntry::deleteExpiredRoundIds(){ RoundIdList::iterator round_id_itr; RoundIdEntry *round_id_entry; struct timeval tmv; GetTime(&tmv); // Go through all round ids for (round_id_itr = round_ids_.begin(); round_id_itr != round_ids_.end(); round_id_itr++){ round_id_entry = *round_id_itr; round_id_entry->deleteExpiredSinks(); round_id_entry->deleteExpiredGradients(); // Delete round id if nothing left if (round_id_entry->gradients_.size() == 0 && round_id_entry->sinks_.size() == 0){ // Round Id has expired, delete it from the list DiffPrint(DEBUG_NO_DETAILS, "Delete expired Round Id: %d\n", round_id_entry->round_id_); round_id_itr = round_ids_.erase(round_id_itr); delete round_id_entry; } }}void RoutingEntry::getSinksFromList(FlowIdList *msg_list, FlowIdList *sink_list){ RoundIdList::iterator round_id_itr; RoundIdEntry *round_id_entry; FlowIdList::iterator flow_id_itr; for (round_id_itr = round_ids_.begin(); round_id_itr != round_ids_.end(); round_id_itr++){ round_id_entry = *round_id_itr; flow_id_itr = find(msg_list->begin(), msg_list->end(), round_id_entry->round_id_); if (flow_id_itr != msg_list->end()){ // Flow id in the list if (round_id_entry->sinks_.size() > 0){ sink_list->push_back(round_id_entry->round_id_); } } }}void RoutingEntry::getFlowsFromList(FlowIdList *msg_list, FlowIdList *flow_list){ RoundIdList::iterator round_id_itr; RoundIdEntry *round_id_entry; FlowIdList::iterator flow_id_itr; for (round_id_itr = round_ids_.begin(); round_id_itr != round_ids_.end(); round_id_itr++){ round_id_entry = *round_id_itr; flow_id_itr = find(msg_list->begin(), msg_list->end(), round_id_entry->round_id_); if (flow_id_itr != msg_list->end()){ // Flow id in the list if (round_id_entry->sinks_.size() == 0){ // This is a flow we have no local sink for flow_list->push_back(round_id_entry->round_id_); } } }}int32_t RoutingEntry::getNeighborFromFlow(int32_t flow_id){ RoundIdList::iterator round_id_itr; RoundIdEntry *round_id_entry; OPPGradientEntry *gradient_entry; for (round_id_itr = round_ids_.begin(); round_id_itr != round_ids_.end(); round_id_itr++){ round_id_entry = *round_id_itr; if (round_id_entry->round_id_ == flow_id){ // Flow matches, get 'reinforced neighbor' if (round_id_entry->gradients_.size() > 0){ // Get the first gradient gradient_entry = *round_id_entry->gradients_.begin(); return gradient_entry->node_id_; } DiffPrint(DEBUG_ALWAYS, "Cannot find 'reinforced neighbor !\n"); break; } } // Couldn't find neighbor for this flow return BROADCAST_ADDR;}void RoundIdEntry::deleteExpiredSinks(){ SinkList::iterator sink_itr; SinkEntry *sink_entry; struct timeval tmv; GetTime(&tmv); // Go through all sinks for (sink_itr = sinks_.begin(); sink_itr != sinks_.end(); sink_itr++){ sink_entry = *sink_itr; // Check if expired if (tmv.tv_sec > (sink_entry->tv_.tv_sec + GRADIENT_TIMEOUT)){ // Expired, delete it DiffPrint(DEBUG_NO_DETAILS, "Deleting Gradient to sink %d !\n", sink_entry->port_); sink_itr = sinks_.erase(sink_itr); delete sink_entry; } }}void RoundIdEntry::deleteExpiredGradients(){ GradientList::iterator gradient_itr; OPPGradientEntry *gradient_entry; struct timeval tmv; GetTime(&tmv); // Go through all gradients for (gradient_itr = gradients_.begin(); gradient_itr != gradients_.end(); gradient_itr++){ gradient_entry = *gradient_itr; // Check if expired if (tmv.tv_sec > (gradient_entry->tv_.tv_sec + GRADIENT_TIMEOUT)){ // Expired, delete it DiffPrint(DEBUG_NO_DETAILS, "Deleting gradient to node %d !\n", gradient_entry->node_id_); gradient_itr = gradients_.erase(gradient_itr); delete gradient_entry; } }}void RoundIdEntry::updateSink(u_int16_t sink_id){ SinkList::iterator sink_itr; SinkEntry *sink_entry; // Go through all sinks for (sink_itr = sinks_.begin(); sink_itr != sinks_.end(); ++sink_itr){ sink_entry = *sink_itr; if (sink_entry->port_ == sink_id){ // We already have this guy GetTime(&(sink_entry->tv_)); return; } } // This is a new sink, so we create a new entry on the list sink_entry = new SinkEntry(sink_id); sinks_.push_back(sink_entry);}OPPGradientEntry * RoundIdEntry::findGradient(int32_t node_id){ GradientList::iterator gradient_itr; OPPGradientEntry *gradient_entry; // Go through all gradients for (gradient_itr = gradients_.begin(); gradient_itr != gradients_.end(); gradient_itr++){ gradient_entry = *gradient_itr; // Is this the one we are looking for ? if (gradient_entry->node_id_ == node_id) return gradient_entry; } // Did not find a match return NULL;}void RoundIdEntry::addGradient(int32_t node_id){ OPPGradientEntry *gradient_entry; // Create new gradient gradient_entry = new OPPGradientEntry(node_id); gradients_.push_back(gradient_entry);}void RoundIdEntry::deleteGradient(int32_t node_id){ GradientList::iterator gradient_itr; OPPGradientEntry *gradient_entry; // Go through all gradients for (gradient_itr = gradients_.begin(); gradient_itr != gradients_.end(); gradient_itr++){ gradient_entry = *gradient_itr; // Is this the one we are looking for ? if (gradient_entry->node_id_ == node_id){ DiffPrint(DEBUG_NO_DETAILS, "Deleting gradient to node %d !\n", node_id); // Found. Delete it from the list and return gradient_itr = gradients_.erase(gradient_itr); delete gradient_entry; return; } }}void OnePhasePullFilter::interestTimeout(Message *msg){ DiffPrint(DEBUG_MORE_DETAILS, "Node%d: Interest Timeout !\n", ((DiffusionRouting *)dr_)->getNodeId()); msg->last_hop_ = LOCALHOST_ADDR; msg->next_hop_ = BROADCAST_ADDR; ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);}void OnePhasePullFilter::messageTimeout(Message *msg){ DiffPrint(DEBUG_MORE_DETAILS, "Node%d: Message Timeout !\n", ((DiffusionRouting *)dr_)->getNodeId()); ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);}void OnePhasePullFilter::gradientTimeout(){ RoutingTable::iterator routing_itr; RoutingEntry *routing_entry; DiffPrint(DEBUG_MORE_DETAILS, "Node%d: Gradient Timeout !\n",((DiffusionRouting *)dr_)->getNodeId()); routing_itr = routing_list_.begin(); // Iterate through the routing table
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -