📄 rmst_filter.cc
字号:
TimerCallback *send_timer; // Now add a timer to send this and any NAKS. DiffPrint(DEBUG_SOME_DETAILS, " Set a SEND_TIMER for reinforced rmst_no %d\n", rmst_no); send_timer = new RmstTimeout(this, rmst_no, SEND_TIMER); // We check on things every second. send_timer_handle_ = ((DiffusionRouting *)dr_)->addTimer(SEND_INTERVAL, send_timer); send_timer_active_ = true; } }}handle RmstFilter::setupFilter(){ NRAttrVec attrs; handle h; // This is a dummy attribute for filtering that matches everything attrs.push_back(NRClassAttr.make(NRAttribute::IS, NRAttribute::INTEREST_CLASS)); h = ((DiffusionRouting *)dr_)->addFilter(&attrs, RMST_FILTER_PRIORITY, fcb_); ClearAttrs(&attrs); return h;}void RmstFilter::run(){#ifdef NS_DIFFUSION TimerCallback *stat_timer; filter_handle_ = setupFilter(); DiffPrint(DEBUG_LOTS_DETAILS, "RmstFilter:: subscribed to all, received handle %d\n", (int)filter_handle_); DiffPrint(DEBUG_LOTS_DETAILS, "RmstFilter constructor: start cleanup timer\n"); stat_timer = new RmstTimeout(this, -1, CLEANUP_TIMER); stat_timer_handle_ = ((DiffusionRouting *)dr_)->addTimer(CLEANUP_INTERVAL, stat_timer);#else // Doesn't do anything while(1){ sleep(1000); }#endif // NS_DIFFUSION}RmstTimeout::RmstTimeout(RmstFilter *rmst_flt, int no, int type){ filter_ = rmst_flt; rmst_no_ = no; timer_type_ = type;}int RmstTimeout::expire(){ int retval; retval = filter_->processTimer(rmst_no_, timer_type_); if(retval == -1) delete this; return retval;}int RmstFilter::processTimer(int rmst_no, int timer_type){ Rmst *rmst_ptr; void *frag_ptr; int frag_no; Int2Rmst::iterator rmst_iterator; timeval cur_time; GetTime (&cur_time); switch (timer_type){ case SEND_TIMER: DiffPrint(DEBUG_SOME_DETAILS, "RmstFilter::processTimer SEND_TIMER"); PrintTime(&cur_time); // If we haven't got any NAKs pending, send the next fragment of the Rmst // in progress. If we've sent all fragments of the current Rmst, we cancel // ourself. When we get a NAK, if this timer is active we send NAK responses // here. Otherwise we can send them directly from the NAK response routine. if (!nak_list_.empty()){ Message *nak_resp; NRAttrVec nak_data_attrs; NakMsgData nak_msg_data = nak_list_.front(); rmst_no = nak_msg_data.rmst_no_; frag_no = nak_msg_data.frag_no_; rmst_iterator = rmst_map_.find(rmst_no); if(rmst_iterator != rmst_map_.end()){ rmst_ptr = (*rmst_iterator).second; // We have the fragment, set the last_data_time_ so that we defer // the cleanup of this Rmst, then we send the fragment to last hop. if (rmst_ptr->reinf_){ GetTime(&rmst_ptr->last_data_time_); rmst_ptr->pkts_sent_++; nak_data_attrs.push_back(RmstTargetAttr.make(NRAttribute::IS, rmst_ptr->target_str_)); nak_data_attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, RMST_RESP)); nak_data_attrs.push_back(RmstFragAttr.make(NRAttribute::IS, frag_no)); // We routinely send the packet sent count on NAKs. nak_data_attrs.push_back(RmstPktsSentAttr.make(NRAttribute::IS, rmst_ptr->pkts_sent_)); nak_data_attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_no)); // Add the actual data; the length depends on if it's the last // fragment or not. frag_ptr = rmst_ptr->getFrag(frag_no); if (frag_no == rmst_ptr->max_frag_) nak_data_attrs.push_back(RmstDataAttr.make(NRAttribute::IS, frag_ptr, rmst_ptr->max_frag_len_)); else nak_data_attrs.push_back(RmstDataAttr.make(NRAttribute::IS, frag_ptr, MAX_FRAG_SIZE)); DiffPrint(DEBUG_IMPORTANT, " Filter sending Data for NAKed frag %d of Rmst %d\n", frag_no, rmst_no); nak_resp = new Message(DIFFUSION_VERSION, DATA, 0, 0, nak_data_attrs.size(), pkt_count_, rdm_id_, LOCALHOST_ADDR, LOCALHOST_ADDR); nak_resp->msg_attr_vec_ = CopyAttrs(&nak_data_attrs); ((DiffusionRouting *)dr_)->sendMessage(nak_resp, filter_handle_); pkt_count_++; delete nak_resp; ClearAttrs(&nak_data_attrs); } else DiffPrint(DEBUG_IMPORTANT, "RmstFilter::processTimer sees non-reinforced path for NAK on rmst %d!\n", rmst_no); } else DiffPrint(DEBUG_IMPORTANT, "RmstFilter::processTimer can't find Rmst %d for NAK!\n", rmst_no); nak_list_.pop_front(); } else if (!send_list_.empty()){ int8_t msg_type; int action = DO_NOTHING; // Get the rmst and frag_no that is in progress. NRAttrVec data_attrs; SendMsgData send_data = send_list_.front(); rmst_no = send_data.rmst_no_; rmst_iterator = rmst_map_.find(rmst_no); if(rmst_iterator == rmst_map_.end()) action = DELETE_FROM_QUEUE; else{ rmst_ptr = (*rmst_iterator).second; if ((send_data.last_frag_sent_ == rmst_ptr->max_frag_) && rmst_ptr->reinf_) action = DELETE_FROM_QUEUE; else if ( (send_data.last_frag_sent_ == send_data.exp_base_) && (!rmst_ptr->reinf_) && (exp_gap_ < 10) ){ action = DO_NOTHING; exp_gap_++; } else action = SEND_NEXT_FRAG; } switch (action){ case(DELETE_FROM_QUEUE): // Delete message data from front. send_list_.pop_front(); break; case(SEND_NEXT_FRAG): send_list_.pop_front(); if (rmst_ptr->reinf_){ send_data.last_frag_sent_++; frag_no = send_data.last_frag_sent_; } else{ frag_no = send_data.exp_base_; send_data.last_frag_sent_ = frag_no; } send_list_.push_front(send_data); rmst_ptr->max_frag_sent_ = frag_no; DiffPrint(DEBUG_IMPORTANT, " Source Filter sending frag %d of Rmst %d\n", frag_no, rmst_no); // Now make a message. data_attrs.push_back(RmstTargetAttr.make(NRAttribute::IS, rmst_ptr->target_str_)); data_attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, RMST_RESP)); data_attrs.push_back(RmstFragAttr.make(NRAttribute::IS, frag_no)); // We send the MaxFragAttr on the first Exploratory packet, // and the PktsSentAttr on the last packet. if(frag_no == 0) data_attrs.push_back(RmstMaxFragAttr.make(NRAttribute::IS, rmst_ptr->max_frag_)); else if (frag_no == rmst_ptr->max_frag_){ rmst_ptr->pkts_sent_++; data_attrs.push_back(RmstPktsSentAttr.make(NRAttribute::IS, rmst_ptr->pkts_sent_)); } else rmst_ptr->pkts_sent_++; data_attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_no)); // Add the actual data; the length depends on if it's the last // fragment or not. frag_ptr = rmst_ptr->getFrag(frag_no); if (rmst_ptr->max_frag_ == frag_no) data_attrs.push_back(RmstDataAttr.make(NRAttribute::IS, frag_ptr, rmst_ptr->max_frag_len_)); else data_attrs.push_back(RmstDataAttr.make(NRAttribute::IS, frag_ptr, MAX_FRAG_SIZE)); if (frag_no == send_data.exp_base_){ ExpLog exp_msg; union LlToInt key; // Insert this Exploratory message in exp_map_. // When we get a reinforcement we'll know what rmst it's for. key.int_val_[0] = pkt_count_; key.int_val_[1] = rdm_id_; DiffPrint(DEBUG_LOTS_DETAILS, " Key = %llx\n", key.ll_val_); exp_msg.rmst_no_ = rmst_no; exp_msg.last_hop_ = LOCALHOST_ADDR; exp_map_.insert(Key2ExpLog::value_type(key.ll_val_, exp_msg)); msg_type = EXPLORATORY_DATA; rmst_ptr->reinf_ = false; rmst_ptr->pkts_sent_ = 0; rmst_ptr->naks_rec_ = 0; exp_gap_ = 0; DiffPrint(DEBUG_IMPORTANT, " Source Filter sending EXPLORATORY frag %d of Rmst %d\n", frag_no, rmst_no); } else msg_type = DATA; Message *new_frag; new_frag = new Message(DIFFUSION_VERSION, msg_type, 0, 0, data_attrs.size(), pkt_count_, rdm_id_, LOCALHOST_ADDR, LOCALHOST_ADDR); new_frag->msg_attr_vec_ = CopyAttrs(&data_attrs); ((DiffusionRouting *)dr_)->sendMessage(new_frag, filter_handle_); pkt_count_++; delete new_frag; ClearAttrs(&data_attrs); // We sent a fragment, set the last_data_time_ for the cleanup timer. GetTime(&rmst_ptr->last_data_time_); // If this is the last frag, we start an ACK_TIMER. if ( (rmst_ptr->max_frag_ == frag_no) && (rmst_ptr->ack_timer_active_ == false) && (rmst_ptr->local_source_) ){ TimerCallback *rmst_timer; DiffPrint(DEBUG_SOME_DETAILS, " Set an ACK_TIMER at source for rmst_no %d\n", rmst_no); rmst_timer = new RmstTimeout(this, rmst_no, ACK_TIMER); // We check on things every 20 seconds. rmst_ptr->ack_timer_handle_ = ((DiffusionRouting *)dr_)->addTimer(ACK_INTERVAL, rmst_timer); rmst_ptr->ack_timer_active_ = true; } break; case(DO_NOTHING): DiffPrint(DEBUG_LOTS_DETAILS, " Nothing to do\n"); break; } // Switch on Action } if (nak_list_.empty() && send_list_.empty()){ DiffPrint(DEBUG_LOTS_DETAILS, " Cancelling SEND_TIMER, no NAKS or data to send\n"); send_timer_active_ = false; return -1; } else return 0; break; case WATCHDOG_TIMER: DiffPrint(DEBUG_SOME_DETAILS, "RmstFilter::processTimer WATCHDOG_TIMER for Rmst %d", rmst_no); PrintTime(&cur_time); rmst_iterator = rmst_map_.find(rmst_no); if(rmst_iterator != rmst_map_.end()) rmst_ptr = (*rmst_iterator).second; else{ DiffPrint(DEBUG_IMPORTANT, "RmstFilter::processTimer can't find Rmst %d for WATCHDOG, cancell timer!\n", rmst_no); return -1; } if(rmst_ptr->cancel_watchdog_){ DiffPrint(DEBUG_SOME_DETAILS, " processTimer cancelling WATCHDOG_TIMER for Rmst %d\n", rmst_no); rmst_ptr->watchdog_active_ = false; rmst_ptr->cancel_watchdog_ = false; return -1; } if (rmst_ptr->wait_for_new_path_){ DiffPrint (DEBUG_IMPORTANT, " WATCHDOG_TIMER sees wait_for_new_path_ - suspend NAKs\n"); return 0; } // If we sent an exp request more than 20 seconds ago, // we send it again. if (rmst_ptr->sent_exp_req_){ int exp_time; exp_time = cur_time.tv_sec - rmst_ptr->exp_req_time_.tv_sec; DiffPrint(DEBUG_SOME_DETAILS, " Node sent an EXP_REQ: time since last exp = %d\n", exp_time); if( exp_time > 20){ // Resend an EXP_REQ!!! DiffPrint(DEBUG_IMPORTANT, " Node resends EXP_REQ up blacklisted stream!\n"); sendExpReqUpstream(rmst_ptr); GetTime(&rmst_ptr->exp_req_time_); } else DiffPrint(DEBUG_LOTS_DETAILS, " Node waits to send another EXP_REQ\n"); return 0; } if (rmst_ptr->local_source_){ if(rmst_ptr->acked_){ DiffPrint(DEBUG_IMPORTANT, " WATCHDOG_TIMER Local Source sees acked state - cancel timer\n"); return -1; } else{ DiffPrint(DEBUG_LOTS_DETAILS, " WATCHDOG_TIMER Local Source sees rmst not acked\n"); return 0; } } // Check if we have waited too long for next fragment. if( ((cur_time.tv_sec - rmst_ptr->last_data_time_.tv_sec) > NEXT_FRAG_WAIT) && (!rmst_ptr->rmstComplete()) ){ int newHole = (rmst_ptr->max_frag_rec_)+1; if ( (newHole <= rmst_ptr->max_frag_) && (!rmst_ptr->inHoleMap(newHole)) ){ DiffPrint(DEBUG_SOME_DETAILS, " WATCHDOG_TIMER adds new hole, frag %d\n", newHole); rmst_ptr->putHole(newHole); NakData *nak_ptr = rmst_ptr->getHole(newHole); // Artificially age this hole so it gets naked immediately nak_ptr->tmv.tv_sec -= 4; } } if(rmst_ptr->holeMapEmpty()){ // There aren't any holes! DiffPrint(DEBUG_SOME_DETAILS, " WATCHDOG_TIMER sees No holes\n"); return 0; } else{ // The WATCHDOG_TIMER expired and we have a hole,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -