⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 one_phase_pull.cc

📁 跑leach需要的
💻 CC
📖 第 1 页 / 共 3 页
字号:
  for (routing_itr = routing_list_.begin();       routing_itr != routing_list_.end(); routing_itr++){    routing_entry = *routing_itr;    // Step 1: Delete expired round ids    routing_entry->deleteExpiredRoundIds();    // Step 2: Remove the routing entry if no round ids left    if (routing_entry->round_ids_.size() == 0){      // Deleting Routing Entry      DiffPrint(DEBUG_DETAILS,		"Nothing left for this data type, cleaning up !\n");      routing_itr = routing_list_.erase(routing_itr);      delete routing_entry;    }  }}void OnePhasePullFilter::reinforcementTimeout(){  DataNeighborList::iterator data_neighbor_itr;  OPPDataNeighborEntry *data_neighbor_entry;  RoutingTable::iterator routing_itr;  RoutingEntry *routing_entry;  Message *my_message;  DiffPrint(DEBUG_MORE_DETAILS, "Reinforcement Timeout !\n");  routing_itr = routing_list_.begin();  while (routing_itr != routing_list_.end()){    routing_entry = *routing_itr;    // Step 1: Delete expired gradients    data_neighbor_itr = routing_entry->data_neighbors_.begin();    while (data_neighbor_itr != routing_entry->data_neighbors_.end()){      data_neighbor_entry = *data_neighbor_itr;      if ((!data_neighbor_entry->new_messages_) &&	  (data_neighbor_entry->messages_ > 0)){	my_message = new Message(DIFFUSION_VERSION, NEGATIVE_REINFORCEMENT,				 0, 0, routing_entry->attrs_->size(), pkt_count_,				 random_id_, data_neighbor_entry->node_id_,				 LOCALHOST_ADDR);	my_message->msg_attr_vec_ = CopyAttrs(routing_entry->attrs_);	DiffPrint(DEBUG_NO_DETAILS,		  "Node%d: Sending Negative Reinforcement to node %d !\n",		  ((DiffusionRouting *)dr_)->getNodeId(), data_neighbor_entry->node_id_);	((DiffusionRouting *)dr_)->sendMessage(my_message, filter_handle_);	pkt_count_++;	delete my_message;	// Done. Delete entry	data_neighbor_itr = routing_entry->data_neighbors_.erase(data_neighbor_itr);	delete data_neighbor_entry;      }      else{	data_neighbor_itr++;      }    }    // Step 2: Delete data neighbors with no activity, zero flags    data_neighbor_itr = routing_entry->data_neighbors_.begin();    while (data_neighbor_itr != routing_entry->data_neighbors_.end()){      data_neighbor_entry = *data_neighbor_itr;      if (data_neighbor_entry->messages_ > 0){	data_neighbor_entry->messages_ = 0;	data_neighbor_entry->new_messages_ = false;	data_neighbor_itr++;      }      else{	// Delete entry	data_neighbor_itr = routing_entry->data_neighbors_.erase(data_neighbor_itr);	delete data_neighbor_entry;      }    }    // Advance to the next routing entry    routing_itr++;  }}int OnePhasePullFilter::subscriptionTimeout(NRAttrVec *attrs){  SubscriptionList::iterator subscription_itr;  SubscriptionEntry *subscription_entry;  RoutingEntry *routing_entry;  struct timeval tmv;  DiffPrint(DEBUG_MORE_DETAILS, "Subscription Timeout !\n");  GetTime(&tmv);  // Find the correct Routing entry  routing_entry = findRoutingEntry(attrs);  if (routing_entry){    // Routing entry found    subscription_itr = routing_entry->subscription_list_.begin();    // Go through all attributes    while (subscription_itr != routing_entry->subscription_list_.end()){      subscription_entry = *subscription_itr;      // Check timeouts      if (tmv.tv_sec > (subscription_entry->tv_.tv_sec + SUBSCRIPTION_TIMEOUT)){	// Time expired, send disinterest message	sendDisinterest(subscription_entry->attrs_, routing_entry);	subscription_itr = routing_entry->subscription_list_.erase(subscription_itr);	delete subscription_entry;      }      else{	subscription_itr++;      }    }  }  else{    DiffPrint(DEBUG_DETAILS, "Warning: Could't find subscription entry - maybe deleted by GradientTimeout ?\n");    // Cancel Timer    return -1;  }  // Keep Timer  return 0;}void OnePhasePullFilter::deleteRoutingEntry(RoutingEntry *routing_entry){  RoutingTable::iterator routing_itr;  RoutingEntry *current_entry;  // Go through the routing table  for (routing_itr = routing_list_.begin(); routing_itr != routing_list_.end(); ++routing_itr){    current_entry = *routing_itr;    // Is this the entry we are looking for ?    if (current_entry == routing_entry){      routing_itr = routing_list_.erase(routing_itr);      delete routing_entry;      return;    }  }  DiffPrint(DEBUG_ALWAYS, "Error: Could not find entry to delete !\n");}RoutingEntry * OnePhasePullFilter::matchRoutingEntry(NRAttrVec *attrs, RoutingTable::iterator start, RoutingTable::iterator *place){  RoutingTable::iterator routing_itr;  RoutingEntry *routing_entry;  for (routing_itr = start; routing_itr != routing_list_.end(); ++routing_itr){    routing_entry = *routing_itr;    if (MatchAttrs(routing_entry->attrs_, attrs)){      *place = routing_itr;      return routing_entry;    }  }  return NULL;}RoutingEntry * OnePhasePullFilter::findRoutingEntry(NRAttrVec *attrs){  RoutingTable::iterator routing_itr;  RoutingEntry *routing_entry;  for (routing_itr = routing_list_.begin(); routing_itr != routing_list_.end(); ++routing_itr){    routing_entry = *routing_itr;    if (PerfectMatch(routing_entry->attrs_, attrs))      return routing_entry;  }  return NULL;}SubscriptionEntry * OnePhasePullFilter::findMatchingSubscription(RoutingEntry *routing_entry,								 NRAttrVec *attrs){  SubscriptionList::iterator subscription_itr;  SubscriptionEntry *subscription_entry;  for (subscription_itr = routing_entry->subscription_list_.begin(); subscription_itr != routing_entry->subscription_list_.end(); ++subscription_itr){    subscription_entry = *subscription_itr;    if (PerfectMatch(subscription_entry->attrs_, attrs))      return subscription_entry;  }  return NULL;}void OnePhasePullFilter::forwardData(Message *msg,				     RoutingEntry *routing_entry,				     DataForwardingHistory *forwarding_history){  NRSimpleAttribute<void *> *nr_data_attr = NULL;  NRAttrVec::iterator attribute_iterator;  FlowIdList msg_flow_list, sinks_flow_list, local_flow_list;  FlowIdList out_flow_list;  int32_t out_neighbor;  int *packed_flows;  FlowIdList::iterator flow_id_itr;  RoundIdList::iterator round_id_itr;  SinkList::iterator sink_itr;  RoundIdEntry *round_id_entry;  SinkEntry *sink_entry;  Message *sink_message, *out_message;  // Step 0: Read flows from message  // Find NRFlowAttr and remove from the message  attribute_iterator = msg->msg_attr_vec_->begin();  nr_data_attr = NRFlowAttr.find_from(msg->msg_attr_vec_,				      attribute_iterator,				      &attribute_iterator);  if (!nr_data_attr){    DiffPrint(DEBUG_ALWAYS, "Cannot find NRFlowAttr !\n");    return;  }  msg->msg_attr_vec_->erase(attribute_iterator);  // Read flow ids from list  readFlowsFromList(nr_data_attr->getLen() / sizeof(int),		    &msg_flow_list, nr_data_attr->getVal());  // Fill lists of sinks and flows  routing_entry->getSinksFromList(&msg_flow_list, &sinks_flow_list);  routing_entry->getFlowsFromList(&msg_flow_list, &local_flow_list);  // Step 1: Sink Processing  if (sinks_flow_list.size() > 0){    // Copy original message so we can change it    sink_message = CopyMessage(msg);    // Go through all rounds    for (round_id_itr = routing_entry->round_ids_.begin();	 round_id_itr != routing_entry->round_ids_.end();	 round_id_itr++){      round_id_entry = *round_id_itr;      flow_id_itr = find(sinks_flow_list.begin(), sinks_flow_list.end(),			 round_id_entry->round_id_);      if (flow_id_itr != sinks_flow_list.end()){	// Flows match ! Send message to sink	for (sink_itr = round_id_entry->sinks_.begin();	     sink_itr != round_id_entry->sinks_.end(); ++sink_itr){	  sink_entry = *sink_itr;	  if (!forwarding_history->alreadyForwardedToLibrary(sink_entry->port_)){	    // Forward DATA to local sinks	    sink_message->next_hop_ = LOCALHOST_ADDR;	    sink_message->next_port_ = sink_entry->port_;	    // Add sink to the forwarding list	    forwarding_history->forwardingToLibrary(sink_entry->port_);	    ((DiffusionRouting *)dr_)->sendMessage(sink_message, filter_handle_);	  }	}	// Remove sink from the flow_list	if (!removeFlowFromList(&msg_flow_list, round_id_entry->round_id_)){	  // We should not get here	  DiffPrint(DEBUG_ALWAYS, "Cannot remove flow from msg_flow_list !\n");	}      }    }    // Delete sink message    delete sink_message;  }  // Step 2: Intermediate Processing  DiffPrint(DEBUG_NO_DETAILS, "Node%d: Forwarding Data\n", ((DiffusionRouting *)dr_)->getNodeId());  // Set reinforcement flags  if (msg->last_hop_ != LOCALHOST_ADDR)    routing_entry->updateNeighborDataInfo(msg->last_hop_, true);  // Work on local list until we finish processing all flows  while (local_flow_list.size() > 0){    // Initialize out_flow_list    out_flow_list.clear();    // Move first flow from the local flow list to out_flow_list    out_flow_list.push_back(*(local_flow_list.begin()));    local_flow_list.erase(local_flow_list.begin());    // Remove flow from the flow_list    if (!removeFlowFromList(&msg_flow_list, *(out_flow_list.begin()))){      // We should not get here      DiffPrint(DEBUG_ALWAYS, "Cannot remove flow from msg_flow_list !\n");    }    // Select output_neighbor    out_neighbor = routing_entry->getNeighborFromFlow(*(out_flow_list.begin()));    // Must have a valid neighbor    if (out_neighbor == BROADCAST_ADDR)      continue;        // Go through all other local flows    for (flow_id_itr = local_flow_list.begin();	 flow_id_itr != local_flow_list.end(); flow_id_itr++){      // Check if output neighbor for this flow matches current      if (routing_entry->getNeighborFromFlow(*flow_id_itr) == out_neighbor){	// Yes it does !	// Remove flow from the flow_list	if (!removeFlowFromList(&msg_flow_list, *flow_id_itr)){	  // We should not get here	  DiffPrint(DEBUG_ALWAYS, "Cannot remove flow from msg_flow_list !\n");	}	// Aggregate both in a single message	out_flow_list.push_back(*flow_id_itr);	flow_id_itr = local_flow_list.erase(flow_id_itr);      }    }    // out_flow_list should have a list of flow for out_neighbor    out_message = CopyMessage(msg);    out_message->next_hop_ = out_neighbor;    packed_flows = writeFlowsToList(&out_flow_list);    out_message->msg_attr_vec_->push_back(NRFlowAttr.make(NRAttribute::IS,							  (void *) packed_flows,							  sizeof(int) * out_flow_list.size()));    // NRFlowAttr.make will copy this, so we must delete it    delete [] packed_flows;    // Send it out    DiffPrint(DEBUG_NO_DETAILS, "Forwarding data to node %d !\n",	      out_neighbor);    ((DiffusionRouting *)dr_)->sendMessage(out_message, filter_handle_);    // Delete message    delete out_message;  }  // Done processing for this data type, we replace the NRFlowAttr  // with the (possibly) shorter msg_flow_list list  packed_flows = writeFlowsToList(&msg_flow_list);  nr_data_attr->setVal((void *) packed_flows, sizeof(int) * msg_flow_list.size());  msg->msg_attr_vec_->push_back(nr_data_attr);  // setVal makes a copy of this, so we must delete it  delete [] packed_flows;}void OnePhasePullFilter::sendInterest(NRAttrVec *attrs,				      RoutingEntry *routing_entry){  RoundIdList::iterator round_id_itr;  RoundIdEntry *round_id_entry;  SinkList::iterator sink_itr;  SinkEntry *sink_entry;  Message *msg = new Message(DIFFUSION_VERSION, INTEREST, 0, 0,			     attrs->size(), 0, 0, LOCALHOST_ADDR,			     LOCALHOST_ADDR);  msg->msg_attr_vec_ = CopyAttrs(attrs);  // Go through all round ids  for (round_id_itr = routing_entry->round_ids_.begin();       round_id_itr != routing_entry->round_ids_.end(); round_id_itr++){    round_id_entry = *round_id_itr;    // Send interest message to all local sinks    for (sink_itr = round_id_entry->sinks_.begin();	 sink_itr != round_id_entry->sinks_.end(); ++sink_itr){      sink_entry = *sink_itr;      msg->next_port_ = sink_entry->port_;      ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);    }  }  delete msg;}void OnePhasePullFilter::sendDisinterest(NRAttrVec *attrs,					 RoutingEntry *routing_entry){  NRAttrVec *new_attrs;  NRSimpleAttribute<int> *nrclass = NULL;  new_attrs = CopyAttrs(attrs);  nrclass = NRClassAttr.find(new_attrs);  if (!nrclass){    DiffPrint(DEBUG_ALWAYS,	      "Error: sendDisinterest couldn't find the class attribute !\n");    ClearAttrs(new_attrs);    delete new_attrs;    return;  }  // Change the class_key value  nrclass->setVal(NRAttribute::DISINTEREST_CLASS);  sendInterest(new_attrs, routing_entry);     ClearAttrs(new_attrs);  delete new_attrs;}void OnePhasePullFilter::readFlowsFromList(int number_of_flows,					   FlowIdList *flow_list,					   void *source_blob){  int *current_flow;  // Point to the beginning of the list  current_flow = (int *) source_blob;  for (int i = 0; i < number_of_flows; i++){    flow_list->push_back(*current_flow);    // Advance to next flow    current_flow++;  }}int * OnePhasePullFilter::writeFlowsToList(FlowIdList *flow_list){  FlowIdList::iterator flow_itr;  int number_of_flows;  int *flows, *current;;  number_of_flows = flow_list->size();  flows = new int[number_of_flows];  current = flows;  for (flow_itr = flow_list->begin();       flow_itr != flow_list->end(); flow_itr++){    *current = *flow_itr;    current++;  }  return flows;}bool OnePhasePullFilter::removeFlowFromList(FlowIdList *flow_list,					    int32_t flow){  FlowIdList::iterator flow_itr;  flow_itr = find(flow_list->begin(), flow_list->end(), flow);  if (flow_itr != flow_list->end()){    flow_itr = flow_list->erase(flow_itr);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -