📄 rmst_filter.cc
字号:
if( exp_time > 20){ // Resend an EXP_REQ!!! DiffPrint(DEBUG_IMPORTANT, " Node resends EXP_REQ up blacklisted stream!\n"); sendExpReqUpstream(rmst_ptr); GetTime(&rmst_ptr->exp_req_time_); } else DiffPrint(DEBUG_LOTS_DETAILS, " Node waits to send another EXP_REQ\n"); return 0; } if (rmst_ptr->local_source_){ if(rmst_ptr->acked_){ DiffPrint(DEBUG_IMPORTANT, " WATCHDOG_TIMER Local Source sees acked state - cancel timer\n"); return -1; } else{ DiffPrint(DEBUG_LOTS_DETAILS, " WATCHDOG_TIMER Local Source sees rmst not acked\n"); return 0; } } // Check if we have waited too long for next fragment. if( ((cur_time.tv_sec - rmst_ptr->last_data_time_.tv_sec) > NEXT_FRAG_WAIT) && (!rmst_ptr->rmstComplete()) ){ int newHole = (rmst_ptr->max_frag_rec_)+1; if ( (newHole <= rmst_ptr->max_frag_) && (!rmst_ptr->inHoleMap(newHole)) ){ DiffPrint(DEBUG_SOME_DETAILS, " WATCHDOG_TIMER adds new hole, frag %d\n", newHole); rmst_ptr->putHole(newHole); NakData *nak_ptr = rmst_ptr->getHole(newHole); // Artificially age this hole so it gets naked immediately nak_ptr->tmv.tv_sec -= 4; } } if(rmst_ptr->holeMapEmpty()){ // There aren't any holes! DiffPrint(DEBUG_SOME_DETAILS, " WATCHDOG_TIMER sees No holes\n"); return 0; } else{ // The WATCHDOG_TIMER expired and we have a hole, // so we may need to construct a NAK from the hole map // If we've fallen off the reinforced path, we must // stop adding NAKs to the NakList. DiffPrint(DEBUG_SOME_DETAILS, " WATCHDOG_TIMER sees holes - check times.\n"); if (rmst_ptr->reinf_) setupNak(rmst_no); // Reschedule timer with same value. return 0; } break; case ACK_TIMER: DiffPrint(DEBUG_SOME_DETAILS, "RmstFilter::processTimer ACK_TIMER for Rmst %d", rmst_no); PrintTime(&cur_time); rmst_iterator = rmst_map_.find(rmst_no); if(rmst_iterator != rmst_map_.end()) rmst_ptr = (*rmst_iterator).second; else{ DiffPrint(DEBUG_IMPORTANT, "RmstFilter::processTimer can't find Rmst %d for ACK_TIMER, cancell timer!\n", rmst_no); return -1; } DiffPrint(DEBUG_SOME_DETAILS, "RmstFilter::processTimer ACK_TIMER, at source for rmst_no %d\n", rmst_no); if (rmst_ptr->acked_){ DiffPrint(DEBUG_IMPORTANT, "RmstFilter::processTimer cancel ACK_TIMER, Rmst %d ACKed\n", rmst_no); rmst_ptr->ack_timer_active_ = false; return -1; } // If there has been no data sent for 30 seconds like a NAK response, we need to resend a packet. if( (cur_time.tv_sec - rmst_ptr->last_data_time_.tv_sec) > ACK_WAIT ){ NRAttrVec attrs; int8_t msg_type; DiffPrint(DEBUG_IMPORTANT, "RmstFilter::processTimer ACK_TIMER, waited too long for Rmst %d ACK!\n", rmst_no); if(rmst_ptr->reinf_ && !rmst_ptr->resent_last_data_){ // We should send the last frag again as an DATA packet. DiffPrint(DEBUG_SOME_DETAILS, "RmstFilter::processTimer ACK_TIMER, resend last packet as DATA\n"); msg_type = DATA; rmst_ptr->resent_last_data_ = true; } else if(rmst_ptr->resent_last_data_ && !rmst_ptr->resent_last_exp_){ ExpLog exp_msg; union LlToInt key; // We tried resending last frag as data and it didn't work, try as EXP DiffPrint(DEBUG_IMPORTANT, "RmstFilter::processTimer ACK_TIMER, resend last packet as EXPLORATORY_DATA\n"); // Insert this Exploratory message in exp_map_. // When we get a reinforcement we'll know what rmst it's for. key.int_val_[0] = pkt_count_; key.int_val_[1] = rdm_id_; DiffPrint(DEBUG_LOTS_DETAILS, " Key = %llx\n", key.ll_val_); exp_msg.rmst_no_ = rmst_no; exp_msg.last_hop_ = LOCALHOST_ADDR; exp_map_.insert(Key2ExpLog::value_type(key.ll_val_, exp_msg)); msg_type = EXPLORATORY_DATA; rmst_ptr->reinf_ = false; rmst_ptr->naks_rec_ = 0; rmst_ptr->pkts_sent_ = 0; rmst_ptr->resent_last_exp_ = true; } else if(rmst_ptr->resent_last_data_ && rmst_ptr->resent_last_exp_ && rmst_ptr->reinf_){ // We should send the last frag again as an DATA packet. DiffPrint(DEBUG_IMPORTANT, "RmstFilter::processTimer ACK_TIMER, resend last packet on new reinf path as DATA\n"); msg_type = DATA; rmst_ptr->resent_last_data_ = false; rmst_ptr->resent_last_exp_ = false; } else{ ExpLog exp_msg; union LlToInt key; DiffPrint(DEBUG_IMPORTANT, "RmstFilter::processTimer ACK_TIMER, resent last packet as EXP and no reinforced path, Try again!\n"); // Insert this Exploratory message in exp_map_. // When we get a reinforcement we'll know what rmst it's for. key.int_val_[0] = pkt_count_; key.int_val_[1] = rdm_id_; DiffPrint(DEBUG_LOTS_DETAILS, " Key = %llx\n", key.ll_val_); exp_msg.rmst_no_ = rmst_no; exp_msg.last_hop_ = LOCALHOST_ADDR; exp_map_.insert(Key2ExpLog::value_type(key.ll_val_, exp_msg)); msg_type = EXPLORATORY_DATA; } attrs.push_back(RmstTargetAttr.make(NRAttribute::IS, rmst_ptr->target_str_)); attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, RMST_RESP)); attrs.push_back(RmstFragAttr.make(NRAttribute::IS, rmst_ptr->max_frag_)); attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_ptr->rmst_no_)); frag_ptr = rmst_ptr->getFrag(rmst_ptr->max_frag_); attrs.push_back(RmstDataAttr.make(NRAttribute::IS, frag_ptr, rmst_ptr->max_frag_len_)); Message *new_frag; new_frag = new Message(DIFFUSION_VERSION, msg_type, 0, 0, attrs.size(), pkt_count_, rdm_id_, LOCALHOST_ADDR, LOCALHOST_ADDR); new_frag->msg_attr_vec_ = CopyAttrs(&attrs); ((DiffusionRouting *)dr_)->sendMessage(new_frag, filter_handle_); pkt_count_++; delete new_frag; ClearAttrs(&attrs); // We sent a fragment, set the last_data_time_ for the cleanup timer. GetTime(&rmst_ptr->last_data_time_); } return 0; break; case CLEANUP_TIMER: DiffPrint(DEBUG_SOME_DETAILS, "RmstFilter::processTimer CLEANUP_TIMER"); PrintTime(&cur_time); DiffPrint(DEBUG_IMPORTANT, " CLEANUP_TIMER called\n"); rmst_iterator = rmst_map_.begin(); while(rmst_iterator != rmst_map_.end()){ rmst_ptr = (*rmst_iterator).second; DiffPrint(DEBUG_SOME_DETAILS, " CLEANUP_TIMER:: rmst_no %d : pkts_sent_ = %d, pkts_rec_ = %d, last_hop_pkts_sent_ = %d\n", rmst_ptr->rmst_no_, rmst_ptr->pkts_sent_, rmst_ptr->pkts_rec_, rmst_ptr->last_hop_pkts_sent_); if((!rmst_ptr->reinf_)&&(!rmst_ptr->acked_)&&(!rmst_ptr->local_source_)&&(!local_sink_)){ if( (cur_time.tv_sec - rmst_ptr->last_data_time_.tv_sec) > LONG_CLEANUP_WAIT ) cleanUpRmst(rmst_ptr); } else if (rmst_ptr->acked_){ if ( ( (cur_time.tv_sec - rmst_ptr->last_data_time_.tv_sec) > SHORT_CLEANUP_WAIT ) && ( (cur_time.tv_sec - rmst_ptr->last_nak_time_.tv_sec) > SHORT_CLEANUP_WAIT ) ) cleanUpRmst(rmst_ptr); } rmst_iterator++; } // Check on the BlackList, in case network is partitioned. if (!black_list_.empty()){ if ( (cur_time.tv_sec - last_data_rec_.tv_sec) > RMST_BLACKLIST_WAIT ){ DiffPrint(DEBUG_IMPORTANT, " clearing black_list_!\n"); ((DiffusionRouting *)dr_)->clearBlacklist(); black_list_.clear(); } } if (local_sink_){ if ( (cur_time.tv_sec - last_sink_time_.tv_sec) > SINK_REFRESH_WAIT ){ DiffPrint(DEBUG_IMPORTANT, " local sink timed out\n"); local_sink_ = false; } else DiffPrint(DEBUG_IMPORTANT, " local sink still alive.\n"); } return 0; break; default: break; } return -1;}void RmstFilter::sendRmstToSink(Rmst *rmst_ptr){ NRAttrVec attrs; Message *rmst_msg; NRSimpleAttribute<void *> *rmst_data_attr; NRSimpleAttribute<int> *frag_number_attr; void *frag_ptr; int size, i; DiffPrint(DEBUG_IMPORTANT, "RmstFilter::sendRmstToSink - sending rmst %d to local sink\n", rmst_ptr->rmst_no_); // Prepare attribute vector attrs.push_back(RmstTargetAttr.make(NRAttribute::IS, rmst_ptr->target_str_)); attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, RMST_RESP)); attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_ptr->rmst_no_)); attrs.push_back(RmstMaxFragAttr.make(NRAttribute::IS, rmst_ptr->max_frag_)); frag_number_attr = RmstFragAttr.make(NRAttribute::IS, 0); attrs.push_back(frag_number_attr); // Add the blob fragment if (rmst_ptr->max_frag_ == 0) size = rmst_ptr->max_frag_len_; else size = MAX_FRAG_SIZE; frag_ptr = rmst_ptr->getFrag(0); rmst_data_attr = RmstDataAttr.make(NRAttribute::IS, frag_ptr, size); attrs.push_back(rmst_data_attr); // Prepare the message rmst_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0, attrs.size(), pkt_count_, rdm_id_, LOCALHOST_ADDR, LOCALHOST_ADDR); rmst_msg->next_hop_ = LOCALHOST_ADDR; rmst_msg->next_port_ = local_sink_port_; rmst_msg->msg_attr_vec_= CopyAttrs(&attrs); ((DiffusionRouting *)dr_)->sendMessage(rmst_msg, filter_handle_, 1); delete rmst_msg; pkt_count_++; // Send all the fragments for (i=1; i <= (rmst_ptr->max_frag_); i++){ frag_number_attr->setVal(i); frag_ptr = rmst_ptr->getFrag(i); if(frag_ptr == NULL) DiffPrint(DEBUG_IMPORTANT, "RmstFilter::sendRmstToSink - got a null frag_ptr for frag!%d\n", i); else{ if (rmst_ptr->max_frag_ == i) rmst_data_attr->setVal(frag_ptr, rmst_ptr->max_frag_len_); else rmst_data_attr->setVal(frag_ptr, MAX_FRAG_SIZE); } rmst_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0, attrs.size(), pkt_count_, rdm_id_, LOCALHOST_ADDR, LOCALHOST_ADDR); rmst_msg->next_hop_ = LOCALHOST_ADDR; rmst_msg->next_port_ = local_sink_port_; rmst_msg->msg_attr_vec_= CopyAttrs(&attrs); ((DiffusionRouting *)dr_)->sendMessage(rmst_msg, filter_handle_, 1); delete rmst_msg; pkt_count_++; } ClearAttrs(&attrs);}void RmstFilter::sendAckToSource(Rmst *rmst_ptr){ NRAttrVec attrs; Message *ack_msg; attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, ACK_RESP)); attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_ptr->rmst_no_)); // New code to send a message to last_hop_ ack_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0, attrs.size(), pkt_count_, rdm_id_, rmst_ptr->last_hop_, LOCALHOST_ADDR); ack_msg->msg_attr_vec_ = CopyAttrs(&attrs); DiffPrint(DEBUG_IMPORTANT, " Sending ACK_RESP to node %d\n", rmst_ptr->last_hop_); ((DiffusionRouting *)dr_)->sendMessage(ack_msg, filter_handle_, 1); pkt_count_++; delete ack_msg; ClearAttrs(&attrs);}void RmstFilter::sendExpReqUpstream(Rmst *rmst_ptr){ NRAttrVec attrs; Message *exp_msg; attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, EXP_REQ)); attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_ptr->rmst_no_)); attrs.push_back(RmstFragAttr.make(NRAttribute::IS, rmst_ptr->max_frag_rec_)); exp_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0, attrs.size(), pkt_count_, rdm_id_, rmst_ptr->last_hop_, LOCALHOST_ADDR); exp_msg->msg_attr_vec_ = CopyAttrs(&attrs); DiffPrint(DEBUG_IMPORTANT, " Sending EXP_REQ to node %d\n", rmst_ptr->last_hop_); ((DiffusionRouting *)dr_)->sendMessage(exp_msg, filter_handle_, 1); pkt_count_++; delete exp_msg; ClearAttrs(&attrs);}void RmstFilter::sendContToSource(Rmst *rmst_ptr){ NRAttrVec attrs; Message *cont_msg; attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::EQ, RMST_CONT)); attrs.push_back(NRClassAttr.make(NRAttribute::IS, NRAttribute::INTEREST_CLASS)); DiffPrint(DEBUG_IMPORTANT, " Sending a RMST_CONT to source\n"); cont_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0, attrs.size(), pkt_count_, rdm_id_, LOCALHOST_ADDR, LOCALHOST_ADDR); cont_msg->msg_attr_vec_ = CopyAttrs(&attrs); cont_msg->next_hop_ = LOCALHOST_ADDR; cont_msg->next_port_ = rmst_ptr->local_source_port_; ((DiffusionRouting *)dr_)->sendMessage(cont_msg, filter_handle_, 1); pkt_count_++; delete cont_msg; ClearAttrs(&attrs);}void RmstFilter::cleanUpRmst(Rmst *rmst_ptr){ int rmst_no = rmst_ptr->rmst_no_; Int2Rmst::iterator rmst_iterator; Key2ExpLog::iterator exp_iterator; ExpLog exp_msg; rmst_no = rmst_ptr->rmst_no_; DiffPrint(DEBUG_IMPORTANT, " cleanUpRmst called to delete Rmst %d\n", rmst_no); if(rmst_ptr->watchdog_active_) ((DiffusionRouting *)dr_)->removeTimer(rmst_ptr->watchdog_handle_); if(rmst_ptr->ack_timer_active_) ((DiffusionRouting *)dr_)->removeTimer(rmst_ptr
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -