📄 gradient.cc
字号:
if (msg->last_hop_ != LOCALHOST_ADDR){ setReinforcementFlags(routing_entry, msg->last_hop_, OLD_MESSAGE); } // Continue going through other data types routing_itr++; routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr, &routing_itr); } break; case PUSH_EXPLORATORY_DATA: // Just drop it DiffPrint(DEBUG_NO_DETAILS, "Received an old Push Exploratory Data. Loop detected !\n"); break; case EXPLORATORY_DATA: // Just drop it DiffPrint(DEBUG_NO_DETAILS, "Received an old Exploratory Data. Loop detected !\n"); break; case POSITIVE_REINFORCEMENT: DiffPrint(DEBUG_IMPORTANT, "Received an old Positive Reinforcement !\n"); break; case NEGATIVE_REINFORCEMENT: DiffPrint(DEBUG_IMPORTANT, "Received an old Negative Reinforcement !\n"); DiffPrint(DEBUG_IMPORTANT, "pkt_num = %d, rdm_id = %d !\n", msg->pkt_num_, msg->rdm_id_); break; default: break; }}void GradientFilter::processNewMessage(Message *msg){ NRSimpleAttribute<void *> *reinforcement_attr = NULL; NRSimpleAttribute<int> *nrclass = NULL; NRSimpleAttribute<int> *nrscope = NULL; ReinforcementBlob *reinforcement_blob; RoutingTable::iterator routing_itr; RoutingEntry *routing_entry; GradientList::iterator gradient_itr; GradientEntry *gradient_entry; NRAttrVec::iterator place; HashEntry *hash_entry; AttributeEntry *attribute_entry; Message *my_msg; TimerType *timer; unsigned int key[2]; bool new_data_type = false; switch (msg->msg_type_){ case INTEREST: DiffPrint(DEBUG_NO_DETAILS, "Received Interest !\n"); nrclass = NRClassAttr.find(msg->msg_attr_vec_); nrscope = NRScopeAttr.find(msg->msg_attr_vec_); if (!nrclass || !nrscope){ DiffPrint(DEBUG_ALWAYS, "Warning: Can't find CLASS/SCOPE attributes in the message !\n"); return; } // Step 1: Look for the same data type routing_entry = findRoutingEntry(msg->msg_attr_vec_); if (!routing_entry){ // Create a new routing entry for this data type routing_entry = new RoutingEntry; routing_entry->attrs_ = CopyAttrs(msg->msg_attr_vec_); routing_list_.push_back(routing_entry); new_data_type = true; } if (msg->last_hop_ == LOCALHOST_ADDR){ // From local agent updateAgent(routing_entry, msg->source_port_); } else{ // From outside, we just add the new gradient updateGradient(routing_entry, msg->last_hop_, false); } if ((nrclass->getVal() == NRAttribute::INTEREST_CLASS) && (nrclass->getOp() == NRAttribute::IS)){ // Global interest messages should always be forwarded if (nrscope->getVal() == NRAttribute::GLOBAL_SCOPE){ timer = new TimerType(INTEREST_TIMER); timer->param_ = (void *) CopyMessage(msg); ((DiffusionRouting *)dr_)->addTimer(INTEREST_FORWARD_DELAY + (int) ((INTEREST_FORWARD_JITTER * (GetRand() * 1.0 / RAND_MAX) - (INTEREST_FORWARD_JITTER / 2))), (void *) timer, timer_callback_); } } else{ if ((nrclass->getOp() != NRAttribute::IS) && (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE) && (new_data_type)){ timer = new TimerType(SUBSCRIPTION_TIMER); timer->param_ = (void *) CopyAttrs(msg->msg_attr_vec_); ((DiffusionRouting *)dr_)->addTimer(SUBSCRIPTION_DELAY + (int) (SUBSCRIPTION_DELAY * (GetRand() * 1.0 / RAND_MAX)), (void *) timer, timer_callback_); } // Subscriptions don't have to match other subscriptions break; } // Step 2: Match other routing tables routing_itr = routing_list_.begin(); routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr, &routing_itr); while (routing_entry){ // Got a match attribute_entry = findMatchingSubscription(routing_entry, msg->msg_attr_vec_); // Do we already have this subscription if (attribute_entry){ GetTime(&(attribute_entry->tv_)); } else{ // Create a new attribute entry, add it to the attribute list // and send an interest message to the local agent attribute_entry = new AttributeEntry(CopyAttrs(msg->msg_attr_vec_)); routing_entry->attr_list_.push_back(attribute_entry); sendInterest(attribute_entry->attrs_, routing_entry); } // Move to the next RoutingEntry routing_itr++; routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr, &routing_itr); } break; case DATA: DiffPrint(DEBUG_NO_DETAILS, "Received Data !\n"); // Find the correct routing entry routing_itr = routing_list_.begin(); routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr, &routing_itr); while (routing_entry){ forwardData(msg, routing_entry); routing_itr++; routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr, &routing_itr); } break; case EXPLORATORY_DATA: DiffPrint(DEBUG_NO_DETAILS, "Received Exploratory Data !\n"); // Find the correct routing entry routing_itr = routing_list_.begin(); routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr, &routing_itr); while (routing_entry){ forwardExploratoryData(msg, routing_entry); routing_itr++; routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr, &routing_itr); } break; case PUSH_EXPLORATORY_DATA: DiffPrint(DEBUG_NO_DETAILS, "Received Push Exploratory Data !\n"); forwardPushExploratoryData(msg); break; case POSITIVE_REINFORCEMENT: DiffPrint(DEBUG_NO_DETAILS, "Received a Positive Reinforcement !\n"); // Step 0: Look for reinforcement attribute place = msg->msg_attr_vec_->begin(); reinforcement_attr = ReinforcementAttr.find_from(msg->msg_attr_vec_, place, &place); if (!reinforcement_attr){ DiffPrint(DEBUG_ALWAYS, "Error: Received an invalid Positive Reinforcement message !\n"); return; } // Step 1: Extract reinforcement blob from message and look for an // entry in our hash table reinforcement_blob = (ReinforcementBlob *) reinforcement_attr->getVal(); key[0] = reinforcement_blob->pkt_num_; key[1] = reinforcement_blob->rdm_id_; hash_entry = getHash(key[0], key[1]); // Step 2: Remove the reinforcement attribute from the message msg->msg_attr_vec_->erase(place); // Step 3: Find a routing entry that matches this message routing_entry = findRoutingEntry(msg->msg_attr_vec_); if (!routing_entry){ // So, if we do not know about this data type, this must be a // reinforcement message to a PUSHED_EXPLORATORY_DATA message // Check for class/scope (all interest message should have it) nrclass = NRClassAttr.find(msg->msg_attr_vec_); nrscope = NRScopeAttr.find(msg->msg_attr_vec_); if (!nrclass || !nrscope){ DiffPrint(DEBUG_ALWAYS, "Warning: Can't find CLASS/SCOPE attributes in the message !\n"); return; } // Create new Routing Entry routing_entry = new RoutingEntry; routing_entry->attrs_ = CopyAttrs(msg->msg_attr_vec_); routing_list_.push_back(routing_entry); } // Add reinforced gradient to last_hop updateGradient(routing_entry, msg->last_hop_, true); // Add the reinforcement attribute back to the message msg->msg_attr_vec_->push_back(reinforcement_attr); // If we have no record of this message it is either because we // originated the message (in which case, no further action is // required) or because we dropped it a long time ago because of // our hashing configuration parameters (in this case, we can't do // anything) if (hash_entry){ msg->next_hop_ = hash_entry->last_hop_; DiffPrint(DEBUG_NO_DETAILS, "Forwarding Positive Reinforcement to node %d !\n", hash_entry->last_hop_); ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_); } break; case NEGATIVE_REINFORCEMENT: DiffPrint(DEBUG_NO_DETAILS, "Received a Negative Reinforcement !\n"); routing_entry = findRoutingEntry(msg->msg_attr_vec_); if (routing_entry){ gradient_entry = findReinforcedGradient(msg->last_hop_, routing_entry); if (gradient_entry){ // Remove reinforced gradient to last_hop deleteGradient(routing_entry, gradient_entry); gradient_entry = findReinforcedGradients(&routing_entry->gradients_, routing_entry->gradients_.begin(), &gradient_itr); // If there are no other reinforced outgoing gradients // we need to send our own negative reinforcement if (!gradient_entry){ my_msg = new Message(DIFFUSION_VERSION, NEGATIVE_REINFORCEMENT, 0, 0, routing_entry->attrs_->size(), pkt_count_, random_id_, BROADCAST_ADDR, LOCALHOST_ADDR); my_msg->msg_attr_vec_ = CopyAttrs(routing_entry->attrs_); DiffPrint(DEBUG_NO_DETAILS, "Forwarding Negative Reinforcement to ALL !\n"); ((DiffusionRouting *)dr_)->sendMessage(my_msg, filter_handle_); pkt_count_++; delete my_msg; } } } break; default: break; }}HashEntry * GradientFilter::getHash(unsigned int pkt_num, unsigned int rdm_id){ unsigned int key[2]; key[0] = pkt_num; key[1] = rdm_id; Tcl_HashEntry *entryPtr = Tcl_FindHashEntry(&htable_, (char *)key); if (entryPtr == NULL) return NULL; return ((HashEntry *) Tcl_GetHashValue(entryPtr));}void GradientFilter::putHash(HashEntry *new_hash_entry, unsigned int pkt_num, unsigned int rdm_id){ Tcl_HashEntry *tcl_hash_entry; HashEntry *hash_entry; HashList::iterator hash_itr; unsigned int key[2]; int new_hash_key; if (hash_list_.size() == HASH_TABLE_DATA_MAX_SIZE){ // Hash table reached maximum size for (int i = 0; ((i < HASH_TABLE_DATA_REMOVE_AT_ONCE) && (hash_list_.size() > 0)); i++){ hash_itr = hash_list_.begin(); tcl_hash_entry = *hash_itr; hash_entry = (HashEntry *) Tcl_GetHashValue(tcl_hash_entry); delete hash_entry; hash_list_.erase(hash_itr); Tcl_DeleteHashEntry(tcl_hash_entry); } } key[0] = pkt_num; key[1] = rdm_id; tcl_hash_entry = Tcl_CreateHashEntry(&htable_, (char *) key, &new_hash_key); if (new_hash_key == 0){ DiffPrint(DEBUG_IMPORTANT, "Key already exists in hash !\n"); return; } Tcl_SetHashValue(tcl_hash_entry, new_hash_entry); hash_list_.push_back(tcl_hash_entry);}handle GradientFilter::setupFilter(){ NRAttrVec attrs; handle h; // For the gradient filter, we use a single attribute with an "IS" // operator. This causes this filter to match every single packet // getting to diffusion attrs.push_back(NRClassAttr.make(NRAttribute::IS, NRAttribute::INTEREST_CLASS)); h = ((DiffusionRouting *)dr_)->addFilter(&attrs, GRADIENT_FILTER_PRIORITY, filter_callback_); ClearAttrs(&attrs); return h;}#ifndef NS_DIFFUSIONvoid GradientFilter::run(){ // Doesn't do anything while (1){ sleep(1000); }}#endif // !NS_DIFFUSION#ifdef NS_DIFFUSIONGradientFilter::GradientFilter(const char *diffrtg){ DiffAppAgent *agent;#elseGradientFilter::GradientFilter(int argc, char **argv){#endif // NS_DIFFUSION struct timeval tv; TimerType *timer; GetTime(&tv); SetSeed(&tv); pkt_count_ = GetRand(); random_id_ = GetRand(); // Create Diffusion Routing class#ifdef NS_DIFFUSION agent = (DiffAppAgent *)TclObject::lookup(diffrtg); dr_ = agent->dr();#else parseCommandLine(argc, argv); dr_ = NR::createNR(diffusion_port_);#endif // NS_DIFFUSION // Create callback classes and set up pointers filter_callback_ = new GradientFilterReceive(this); timer_callback_ = new GradientTimerReceive(this); // Initialize Hashing structures Tcl_InitHashTable(&htable_, 2); // Set up the filter filter_handle_ = setupFilter(); // Print filter information DiffPrint(DEBUG_IMPORTANT, "Gradient filter subscribed to *, received handle %d\n", filter_handle_); // Add timers for keeping state up-to-date timer = new TimerType(GRADIENT_TIMER); ((DiffusionRouting *)dr_)->addTimer(GRADIENT_DELAY, (void *) timer, timer_callback_); timer = new TimerType(REINFORCEMENT_TIMER); ((DiffusionRouting *)dr_)->addTimer(REINFORCEMENT_DELAY, (void *) timer, timer_callback_); GetTime(&tv); DiffPrint(DEBUG_ALWAYS, "Gradient filter initialized at time %ld:%ld!\n", tv.tv_sec, tv.tv_usec);}#ifndef NS_DIFFUSIONint main(int argc, char **argv){ GradientFilter *app; // Initialize and run the Gradient Filter app = new GradientFilter(argc, argv); app->run(); return 0;}#endif // !NS_DIFFUSION
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -