⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rmst_filter.cc

📁 跑leach需要的
💻 CC
📖 第 1 页 / 共 5 页
字号:
  rmst_iterator = rmst_map_.begin();  rmst_iterator = rmst_map_.find(rmst_no);  if(rmst_iterator != rmst_map_.end())    rmst_ptr = (*rmst_iterator).second;  else{    DiffPrint(DEBUG_IMPORTANT, "  Filter can't find Rmst %d for Rmst control msg\n", rmst_no);    return;  }  switch (rmst_ctl_type){  case(ACK_RESP):    DiffPrint(DEBUG_IMPORTANT, "  Got an ACK_RESP\n");    // For now we automatically forward ACKs if we're not the source.    rmst_ptr->acked_ = true;    if(!rmst_ptr->local_source_){      Message *ack_msg;      // If we got an ACK and we aren't the source, we must be an      // intermediate node (Sinks don't get ACKs, they send them).      // We need to forward ACK toward source if possible.      if (rmst_ptr->reinf_) {        DiffPrint(DEBUG_SOME_DETAILS, "  forwarding ACK to %d\n", rmst_ptr->last_hop_);        attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, ACK_RESP));        attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_no));        ack_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0, attrs.size(),          pkt_count_, rdm_id_, rmst_ptr->last_hop_, LOCALHOST_ADDR);        ack_msg->msg_attr_vec_ = CopyAttrs(&attrs);        ((DiffusionRouting *)dr_)->sendMessage(ack_msg, filter_handle_, 1);        pkt_count_++;        delete ack_msg;        ClearAttrs(&attrs);      }      else        DiffPrint(DEBUG_IMPORTANT, "  intermediate node can't forward ACK for Rmst %d\n", rmst_no);    }    else{      DiffPrint(DEBUG_IMPORTANT, "  Source got ACK for Rmst %d\n", rmst_no);      sendContToSource(rmst_ptr);    }    break;  case(NAK_REQ):    // Mark the time we got this NAK for cleanup timer.    GetTime(&rmst_ptr->last_nak_time_);    rmst_ptr->naks_rec_++;    DiffPrint(DEBUG_IMPORTANT, "  Got a NAK_REQ; number = %d\n", rmst_ptr->naks_rec_);    if ((rmst_ptr->naks_rec_ > 10) && (rmst_ptr->naks_rec_ > (.30 * rmst_ptr->max_frag_)) &&            rmst_ptr->local_source_){      DiffPrint(DEBUG_IMPORTANT, "  Too many NAKs - send an EXPLORATORY msg!\n");      processExpReq(rmst_ptr, rmst_ptr->max_frag_sent_);      return;    }    // If we sent an exp request more than 30 seconds ago,    // we send it again.    if (rmst_ptr->sent_exp_req_){      int exp_time = rmst_ptr->last_nak_time_.tv_sec - rmst_ptr->exp_req_time_.tv_sec;      DiffPrint(DEBUG_SOME_DETAILS,         "  Node that sent an EXP_REQ got a NAK: time since last exp = %d\n", exp_time);      if( (rmst_ptr->last_nak_time_.tv_sec - rmst_ptr->exp_req_time_.tv_sec) > 30){        // Resend an EXP_REQ!!!        DiffPrint(DEBUG_IMPORTANT, "  Node resends EXP_REQ up blacklisted stream!\n");        sendExpReqUpstream(rmst_ptr);        GetTime(&rmst_ptr->exp_req_time_);        // Send another negative reinforcement on blacklisted link!        Message *neg_reinf_msg;        neg_reinf_msg = new Message(DIFFUSION_VERSION, NEGATIVE_REINFORCEMENT,          0, 0, interest_attrs_->size(), pkt_count_, rdm_id_,           rmst_ptr->last_hop_, LOCALHOST_ADDR);        neg_reinf_msg->msg_attr_vec_ = CopyAttrs(interest_attrs_);        ((DiffusionRouting *)dr_)->sendMessage(neg_reinf_msg, filter_handle_, 1);        pkt_count_++;        delete neg_reinf_msg;      }      return;    }    // We need to send the naked fragments if we are the source,    // or a caching node.    place = data->begin();    for(;;){      frag_attr = RmstFragAttr.find_from(data, place, &place);      if (!frag_attr)        break;      frag_no = frag_attr->getVal();      DiffPrint(DEBUG_IMPORTANT, "  Filter received a NAK_REQ for Rmst %d, frag %d\n",        rmst_no, frag_no);      // Check if we have this fragment.      // If not forward NAK toward source if possible.      frag_ptr =  rmst_ptr->getFrag(frag_no);      if (frag_ptr == NULL){        DiffPrint(DEBUG_SOME_DETAILS, "  Filter can't find frag %d of Rmst %d for NAK\n",        frag_no, rmst_no);        // We need to forward NAK toward source if possible.        if ( (rmst_ptr->reinf_) && (rmst_ptr->last_hop_ != LOCALHOST_ADDR) ){          forwarding_nak = true;          DiffPrint(DEBUG_IMPORTANT, "  forwarding NAK to %d\n", rmst_ptr->last_hop_);          attrs.push_back(RmstFragAttr.make(NRAttribute::IS, frag_no));          if(caching_mode_){            // We need to add this fragment to our hole map!            rmst_ptr->putHole(frag_no);            NakData *nak_ptr = rmst_ptr->getHole(frag_no);            // Artificially age this hole so it gets            // naked immediately.            nak_ptr->tmv.tv_sec -= 1;          }        }        else          DiffPrint(DEBUG_IMPORTANT, "  not forwarding NAK! - no place to send it!\n");      }      else{        // We have this fragment so add it to the NakList for sending.        NakMsgData nak_msg_data;        NakList::iterator nak_list_iterator;        nak_list_iterator = nak_list_.begin();        while(nak_list_iterator != nak_list_.end()){          if((nak_list_iterator->rmst_no_ == rmst_no) &&            (nak_list_iterator->frag_no_ == frag_no))            break;          nak_list_iterator++;        }        if(nak_list_iterator == nak_list_.end()){          DiffPrint(DEBUG_SOME_DETAILS,            "  adding NAK for rmst %d frag %d to nak_list_\n", rmst_no, frag_no);          nak_msg_data.rmst_no_ = rmst_no;          nak_msg_data.frag_no_ = frag_no;          nak_list_.push_back(nak_msg_data);          if(!send_timer_active_){            TimerCallback *send_timer;            // Now add a timer to send this and any NAKS.            DiffPrint(DEBUG_LOTS_DETAILS,               "  Set a SEND_TIMER for reinforced rmst_no %d\n", rmst_no);            send_timer = new RmstTimeout(this, rmst_no, SEND_TIMER);            // We check on things every second.            send_timer_handle_ =               ((DiffusionRouting *)dr_)->addTimer(SEND_INTERVAL, send_timer);            send_timer_active_ = true;          }        }      }      place++;    }    if (forwarding_nak){      attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, NAK_REQ));      attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_no));      nak_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0,      attrs.size(), pkt_count_, rdm_id_, rmst_ptr->last_hop_,        LOCALHOST_ADDR);      nak_msg->msg_attr_vec_ = CopyAttrs(&attrs);      ((DiffusionRouting *)dr_)->sendMessage(nak_msg, filter_handle_, 1);      pkt_count_++;      delete nak_msg;      ClearAttrs(&attrs);    }    break;  case(EXP_REQ):    DiffPrint(DEBUG_IMPORTANT, "  Got an EXP_REQ\n");    if(!rmst_ptr->local_source_){      DiffPrint(DEBUG_SOME_DETAILS, "  Filter forwarding EXP_REQ for Rmst %d\n", rmst_no);      // We need to forward EXP_REQ toward source if possible.      if (rmst_ptr->reinf_)        sendExpReqUpstream(rmst_ptr);    }    else{      // We need to call a routine that will clean the NAK list of       // outstanding NAK responses for this Rmst, put a new expBase      // in the send list (lowest of nak or send Lists), and set this      // rmst as not reinforced.      frag_attr = RmstFragAttr.find(msg->msg_attr_vec_);      frag_no = frag_attr->getVal();      DiffPrint(DEBUG_IMPORTANT, "  Source got EXP request for Rmst %d\n", rmst_no);      if (rmst_ptr->reinf_)          processExpReq(rmst_ptr, frag_no);      else          DiffPrint(DEBUG_IMPORTANT, "  EXP request for non-reinforced Rmst %d\n", rmst_no);    }    break;  default:    break;  }  // switch (rmst_ctl_type)  return;}void RmstFilter::setupNak(int rmst_id){  NRAttrVec attrs;  int frag_id;  NakData *nak_ptr;  Rmst *rmst_ptr;  int nak_count = 0;  Int2Rmst::iterator rmst_iterator = rmst_map_.find(rmst_id);  if(rmst_iterator != rmst_map_.end())    rmst_ptr = (*rmst_iterator).second;  else{    DiffPrint(DEBUG_IMPORTANT, "setupNak - can't find Rmst %d\n", rmst_id);    return;  }  Int2Nak::iterator hole_iter = rmst_ptr->hole_map_.begin();  bool send_new_nak = false;  timeval cur_time;  // We now have an iterator to look at each hole (hole_iter),  // a Rmst Id (rmst_id), a fragment Id ((*hole_iter).first),  // a NakData pointer ((*hole_iter).second), and an Rmst  // pointer (rmst_ptr).  GetTime (&cur_time);  // The first pass finds holes that haven't been NAKed and should be.  while(hole_iter != rmst_ptr->hole_map_.end()){    frag_id = (*hole_iter).first;    nak_ptr = (*hole_iter).second;    DiffPrint(DEBUG_SOME_DETAILS,      "  setupNak - found hole rmst_id %d, frag %d\n", rmst_id, frag_id);    // If we never NAKed this fragment and it's past due,    // mark it so it gets NAKed.    if (!nak_ptr->nak_sent_){      if ( (cur_time.tv_sec - nak_ptr->tmv.tv_sec) > 3 ){        nak_ptr->nak_sent_ = true;        nak_ptr->send_nak_ = true;        send_new_nak = true;      }      else        DiffPrint(DEBUG_SOME_DETAILS,          "  setupNak - hole %d not old enough to NAK\n", frag_id);    }    // If we NAKed this fragment and the NAK response is past due,    // NAK it again.    else if ( (cur_time.tv_sec - nak_ptr->tmv.tv_sec) > NAK_RESPONSE_WAIT ){      DiffPrint(DEBUG_SOME_DETAILS, "  setupNak - hole %d has an overdue NAK\n", frag_id);      nak_ptr->send_nak_ = true;      send_new_nak = true;    }    hole_iter++;  }  if (send_new_nak){    Message *nak_msg;    if ( rmst_ptr->last_hop_ == LOCALHOST_ADDR ){      DiffPrint(DEBUG_IMPORTANT, "  can't send NAK, no last_hop_!\n");      return;    }    // The second pass adds all holes that should be NAKed to vector.    hole_iter = rmst_ptr->hole_map_.begin();    while( (hole_iter != rmst_ptr->hole_map_.end()) && (nak_count <= 10) ){      frag_id = (*hole_iter).first;      nak_ptr = (*hole_iter).second;      if ( nak_ptr->send_nak_ ){        nak_ptr->send_nak_ = false;        DiffPrint(DEBUG_SOME_DETAILS,          "  setupNak - adding a NAK for frag_id %d to attrs\n", frag_id);        attrs.push_back(RmstFragAttr.make(NRAttribute::IS, frag_id));        GetTime(&(nak_ptr->tmv));        nak_count++;      }      hole_iter++;    }    attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, NAK_REQ));    attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_id));    // Code to send a message to last_hop_    nak_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0, attrs.size(),      pkt_count_, rdm_id_, rmst_ptr->last_hop_, LOCALHOST_ADDR);    pkt_count_++;    nak_msg->msg_attr_vec_ = CopyAttrs(&attrs);    DiffPrint(DEBUG_IMPORTANT, "  Sending NAK_REQ to node %d\n", rmst_ptr->last_hop_);    ((DiffusionRouting *)dr_)->sendMessage(nak_msg, filter_handle_, 1);    delete nak_msg;    ClearAttrs(&attrs);    // Mark the time we sent this NAK for cleanup timer.    GetTime(&rmst_ptr->last_nak_time_);  }  else    DiffPrint(DEBUG_SOME_DETAILS, "  setupNak - no need for a new NAK for rmst_id %d\n", rmst_id);  return;}void RmstFilter::processExpReq(Rmst *rmst_ptr, int frag_no){  NakList::iterator nak_list_iterator;  SendList::iterator send_list_iterator;  int rmst_no = rmst_ptr->rmst_no_;  DiffPrint(DEBUG_IMPORTANT, "  processExpReq called for rmstId %d, frag_no %d\n", rmst_no, frag_no);  // Indicate that Rmst is not reinforced.  rmst_ptr->reinf_ = false;  rmst_ptr->pkts_sent_ = 0;  // If we have an ACK_TIMER active cancel it. We want to resend some packets.  if(rmst_ptr->ack_timer_active_){    ((DiffusionRouting *)dr_)->removeTimer(rmst_ptr->ack_timer_handle_);    rmst_ptr->ack_timer_active_ = false;  }  // Erase any NAKs. We are about to establish a new path.  nak_list_iterator = nak_list_.begin();  while (nak_list_iterator != nak_list_.end()){    if (nak_list_iterator->rmst_no_ == rmst_no){      DiffPrint(DEBUG_SOME_DETAILS,        "  processExpReq erasing frag_no %d from nak_list_\n", nak_list_iterator->frag_no_);      nak_list_iterator = nak_list_.erase(nak_list_iterator);    }    else      nak_list_iterator++;  }  DiffPrint(DEBUG_LOTS_DETAILS, "  processExpReq done with nak_list_ for rmstId %d\n", rmst_no);  // If we are being told to start by resending the last packet, back up by one.  // When a sink gets an exploratory message, they don't start re-NAKing until they  // know they are reinforced. Sinks only know they are reinforced when they get DATA.  if ( (frag_no == rmst_ptr->max_frag_) && (rmst_ptr->max_frag_ > 0) ){    frag_no--;    DiffPrint(DEBUG_IMPORTANT, "  processExpReq decrements frag_no to %d\n", frag_no);  }  // Update send_list_ entry or add one.  send_list_iterator = send_list_.begin();  while (send_list_iterator != send_list_.end()){    if (send_list_iterator->rmst_no_ == rmst_no)      break;    send_list_iterator++;  }  if (send_list_iterator != send_list_.end()){    send_list_iterator->exp_base_ = frag_no;    DiffPrint(DEBUG_SOME_DETAILS, "  processExpReq sets send_list_ expBase to %d\n", frag_no);    send_list_iterator->last_frag_sent_ = frag_no-1;  }  else{    SendMsgData new_send_msg;    DiffPrint(DEBUG_SOME_DETAILS,      "  processExpReq creating new send_list_ entry for rmstId %d\n", rmst_no);    DiffPrint(DEBUG_SOME_DETAILS,      "  processExpReq sets send_list_ expBase to %d\n", frag_no);    new_send_msg.rmst_no_ = rmst_no;    new_send_msg.exp_base_ = frag_no;    new_send_msg.last_frag_sent_ = frag_no-1;    send_list_.push_front(new_send_msg);    if(!send_timer_active_){

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -