📄 rmst_filter.cc
字号:
nrscope = NRScopeAttr.find(msg->msg_attr_vec_); if(nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE) DiffPrint(DEBUG_SOME_DETAILS, " rmst LOCAL_SCOPE Interest Message\n"); else if (msg->last_hop_ == LOCALHOST_ADDR){ DiffPrint(DEBUG_SOME_DETAILS, " rmst Interest Message from local SINK\n"); local_sink_ = true; local_sink_port_ = msg->source_port_; GetTime (&last_sink_time_); if (interest_attrs_ == NULL) interest_attrs_ = CopyAttrs(msg->msg_attr_vec_); } else{ DiffPrint(DEBUG_SOME_DETAILS, " rmst Interest Message from non-local node\n"); if (interest_attrs_ == NULL) interest_attrs_ = CopyAttrs(msg->msg_attr_vec_); } break; case(POSITIVE_REINFORCEMENT): ReinfMessage *reinf_msg; ExpLog exp_log; DiffPrint(DEBUG_IMPORTANT, " Positive Reinf arrived\n"); reinf_attr = ReinforcementAttr.find(msg->msg_attr_vec_); reinf_msg = (ReinfMessage*)reinf_attr->getVal(); DiffPrint(DEBUG_LOTS_DETAILS, " Pos_Reinf: ptk_num = %x, rdm_id_ = %x\n", reinf_msg->pkt_num_, reinf_msg->rdm_id_); key.int_val_[0] = reinf_msg->pkt_num_; key.int_val_[1] = reinf_msg->rdm_id_; exp_iterator = exp_map_.find(key.ll_val_); if(exp_iterator != exp_map_.end()){ exp_log = (*exp_iterator).second; DiffPrint(DEBUG_SOME_DETAILS, " Reinforcement for rmst_no = %d, last_hop_ = %d\n", exp_log.rmst_no_, exp_log.last_hop_); // Here is where we must update the rmst with back-channel // last hop. rmst_no = exp_log.rmst_no_; rmst_iterator = rmst_map_.find(rmst_no); if(rmst_iterator != rmst_map_.end()){ rmst_ptr = (*rmst_iterator).second; rmst_ptr->last_hop_ = exp_log.last_hop_; rmst_ptr->fwd_hop_ = msg->last_hop_; DiffPrint(DEBUG_SOME_DETAILS, " Setting rmst_no %d last_hop_ = %d, fwd_hop_ = %d\n", rmst_no, rmst_ptr->last_hop_, rmst_ptr->fwd_hop_); if(!rmst_ptr->reinf_){ rmst_ptr->reinf_ = true; if(rmst_ptr->local_source_) DiffPrint(DEBUG_LOTS_DETAILS, " Local source got a Reinf\n"); } } else{ DiffPrint(DEBUG_IMPORTANT, " Reinforcement cant't find rmst_no\n"); break; } // We are on the reinforced path, so we must start a timer // if one hasn't already been started. Sinks don't get // reinforced, so they start a WATCHDOG in syncLocalCache. if( (rmst_ptr->watchdog_active_ == false) && (caching_mode_) ){ TimerCallback *rmst_timer; DiffPrint(DEBUG_IMPORTANT, " Set a WATCHDOG_TIMER at caching node for reinforced rmst_no %d\n", rmst_no); rmst_timer = new RmstTimeout(this, rmst_no, WATCHDOG_TIMER); // We check on things every 10 seconds. rmst_ptr->watchdog_handle_ = ((DiffusionRouting *)dr_)->addTimer(WATCHDOG_INTERVAL, rmst_timer); rmst_ptr->watchdog_active_ = true; } if (rmst_ptr->wait_for_new_path_){ DiffPrint(DEBUG_SOME_DETAILS, " Resetting wait_for_new_path_ for rmst_no %d\n", rmst_no); rmst_ptr->wait_for_new_path_ = false; } if (rmst_ptr->sent_exp_req_){ DiffPrint(DEBUG_SOME_DETAILS, " intermediate node got a new path, set sent_exp_req_ false.\n"); rmst_ptr->sent_exp_req_ = false; } } else{ if(!rmst_ptr->local_source_) DiffPrint(DEBUG_IMPORTANT, " Reinforcement matches no Exploratory msg\n"); } break; case(NEGATIVE_REINFORCEMENT): bool ret_val; if (tsprt_ctl_attr){ DiffPrint(DEBUG_SOME_DETAILS, " NEGATIVE_REINFORCEMENT, last_hop_ = %d, rmst_ctl_type = %d\n", msg->last_hop_, rmst_ctl_type); } // We need to check if we got a NEGATIVE REINFORCEMENT from a node that is the // next node in the forward direction (downstream). If so, and we are the source // we must send a new EXPLORATORY message; else if we are not the source, // we must send and exp request upstream. ret_val = true; rmst_iterator = rmst_map_.begin(); while(rmst_iterator != rmst_map_.end()){ rmst_ptr = (*rmst_iterator).second; DiffPrint(DEBUG_SOME_DETAILS, " searching rmsts - rmst_no_ %d: fwd_hop_ = %d, reinf_ = %d, acked = %d\n", rmst_ptr->rmst_no_, rmst_ptr->fwd_hop_, rmst_ptr->reinf_, rmst_ptr->acked_); if (rmst_ptr->local_source_ && rmst_ptr->reinf_ && (rmst_ptr->fwd_hop_ == msg->last_hop_) && !rmst_ptr->acked_){ // If we are reinforced then we never got and EXP_REQ!! DiffPrint(DEBUG_SOME_DETAILS, " local source sees NEG_REINF\n"); processExpReq(rmst_ptr, rmst_ptr->max_frag_sent_); } else if (!rmst_ptr->local_source_ && (rmst_ptr->fwd_hop_ == msg->last_hop_) && rmst_ptr->reinf_ && !rmst_ptr->acked_){ DiffPrint(DEBUG_SOME_DETAILS, " intermediate node sees NEG_REINF from reinforced node\n"); DiffPrint(DEBUG_SOME_DETAILS, " send Exp Request upstream!\n"); ret_val = false; sendExpReqUpstream(rmst_ptr); } else{ DiffPrint(DEBUG_SOME_DETAILS, " node sees NEG_REINF from non-reinforced node - let routing layer see it\n"); ret_val = true; } rmst_iterator++; } if (!ret_val) return false; break; default: break; } // switch msg->type return true;}// RmstFilter::syncLocalCache//// This routine adds new transport data messages to the local data base.Rmst* RmstFilter::syncLocalCache (Message *msg){ NRSimpleAttribute<int> *rmst_id_attr = NULL; NRSimpleAttribute<int> *frag_attr = NULL; NRSimpleAttribute<int> *max_frag_attr = NULL; NRSimpleAttribute<void *> *data_buf_attr = NULL; NRAttrVec *data = msg->msg_attr_vec_; Int2Rmst::iterator rmst_iterator; int rmst_no; int frag_no; int max_frag_no; void *blob_ptr; int blob_len; void *tmp_frag_ptr; Rmst *rmst_ptr; rmst_id_attr = RmstIdAttr.find(data); frag_attr = RmstFragAttr.find(data); max_frag_attr = RmstMaxFragAttr.find(data); data_buf_attr = RmstDataAttr.find(data); if (! (rmst_id_attr && frag_attr && data_buf_attr) ){ DiffPrint(DEBUG_IMPORTANT, " Filter received a BAD transport packet!\n"); return NULL; } rmst_no = rmst_id_attr->getVal(); frag_no = frag_attr->getVal(); if(max_frag_attr) max_frag_no = max_frag_attr->getVal(); else max_frag_no = 0; blob_ptr = data_buf_attr->getVal(); blob_len = data_buf_attr->getLen(); // Here is where I consuslt the Data Base and possibly add a new map, // or add to an existing map. rmst_iterator = rmst_map_.find(rmst_no); if(rmst_iterator == rmst_map_.end()){ DiffPrint(DEBUG_IMPORTANT, " creating a new DB entry for Rmst %d\n", rmst_no); DiffPrint(DEBUG_SOME_DETAILS, " Max Fragment number = %d\n", max_frag_no); rmst_ptr = new Rmst(rmst_no); rmst_ptr->max_frag_ = max_frag_no; rmst_map_.insert(Int2Rmst::value_type(rmst_no, rmst_ptr)); // Artificially initialize last_nak_time_ so it's older // than any Naks we may get. GetTime(&rmst_ptr->last_nak_time_); // Several decisions in this routine relate to messages that emanated // from a local source. if (msg->last_hop_ == LOCALHOST_ADDR) { rmst_ptr->local_source_ = true; rmst_ptr->local_source_port_ = msg->source_port_; // This is the first fragment of an rmst from a local source. // The message will be marked exploratory by this filter. // We need to mark the last hop as LOCALHOST_ADDR. rmst_ptr->last_hop_ = LOCALHOST_ADDR; } // We need to capture the RmstTargetAttr for concantenation on sendMessage. if((rmst_ptr->local_source_)||(local_sink_)||(caching_mode_)){ NRSimpleAttribute<char *> *rmst_tgt_attr = NULL; rmst_tgt_attr = RmstTargetAttr.find(msg->msg_attr_vec_); if (rmst_tgt_attr){ char *tmp_str = rmst_tgt_attr->getVal(); rmst_ptr->target_str_ = new char[strlen(tmp_str)+1]; strcpy (rmst_ptr->target_str_, tmp_str); DiffPrint(DEBUG_IMPORTANT, " RmstTargetAttr = %s\n", rmst_ptr->target_str_); } else DiffPrint(DEBUG_IMPORTANT, " no RmstTargetAttr Rmst %d !\n", rmst_no); } } else rmst_ptr = (*rmst_iterator).second; if(!rmst_ptr->local_source_) DiffPrint(DEBUG_IMPORTANT, " Got a blob, rmstId = %d, frag_no = %d\n", rmst_no, frag_no); // Update the time we last saw data for this Rmst. GetTime(&rmst_ptr->last_data_time_); // We cache the fragment at the sink and source, // or in caching mode at each node that receives it. if((rmst_ptr->local_source_)||(local_sink_)||(caching_mode_)){ tmp_frag_ptr = rmst_ptr->getFrag(frag_no); if (tmp_frag_ptr == NULL){ if(!rmst_ptr->local_source_) DiffPrint(DEBUG_SOME_DETAILS, " creating a new frag %d entry for Rmst %d\n", frag_no, rmst_no); if (frag_no == rmst_ptr->max_frag_) rmst_ptr->max_frag_len_ = blob_len; tmp_frag_ptr = new char[blob_len]; memcpy(tmp_frag_ptr, blob_ptr, blob_len); rmst_ptr->putFrag(frag_no, tmp_frag_ptr); // Check to see if this fragment was NAKed. // If so, delete from the hole map. if(!rmst_ptr->local_source_){ if ( rmst_ptr->inHoleMap(frag_no) ){ // We need to see if we sent a NAK for this frag. NakData *nak_ptr = rmst_ptr->getHole(frag_no); if(nak_ptr->nak_sent_) DiffPrint(DEBUG_SOME_DETAILS, " We sent a NAK_REQ for this fragment.\n"); DiffPrint(DEBUG_SOME_DETAILS, " filter removing hole %d from hole_map_\n",frag_no); rmst_ptr->delHole(frag_no); } } // We start a WATCHDOG for an rmst here if: this is a local sink, // we haven't started a timer, and this is not the initial fragment. // Intermediate nodes (in caching mode) start a timer if they are on the // reinforced path, which is checked in processMessage. if((!rmst_ptr->local_source_)&&(local_sink_)&&(rmst_ptr->watchdog_active_ == false) && (frag_no>0)){ TimerCallback *rmst_timer; DiffPrint(DEBUG_IMPORTANT, " Set a WATCHDOG_TIMER at sink for rmst_no %d\n", rmst_no); rmst_timer = new RmstTimeout(this, rmst_no, WATCHDOG_TIMER); // We check on things every 10 seconds. rmst_ptr->watchdog_handle_ = ((DiffusionRouting *)dr_)->addTimer(WATCHDOG_INTERVAL, rmst_timer); rmst_ptr->watchdog_active_ = true; } } else DiffPrint(DEBUG_SOME_DETAILS, " got a duplicate frag %d for blob %d\n", frag_no, rmst_no); // If we have still have a hole in the fragment map, update the hole map. if ((!rmst_ptr->local_source_) && (rmst_ptr->holeInFragMap())) rmst_ptr->syncHoleMap(); // If the Rmst is complete, cancell timer, stop timer, // send Ack, give to local sinks. if(rmst_ptr->rmstComplete()){ if ((rmst_ptr->watchdog_active_) && (!rmst_ptr->local_source_)){ DiffPrint(DEBUG_SOME_DETAILS, " Rmst #%d is complete set cancel_watchdog_ to stop WATCHDOG\n", rmst_no); rmst_ptr->cancel_watchdog_ = true; } // Send this Rmst to any local sink if(local_sink_ && !(rmst_ptr->acked_)){ sendRmstToSink(rmst_ptr); // We mark the rmst acked at the sink so it will clean up. rmst_ptr->acked_ = true; } // If this is a source, we only send out the fragments when we've got // them all from the application. If the Rmst is complete we add the // Rmst to the send_list_, and if there is no send timer we start one. if(rmst_ptr->local_source_){ SendMsgData new_send_msg; // The Rmst is complete and this is a source - put in send list. new_send_msg.rmst_no_ = rmst_no; new_send_msg.last_frag_sent_ = -1; new_send_msg.exp_base_ = 0; send_list_.push_back(new_send_msg); if(!send_timer_active_){ TimerCallback *send_timer; // Now add a timer to send this and any NAKS. DiffPrint(DEBUG_SOME_DETAILS, " Rmst %d ready to send - Set a SEND_TIMER\n", rmst_no); send_timer = new RmstTimeout(this, -1, SEND_TIMER); // We check on things every second. send_timer_handle_ = ((DiffusionRouting *)dr_)->addTimer(SEND_INTERVAL, send_timer); send_timer_active_ = true; } } else // We must let upstream nodes know that we got the whole blob. sendAckToSource(rmst_ptr); } } else{ rmst_ptr->max_frag_rec_ = frag_no; DiffPrint(DEBUG_LOTS_DETAILS, " Not caching frag %d entry for Rmst %d\n", frag_no, rmst_no); } return rmst_ptr;}void RmstFilter::processCtrlMessage(Message *msg){ NRSimpleAttribute<int> *tsprt_ctl_attr = NULL; NRSimpleAttribute<int> *rmst_id_attr = NULL; NRSimpleAttribute<int> *frag_attr = NULL; NRAttrVec *data; NRAttrVec attrs; Int2Rmst::iterator rmst_iterator; int rmst_no; int frag_no; int rmst_ctl_type; Rmst *rmst_ptr; void *frag_ptr; Message *nak_msg; NRAttrVec::iterator place; bool forwarding_nak = false; data = msg->msg_attr_vec_; tsprt_ctl_attr = RmstTsprtCtlAttr.find(data); rmst_ctl_type = tsprt_ctl_attr->getVal(); rmst_id_attr = RmstIdAttr.find(data); if(!rmst_id_attr) { DiffPrint(DEBUG_SOME_DETAILS, " Node got a bad Rmst control msg - no RmstIdAttr!\n"); return; } rmst_no = rmst_id_attr->getVal(); // Let's make sure we have this rmst
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -