⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rmst_filter.cc

📁 跑leach需要的
💻 CC
📖 第 1 页 / 共 5 页
字号:
      //   so we may need to construct a NAK from the hole map      //   If we've fallen off the reinforced path, we must      //   stop adding NAKs to the NakList.      DiffPrint(DEBUG_SOME_DETAILS, "  WATCHDOG_TIMER sees holes - check times.\n");      if (rmst_ptr->reinf_)        setupNak(rmst_no);      // Reschedule timer with same value.      return 0;    }    break;  case ACK_TIMER:    DiffPrint(DEBUG_SOME_DETAILS, "RmstFilter::processTimer ACK_TIMER for Rmst %d", rmst_no);    PrintTime(&cur_time);    rmst_iterator = rmst_map_.find(rmst_no);    if(rmst_iterator != rmst_map_.end())      rmst_ptr = (*rmst_iterator).second;    else{      DiffPrint(DEBUG_IMPORTANT,        "RmstFilter::processTimer can't find Rmst %d for ACK_TIMER, cancell timer!\n",           rmst_no);      return -1;    }    DiffPrint(DEBUG_SOME_DETAILS, "RmstFilter::processTimer ACK_TIMER, at source for rmst_no %d\n",      rmst_no);    if (rmst_ptr->acked_){      DiffPrint(DEBUG_IMPORTANT, "RmstFilter::processTimer cancel ACK_TIMER, Rmst %d ACKed\n",        rmst_no);      rmst_ptr->ack_timer_active_ = false;      return -1;    }    // If there has been no data sent for 30 seconds like a NAK response, we need to resend a packet.    if( (cur_time.tv_sec - rmst_ptr->last_data_time_.tv_sec) > ACK_WAIT ){      NRAttrVec attrs;      int8_t msg_type;      DiffPrint(DEBUG_IMPORTANT,         "RmstFilter::processTimer ACK_TIMER, waited too long for Rmst %d ACK!\n", rmst_no);      if(rmst_ptr->reinf_  && !rmst_ptr->resent_last_data_){        // We should send the last frag again as an DATA packet.        DiffPrint(DEBUG_SOME_DETAILS,           "RmstFilter::processTimer ACK_TIMER, resend last packet as DATA\n");        msg_type = DATA;        rmst_ptr->resent_last_data_ = true;      }      else if(rmst_ptr->resent_last_data_ && !rmst_ptr->resent_last_exp_){        ExpLog exp_msg;        union LlToInt key;        // We tried resending last frag as data and it didn't work, try as EXP        DiffPrint(DEBUG_IMPORTANT,          "RmstFilter::processTimer ACK_TIMER, resend last packet as EXPLORATORY_DATA\n");        // Insert this Exploratory message in exp_map_.        // When we get a reinforcement we'll know what rmst it's for.        key.int_val_[0] = pkt_count_;        key.int_val_[1] = rdm_id_;        DiffPrint(DEBUG_LOTS_DETAILS, "  Key = %llx\n", key.ll_val_);        exp_msg.rmst_no_ = rmst_no;        exp_msg.last_hop_ = LOCALHOST_ADDR;        exp_map_.insert(Key2ExpLog::value_type(key.ll_val_, exp_msg));        msg_type = EXPLORATORY_DATA;        rmst_ptr->reinf_ = false;        rmst_ptr->naks_rec_ = 0;        rmst_ptr->pkts_sent_ = 0;        rmst_ptr->resent_last_exp_ = true;      }      else if(rmst_ptr->resent_last_data_ && rmst_ptr->resent_last_exp_ && rmst_ptr->reinf_){        // We should send the last frag again as an DATA packet.        DiffPrint(DEBUG_IMPORTANT,          "RmstFilter::processTimer ACK_TIMER, resend last packet on new reinf path as DATA\n");        msg_type = DATA;        rmst_ptr->resent_last_data_ = false;        rmst_ptr->resent_last_exp_ = false;      }      else{        ExpLog exp_msg;        union LlToInt key;        DiffPrint(DEBUG_IMPORTANT,           "RmstFilter::processTimer ACK_TIMER, resent last packet as EXP and no reinforced path, Try again!\n");        // Insert this Exploratory message in exp_map_.        // When we get a reinforcement we'll know what rmst it's for.        key.int_val_[0] = pkt_count_;        key.int_val_[1] = rdm_id_;        DiffPrint(DEBUG_LOTS_DETAILS, "  Key = %llx\n", key.ll_val_);        exp_msg.rmst_no_ = rmst_no;        exp_msg.last_hop_ = LOCALHOST_ADDR;        exp_map_.insert(Key2ExpLog::value_type(key.ll_val_, exp_msg));        msg_type = EXPLORATORY_DATA;      }      attrs.push_back(RmstTargetAttr.make(NRAttribute::IS, rmst_ptr->target_str_));      attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, RMST_RESP));      attrs.push_back(RmstFragAttr.make(NRAttribute::IS, rmst_ptr->max_frag_));      attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_ptr->rmst_no_));      frag_ptr =  rmst_ptr->getFrag(rmst_ptr->max_frag_);      attrs.push_back(RmstDataAttr.make(NRAttribute::IS,         frag_ptr, rmst_ptr->max_frag_len_));      Message *new_frag;      new_frag = new Message(DIFFUSION_VERSION, msg_type, 0, 0, attrs.size(), pkt_count_,        rdm_id_, LOCALHOST_ADDR, LOCALHOST_ADDR);      new_frag->msg_attr_vec_ = CopyAttrs(&attrs);      ((DiffusionRouting *)dr_)->sendMessage(new_frag, filter_handle_);      pkt_count_++;      delete new_frag;      ClearAttrs(&attrs);      // We sent a fragment, set the last_data_time_ for the cleanup timer.      GetTime(&rmst_ptr->last_data_time_);    }    return 0;    break;  case CLEANUP_TIMER:    DiffPrint(DEBUG_SOME_DETAILS, "RmstFilter::processTimer CLEANUP_TIMER");    PrintTime(&cur_time);    DiffPrint(DEBUG_IMPORTANT, "  CLEANUP_TIMER called\n");    rmst_iterator = rmst_map_.begin();    while(rmst_iterator != rmst_map_.end()){      rmst_ptr = (*rmst_iterator).second;      DiffPrint(DEBUG_SOME_DETAILS,         "  CLEANUP_TIMER:: rmst_no %d : pkts_sent_ = %d, pkts_rec_ = %d, last_hop_pkts_sent_ = %d\n",        rmst_ptr->rmst_no_, rmst_ptr->pkts_sent_, rmst_ptr->pkts_rec_, rmst_ptr->last_hop_pkts_sent_);      if((!rmst_ptr->reinf_)&&(!rmst_ptr->acked_)&&(!rmst_ptr->local_source_)&&(!local_sink_)){        if( (cur_time.tv_sec - rmst_ptr->last_data_time_.tv_sec) > LONG_CLEANUP_WAIT )          cleanUpRmst(rmst_ptr);      }      else if (rmst_ptr->acked_){        if ( ( (cur_time.tv_sec - rmst_ptr->last_data_time_.tv_sec) > SHORT_CLEANUP_WAIT ) &&           ( (cur_time.tv_sec - rmst_ptr->last_nak_time_.tv_sec) > SHORT_CLEANUP_WAIT ) )           cleanUpRmst(rmst_ptr);      }                  rmst_iterator++;    }    // Check on the BlackList, in case network is partitioned.    if (!black_list_.empty()){      if ( (cur_time.tv_sec - last_data_rec_.tv_sec) > RMST_BLACKLIST_WAIT ){        DiffPrint(DEBUG_IMPORTANT, "  clearing black_list_!\n");        ((DiffusionRouting *)dr_)->clearBlacklist();        black_list_.clear();      }    }    if (local_sink_){      if ( (cur_time.tv_sec - last_sink_time_.tv_sec) > SINK_REFRESH_WAIT ){        DiffPrint(DEBUG_IMPORTANT, "  local sink timed out\n");        local_sink_ = false;      }      else        DiffPrint(DEBUG_IMPORTANT, "  local sink still alive.\n");    }    return 0;    break;  default:    break;  }  return -1;}void RmstFilter::sendRmstToSink(Rmst *rmst_ptr){  NRAttrVec attrs;  Message *rmst_msg;  NRSimpleAttribute<void *> *rmst_data_attr;  NRSimpleAttribute<int> *frag_number_attr;  void *frag_ptr;  int size, i;  DiffPrint(DEBUG_IMPORTANT, "RmstFilter::sendRmstToSink - sending rmst %d to local sink\n",    rmst_ptr->rmst_no_);  // Prepare attribute vector  attrs.push_back(RmstTargetAttr.make(NRAttribute::IS, rmst_ptr->target_str_));  attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, RMST_RESP));  attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_ptr->rmst_no_));  attrs.push_back(RmstMaxFragAttr.make(NRAttribute::IS, rmst_ptr->max_frag_));  frag_number_attr = RmstFragAttr.make(NRAttribute::IS, 0);  attrs.push_back(frag_number_attr);  // Add the blob fragment  if (rmst_ptr->max_frag_ == 0)    size = rmst_ptr->max_frag_len_;  else    size = MAX_FRAG_SIZE;  frag_ptr =  rmst_ptr->getFrag(0);  rmst_data_attr = RmstDataAttr.make(NRAttribute::IS, frag_ptr, size);  attrs.push_back(rmst_data_attr);  // Prepare the message  rmst_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0, attrs.size(),    pkt_count_, rdm_id_, LOCALHOST_ADDR, LOCALHOST_ADDR);  rmst_msg->next_hop_ = LOCALHOST_ADDR;  rmst_msg->next_port_ = local_sink_port_;  rmst_msg->msg_attr_vec_= CopyAttrs(&attrs);  ((DiffusionRouting *)dr_)->sendMessage(rmst_msg, filter_handle_, 1);  delete rmst_msg;  pkt_count_++;  // Send all the fragments  for (i=1; i <= (rmst_ptr->max_frag_); i++){    frag_number_attr->setVal(i);    frag_ptr =  rmst_ptr->getFrag(i);    if(frag_ptr == NULL)      DiffPrint(DEBUG_IMPORTANT, "RmstFilter::sendRmstToSink - got a null frag_ptr for frag!%d\n",      i);    else{      if (rmst_ptr->max_frag_ == i)        rmst_data_attr->setVal(frag_ptr, rmst_ptr->max_frag_len_);      else        rmst_data_attr->setVal(frag_ptr, MAX_FRAG_SIZE);    }    rmst_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0, attrs.size(),      pkt_count_, rdm_id_, LOCALHOST_ADDR, LOCALHOST_ADDR);    rmst_msg->next_hop_ = LOCALHOST_ADDR;    rmst_msg->next_port_ = local_sink_port_;    rmst_msg->msg_attr_vec_= CopyAttrs(&attrs);    ((DiffusionRouting *)dr_)->sendMessage(rmst_msg, filter_handle_, 1);    delete rmst_msg;    pkt_count_++;  }  ClearAttrs(&attrs);}void RmstFilter::sendAckToSource(Rmst *rmst_ptr){  NRAttrVec attrs;  Message *ack_msg;  attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, ACK_RESP));  attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_ptr->rmst_no_));  // New code to send a message to last_hop_  ack_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0,            attrs.size(), pkt_count_, rdm_id_, rmst_ptr->last_hop_,            LOCALHOST_ADDR);  ack_msg->msg_attr_vec_ = CopyAttrs(&attrs);  DiffPrint(DEBUG_IMPORTANT, "  Sending ACK_RESP to node %d\n", rmst_ptr->last_hop_);  ((DiffusionRouting *)dr_)->sendMessage(ack_msg, filter_handle_, 1);  pkt_count_++;  delete ack_msg;  ClearAttrs(&attrs);}void RmstFilter::sendExpReqUpstream(Rmst *rmst_ptr){  NRAttrVec attrs;  Message *exp_msg;  attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, EXP_REQ));  attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_ptr->rmst_no_));  attrs.push_back(RmstFragAttr.make(NRAttribute::IS, rmst_ptr->max_frag_rec_));  exp_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0,            attrs.size(), pkt_count_, rdm_id_, rmst_ptr->last_hop_,            LOCALHOST_ADDR);  exp_msg->msg_attr_vec_ = CopyAttrs(&attrs);  DiffPrint(DEBUG_IMPORTANT, "  Sending EXP_REQ to node %d\n", rmst_ptr->last_hop_);  ((DiffusionRouting *)dr_)->sendMessage(exp_msg, filter_handle_, 1);  pkt_count_++;  delete exp_msg;  ClearAttrs(&attrs);}void RmstFilter::sendContToSource(Rmst *rmst_ptr){  NRAttrVec attrs;  Message *cont_msg;  attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::EQ, RMST_CONT));  attrs.push_back(NRClassAttr.make(NRAttribute::IS,    NRAttribute::INTEREST_CLASS));  DiffPrint(DEBUG_IMPORTANT, "  Sending a RMST_CONT to source\n");  cont_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0, attrs.size(),    pkt_count_, rdm_id_, LOCALHOST_ADDR, LOCALHOST_ADDR);  cont_msg->msg_attr_vec_ = CopyAttrs(&attrs);  cont_msg->next_hop_ = LOCALHOST_ADDR;  cont_msg->next_port_ = rmst_ptr->local_source_port_;  ((DiffusionRouting *)dr_)->sendMessage(cont_msg, filter_handle_, 1);  pkt_count_++;  delete cont_msg;  ClearAttrs(&attrs);}void RmstFilter::cleanUpRmst(Rmst *rmst_ptr){  int rmst_no = rmst_ptr->rmst_no_;  Int2Rmst::iterator rmst_iterator;  Key2ExpLog::iterator exp_iterator;  ExpLog exp_msg;  rmst_no = rmst_ptr->rmst_no_;  DiffPrint(DEBUG_IMPORTANT, "  cleanUpRmst called to delete Rmst %d\n", rmst_no);  if(rmst_ptr->watchdog_active_)    ((DiffusionRouting *)dr_)->removeTimer(rmst_ptr->watchdog_handle_);  if(rmst_ptr->ack_timer_active_)    ((DiffusionRouting *)dr_)->removeTimer(rmst_ptr->ack_timer_handle_);  rmst_iterator = rmst_map_.find(rmst_no);  if(rmst_iterator != rmst_map_.end()){    rmst_map_.erase(rmst_iterator);  }  delete rmst_ptr;  // clean up the exp_map_ of any entries base on this rmst  exp_iterator = exp_map_.begin();  while(exp_iterator != exp_map_.end()){    exp_msg = (*exp_iterator).second;    if(exp_msg.rmst_no_ == rmst_no){      DiffPrint(DEBUG_LOTS_DETAILS, "  cleanUpRmst deleting exp_map_ entry for Rmst %d\n", rmst_no);      exp_map_.erase(exp_iterator);    }    exp_iterator++;  }}#ifdef NS_DIFFUSIONRmstFilter::RmstFilter(){#elseRmstFilter::RmstFilter(int argc, char **argv){  TimerCallback *stat_timer;  parseCommandLine(argc, argv);  dr_ = NR::createNR(diffusion_port_);#endif // NS_DIFFUSION  fcb_ = new RmstFilterCallback;  fcb_->app_ = this;  rdm_id_ = rand();  pkt_count_ = rand();  local_sink_ = false;  caching_mode_ = false;  send_timer_active_ = false;  DiffPrint(DEBUG_ALWAYS, "RmstFilter constructor: rdm_id_ = %x, pkt_count_ = %x\n",	    rdm_id_, pkt_count_);#ifndef NS_DIFFUSION  filter_handle_ = setupFilter();  DiffPrint(DEBUG_LOTS_DETAILS, "RmstFilter:: subscribed to all, received handle %d\n",	    (int)filter_handle_);    DiffPrint(DEBUG_LOTS_DETAILS, "RmstFilter constructor: start cleanup tim

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -