📄 diffusion.cc
字号:
{ Message *send_message; int len; int32_t dst; DeviceList::iterator device_itr; send_message = CopyMessage(msg); len = CalculateSize(send_message->msg_attr_vec_); len = len + sizeof(struct hdr_diff); dst = send_message->next_hop_; for (device_itr = out_devices_.begin(); device_itr != out_devices_.end(); ++device_itr){ (*device_itr)->sendPacket((DiffPacket) send_message, len, dst); }}#endif // !NS_DIFFUSIONvoid DiffusionCoreAgent::sendPacketToLibrary(DiffPacket pkt, int len, u_int16_t dst){ DeviceList::iterator device_itr; for (device_itr = local_out_devices_.begin(); device_itr != local_out_devices_.end(); ++device_itr){ (*device_itr)->sendPacket(pkt, len, dst); }}void DiffusionCoreAgent::sendPacketToNetwork(DiffPacket pkt, int len, int dst){ DeviceList::iterator device_itr; for (device_itr = out_devices_.begin(); device_itr != out_devices_.end(); ++device_itr){ (*device_itr)->sendPacket(pkt, len, dst); }}void DiffusionCoreAgent::updateNeighbors(int id){ NeighborList::iterator neighbor_itr; NeighborEntry *neighbor_entry; if (id == LOCALHOST_ADDR || id == my_id_) return; for (neighbor_itr = neighbor_list_.begin(); neighbor_itr != neighbor_list_.end(); ++neighbor_itr){ if ((*neighbor_itr)->id == id) break; } if (neighbor_itr == neighbor_list_.end()){ // This is a new neighbor neighbor_entry = new NeighborEntry(id); neighbor_list_.push_front(neighbor_entry); } else{ // Just update the neighbor timeout GetTime(&((*neighbor_itr)->tmv)); }}FilterEntry * DiffusionCoreAgent::findFilter(int16_t handle, u_int16_t agent){ FilterList::iterator filter_itr; FilterEntry *filter_entry; for (filter_itr = filter_list_.begin(); filter_itr != filter_list_.end(); ++filter_itr){ filter_entry = *filter_itr; if (handle != filter_entry->handle_ || agent != filter_entry->agent_) continue; // Found return filter_entry; } return NULL;}FilterEntry * DiffusionCoreAgent::deleteFilter(int16_t handle, u_int16_t agent){ FilterList::iterator filter_itr = filter_list_.begin(); FilterEntry *filter_entry = NULL; while (filter_itr != filter_list_.end()){ filter_entry = *filter_itr; if (handle == filter_entry->handle_ && agent == filter_entry->agent_){ filter_list_.erase(filter_itr); break; } filter_entry = NULL; filter_itr++; } return filter_entry;}bool DiffusionCoreAgent::addFilter(NRAttrVec *attrs, u_int16_t agent, int16_t handle, u_int16_t priority){ FilterList::iterator filter_itr; FilterEntry *filter_entry; filter_itr = filter_list_.begin(); while (filter_itr != filter_list_.end()){ filter_entry = *filter_itr; if (filter_entry->priority_ == priority) return false; filter_itr++; } filter_entry = new FilterEntry(handle, priority, agent); // Copy the Attribute Vector filter_entry->filter_attrs_ = CopyAttrs(attrs); // Add this filter to the filter list filter_list_.push_back(filter_entry); return true;}FilterList::iterator DiffusionCoreAgent::findMatchingFilter(NRAttrVec *attrs, FilterList::iterator filter_itr){ FilterEntry *filter_entry; for (;filter_itr != filter_list_.end(); ++filter_itr){ filter_entry = *filter_itr; if (OneWayMatch(filter_entry->filter_attrs_, attrs)){ // That's a match ! break; } } return filter_itr;}bool DiffusionCoreAgent::restoreOriginalHeader(Message *msg){ NRAttrVec::iterator attr_itr = msg->msg_attr_vec_->begin(); NRSimpleAttribute<void *> *original_header_attr = NULL; RedirectMessage *original_hdr; // Find original Header original_header_attr = OriginalHdrAttr.find_from(msg->msg_attr_vec_, attr_itr, &attr_itr); if (!original_header_attr){ DiffPrint(DEBUG_ALWAYS, "Error: DiffusionCoreAgent::ProcessControlMessage couldn't find the OriginalHdrAttr !\n"); return false; } // Restore original Header original_hdr = (RedirectMessage *) original_header_attr->getVal(); msg->msg_type_ = original_hdr->msg_type_; msg->source_port_ = original_hdr->source_port_; msg->pkt_num_ = original_hdr->pkt_num_; msg->rdm_id_ = original_hdr->rdm_id_; msg->next_hop_ = original_hdr->next_hop_; msg->last_hop_ = original_hdr->last_hop_; msg->new_message_ = original_hdr->new_message_; msg->num_attr_ = original_hdr->num_attr_; msg->data_len_ = original_hdr->data_len_; msg->next_port_ = original_hdr->next_port_; // Delete attribute from original set msg->msg_attr_vec_->erase(attr_itr); delete original_header_attr; return true;}FilterList * DiffusionCoreAgent::getFilterList(NRAttrVec *attrs){ FilterList *matching_filter_list = new FilterList; FilterList::iterator known_filters_itr, filter_list_itr; FilterEntry *matching_filter_entry, *filter_entry; // We need to come up with a list of filters to call // F1 will be called before F2 if F1->priority > F2->priority known_filters_itr = findMatchingFilter(attrs, filter_list_.begin()); while (known_filters_itr != filter_list_.end()){ // We have a match ! matching_filter_entry = *known_filters_itr; for (filter_list_itr = matching_filter_list->begin(); filter_list_itr != matching_filter_list->end(); ++filter_list_itr){ filter_entry = *filter_list_itr; // Figure out where to insert if (matching_filter_entry->priority_ > filter_entry->priority_) break; } // Insert matching filter in the list matching_filter_list->insert(filter_list_itr, matching_filter_entry); // Continue the search known_filters_itr++; known_filters_itr = findMatchingFilter(attrs, known_filters_itr); } return matching_filter_list;}u_int16_t DiffusionCoreAgent::getNextFilterPriority(int16_t handle, u_int16_t priority, u_int16_t agent){ FilterList::iterator filter_itr; FilterEntry *filter_entry; if ((priority < FILTER_MIN_PRIORITY) || (priority > FILTER_KEEP_PRIORITY)) return FILTER_INVALID_PRIORITY; if (priority < FILTER_KEEP_PRIORITY) return (priority - 1); filter_itr = filter_list_.begin(); while (filter_itr != filter_list_.end()){ filter_entry = *filter_itr; if ((filter_entry->handle_ == handle) && (filter_entry->agent_ == agent)){ // Found this filter return (filter_entry->priority_ - 1); } filter_itr++; } return FILTER_INVALID_PRIORITY;}void DiffusionCoreAgent::processMessage(Message *msg){ FilterList *filter_list; FilterList::iterator filter_list_itr; FilterEntry *filter_entry; filter_list = getFilterList(msg->msg_attr_vec_); // Ok, we have a list of Filters to call. Send this message // to the first filter on this list if (filter_list->size() > 0){ filter_list_itr = filter_list->begin(); filter_entry = *filter_list_itr; forwardMessage(msg, filter_entry); filter_list->clear(); } delete filter_list;}void DiffusionCoreAgent::processControlMessage(Message *msg){ NRSimpleAttribute<void *> *ctrl_msg_attr = NULL; NRAttrVec::iterator attr_itr; ControlMessage *control_blob = NULL; FilterList *filter_list; FilterList::iterator filter_list_itr; FilterEntry *filter_entry; int command, param1, param2; u_int16_t priority, source_port, new_priority; int16_t handle; bool filter_is_last = false; // Control messages should not come from other nodes if (msg->last_hop_ != LOCALHOST_ADDR){ DiffPrint(DEBUG_ALWAYS, "Error: Received control message from another node !\n"); return; } // Find the control attribute attr_itr = msg->msg_attr_vec_->begin(); ctrl_msg_attr = ControlMsgAttr.find_from(msg->msg_attr_vec_, attr_itr, &attr_itr); if (!ctrl_msg_attr){ // Control message is invalid DiffPrint(DEBUG_ALWAYS, "Error: Control message received is invalid !\n"); return; } // Extract the control message info control_blob = (ControlMessage *) ctrl_msg_attr->getVal(); command = control_blob->command_; param1 = control_blob->param1_; param2 = control_blob->param2_; // Filter API definitions // // command = ADD_UPDATE_FILTER // param1 = priority // param2 = handle // attrs = other attrs specify the filter // // Remarks: If this filter is already present for this module, // we don't create a new one. A filter is identified // by the handle and the originating agent. The filter // gets refreshed if it already exists. If attrs and // handle are the same, we update the priority. // // // command = REMOVE_FILTER // param1 = handle // // Remarks: Remove the filter identified by (agent, handle) // If it's not found, a warning message is generated. // // // Remarks: Send message from a local App to another App or // a neighbor. If agent_id is zero, the packet goes // out to the network. Otherwise, it goes to the // agent_id located on this node. // // // command = SEND_MESSAGE // param1 = handle // param2 = priority // // Remarks: Send this message to the next filter or to a local // application. We have to assemble the list again // and figure out the current agent's position on the // list. Then, we send to the next guy. If there is // no other filter in the list, we try to send it to // the network, if next_hop contains a node address. logControlMessage(msg, command, param1, param2); // First we remove the control attribute from the message msg->msg_attr_vec_->erase(attr_itr); delete ctrl_msg_attr; switch(command){ case ADD_UPDATE_FILTER: priority = param1; handle = param2; filter_entry = findFilter(handle, msg->source_port_); if (filter_entry){ // Filter already present, must be an update message if (PerfectMatch(filter_entry->filter_attrs_, msg->msg_attr_vec_)){ // Attrs also match, let's update the filter's timeout GetTime(&(filter_entry->tmv_)); // Check if the priority has changed... if (priority == filter_entry->priority_){ // Nothing to do ! DiffPrint(DEBUG_SOME_DETAILS, "Filter %d, %d, %d refreshed.\n", filter_entry->agent_, filter_entry->handle_, filter_entry->priority_); } else{ // Update the priority DiffPrint(DEBUG_NO_DETAILS, "Updated priority of filter %d, %d, %d to %d\n", msg->source_port_, handle, filter_entry->priority_, priority); filter_entry->priority_ = priority; } break; } else{ // Filter attributes have changed ! This is not allowed ! DiffPrint(DEBUG_ALWAYS, "Filter attributes cannot change during an update !\n"); break; } } else{ // This is a new filter if (!addFilter(msg->msg_attr_vec_, msg->source_port_, handle, priority)){ DiffPrint(DEBUG_ALWAYS, "Failed to add filter %d, %d, %d\n", msg->source_port_, handle, priority); } else{ DiffPrint(DEBUG_NO_DETAILS, "Adding filter %d, %d, %d\n", msg->source_port_, handle, priority); } } break; case REMOVE_FILTER: handle = param1; filter_entry = deleteFilter(handle, msg->source_port_); if (filter_entry){ // Filter deleted DiffPrint(DEBUG_NO_DETAILS, "Filter %d, %d, %d deleted.\n", filter_entry->agent_, filter_entry->handle_, filter_entry->priority_); delete filter_entry; } else{ DiffPrint(DEBUG_ALWAYS, "Couldn't find filter to delete !\n"); } break; case SEND_MESSAGE: handle = param1; priority = param2; source_port = msg->source_port_; if (!restoreOriginalHeader(msg)) break; new_priority = getNextFilterPriority(handle, priority, source_port); if (new_priority == FILTER_INVALID_PRIORITY) break; // Now process the incoming message filter_list = getFilterList(msg->msg_attr_vec_); // Find the filter after the 'current' filter on the list if (filter_list->size() > 0){ for (filter_list_itr = filter_list->begin(); filter_list_itr != filter_list->end(); ++filter_list_itr){ filter_entry = *filter_list_itr;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -