📄 rmst_filter.cc
字号:
rmst_iterator = rmst_map_.begin(); rmst_iterator = rmst_map_.find(rmst_no); if(rmst_iterator != rmst_map_.end()) rmst_ptr = (*rmst_iterator).second; else{ DiffPrint(DEBUG_IMPORTANT, " Filter can't find Rmst %d for Rmst control msg\n", rmst_no); return; } switch (rmst_ctl_type){ case(ACK_RESP): DiffPrint(DEBUG_IMPORTANT, " Got an ACK_RESP\n"); // For now we automatically forward ACKs if we're not the source. rmst_ptr->acked_ = true; if(!rmst_ptr->local_source_){ Message *ack_msg; // If we got an ACK and we aren't the source, we must be an // intermediate node (Sinks don't get ACKs, they send them). // We need to forward ACK toward source if possible. if (rmst_ptr->reinf_) { DiffPrint(DEBUG_SOME_DETAILS, " forwarding ACK to %d\n", rmst_ptr->last_hop_); attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, ACK_RESP)); attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_no)); ack_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0, attrs.size(), pkt_count_, rdm_id_, rmst_ptr->last_hop_, LOCALHOST_ADDR); ack_msg->msg_attr_vec_ = CopyAttrs(&attrs); ((DiffusionRouting *)dr_)->sendMessage(ack_msg, filter_handle_, 1); pkt_count_++; delete ack_msg; ClearAttrs(&attrs); } else DiffPrint(DEBUG_IMPORTANT, " intermediate node can't forward ACK for Rmst %d\n", rmst_no); } else{ DiffPrint(DEBUG_IMPORTANT, " Source got ACK for Rmst %d\n", rmst_no); sendContToSource(rmst_ptr); } break; case(NAK_REQ): // Mark the time we got this NAK for cleanup timer. GetTime(&rmst_ptr->last_nak_time_); rmst_ptr->naks_rec_++; DiffPrint(DEBUG_IMPORTANT, " Got a NAK_REQ; number = %d\n", rmst_ptr->naks_rec_); if ((rmst_ptr->naks_rec_ > 10) && (rmst_ptr->naks_rec_ > (.30 * rmst_ptr->max_frag_)) && rmst_ptr->local_source_){ DiffPrint(DEBUG_IMPORTANT, " Too many NAKs - send an EXPLORATORY msg!\n"); processExpReq(rmst_ptr, rmst_ptr->max_frag_sent_); return; } // If we sent an exp request more than 30 seconds ago, // we send it again. if (rmst_ptr->sent_exp_req_){ int exp_time = rmst_ptr->last_nak_time_.tv_sec - rmst_ptr->exp_req_time_.tv_sec; DiffPrint(DEBUG_SOME_DETAILS, " Node that sent an EXP_REQ got a NAK: time since last exp = %d\n", exp_time); if( (rmst_ptr->last_nak_time_.tv_sec - rmst_ptr->exp_req_time_.tv_sec) > 30){ // Resend an EXP_REQ!!! DiffPrint(DEBUG_IMPORTANT, " Node resends EXP_REQ up blacklisted stream!\n"); sendExpReqUpstream(rmst_ptr); GetTime(&rmst_ptr->exp_req_time_); // Send another negative reinforcement on blacklisted link! Message *neg_reinf_msg; neg_reinf_msg = new Message(DIFFUSION_VERSION, NEGATIVE_REINFORCEMENT, 0, 0, interest_attrs_->size(), pkt_count_, rdm_id_, rmst_ptr->last_hop_, LOCALHOST_ADDR); neg_reinf_msg->msg_attr_vec_ = CopyAttrs(interest_attrs_); ((DiffusionRouting *)dr_)->sendMessage(neg_reinf_msg, filter_handle_, 1); pkt_count_++; delete neg_reinf_msg; } return; } // We need to send the naked fragments if we are the source, // or a caching node. place = data->begin(); for(;;){ frag_attr = RmstFragAttr.find_from(data, place, &place); if (!frag_attr) break; frag_no = frag_attr->getVal(); DiffPrint(DEBUG_IMPORTANT, " Filter received a NAK_REQ for Rmst %d, frag %d\n", rmst_no, frag_no); // Check if we have this fragment. // If not forward NAK toward source if possible. frag_ptr = rmst_ptr->getFrag(frag_no); if (frag_ptr == NULL){ DiffPrint(DEBUG_SOME_DETAILS, " Filter can't find frag %d of Rmst %d for NAK\n", frag_no, rmst_no); // We need to forward NAK toward source if possible. if ( (rmst_ptr->reinf_) && (rmst_ptr->last_hop_ != LOCALHOST_ADDR) ){ forwarding_nak = true; DiffPrint(DEBUG_IMPORTANT, " forwarding NAK to %d\n", rmst_ptr->last_hop_); attrs.push_back(RmstFragAttr.make(NRAttribute::IS, frag_no)); if(caching_mode_){ // We need to add this fragment to our hole map! rmst_ptr->putHole(frag_no); NakData *nak_ptr = rmst_ptr->getHole(frag_no); // Artificially age this hole so it gets // naked immediately. nak_ptr->tmv.tv_sec -= 1; } } else DiffPrint(DEBUG_IMPORTANT, " not forwarding NAK! - no place to send it!\n"); } else{ // We have this fragment so add it to the NakList for sending. NakMsgData nak_msg_data; NakList::iterator nak_list_iterator; nak_list_iterator = nak_list_.begin(); while(nak_list_iterator != nak_list_.end()){ if((nak_list_iterator->rmst_no_ == rmst_no) && (nak_list_iterator->frag_no_ == frag_no)) break; nak_list_iterator++; } if(nak_list_iterator == nak_list_.end()){ DiffPrint(DEBUG_SOME_DETAILS, " adding NAK for rmst %d frag %d to nak_list_\n", rmst_no, frag_no); nak_msg_data.rmst_no_ = rmst_no; nak_msg_data.frag_no_ = frag_no; nak_list_.push_back(nak_msg_data); if(!send_timer_active_){ TimerCallback *send_timer; // Now add a timer to send this and any NAKS. DiffPrint(DEBUG_LOTS_DETAILS, " Set a SEND_TIMER for reinforced rmst_no %d\n", rmst_no); send_timer = new RmstTimeout(this, rmst_no, SEND_TIMER); // We check on things every second. send_timer_handle_ = ((DiffusionRouting *)dr_)->addTimer(SEND_INTERVAL, send_timer); send_timer_active_ = true; } } } place++; } if (forwarding_nak){ attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, NAK_REQ)); attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_no)); nak_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0, attrs.size(), pkt_count_, rdm_id_, rmst_ptr->last_hop_, LOCALHOST_ADDR); nak_msg->msg_attr_vec_ = CopyAttrs(&attrs); ((DiffusionRouting *)dr_)->sendMessage(nak_msg, filter_handle_, 1); pkt_count_++; delete nak_msg; ClearAttrs(&attrs); } break; case(EXP_REQ): DiffPrint(DEBUG_IMPORTANT, " Got an EXP_REQ\n"); if(!rmst_ptr->local_source_){ DiffPrint(DEBUG_SOME_DETAILS, " Filter forwarding EXP_REQ for Rmst %d\n", rmst_no); // We need to forward EXP_REQ toward source if possible. if (rmst_ptr->reinf_) sendExpReqUpstream(rmst_ptr); } else{ // We need to call a routine that will clean the NAK list of // outstanding NAK responses for this Rmst, put a new expBase // in the send list (lowest of nak or send Lists), and set this // rmst as not reinforced. frag_attr = RmstFragAttr.find(msg->msg_attr_vec_); frag_no = frag_attr->getVal(); DiffPrint(DEBUG_IMPORTANT, " Source got EXP request for Rmst %d\n", rmst_no); if (rmst_ptr->reinf_) processExpReq(rmst_ptr, frag_no); else DiffPrint(DEBUG_IMPORTANT, " EXP request for non-reinforced Rmst %d\n", rmst_no); } break; default: break; } // switch (rmst_ctl_type) return;}void RmstFilter::setupNak(int rmst_id){ NRAttrVec attrs; int frag_id; NakData *nak_ptr; Rmst *rmst_ptr; int nak_count = 0; Int2Rmst::iterator rmst_iterator = rmst_map_.find(rmst_id); if(rmst_iterator != rmst_map_.end()) rmst_ptr = (*rmst_iterator).second; else{ DiffPrint(DEBUG_IMPORTANT, "setupNak - can't find Rmst %d\n", rmst_id); return; } Int2Nak::iterator hole_iter = rmst_ptr->hole_map_.begin(); bool send_new_nak = false; timeval cur_time; // We now have an iterator to look at each hole (hole_iter), // a Rmst Id (rmst_id), a fragment Id ((*hole_iter).first), // a NakData pointer ((*hole_iter).second), and an Rmst // pointer (rmst_ptr). GetTime (&cur_time); // The first pass finds holes that haven't been NAKed and should be. while(hole_iter != rmst_ptr->hole_map_.end()){ frag_id = (*hole_iter).first; nak_ptr = (*hole_iter).second; DiffPrint(DEBUG_SOME_DETAILS, " setupNak - found hole rmst_id %d, frag %d\n", rmst_id, frag_id); // If we never NAKed this fragment and it's past due, // mark it so it gets NAKed. if (!nak_ptr->nak_sent_){ if ( (cur_time.tv_sec - nak_ptr->tmv.tv_sec) > 3 ){ nak_ptr->nak_sent_ = true; nak_ptr->send_nak_ = true; send_new_nak = true; } else DiffPrint(DEBUG_SOME_DETAILS, " setupNak - hole %d not old enough to NAK\n", frag_id); } // If we NAKed this fragment and the NAK response is past due, // NAK it again. else if ( (cur_time.tv_sec - nak_ptr->tmv.tv_sec) > NAK_RESPONSE_WAIT ){ DiffPrint(DEBUG_SOME_DETAILS, " setupNak - hole %d has an overdue NAK\n", frag_id); nak_ptr->send_nak_ = true; send_new_nak = true; } hole_iter++; } if (send_new_nak){ Message *nak_msg; if ( rmst_ptr->last_hop_ == LOCALHOST_ADDR ){ DiffPrint(DEBUG_IMPORTANT, " can't send NAK, no last_hop_!\n"); return; } // The second pass adds all holes that should be NAKed to vector. hole_iter = rmst_ptr->hole_map_.begin(); while( (hole_iter != rmst_ptr->hole_map_.end()) && (nak_count <= 10) ){ frag_id = (*hole_iter).first; nak_ptr = (*hole_iter).second; if ( nak_ptr->send_nak_ ){ nak_ptr->send_nak_ = false; DiffPrint(DEBUG_SOME_DETAILS, " setupNak - adding a NAK for frag_id %d to attrs\n", frag_id); attrs.push_back(RmstFragAttr.make(NRAttribute::IS, frag_id)); GetTime(&(nak_ptr->tmv)); nak_count++; } hole_iter++; } attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, NAK_REQ)); attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_id)); // Code to send a message to last_hop_ nak_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0, attrs.size(), pkt_count_, rdm_id_, rmst_ptr->last_hop_, LOCALHOST_ADDR); pkt_count_++; nak_msg->msg_attr_vec_ = CopyAttrs(&attrs); DiffPrint(DEBUG_IMPORTANT, " Sending NAK_REQ to node %d\n", rmst_ptr->last_hop_); ((DiffusionRouting *)dr_)->sendMessage(nak_msg, filter_handle_, 1); delete nak_msg; ClearAttrs(&attrs); // Mark the time we sent this NAK for cleanup timer. GetTime(&rmst_ptr->last_nak_time_); } else DiffPrint(DEBUG_SOME_DETAILS, " setupNak - no need for a new NAK for rmst_id %d\n", rmst_id); return;}void RmstFilter::processExpReq(Rmst *rmst_ptr, int frag_no){ NakList::iterator nak_list_iterator; SendList::iterator send_list_iterator; int rmst_no = rmst_ptr->rmst_no_; DiffPrint(DEBUG_IMPORTANT, " processExpReq called for rmstId %d, frag_no %d\n", rmst_no, frag_no); // Indicate that Rmst is not reinforced. rmst_ptr->reinf_ = false; rmst_ptr->pkts_sent_ = 0; // If we have an ACK_TIMER active cancel it. We want to resend some packets. if(rmst_ptr->ack_timer_active_){ ((DiffusionRouting *)dr_)->removeTimer(rmst_ptr->ack_timer_handle_); rmst_ptr->ack_timer_active_ = false; } // Erase any NAKs. We are about to establish a new path. nak_list_iterator = nak_list_.begin(); while (nak_list_iterator != nak_list_.end()){ if (nak_list_iterator->rmst_no_ == rmst_no){ DiffPrint(DEBUG_SOME_DETAILS, " processExpReq erasing frag_no %d from nak_list_\n", nak_list_iterator->frag_no_); nak_list_iterator = nak_list_.erase(nak_list_iterator); } else nak_list_iterator++; } DiffPrint(DEBUG_LOTS_DETAILS, " processExpReq done with nak_list_ for rmstId %d\n", rmst_no); // If we are being told to start by resending the last packet, back up by one. // When a sink gets an exploratory message, they don't start re-NAKing until they // know they are reinforced. Sinks only know they are reinforced when they get DATA. if ( (frag_no == rmst_ptr->max_frag_) && (rmst_ptr->max_frag_ > 0) ){ frag_no--; DiffPrint(DEBUG_IMPORTANT, " processExpReq decrements frag_no to %d\n", frag_no); } // Update send_list_ entry or add one. send_list_iterator = send_list_.begin(); while (send_list_iterator != send_list_.end()){ if (send_list_iterator->rmst_no_ == rmst_no) break; send_list_iterator++; } if (send_list_iterator != send_list_.end()){ send_list_iterator->exp_base_ = frag_no; DiffPrint(DEBUG_SOME_DETAILS, " processExpReq sets send_list_ expBase to %d\n", frag_no); send_list_iterator->last_frag_sent_ = frag_no-1; } else{ SendMsgData new_send_msg; DiffPrint(DEBUG_SOME_DETAILS, " processExpReq creating new send_list_ entry for rmstId %d\n", rmst_no); DiffPrint(DEBUG_SOME_DETAILS, " processExpReq sets send_list_ expBase to %d\n", frag_no); new_send_msg.rmst_no_ = rmst_no; new_send_msg.exp_base_ = frag_no; new_send_msg.last_frag_sent_ = frag_no-1; send_list_.push_front(new_send_msg); if(!send_timer_active_){
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -