📄 rmst_filter.cc
字号:
// so we may need to construct a NAK from the hole map // If we've fallen off the reinforced path, we must // stop adding NAKs to the NakList. DiffPrint(DEBUG_SOME_DETAILS, " WATCHDOG_TIMER sees holes - check times.\n"); if (rmst_ptr->reinf_) setupNak(rmst_no); // Reschedule timer with same value. return 0; } break; case ACK_TIMER: DiffPrint(DEBUG_SOME_DETAILS, "RmstFilter::processTimer ACK_TIMER for Rmst %d", rmst_no); PrintTime(&cur_time); rmst_iterator = rmst_map_.find(rmst_no); if(rmst_iterator != rmst_map_.end()) rmst_ptr = (*rmst_iterator).second; else{ DiffPrint(DEBUG_IMPORTANT, "RmstFilter::processTimer can't find Rmst %d for ACK_TIMER, cancell timer!\n", rmst_no); return -1; } DiffPrint(DEBUG_SOME_DETAILS, "RmstFilter::processTimer ACK_TIMER, at source for rmst_no %d\n", rmst_no); if (rmst_ptr->acked_){ DiffPrint(DEBUG_IMPORTANT, "RmstFilter::processTimer cancel ACK_TIMER, Rmst %d ACKed\n", rmst_no); rmst_ptr->ack_timer_active_ = false; return -1; } // If there has been no data sent for 30 seconds like a NAK response, we need to resend a packet. if( (cur_time.tv_sec - rmst_ptr->last_data_time_.tv_sec) > ACK_WAIT ){ NRAttrVec attrs; int8_t msg_type; DiffPrint(DEBUG_IMPORTANT, "RmstFilter::processTimer ACK_TIMER, waited too long for Rmst %d ACK!\n", rmst_no); if(rmst_ptr->reinf_ && !rmst_ptr->resent_last_data_){ // We should send the last frag again as an DATA packet. DiffPrint(DEBUG_SOME_DETAILS, "RmstFilter::processTimer ACK_TIMER, resend last packet as DATA\n"); msg_type = DATA; rmst_ptr->resent_last_data_ = true; } else if(rmst_ptr->resent_last_data_ && !rmst_ptr->resent_last_exp_){ ExpLog exp_msg; union LlToInt key; // We tried resending last frag as data and it didn't work, try as EXP DiffPrint(DEBUG_IMPORTANT, "RmstFilter::processTimer ACK_TIMER, resend last packet as EXPLORATORY_DATA\n"); // Insert this Exploratory message in exp_map_. // When we get a reinforcement we'll know what rmst it's for. key.int_val_[0] = pkt_count_; key.int_val_[1] = rdm_id_; DiffPrint(DEBUG_LOTS_DETAILS, " Key = %llx\n", key.ll_val_); exp_msg.rmst_no_ = rmst_no; exp_msg.last_hop_ = LOCALHOST_ADDR; exp_map_.insert(Key2ExpLog::value_type(key.ll_val_, exp_msg)); msg_type = EXPLORATORY_DATA; rmst_ptr->reinf_ = false; rmst_ptr->naks_rec_ = 0; rmst_ptr->pkts_sent_ = 0; rmst_ptr->resent_last_exp_ = true; } else if(rmst_ptr->resent_last_data_ && rmst_ptr->resent_last_exp_ && rmst_ptr->reinf_){ // We should send the last frag again as an DATA packet. DiffPrint(DEBUG_IMPORTANT, "RmstFilter::processTimer ACK_TIMER, resend last packet on new reinf path as DATA\n"); msg_type = DATA; rmst_ptr->resent_last_data_ = false; rmst_ptr->resent_last_exp_ = false; } else{ ExpLog exp_msg; union LlToInt key; DiffPrint(DEBUG_IMPORTANT, "RmstFilter::processTimer ACK_TIMER, resent last packet as EXP and no reinforced path, Try again!\n"); // Insert this Exploratory message in exp_map_. // When we get a reinforcement we'll know what rmst it's for. key.int_val_[0] = pkt_count_; key.int_val_[1] = rdm_id_; DiffPrint(DEBUG_LOTS_DETAILS, " Key = %llx\n", key.ll_val_); exp_msg.rmst_no_ = rmst_no; exp_msg.last_hop_ = LOCALHOST_ADDR; exp_map_.insert(Key2ExpLog::value_type(key.ll_val_, exp_msg)); msg_type = EXPLORATORY_DATA; } attrs.push_back(RmstTargetAttr.make(NRAttribute::IS, rmst_ptr->target_str_)); attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, RMST_RESP)); attrs.push_back(RmstFragAttr.make(NRAttribute::IS, rmst_ptr->max_frag_)); attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_ptr->rmst_no_)); frag_ptr = rmst_ptr->getFrag(rmst_ptr->max_frag_); attrs.push_back(RmstDataAttr.make(NRAttribute::IS, frag_ptr, rmst_ptr->max_frag_len_)); Message *new_frag; new_frag = new Message(DIFFUSION_VERSION, msg_type, 0, 0, attrs.size(), pkt_count_, rdm_id_, LOCALHOST_ADDR, LOCALHOST_ADDR); new_frag->msg_attr_vec_ = CopyAttrs(&attrs); ((DiffusionRouting *)dr_)->sendMessage(new_frag, filter_handle_); pkt_count_++; delete new_frag; ClearAttrs(&attrs); // We sent a fragment, set the last_data_time_ for the cleanup timer. GetTime(&rmst_ptr->last_data_time_); } return 0; break; case CLEANUP_TIMER: DiffPrint(DEBUG_SOME_DETAILS, "RmstFilter::processTimer CLEANUP_TIMER"); PrintTime(&cur_time); DiffPrint(DEBUG_IMPORTANT, " CLEANUP_TIMER called\n"); rmst_iterator = rmst_map_.begin(); while(rmst_iterator != rmst_map_.end()){ rmst_ptr = (*rmst_iterator).second; DiffPrint(DEBUG_SOME_DETAILS, " CLEANUP_TIMER:: rmst_no %d : pkts_sent_ = %d, pkts_rec_ = %d, last_hop_pkts_sent_ = %d\n", rmst_ptr->rmst_no_, rmst_ptr->pkts_sent_, rmst_ptr->pkts_rec_, rmst_ptr->last_hop_pkts_sent_); if((!rmst_ptr->reinf_)&&(!rmst_ptr->acked_)&&(!rmst_ptr->local_source_)&&(!local_sink_)){ if( (cur_time.tv_sec - rmst_ptr->last_data_time_.tv_sec) > LONG_CLEANUP_WAIT ) cleanUpRmst(rmst_ptr); } else if (rmst_ptr->acked_){ if ( ( (cur_time.tv_sec - rmst_ptr->last_data_time_.tv_sec) > SHORT_CLEANUP_WAIT ) && ( (cur_time.tv_sec - rmst_ptr->last_nak_time_.tv_sec) > SHORT_CLEANUP_WAIT ) ) cleanUpRmst(rmst_ptr); } rmst_iterator++; } // Check on the BlackList, in case network is partitioned. if (!black_list_.empty()){ if ( (cur_time.tv_sec - last_data_rec_.tv_sec) > RMST_BLACKLIST_WAIT ){ DiffPrint(DEBUG_IMPORTANT, " clearing black_list_!\n"); ((DiffusionRouting *)dr_)->clearBlacklist(); black_list_.clear(); } } if (local_sink_){ if ( (cur_time.tv_sec - last_sink_time_.tv_sec) > SINK_REFRESH_WAIT ){ DiffPrint(DEBUG_IMPORTANT, " local sink timed out\n"); local_sink_ = false; } else DiffPrint(DEBUG_IMPORTANT, " local sink still alive.\n"); } return 0; break; default: break; } return -1;}void RmstFilter::sendRmstToSink(Rmst *rmst_ptr){ NRAttrVec attrs; Message *rmst_msg; NRSimpleAttribute<void *> *rmst_data_attr; NRSimpleAttribute<int> *frag_number_attr; void *frag_ptr; int size, i; DiffPrint(DEBUG_IMPORTANT, "RmstFilter::sendRmstToSink - sending rmst %d to local sink\n", rmst_ptr->rmst_no_); // Prepare attribute vector attrs.push_back(RmstTargetAttr.make(NRAttribute::IS, rmst_ptr->target_str_)); attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, RMST_RESP)); attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_ptr->rmst_no_)); attrs.push_back(RmstMaxFragAttr.make(NRAttribute::IS, rmst_ptr->max_frag_)); frag_number_attr = RmstFragAttr.make(NRAttribute::IS, 0); attrs.push_back(frag_number_attr); // Add the blob fragment if (rmst_ptr->max_frag_ == 0) size = rmst_ptr->max_frag_len_; else size = MAX_FRAG_SIZE; frag_ptr = rmst_ptr->getFrag(0); rmst_data_attr = RmstDataAttr.make(NRAttribute::IS, frag_ptr, size); attrs.push_back(rmst_data_attr); // Prepare the message rmst_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0, attrs.size(), pkt_count_, rdm_id_, LOCALHOST_ADDR, LOCALHOST_ADDR); rmst_msg->next_hop_ = LOCALHOST_ADDR; rmst_msg->next_port_ = local_sink_port_; rmst_msg->msg_attr_vec_= CopyAttrs(&attrs); ((DiffusionRouting *)dr_)->sendMessage(rmst_msg, filter_handle_, 1); delete rmst_msg; pkt_count_++; // Send all the fragments for (i=1; i <= (rmst_ptr->max_frag_); i++){ frag_number_attr->setVal(i); frag_ptr = rmst_ptr->getFrag(i); if(frag_ptr == NULL) DiffPrint(DEBUG_IMPORTANT, "RmstFilter::sendRmstToSink - got a null frag_ptr for frag!%d\n", i); else{ if (rmst_ptr->max_frag_ == i) rmst_data_attr->setVal(frag_ptr, rmst_ptr->max_frag_len_); else rmst_data_attr->setVal(frag_ptr, MAX_FRAG_SIZE); } rmst_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0, attrs.size(), pkt_count_, rdm_id_, LOCALHOST_ADDR, LOCALHOST_ADDR); rmst_msg->next_hop_ = LOCALHOST_ADDR; rmst_msg->next_port_ = local_sink_port_; rmst_msg->msg_attr_vec_= CopyAttrs(&attrs); ((DiffusionRouting *)dr_)->sendMessage(rmst_msg, filter_handle_, 1); delete rmst_msg; pkt_count_++; } ClearAttrs(&attrs);}void RmstFilter::sendAckToSource(Rmst *rmst_ptr){ NRAttrVec attrs; Message *ack_msg; attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, ACK_RESP)); attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_ptr->rmst_no_)); // New code to send a message to last_hop_ ack_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0, attrs.size(), pkt_count_, rdm_id_, rmst_ptr->last_hop_, LOCALHOST_ADDR); ack_msg->msg_attr_vec_ = CopyAttrs(&attrs); DiffPrint(DEBUG_IMPORTANT, " Sending ACK_RESP to node %d\n", rmst_ptr->last_hop_); ((DiffusionRouting *)dr_)->sendMessage(ack_msg, filter_handle_, 1); pkt_count_++; delete ack_msg; ClearAttrs(&attrs);}void RmstFilter::sendExpReqUpstream(Rmst *rmst_ptr){ NRAttrVec attrs; Message *exp_msg; attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, EXP_REQ)); attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_ptr->rmst_no_)); attrs.push_back(RmstFragAttr.make(NRAttribute::IS, rmst_ptr->max_frag_rec_)); exp_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0, attrs.size(), pkt_count_, rdm_id_, rmst_ptr->last_hop_, LOCALHOST_ADDR); exp_msg->msg_attr_vec_ = CopyAttrs(&attrs); DiffPrint(DEBUG_IMPORTANT, " Sending EXP_REQ to node %d\n", rmst_ptr->last_hop_); ((DiffusionRouting *)dr_)->sendMessage(exp_msg, filter_handle_, 1); pkt_count_++; delete exp_msg; ClearAttrs(&attrs);}void RmstFilter::sendContToSource(Rmst *rmst_ptr){ NRAttrVec attrs; Message *cont_msg; attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::EQ, RMST_CONT)); attrs.push_back(NRClassAttr.make(NRAttribute::IS, NRAttribute::INTEREST_CLASS)); DiffPrint(DEBUG_IMPORTANT, " Sending a RMST_CONT to source\n"); cont_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0, attrs.size(), pkt_count_, rdm_id_, LOCALHOST_ADDR, LOCALHOST_ADDR); cont_msg->msg_attr_vec_ = CopyAttrs(&attrs); cont_msg->next_hop_ = LOCALHOST_ADDR; cont_msg->next_port_ = rmst_ptr->local_source_port_; ((DiffusionRouting *)dr_)->sendMessage(cont_msg, filter_handle_, 1); pkt_count_++; delete cont_msg; ClearAttrs(&attrs);}void RmstFilter::cleanUpRmst(Rmst *rmst_ptr){ int rmst_no = rmst_ptr->rmst_no_; Int2Rmst::iterator rmst_iterator; Key2ExpLog::iterator exp_iterator; ExpLog exp_msg; rmst_no = rmst_ptr->rmst_no_; DiffPrint(DEBUG_IMPORTANT, " cleanUpRmst called to delete Rmst %d\n", rmst_no); if(rmst_ptr->watchdog_active_) ((DiffusionRouting *)dr_)->removeTimer(rmst_ptr->watchdog_handle_); if(rmst_ptr->ack_timer_active_) ((DiffusionRouting *)dr_)->removeTimer(rmst_ptr->ack_timer_handle_); rmst_iterator = rmst_map_.find(rmst_no); if(rmst_iterator != rmst_map_.end()){ rmst_map_.erase(rmst_iterator); } delete rmst_ptr; // clean up the exp_map_ of any entries base on this rmst exp_iterator = exp_map_.begin(); while(exp_iterator != exp_map_.end()){ exp_msg = (*exp_iterator).second; if(exp_msg.rmst_no_ == rmst_no){ DiffPrint(DEBUG_LOTS_DETAILS, " cleanUpRmst deleting exp_map_ entry for Rmst %d\n", rmst_no); exp_map_.erase(exp_iterator); } exp_iterator++; }}#ifdef NS_DIFFUSIONRmstFilter::RmstFilter(){#elseRmstFilter::RmstFilter(int argc, char **argv){ TimerCallback *stat_timer; parseCommandLine(argc, argv); dr_ = NR::createNR(diffusion_port_);#endif // NS_DIFFUSION fcb_ = new RmstFilterCallback; fcb_->app_ = this; rdm_id_ = rand(); pkt_count_ = rand(); local_sink_ = false; caching_mode_ = false; send_timer_active_ = false; DiffPrint(DEBUG_ALWAYS, "RmstFilter constructor: rdm_id_ = %x, pkt_count_ = %x\n", rdm_id_, pkt_count_);#ifndef NS_DIFFUSION filter_handle_ = setupFilter(); DiffPrint(DEBUG_LOTS_DETAILS, "RmstFilter:: subscribed to all, received handle %d\n", (int)filter_handle_); DiffPrint(DEBUG_LOTS_DETAILS, "RmstFilter constructor: start cleanup tim
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -