📄 rmst_filter.cc
字号:
//// rmst_filter.cc : RmstFilter Class Methods// authors : Fred Stann//// Copyright (C) 2003 by the University of Southern California// $Id: rmst_filter.cc,v 1.2 2003/07/10 21:18:57 haldar Exp $//// This program is free software; you can redistribute it and/or// modify it under the terms of the GNU General Public License,// version 2, as published by the Free Software Foundation.//// This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the// GNU General Public License for more details.//// You should have received a copy of the GNU General Public License along// with this program; if not, write to the Free Software Foundation, Inc.,// 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.////#include "rmst_filter.hh"char *rmstmsg_types[] = {"INTEREST", "POSITIVE REINFORCEMENT", "NEGATIVE REINFORCEMENT", "DATA", "EXPLORATORY DATA", "PUSH EXPLORATORY DATA", "CONTROL", "REDIRECT"};#ifdef NS_DIFFUSIONclass DiffAppAgent;#endif // NS_DIFFUSION#ifdef NS_DIFFUSIONstatic class RmstFilterClass : public TclClass {public: RmstFilterClass() : TclClass("Application/DiffApp/RmstFilter") {} TclObject* create(int argc, const char*const* argv) { return(new RmstFilter()); }} class_rmst_filter;int RmstFilter::command(int argc, const char*const* argv) { //Tcl& tcl = Tcl::instance(); if (argc == 2) { if (strcmp(argv[1], "start") == 0) { run(); return (TCL_OK); } } return (DiffApp::command(argc, argv));}#endif // NS_DIFFUSIONclass ReinfMessage {public: int32_t rdm_id_; int32_t pkt_num_;};// RmstFilterCallback::recv// Called by diffusion core when a message is available for this filter.// RmstFilterCallback is derived from the abstract class FilterCallback.// A pointer to the FilterCallback class is required in the API method "addFilter."void RmstFilterCallback::recv(Message *msg, handle h){ app_->recv(msg, h);}// RmstFilter::recv//// Called by the Callback::recv method.void RmstFilter::recv(Message *msg, handle h){ // Process the message handed to us by the core. // If true is returned we forward the message. Otherwise it dies here. if(processMessage(msg)) ((DiffusionRouting *)dr_)->sendMessage(msg, h);}// RmstFilter::processMessage//// Called by the RmstFilter::recv method when this filter gets a message.bool RmstFilter::processMessage(Message *msg){ NRSimpleAttribute<int> *rmst_id_attr = NULL; NRSimpleAttribute<int> *frag_attr = NULL; NRSimpleAttribute<int> *pkts_sent_attr = NULL; NRSimpleAttribute<int> *tsprt_ctl_attr = NULL; NRSimpleAttribute<void *> *reinf_attr = NULL; NRSimpleAttribute<int> *nrscope = NULL; NRSimpleAttribute<int> *nr_class = NULL; NRAttrVec *data; Key2ExpLog::iterator exp_iterator; Int2Rmst::iterator rmst_iterator; int rmst_no; int frag_no; int class_type; int rmst_ctl_type; union LlToInt key; Rmst *rmst_ptr; // If this is a message that uses the transport layer, we process it. // Otherwise we send it back to the core (by returning true). tsprt_ctl_attr = RmstTsprtCtlAttr.find(msg->msg_attr_vec_); if (!tsprt_ctl_attr){ DiffPrint(DEBUG_SOME_DETAILS, "RmstFilter got non-transport message\n"); return true; } rmst_ctl_type = tsprt_ctl_attr->getVal(); DiffPrint(DEBUG_IMPORTANT, "RmstFilter::processMessage got a"); if (msg->new_message_) DiffPrint(DEBUG_IMPORTANT, " new (%d) ", msg->msg_type_); else DiffPrint(DEBUG_IMPORTANT, "n old (%d) ", msg->msg_type_); if (msg->last_hop_ != LOCALHOST_ADDR) DiffPrint(DEBUG_IMPORTANT, "%s message from %d to %d\n", rmstmsg_types[msg->msg_type_], msg->last_hop_, msg->next_hop_); else DiffPrint(DEBUG_IMPORTANT, "%s message from local agent\n", rmstmsg_types[msg->msg_type_]); // We only care about messages we haven't seen before, // but we generally let other filters get them (because they may need them). // However, if this is an old DATA message arriving at a sink, the sink may // negatively reinforce a reinforced path. This is because we withold the // new messages until we get the entire blob. The old message is the result // of a lost ACK when using SMAC with ARQ. if (!msg->new_message_ && msg->msg_type_ == DATA && rmst_ctl_type == RMST_RESP){ DiffPrint(DEBUG_SOME_DETAILS, " Sink got an old DATA message from node %d\n", msg->last_hop_); data = msg->msg_attr_vec_; rmst_id_attr = RmstIdAttr.find(data); if (!rmst_id_attr){ DiffPrint(DEBUG_SOME_DETAILS, " Filter received a bad transport packet!\n"); return false; } rmst_no = rmst_id_attr->getVal(); // Find the rmst. rmst_iterator = rmst_map_.find(rmst_no); if(rmst_iterator == rmst_map_.end()){ DiffPrint(DEBUG_IMPORTANT, " couldn't find DB entry for Rmst %d\n", rmst_no); return false; } else{ rmst_ptr = (*rmst_iterator).second; if ( (local_sink_) && (msg->last_hop_ == rmst_ptr->last_hop_) ){ // This is the case where SMAC sent the same DATA message twice to a sink. // We suppress this message so we don't kill our reinforced path. DiffPrint(DEBUG_IMPORTANT, " We suppress old DATA message from smac retransmission!\n"); return false; } else return true; } } else if (!msg->new_message_) return true; // When we get Rmst Fragments we must sync the local cache! if ( (rmst_ctl_type == RMST_RESP) && ((msg->msg_type_ == DATA) || (msg->msg_type_ == EXPLORATORY_DATA)) ){ rmst_ptr = syncLocalCache(msg); // syncLocalCache will return NULL if the // attribute set doesn't make sense. if (rmst_ptr == NULL) return false; rmst_no = rmst_ptr->rmst_no_; // Mark the time we got some kind of data. GetTime (&last_data_rec_); } // New exploratory messages are entered into the exp_map_, // so that we can find the last hop if it gets reinforced. // Positive reinforcement messages are used to find the // corresponding message in the exp_map_, so we know the // current reinforced path to the source of an rmst. switch (msg->msg_type_){ case(EXPLORATORY_DATA): ExpLog exp_msg; DiffPrint(DEBUG_LOTS_DETAILS, " Exploratory_Msg: ptk_num = %x, rdm_id_ = %x, last_hop = %d\n", msg->pkt_num_, msg->rdm_id_, msg->last_hop_); // Put the ID for this Exploratory message, along with its last hop, // into the exp_map_. If this message gets reinforced, we will be // able to identify the next hop in the back channel. DiffPrint(DEBUG_SOME_DETAILS, " Exploratory message for Reliable transport Id = %d\n", rmst_no); key.int_val_[0] = msg->pkt_num_; key.int_val_[1] = msg->rdm_id_; DiffPrint(DEBUG_LOTS_DETAILS, " Key = %llx\n", key.ll_val_); exp_msg.rmst_no_ = rmst_no; exp_msg.last_hop_ = msg->last_hop_; exp_map_.insert(Key2ExpLog::value_type(key.ll_val_, exp_msg)); // If this is a new exploratory message arriving at a sink, // we assume that this path will get reinforced by the // gradient filter. Sinks don't get positive reinforcement // messages, so we must record last_hop_ now. if (local_sink_){ rmst_ptr->last_hop_ = msg->last_hop_; if (rmst_ptr->reinf_){ DiffPrint(DEBUG_IMPORTANT, " got a new path exploratory msg at sink.\n"); rmst_ptr->wait_for_new_path_ = true; } else{ rmst_ptr->reinf_ = true; DiffPrint(DEBUG_IMPORTANT, " got an initial exploratory msg at sink.\n"); } DiffPrint(DEBUG_IMPORTANT, " set last_hop for rmst %d to %d\n", rmst_no, rmst_ptr->last_hop_); rmst_ptr->pkts_rec_ = 0; rmst_ptr->last_hop_pkts_sent_ = 0; } else{ // If this is not a sink we reset the base fragment that // we look for holes from. DiffPrint(DEBUG_LOTS_DETAILS, " intermediate node resets sync_base_ and reinf_.\n"); frag_attr = RmstFragAttr.find(msg->msg_attr_vec_); frag_no = frag_attr->getVal(); rmst_ptr->sync_base_ = frag_no; if(rmst_ptr->reinf_) rmst_ptr->reinf_ = false; rmst_ptr->last_hop_ = 0; rmst_ptr->pkts_sent_ = 0; rmst_ptr->pkts_rec_ = 0; rmst_ptr->last_hop_pkts_sent_ = 0; rmst_ptr->naks_rec_ = 0; } // If this is not a sink and a watchdog timer is active, we cancel // it because we may not end up on the new reinforced path. We // don't want to look for fragments that will never arrive. if ((rmst_ptr->watchdog_active_) && (!local_sink_) && (!rmst_ptr->local_source_)){ rmst_ptr->cancel_watchdog_ = true; rmst_ptr->cleanHoleMap(); } // We always forward exploratory data. return(true); break; case(DATA): if (rmst_ctl_type != RMST_RESP){ processCtrlMessage(msg); // We don't let Rmst control messages go to the gradient or other filters. return false; } // We have a normal DATA packet. rmst_ptr->pkts_rec_++; // If we got the upstream send count - update it in Rmst. pkts_sent_attr = RmstPktsSentAttr.find(msg->msg_attr_vec_); if (pkts_sent_attr){ rmst_ptr->last_hop_pkts_sent_ = pkts_sent_attr->getVal(); DiffPrint(DEBUG_SOME_DETAILS, "processMessage:: got last_hop_pkts_sent_ = %d packets\n", rmst_ptr->last_hop_pkts_sent_); if ( (rmst_ptr->last_hop_pkts_sent_ > 20) && (rmst_ptr->pkts_rec_ < (rmst_ptr->last_hop_pkts_sent_ * BLACKLIST_THRESHOLD)) ){ Blacklist::iterator black_list_iterator; black_list_iterator = black_list_.begin(); while(black_list_iterator != black_list_.end()){ if(*black_list_iterator == rmst_ptr->last_hop_) break; black_list_iterator++; } if(black_list_iterator == black_list_.end()){ DiffPrint(DEBUG_IMPORTANT, "Adding node %d to black_list_ !!\n", rmst_ptr->last_hop_); black_list_.push_front(rmst_ptr->last_hop_); ((DiffusionRouting *)dr_)->addToBlacklist(rmst_ptr->last_hop_); // Now send an EXP_REQ! sendExpReqUpstream(rmst_ptr); rmst_ptr->sent_exp_req_ = true; GetTime(&rmst_ptr->exp_req_time_); // We need to send a negative reinforcement on blacklisted link! Message *neg_reinf_msg; neg_reinf_msg = new Message(DIFFUSION_VERSION, NEGATIVE_REINFORCEMENT, 0, 0, interest_attrs_->size(), pkt_count_, rdm_id_, rmst_ptr->last_hop_, LOCALHOST_ADDR); neg_reinf_msg->msg_attr_vec_ = CopyAttrs(interest_attrs_); ((DiffusionRouting *)dr_)->sendMessage(neg_reinf_msg, filter_handle_, 1); pkt_count_++; delete neg_reinf_msg; } } } // We suppress new DATA messages that don't arrive on the // reinforced path. if ( msg->last_hop_ != rmst_ptr->last_hop_ ){ DiffPrint(DEBUG_IMPORTANT, " We suppress new DATA message on non-backchannel path!; backchannel = %d\n", rmst_ptr->last_hop_); msg->new_message_ = 0; return true; } if (rmst_ptr->wait_for_new_path_){ rmst_ptr->wait_for_new_path_ = false; DiffPrint(DEBUG_SOME_DETAILS, " node resets wait_for_new_path_.\n"); } if (local_sink_ && rmst_ptr->sent_exp_req_){ DiffPrint(DEBUG_SOME_DETAILS, " source got a new path, set sent_exp_req_ false.\n"); rmst_ptr->sent_exp_req_ = false; } // We forward DATA if we aren't a source or a sink. // Sources collect all fragments and send them from a timer. // Sinks collect all fragments and send them to the app when they // have all arrived. if(rmst_ptr->local_source_ || local_sink_) return false; else{ rmst_ptr->pkts_sent_++; // We need to alter the RmstPktsSentAttr to reflect this node! if(pkts_sent_attr) pkts_sent_attr->setVal(rmst_ptr->pkts_sent_); return true; } break; case(INTEREST): data = msg->msg_attr_vec_; nr_class = NRClassAttr.find(data); if (nr_class){ class_type = nr_class->getVal(); if (class_type == NRAttribute::DISINTEREST_CLASS) DiffPrint(DEBUG_SOME_DETAILS, " DISINTEREST_CLASS\n"); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -