⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rmst_filter.cc

📁 一款用来进行网络模拟的软件
💻 CC
📖 第 1 页 / 共 5 页
字号:
      if( exp_time > 20){        // Resend an EXP_REQ!!!        DiffPrint(DEBUG_IMPORTANT, "  Node resends EXP_REQ up blacklisted stream!\n");        sendExpReqUpstream(rmst_ptr);        GetTime(&rmst_ptr->exp_req_time_);      }      else        DiffPrint(DEBUG_LOTS_DETAILS, "  Node waits to send another EXP_REQ\n");      return 0;    }    if (rmst_ptr->local_source_){      if(rmst_ptr->acked_){        DiffPrint(DEBUG_IMPORTANT,           "  WATCHDOG_TIMER Local Source sees acked state - cancel timer\n");        return -1;      }      else{        DiffPrint(DEBUG_LOTS_DETAILS, "  WATCHDOG_TIMER Local Source sees rmst not acked\n");        return 0;      }    }    // Check if we have waited too long for next fragment.    if( ((cur_time.tv_sec - rmst_ptr->last_data_time_.tv_sec) > NEXT_FRAG_WAIT) &&      (!rmst_ptr->rmstComplete()) ){      int newHole = (rmst_ptr->max_frag_rec_)+1;      if ( (newHole <= rmst_ptr->max_frag_) && (!rmst_ptr->inHoleMap(newHole)) ){        DiffPrint(DEBUG_SOME_DETAILS, "  WATCHDOG_TIMER adds new hole, frag %d\n",          newHole);        rmst_ptr->putHole(newHole);        NakData *nak_ptr = rmst_ptr->getHole(newHole);        // Artificially age this hole so it gets naked immediately        nak_ptr->tmv.tv_sec -= 4;      }    }    if(rmst_ptr->holeMapEmpty()){      // There aren't any holes!      DiffPrint(DEBUG_SOME_DETAILS, "  WATCHDOG_TIMER sees No holes\n");      return 0;    }    else{      // The WATCHDOG_TIMER expired and we have a hole,      //   so we may need to construct a NAK from the hole map      //   If we've fallen off the reinforced path, we must      //   stop adding NAKs to the NakList.      DiffPrint(DEBUG_SOME_DETAILS, "  WATCHDOG_TIMER sees holes - check times.\n");      if (rmst_ptr->reinf_)        setupNak(rmst_no);      // Reschedule timer with same value.      return 0;    }    break;  case ACK_TIMER:    DiffPrint(DEBUG_SOME_DETAILS, "RmstFilter::processTimer ACK_TIMER for Rmst %d", rmst_no);    PrintTime(&cur_time);    rmst_iterator = rmst_map_.find(rmst_no);    if(rmst_iterator != rmst_map_.end())      rmst_ptr = (*rmst_iterator).second;    else{      DiffPrint(DEBUG_IMPORTANT,        "RmstFilter::processTimer can't find Rmst %d for ACK_TIMER, cancell timer!\n",           rmst_no);      return -1;    }    DiffPrint(DEBUG_SOME_DETAILS, "RmstFilter::processTimer ACK_TIMER, at source for rmst_no %d\n",      rmst_no);    if (rmst_ptr->acked_){      DiffPrint(DEBUG_IMPORTANT, "RmstFilter::processTimer cancel ACK_TIMER, Rmst %d ACKed\n",        rmst_no);      rmst_ptr->ack_timer_active_ = false;      return -1;    }    // If there has been no data sent for 30 seconds like a NAK response, we need to resend a packet.    if( (cur_time.tv_sec - rmst_ptr->last_data_time_.tv_sec) > ACK_WAIT ){      NRAttrVec attrs;      int8_t msg_type;      DiffPrint(DEBUG_IMPORTANT,         "RmstFilter::processTimer ACK_TIMER, waited too long for Rmst %d ACK!\n", rmst_no);      if(rmst_ptr->reinf_  && !rmst_ptr->resent_last_data_){        // We should send the last frag again as an DATA packet.        DiffPrint(DEBUG_SOME_DETAILS,           "RmstFilter::processTimer ACK_TIMER, resend last packet as DATA\n");        msg_type = DATA;        rmst_ptr->resent_last_data_ = true;      }      else if(rmst_ptr->resent_last_data_ && !rmst_ptr->resent_last_exp_){        ExpLog exp_msg;        union LlToInt key;        // We tried resending last frag as data and it didn't work, try as EXP        DiffPrint(DEBUG_IMPORTANT,          "RmstFilter::processTimer ACK_TIMER, resend last packet as EXPLORATORY_DATA\n");        // Insert this Exploratory message in exp_map_.        // When we get a reinforcement we'll know what rmst it's for.        key.int_val_[0] = pkt_count_;        key.int_val_[1] = rdm_id_;        DiffPrint(DEBUG_LOTS_DETAILS, "  Key = %llx\n", key.ll_val_);        exp_msg.rmst_no_ = rmst_no;        exp_msg.last_hop_ = LOCALHOST_ADDR;        exp_map_.insert(Key2ExpLog::value_type(key.ll_val_, exp_msg));        msg_type = EXPLORATORY_DATA;        rmst_ptr->reinf_ = false;        rmst_ptr->naks_rec_ = 0;        rmst_ptr->pkts_sent_ = 0;        rmst_ptr->resent_last_exp_ = true;      }      else if(rmst_ptr->resent_last_data_ && rmst_ptr->resent_last_exp_ && rmst_ptr->reinf_){        // We should send the last frag again as an DATA packet.        DiffPrint(DEBUG_IMPORTANT,          "RmstFilter::processTimer ACK_TIMER, resend last packet on new reinf path as DATA\n");        msg_type = DATA;        rmst_ptr->resent_last_data_ = false;        rmst_ptr->resent_last_exp_ = false;      }      else{        ExpLog exp_msg;        union LlToInt key;        DiffPrint(DEBUG_IMPORTANT,           "RmstFilter::processTimer ACK_TIMER, resent last packet as EXP and no reinforced path, Try again!\n");        // Insert this Exploratory message in exp_map_.        // When we get a reinforcement we'll know what rmst it's for.        key.int_val_[0] = pkt_count_;        key.int_val_[1] = rdm_id_;        DiffPrint(DEBUG_LOTS_DETAILS, "  Key = %llx\n", key.ll_val_);        exp_msg.rmst_no_ = rmst_no;        exp_msg.last_hop_ = LOCALHOST_ADDR;        exp_map_.insert(Key2ExpLog::value_type(key.ll_val_, exp_msg));        msg_type = EXPLORATORY_DATA;      }      attrs.push_back(RmstTargetAttr.make(NRAttribute::IS, rmst_ptr->target_str_));      attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, RMST_RESP));      attrs.push_back(RmstFragAttr.make(NRAttribute::IS, rmst_ptr->max_frag_));      attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_ptr->rmst_no_));      frag_ptr =  rmst_ptr->getFrag(rmst_ptr->max_frag_);      attrs.push_back(RmstDataAttr.make(NRAttribute::IS,         frag_ptr, rmst_ptr->max_frag_len_));      Message *new_frag;      new_frag = new Message(DIFFUSION_VERSION, msg_type, 0, 0, attrs.size(), pkt_count_,        rdm_id_, LOCALHOST_ADDR, LOCALHOST_ADDR);      new_frag->msg_attr_vec_ = CopyAttrs(&attrs);      ((DiffusionRouting *)dr_)->sendMessage(new_frag, filter_handle_);      pkt_count_++;      delete new_frag;      ClearAttrs(&attrs);      // We sent a fragment, set the last_data_time_ for the cleanup timer.      GetTime(&rmst_ptr->last_data_time_);    }    return 0;    break;  case CLEANUP_TIMER:    DiffPrint(DEBUG_SOME_DETAILS, "RmstFilter::processTimer CLEANUP_TIMER");    PrintTime(&cur_time);    DiffPrint(DEBUG_IMPORTANT, "  CLEANUP_TIMER called\n");    rmst_iterator = rmst_map_.begin();    while(rmst_iterator != rmst_map_.end()){      rmst_ptr = (*rmst_iterator).second;      DiffPrint(DEBUG_SOME_DETAILS,         "  CLEANUP_TIMER:: rmst_no %d : pkts_sent_ = %d, pkts_rec_ = %d, last_hop_pkts_sent_ = %d\n",        rmst_ptr->rmst_no_, rmst_ptr->pkts_sent_, rmst_ptr->pkts_rec_, rmst_ptr->last_hop_pkts_sent_);      if((!rmst_ptr->reinf_)&&(!rmst_ptr->acked_)&&(!rmst_ptr->local_source_)&&(!local_sink_)){        if( (cur_time.tv_sec - rmst_ptr->last_data_time_.tv_sec) > LONG_CLEANUP_WAIT )          cleanUpRmst(rmst_ptr);      }      else if (rmst_ptr->acked_){        if ( ( (cur_time.tv_sec - rmst_ptr->last_data_time_.tv_sec) > SHORT_CLEANUP_WAIT ) &&           ( (cur_time.tv_sec - rmst_ptr->last_nak_time_.tv_sec) > SHORT_CLEANUP_WAIT ) )           cleanUpRmst(rmst_ptr);      }                  rmst_iterator++;    }    // Check on the BlackList, in case network is partitioned.    if (!black_list_.empty()){      if ( (cur_time.tv_sec - last_data_rec_.tv_sec) > RMST_BLACKLIST_WAIT ){        DiffPrint(DEBUG_IMPORTANT, "  clearing black_list_!\n");        ((DiffusionRouting *)dr_)->clearBlacklist();        black_list_.clear();      }    }    if (local_sink_){      if ( (cur_time.tv_sec - last_sink_time_.tv_sec) > SINK_REFRESH_WAIT ){        DiffPrint(DEBUG_IMPORTANT, "  local sink timed out\n");        local_sink_ = false;      }      else        DiffPrint(DEBUG_IMPORTANT, "  local sink still alive.\n");    }    return 0;    break;  default:    break;  }  return -1;}void RmstFilter::sendRmstToSink(Rmst *rmst_ptr){  NRAttrVec attrs;  Message *rmst_msg;  NRSimpleAttribute<void *> *rmst_data_attr;  NRSimpleAttribute<int> *frag_number_attr;  void *frag_ptr;  int size, i;  DiffPrint(DEBUG_IMPORTANT, "RmstFilter::sendRmstToSink - sending rmst %d to local sink\n",    rmst_ptr->rmst_no_);  // Prepare attribute vector  attrs.push_back(RmstTargetAttr.make(NRAttribute::IS, rmst_ptr->target_str_));  attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, RMST_RESP));  attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_ptr->rmst_no_));  attrs.push_back(RmstMaxFragAttr.make(NRAttribute::IS, rmst_ptr->max_frag_));  frag_number_attr = RmstFragAttr.make(NRAttribute::IS, 0);  attrs.push_back(frag_number_attr);  // Add the blob fragment  if (rmst_ptr->max_frag_ == 0)    size = rmst_ptr->max_frag_len_;  else    size = MAX_FRAG_SIZE;  frag_ptr =  rmst_ptr->getFrag(0);  rmst_data_attr = RmstDataAttr.make(NRAttribute::IS, frag_ptr, size);  attrs.push_back(rmst_data_attr);  // Prepare the message  rmst_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0, attrs.size(),    pkt_count_, rdm_id_, LOCALHOST_ADDR, LOCALHOST_ADDR);  rmst_msg->next_hop_ = LOCALHOST_ADDR;  rmst_msg->next_port_ = local_sink_port_;  rmst_msg->msg_attr_vec_= CopyAttrs(&attrs);  ((DiffusionRouting *)dr_)->sendMessage(rmst_msg, filter_handle_, 1);  delete rmst_msg;  pkt_count_++;  // Send all the fragments  for (i=1; i <= (rmst_ptr->max_frag_); i++){    frag_number_attr->setVal(i);    frag_ptr =  rmst_ptr->getFrag(i);    if(frag_ptr == NULL)      DiffPrint(DEBUG_IMPORTANT, "RmstFilter::sendRmstToSink - got a null frag_ptr for frag!%d\n",      i);    else{      if (rmst_ptr->max_frag_ == i)        rmst_data_attr->setVal(frag_ptr, rmst_ptr->max_frag_len_);      else        rmst_data_attr->setVal(frag_ptr, MAX_FRAG_SIZE);    }    rmst_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0, attrs.size(),      pkt_count_, rdm_id_, LOCALHOST_ADDR, LOCALHOST_ADDR);    rmst_msg->next_hop_ = LOCALHOST_ADDR;    rmst_msg->next_port_ = local_sink_port_;    rmst_msg->msg_attr_vec_= CopyAttrs(&attrs);    ((DiffusionRouting *)dr_)->sendMessage(rmst_msg, filter_handle_, 1);    delete rmst_msg;    pkt_count_++;  }  ClearAttrs(&attrs);}void RmstFilter::sendAckToSource(Rmst *rmst_ptr){  NRAttrVec attrs;  Message *ack_msg;  attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, ACK_RESP));  attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_ptr->rmst_no_));  // New code to send a message to last_hop_  ack_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0,            attrs.size(), pkt_count_, rdm_id_, rmst_ptr->last_hop_,            LOCALHOST_ADDR);  ack_msg->msg_attr_vec_ = CopyAttrs(&attrs);  DiffPrint(DEBUG_IMPORTANT, "  Sending ACK_RESP to node %d\n", rmst_ptr->last_hop_);  ((DiffusionRouting *)dr_)->sendMessage(ack_msg, filter_handle_, 1);  pkt_count_++;  delete ack_msg;  ClearAttrs(&attrs);}void RmstFilter::sendExpReqUpstream(Rmst *rmst_ptr){  NRAttrVec attrs;  Message *exp_msg;  attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, EXP_REQ));  attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_ptr->rmst_no_));  attrs.push_back(RmstFragAttr.make(NRAttribute::IS, rmst_ptr->max_frag_rec_));  exp_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0,            attrs.size(), pkt_count_, rdm_id_, rmst_ptr->last_hop_,            LOCALHOST_ADDR);  exp_msg->msg_attr_vec_ = CopyAttrs(&attrs);  DiffPrint(DEBUG_IMPORTANT, "  Sending EXP_REQ to node %d\n", rmst_ptr->last_hop_);  ((DiffusionRouting *)dr_)->sendMessage(exp_msg, filter_handle_, 1);  pkt_count_++;  delete exp_msg;  ClearAttrs(&attrs);}void RmstFilter::sendContToSource(Rmst *rmst_ptr){  NRAttrVec attrs;  Message *cont_msg;  attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::EQ, RMST_CONT));  attrs.push_back(NRClassAttr.make(NRAttribute::IS,    NRAttribute::INTEREST_CLASS));  DiffPrint(DEBUG_IMPORTANT, "  Sending a RMST_CONT to source\n");  cont_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0, attrs.size(),    pkt_count_, rdm_id_, LOCALHOST_ADDR, LOCALHOST_ADDR);  cont_msg->msg_attr_vec_ = CopyAttrs(&attrs);  cont_msg->next_hop_ = LOCALHOST_ADDR;  cont_msg->next_port_ = rmst_ptr->local_source_port_;  ((DiffusionRouting *)dr_)->sendMessage(cont_msg, filter_handle_, 1);  pkt_count_++;  delete cont_msg;  ClearAttrs(&attrs);}void RmstFilter::cleanUpRmst(Rmst *rmst_ptr){  int rmst_no = rmst_ptr->rmst_no_;  Int2Rmst::iterator rmst_iterator;  Key2ExpLog::iterator exp_iterator;  ExpLog exp_msg;  rmst_no = rmst_ptr->rmst_no_;  DiffPrint(DEBUG_IMPORTANT, "  cleanUpRmst called to delete Rmst %d\n", rmst_no);  if(rmst_ptr->watchdog_active_)    ((DiffusionRouting *)dr_)->removeTimer(rmst_ptr->watchdog_handle_);  if(rmst_ptr->ack_timer_active_)    ((DiffusionRouting *)dr_)->removeTimer(rmst_ptr

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -