⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rmst_filter.cc

📁 跑leach需要的
💻 CC
📖 第 1 页 / 共 5 页
字号:
      nrscope = NRScopeAttr.find(msg->msg_attr_vec_);      if(nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE)        DiffPrint(DEBUG_SOME_DETAILS, "  rmst LOCAL_SCOPE Interest Message\n");      else if (msg->last_hop_ == LOCALHOST_ADDR){        DiffPrint(DEBUG_SOME_DETAILS, "  rmst Interest Message from local SINK\n");        local_sink_ = true;        local_sink_port_ = msg->source_port_;        GetTime (&last_sink_time_);        if (interest_attrs_ == NULL)          interest_attrs_ = CopyAttrs(msg->msg_attr_vec_);      }      else{        DiffPrint(DEBUG_SOME_DETAILS, "  rmst Interest Message from non-local node\n");          if (interest_attrs_ == NULL)            interest_attrs_ = CopyAttrs(msg->msg_attr_vec_);      }      break;    case(POSITIVE_REINFORCEMENT):      ReinfMessage *reinf_msg;      ExpLog exp_log;      DiffPrint(DEBUG_IMPORTANT, "  Positive Reinf arrived\n");      reinf_attr = ReinforcementAttr.find(msg->msg_attr_vec_);      reinf_msg = (ReinfMessage*)reinf_attr->getVal();      DiffPrint(DEBUG_LOTS_DETAILS, "  Pos_Reinf: ptk_num = %x, rdm_id_ = %x\n",        reinf_msg->pkt_num_, reinf_msg->rdm_id_);      key.int_val_[0] = reinf_msg->pkt_num_;      key.int_val_[1] = reinf_msg->rdm_id_;      exp_iterator = exp_map_.find(key.ll_val_);      if(exp_iterator != exp_map_.end()){        exp_log = (*exp_iterator).second;        DiffPrint(DEBUG_SOME_DETAILS, "  Reinforcement for rmst_no = %d, last_hop_ = %d\n",          exp_log.rmst_no_, exp_log.last_hop_);        // Here is where we must update the rmst with back-channel        // last hop.        rmst_no = exp_log.rmst_no_;        rmst_iterator = rmst_map_.find(rmst_no);        if(rmst_iterator != rmst_map_.end()){          rmst_ptr = (*rmst_iterator).second;          rmst_ptr->last_hop_ = exp_log.last_hop_;          rmst_ptr->fwd_hop_ = msg->last_hop_;          DiffPrint(DEBUG_SOME_DETAILS, "  Setting rmst_no %d last_hop_ = %d, fwd_hop_ = %d\n",            rmst_no, rmst_ptr->last_hop_, rmst_ptr->fwd_hop_);          if(!rmst_ptr->reinf_){            rmst_ptr->reinf_ = true;            if(rmst_ptr->local_source_)              DiffPrint(DEBUG_LOTS_DETAILS, "  Local source got a Reinf\n");          }        }        else{          DiffPrint(DEBUG_IMPORTANT, "  Reinforcement cant't find rmst_no\n");          break;        }        // We are on the reinforced path, so we must start a timer        // if one hasn't already been started. Sinks don't get        // reinforced, so they start a WATCHDOG in syncLocalCache.        if( (rmst_ptr->watchdog_active_ == false) && (caching_mode_) ){          TimerCallback *rmst_timer;          DiffPrint(DEBUG_IMPORTANT,            "  Set a WATCHDOG_TIMER at caching node for reinforced rmst_no %d\n",            rmst_no);          rmst_timer = new RmstTimeout(this, rmst_no, WATCHDOG_TIMER);          // We check on things every 10 seconds.          rmst_ptr->watchdog_handle_ =             ((DiffusionRouting *)dr_)->addTimer(WATCHDOG_INTERVAL, rmst_timer);          rmst_ptr->watchdog_active_ = true;        }        if (rmst_ptr->wait_for_new_path_){          DiffPrint(DEBUG_SOME_DETAILS, "  Resetting wait_for_new_path_ for rmst_no %d\n", rmst_no);          rmst_ptr->wait_for_new_path_ = false;        }        if (rmst_ptr->sent_exp_req_){          DiffPrint(DEBUG_SOME_DETAILS,            "  intermediate node got a new path, set sent_exp_req_ false.\n");          rmst_ptr->sent_exp_req_ = false;        }      }      else{        if(!rmst_ptr->local_source_)          DiffPrint(DEBUG_IMPORTANT, "  Reinforcement matches no Exploratory msg\n");      }      break;    case(NEGATIVE_REINFORCEMENT):      bool ret_val;      if (tsprt_ctl_attr){        DiffPrint(DEBUG_SOME_DETAILS,            "  NEGATIVE_REINFORCEMENT, last_hop_ = %d, rmst_ctl_type = %d\n",             msg->last_hop_, rmst_ctl_type);      }      // We need to check if we got a NEGATIVE REINFORCEMENT from a node that is the      // next node in the forward direction (downstream).  If so, and we are the source      // we must send a new EXPLORATORY message;  else if we are not the source,      // we must send and exp request upstream.      ret_val = true;      rmst_iterator = rmst_map_.begin();      while(rmst_iterator != rmst_map_.end()){        rmst_ptr = (*rmst_iterator).second;        DiffPrint(DEBUG_SOME_DETAILS,            "  searching rmsts - rmst_no_ %d: fwd_hop_ = %d, reinf_ = %d, acked = %d\n",            rmst_ptr->rmst_no_, rmst_ptr->fwd_hop_, rmst_ptr->reinf_, rmst_ptr->acked_);        if (rmst_ptr->local_source_ && rmst_ptr->reinf_            && (rmst_ptr->fwd_hop_ == msg->last_hop_)            && !rmst_ptr->acked_){          // If we are reinforced then we never got and EXP_REQ!!          DiffPrint(DEBUG_SOME_DETAILS, "  local source sees NEG_REINF\n");          processExpReq(rmst_ptr, rmst_ptr->max_frag_sent_);        }        else if (!rmst_ptr->local_source_ && (rmst_ptr->fwd_hop_ == msg->last_hop_)                 && rmst_ptr->reinf_ && !rmst_ptr->acked_){          DiffPrint(DEBUG_SOME_DETAILS, "  intermediate node sees NEG_REINF from reinforced node\n");          DiffPrint(DEBUG_SOME_DETAILS, "  send Exp Request upstream!\n");          ret_val = false;          sendExpReqUpstream(rmst_ptr);        }        else{          DiffPrint(DEBUG_SOME_DETAILS,            "  node sees NEG_REINF from non-reinforced node - let routing layer see it\n");	  ret_val = true;	}        rmst_iterator++;      }      if (!ret_val)        return false;      break;    default:      break;  } // switch msg->type  return true;}// RmstFilter::syncLocalCache//// This routine adds new transport data messages to the local data base.Rmst* RmstFilter::syncLocalCache (Message *msg){  NRSimpleAttribute<int> *rmst_id_attr = NULL;  NRSimpleAttribute<int> *frag_attr = NULL;  NRSimpleAttribute<int> *max_frag_attr = NULL;  NRSimpleAttribute<void *> *data_buf_attr = NULL;  NRAttrVec *data = msg->msg_attr_vec_;  Int2Rmst::iterator rmst_iterator;  int rmst_no;  int frag_no;  int max_frag_no;  void *blob_ptr;  int blob_len;  void *tmp_frag_ptr;  Rmst *rmst_ptr;  rmst_id_attr = RmstIdAttr.find(data);  frag_attr = RmstFragAttr.find(data);  max_frag_attr = RmstMaxFragAttr.find(data);  data_buf_attr = RmstDataAttr.find(data);  if (! (rmst_id_attr && frag_attr && data_buf_attr) ){    DiffPrint(DEBUG_IMPORTANT, "  Filter received a BAD transport packet!\n");    return NULL;  }  rmst_no = rmst_id_attr->getVal();  frag_no = frag_attr->getVal();  if(max_frag_attr)    max_frag_no = max_frag_attr->getVal();  else    max_frag_no = 0;  blob_ptr = data_buf_attr->getVal();  blob_len = data_buf_attr->getLen();  // Here is where I consuslt the Data Base and possibly add a new map,  // or add to an existing map.  rmst_iterator = rmst_map_.find(rmst_no);  if(rmst_iterator == rmst_map_.end()){    DiffPrint(DEBUG_IMPORTANT, "  creating a new DB entry for Rmst %d\n", rmst_no);    DiffPrint(DEBUG_SOME_DETAILS, "  Max Fragment number = %d\n", max_frag_no);    rmst_ptr = new Rmst(rmst_no);    rmst_ptr->max_frag_ = max_frag_no;    rmst_map_.insert(Int2Rmst::value_type(rmst_no, rmst_ptr));    // Artificially initialize last_nak_time_ so it's older    // than any Naks we may get.    GetTime(&rmst_ptr->last_nak_time_);    // Several decisions in this routine relate to messages that emanated    // from a local source.    if (msg->last_hop_ == LOCALHOST_ADDR) {      rmst_ptr->local_source_ = true;      rmst_ptr->local_source_port_ = msg->source_port_;      // This is the first fragment of an rmst from a local source.      // The message will be marked exploratory by this filter.      // We need to mark the last hop as LOCALHOST_ADDR.      rmst_ptr->last_hop_ = LOCALHOST_ADDR;    }    // We need to capture the RmstTargetAttr for concantenation on sendMessage.    if((rmst_ptr->local_source_)||(local_sink_)||(caching_mode_)){      NRSimpleAttribute<char *> *rmst_tgt_attr = NULL;      rmst_tgt_attr = RmstTargetAttr.find(msg->msg_attr_vec_);      if (rmst_tgt_attr){        char *tmp_str = rmst_tgt_attr->getVal();        rmst_ptr->target_str_ = new char[strlen(tmp_str)+1];        strcpy (rmst_ptr->target_str_, tmp_str);        DiffPrint(DEBUG_IMPORTANT, "  RmstTargetAttr = %s\n", rmst_ptr->target_str_);      }      else        DiffPrint(DEBUG_IMPORTANT, "  no RmstTargetAttr Rmst %d !\n", rmst_no);    }  }  else    rmst_ptr = (*rmst_iterator).second;  if(!rmst_ptr->local_source_)    DiffPrint(DEBUG_IMPORTANT, "  Got a blob, rmstId = %d, frag_no = %d\n", rmst_no, frag_no);  // Update the time we last saw data for this Rmst.  GetTime(&rmst_ptr->last_data_time_);  // We cache the fragment at the sink and source,  // or in caching mode at each node that receives it.  if((rmst_ptr->local_source_)||(local_sink_)||(caching_mode_)){    tmp_frag_ptr = rmst_ptr->getFrag(frag_no);    if (tmp_frag_ptr == NULL){      if(!rmst_ptr->local_source_)        DiffPrint(DEBUG_SOME_DETAILS, "  creating a new frag %d entry for Rmst %d\n",          frag_no, rmst_no);      if (frag_no == rmst_ptr->max_frag_)      rmst_ptr->max_frag_len_ = blob_len;      tmp_frag_ptr = new char[blob_len];      memcpy(tmp_frag_ptr, blob_ptr, blob_len);      rmst_ptr->putFrag(frag_no, tmp_frag_ptr);      // Check to see if this fragment was NAKed.      // If so, delete from the hole map.      if(!rmst_ptr->local_source_){        if ( rmst_ptr->inHoleMap(frag_no) ){          // We need to see if we sent a NAK for this frag.          NakData *nak_ptr = rmst_ptr->getHole(frag_no);          if(nak_ptr->nak_sent_)            DiffPrint(DEBUG_SOME_DETAILS, "  We sent a NAK_REQ for this fragment.\n");          DiffPrint(DEBUG_SOME_DETAILS, "  filter removing hole %d from hole_map_\n",frag_no);          rmst_ptr->delHole(frag_no);        }      }      // We start a WATCHDOG for an rmst here if: this is a local sink,      // we haven't started a timer, and this is not the initial fragment.      // Intermediate nodes (in caching mode) start a timer if they are on the      // reinforced path, which is checked in processMessage.      if((!rmst_ptr->local_source_)&&(local_sink_)&&(rmst_ptr->watchdog_active_ == false)        && (frag_no>0)){        TimerCallback *rmst_timer;        DiffPrint(DEBUG_IMPORTANT, "  Set a WATCHDOG_TIMER at sink for rmst_no %d\n", rmst_no);        rmst_timer = new RmstTimeout(this, rmst_no, WATCHDOG_TIMER);        // We check on things every 10 seconds.        rmst_ptr->watchdog_handle_ = ((DiffusionRouting *)dr_)->addTimer(WATCHDOG_INTERVAL,          rmst_timer);        rmst_ptr->watchdog_active_ = true;      }    }    else      DiffPrint(DEBUG_SOME_DETAILS, "  got a duplicate frag %d for blob %d\n",        frag_no, rmst_no);    // If we have still have a hole in the fragment map, update the hole map.    if ((!rmst_ptr->local_source_) && (rmst_ptr->holeInFragMap()))      rmst_ptr->syncHoleMap();    // If the Rmst is complete, cancell timer, stop timer,    // send Ack, give to local sinks.    if(rmst_ptr->rmstComplete()){      if ((rmst_ptr->watchdog_active_) && (!rmst_ptr->local_source_)){        DiffPrint(DEBUG_SOME_DETAILS,           "  Rmst #%d is complete set cancel_watchdog_ to stop WATCHDOG\n",          rmst_no);        rmst_ptr->cancel_watchdog_ = true;      }      // Send this Rmst to any local sink      if(local_sink_ && !(rmst_ptr->acked_)){        sendRmstToSink(rmst_ptr);        // We mark the rmst acked at the sink so it will clean up.        rmst_ptr->acked_ = true;      }      // If this is a source, we only send out the fragments when we've got      // them all from the application. If the Rmst is complete we add the      // Rmst to the send_list_, and if there is no send timer we start one.      if(rmst_ptr->local_source_){        SendMsgData new_send_msg;        // The Rmst is complete and this is a source - put in send list.        new_send_msg.rmst_no_ = rmst_no;        new_send_msg.last_frag_sent_ = -1;        new_send_msg.exp_base_ = 0;        send_list_.push_back(new_send_msg);        if(!send_timer_active_){          TimerCallback *send_timer;          // Now add a timer to send this and any NAKS.          DiffPrint(DEBUG_SOME_DETAILS,            "  Rmst %d ready to send - Set a SEND_TIMER\n", rmst_no);          send_timer = new RmstTimeout(this, -1, SEND_TIMER);          // We check on things every second.          send_timer_handle_ =             ((DiffusionRouting *)dr_)->addTimer(SEND_INTERVAL, send_timer);          send_timer_active_ = true;        }      }      else        // We must let upstream nodes know that we got the whole blob.        sendAckToSource(rmst_ptr);    }  }  else{    rmst_ptr->max_frag_rec_ = frag_no;    DiffPrint(DEBUG_LOTS_DETAILS, "  Not caching frag %d entry for Rmst %d\n", frag_no, rmst_no);  }  return rmst_ptr;}void RmstFilter::processCtrlMessage(Message *msg){  NRSimpleAttribute<int> *tsprt_ctl_attr = NULL;  NRSimpleAttribute<int> *rmst_id_attr = NULL;  NRSimpleAttribute<int> *frag_attr = NULL;  NRAttrVec *data;  NRAttrVec attrs;  Int2Rmst::iterator rmst_iterator;  int rmst_no;   int frag_no;  int rmst_ctl_type;  Rmst *rmst_ptr;  void *frag_ptr;  Message *nak_msg;  NRAttrVec::iterator place;  bool forwarding_nak = false;  data = msg->msg_attr_vec_;  tsprt_ctl_attr = RmstTsprtCtlAttr.find(data);  rmst_ctl_type = tsprt_ctl_attr->getVal();  rmst_id_attr = RmstIdAttr.find(data);  if(!rmst_id_attr) {    DiffPrint(DEBUG_SOME_DETAILS, "  Node got a bad Rmst control msg - no RmstIdAttr!\n");    return;  }  rmst_no = rmst_id_attr->getVal();  // Let's make sure we have this rmst

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -