⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 one_phase_pull.cc

📁 跑leach需要的
💻 CC
📖 第 1 页 / 共 3 页
字号:
    return true;  }  return false;}void OnePhasePullFilter::addLocalFlowsToMessage(Message *msg){  RoutingTable::iterator routing_itr;  RoundIdList::iterator round_id_itr;  RoutingEntry *routing_entry;  RoundIdEntry *round_id_entry;  FlowIdList local_flows;  int *packed_flows;  // First we loop through our routing entries  for (routing_itr = routing_list_.begin();       routing_itr != routing_list_.end(); routing_itr++){    routing_entry = *routing_itr;    // Now go through each round    for (round_id_itr = routing_entry->round_ids_.begin();	 round_id_itr != routing_entry->round_ids_.end();	 round_id_itr++){      round_id_entry = *round_id_itr;      local_flows.push_back(round_id_entry->round_id_);    }  }    packed_flows = writeFlowsToList(&local_flows);  msg->msg_attr_vec_->push_back(NRFlowAttr.make(NRAttribute::IS,						(void *) packed_flows,						sizeof(int) * local_flows.size()));  // NRFlowAttr.make will copy this, so we must delete it here  delete [] packed_flows;  local_flows.clear();}void OnePhasePullFilter::recv(Message *msg, handle h){  if (h != filter_handle_){    DiffPrint(DEBUG_ALWAYS,	      "Error: received msg for handle %d, subscribed to handle %d !\n",	      h, filter_handle_);    return;  }  if (msg->new_message_ == 1)    processNewMessage(msg);  else    processOldMessage(msg);}void OnePhasePullFilter::processOldMessage(Message *msg){  NRSimpleAttribute<int> *nrsubscription = NULL;  NRAttrVec::iterator attribute_iterator;  RoutingTable::iterator routing_itr;  RoutingEntry *routing_entry;  int32_t round_id;  switch (msg->msg_type_){  case INTEREST:    DiffPrint(DEBUG_NO_DETAILS, "Node%d: Received Old Interest !\n",((DiffusionRouting *)dr_)->getNodeId());    if (msg->last_hop_ == LOCALHOST_ADDR){      // Old interest should not come from local sink      DiffPrint(DEBUG_ALWAYS, "Warning: Old Interest from local sink !\n");      break;    }    // Step 0: Take out the subscription attribute    attribute_iterator = msg->msg_attr_vec_->begin();    nrsubscription = NRSubscriptionAttr.find_from(msg->msg_attr_vec_,						  attribute_iterator,						  &attribute_iterator);    // Return if we cannot find a subscription attribute    if (!nrsubscription){      DiffPrint(DEBUG_ALWAYS,		"Warning: Can't find SUBSCRIPTION attribute in the message !\n");      return;    }    // Delete attribute from the message    msg->msg_attr_vec_->erase(attribute_iterator);    // Get the routing entry for these attrs          routing_entry = findRoutingEntry(msg->msg_attr_vec_);    if (routing_entry){      // Use subscription id for identifying this flow      round_id = nrsubscription->getVal();      // Add gradient to the current round entry      routing_entry->addGradient(msg->last_hop_, round_id, false);    }    // Add the subscription attribute back to the message    msg->msg_attr_vec_->push_back(nrsubscription);    break;  case EXPLORATORY_DATA:  case PUSH_EXPLORATORY_DATA:    DiffPrint(DEBUG_ALWAYS, "Received and OLD EXPLORATORY message !\n");    break;  case DATA:     DiffPrint(DEBUG_NO_DETAILS, "Received an old Data message !\n");    // Find the correct routing entry    routing_itr = routing_list_.begin();    routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,				      &routing_itr);    while (routing_entry){      DiffPrint(DEBUG_NO_DETAILS,		"Set flags to %d to OLD_MESSAGE !\n", msg->last_hop_);      // Set reinforcement flags      if (msg->last_hop_ != LOCALHOST_ADDR)	routing_entry->updateNeighborDataInfo(msg->last_hop_, false);      // Continue going through other data types      routing_itr++;      routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,					&routing_itr);    }    break;   case NEGATIVE_REINFORCEMENT:    DiffPrint(DEBUG_IMPORTANT, "Received an old Negative Reinforcement !\n");    break;  default:    DiffPrint(DEBUG_ALWAYS,	      "Received an unknown message type: %d\n", msg->msg_type_);    break;  }}void OnePhasePullFilter::processNewMessage(Message *msg){  DataForwardingHistory *forwarding_history;  NRSimpleAttribute<int> *nrclass = NULL;  NRSimpleAttribute<int> *nrscope = NULL;  NRSimpleAttribute<int> *nrsubscription = NULL;  RoundIdList::iterator round_id_itr;  RoutingTable::iterator routing_itr;  NRAttrVec::iterator attribute_iterator;  RoundIdEntry *round_id_entry;  RoutingEntry *routing_entry;  SubscriptionEntry *subscription_entry;  Message *my_msg;  TimerCallback *interest_timer, *subscription_timer;  bool new_data_type = false;  int32_t round_id;  switch (msg->msg_type_){  case INTEREST:    DiffPrint(DEBUG_NO_DETAILS, "Received Interest !\n");    nrclass = NRClassAttr.find(msg->msg_attr_vec_);    nrscope = NRScopeAttr.find(msg->msg_attr_vec_);    if (!nrclass || !nrscope){      DiffPrint(DEBUG_ALWAYS,		"Warning: Can't find CLASS/SCOPE attributes in the message !\n");      return;    }    // Step 0: Take out the subscription attribute    attribute_iterator = msg->msg_attr_vec_->begin();    nrsubscription = NRSubscriptionAttr.find_from(msg->msg_attr_vec_,						  attribute_iterator,						  &attribute_iterator);    // Return if we cannot find a subscription attribute    if (!nrsubscription){      DiffPrint(DEBUG_ALWAYS,		"Warning: Can't find SUBSCRIPTION attribute in the message !\n");      return;    }    // Delete attribute from the message    msg->msg_attr_vec_->erase(attribute_iterator);    // Step 1: Look for the same data type    routing_entry = findRoutingEntry(msg->msg_attr_vec_);    if (!routing_entry){      // Create a new routing entry for this data type      routing_entry = new RoutingEntry;      routing_entry->attrs_ = CopyAttrs(msg->msg_attr_vec_);      routing_list_.push_back(routing_entry);      new_data_type = true;    }    // Add the subscription attribute back to the message    msg->msg_attr_vec_->push_back(nrsubscription);    // Use subscription id for identifying this flow    round_id = nrsubscription->getVal();    if (msg->last_hop_ == LOCALHOST_ADDR){      // From local sink      routing_entry->updateSink(msg->source_port_, round_id);    }    else{      // Interest received from the network. Add gradient to our      // last_hop neighbor      // Add gradient to the current round entry      routing_entry->addGradient(msg->last_hop_, round_id, true);    }    if ((nrclass->getVal() == NRAttribute::INTEREST_CLASS) &&	(nrclass->getOp() == NRAttribute::IS)){      // Global interest messages should always be forwarded      if (nrscope->getVal() == NRAttribute::GLOBAL_SCOPE){	interest_timer = new OppInterestForwardTimer(this, CopyMessage(msg));	((DiffusionRouting *)dr_)->addTimer(INTEREST_FORWARD_DELAY +					    (int) ((INTEREST_FORWARD_JITTER * (GetRand() * 1.0 / RAND_MAX) - (INTEREST_FORWARD_JITTER / 2))),					    interest_timer);      }    }    else{      if ((nrclass->getOp() != NRAttribute::IS) &&	  (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE) &&	  (new_data_type)){	subscription_timer = new OppSubscriptionExpirationTimer(this,							     CopyAttrs(msg->msg_attr_vec_));		((DiffusionRouting *)dr_)->addTimer(SUBSCRIPTION_DELAY +					    (int) (SUBSCRIPTION_DELAY * (GetRand() * 1.0 / RAND_MAX)),					    subscription_timer);      }      // Subscriptions don't have to match other subscriptions      break;    }    // Step 2: Match interest against other subscriptions    routing_itr = routing_list_.begin();    routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,				      &routing_itr);    while (routing_entry){      // Got a match      subscription_entry = findMatchingSubscription(routing_entry,						    msg->msg_attr_vec_);      // Do we already have this subscription      if (subscription_entry){	GetTime(&(subscription_entry->tv_));      }      else{	// Create a new attribute entry, add it to the attribute list	// and send an interest message to the local sink	subscription_entry = new SubscriptionEntry(CopyAttrs(msg->msg_attr_vec_));	routing_entry->subscription_list_.push_back(subscription_entry);	sendInterest(subscription_entry->attrs_, routing_entry);      }      // Move to the next RoutingEntry      routing_itr++;      routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,					&routing_itr);    }      break;  case EXPLORATORY_DATA:  case PUSH_EXPLORATORY_DATA:    DiffPrint(DEBUG_ALWAYS, "Node%d: Received EXPLORATORY Message !\n",((DiffusionRouting *)dr_)->getNodeId());    break;  case DATA:    DiffPrint(DEBUG_NO_DETAILS, "Node%d: Received Data !\n",((DiffusionRouting *)dr_)->getNodeId());    // Create data message forwarding cache    forwarding_history = new DataForwardingHistory;    // If message comes from local source, we include our local flows    if (msg->last_hop_ == LOCALHOST_ADDR){      // From local source      addLocalFlowsToMessage(msg);    }    // Find the correct routing entry    routing_itr = routing_list_.begin();    routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,				      &routing_itr);    while (routing_entry){      forwardData(msg, routing_entry, forwarding_history);      routing_itr++;      routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,					&routing_itr);    }    delete forwarding_history;    break;  case NEGATIVE_REINFORCEMENT:    DiffPrint(DEBUG_NO_DETAILS, "Received a Negative Reinforcement !\n");    // Find matching routing entry    routing_entry = findRoutingEntry(msg->msg_attr_vec_);    if (routing_entry){      // Go through all round ids      for (round_id_itr = routing_entry->round_ids_.begin();	   round_id_itr != routing_entry->round_ids_.end();	   round_id_itr++){	round_id_entry = *round_id_itr;	// Delete gradient to last hop	round_id_entry->deleteGradient(msg->last_hop_);	// Delete round id entry if nothing left	if (round_id_entry->gradients_.size() == 0){	  round_id_itr = routing_entry->round_ids_.erase(round_id_itr);	  delete round_id_entry;	}      }      // If there are no other gradients we need to send our own      // negative reinforcement      if (routing_entry->round_ids_.size() == 0){	my_msg = new Message(DIFFUSION_VERSION, NEGATIVE_REINFORCEMENT,			     0, 0, routing_entry->attrs_->size(), pkt_count_,			     random_id_, BROADCAST_ADDR, LOCALHOST_ADDR);	my_msg->msg_attr_vec_ = CopyAttrs(routing_entry->attrs_);	DiffPrint(DEBUG_NO_DETAILS,		  "Broadcasting Negative Reinforcement !\n");	((DiffusionRouting *)dr_)->sendMessage(my_msg, filter_handle_);	pkt_count_++;	delete my_msg;      }    }    break;  default:    break;  }}handle OnePhasePullFilter::setupFilter(){  NRAttrVec attrs;  handle h;  // For the One-Phase Pull filter, we set up a filter to receive  // messages using this protocol  attrs.push_back(NRAlgorithmAttr.make(NRAttribute::EQ,				       NRAttribute::ONE_PHASE_PULL_ALGORITHM));  h = ((DiffusionRouting *)dr_)->addFilter(&attrs,					   ONE_PHASE_PULL_FILTER_PRIORITY,					   filter_callback_);  ClearAttrs(&attrs);  return h;}#ifndef NS_DIFFUSIONvoid OnePhasePullFilter::run(){  // Doesn't do anything  while (1){    sleep(1000);  }}#endif // !NS_DIFFUSION#ifdef NS_DIFFUSIONOnePhasePullFilter::OnePhasePullFilter(const char *diffrtg){  DiffAppAgent *agent;#elseOnePhasePullFilter::OnePhasePullFilter(int argc, char **argv){#endif // NS_DIFFUSION  struct timeval tv;  TimerCallback *reinforcement_timer, *gradient_timer;  GetTime(&tv);  SetSeed(&tv);  pkt_count_ = GetRand();  random_id_ = GetRand();  // Create Diffusion Routing class#ifdef NS_DIFFUSION  agent = (DiffAppAgent *)TclObject::lookup(diffrtg);  dr_ = agent->dr();#else  parseCommandLine(argc, argv);  dr_ = NR::createNR(diffusion_port_);#endif // NS_DIFFUSION  // Create callback classes and set up pointers  filter_callback_ = new OnePhasePullFilterReceive(this);  // Set up the filter  filter_handle_ = setupFilter();  // Print filter information  DiffPrint(DEBUG_IMPORTANT, "One-Phase Pull filter received handle %d\n",	    filter_handle_);  // Add timers for keeping state up-to-date  gradient_timer = new OppGradientExpirationCheckTimer(this);  ((DiffusionRouting *)dr_)->addTimer(GRADIENT_DELAY, gradient_timer);  reinforcement_timer = new OppReinforcementCheckTimer(this);  ((DiffusionRouting *)dr_)->addTimer(REINFORCEMENT_DELAY, reinforcement_timer);  GetTime(&tv);  DiffPrint(DEBUG_ALWAYS,	    "One-Phase Pull filter initialized at time %ld:%ld!\n",	    tv.tv_sec, tv.tv_usec);}#ifndef USE_SINGLE_ADDRESS_SPACEint main(int argc, char **argv){  OnePhasePullFilter *app;  // Initialize and run the Gradient Filter  app = new OnePhasePullFilter(argc, argv);  app->run();  return 0;}#endif // !USE_SINGLE_ADDRESS_SPACE

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -