📄 dr.cc
字号:
} } } while ((status > 0) && (flag == true)); } else if (status < 0){ DiffPrint(DEBUG_IMPORTANT, "Select returned %d\n", status); } } while (wait_condition);}#endif // NS_DIFFUSION#ifndef NS_DIFFUSIONvoid DiffusionRouting::sendMessageToDiffusion(Message *msg){ DiffPacket out_pkt = NULL; struct hdr_diff *dfh; char *pos; int len; out_pkt = AllocateBuffer(msg->msg_attr_vec_); dfh = HDR_DIFF(out_pkt); pos = (char *) out_pkt; pos = pos + sizeof(struct hdr_diff); len = PackAttrs(msg->msg_attr_vec_, pos); LAST_HOP(dfh) = htonl(msg->last_hop_); NEXT_HOP(dfh) = htonl(msg->next_hop_); DIFF_VER(dfh) = msg->version_; MSG_TYPE(dfh) = msg->msg_type_; DATA_LEN(dfh) = htons(len); PKT_NUM(dfh) = htonl(msg->pkt_num_); RDM_ID(dfh) = htonl(msg->rdm_id_); NUM_ATTR(dfh) = htons(msg->num_attr_); SRC_PORT(dfh) = htons(msg->source_port_); sendPacketToDiffusion(out_pkt, sizeof(struct hdr_diff) + len, diffusion_port_); delete [] out_pkt;}#elsevoid DiffusionRouting::sendMessageToDiffusion(Message *msg){ Message *my_msg; DeviceList::iterator itr; int len; my_msg = CopyMessage(msg); len = CalculateSize(my_msg->msg_attr_vec_); len = len + sizeof(struct hdr_diff); for (itr = local_out_devices_.begin(); itr != local_out_devices_.end(); ++itr){ (*itr)->sendPacket((DiffPacket) my_msg, len, diffusion_port_); }}#endif // !NS_DIFFUSIONvoid DiffusionRouting::sendPacketToDiffusion(DiffPacket pkt, int len, int dst){ DeviceList::iterator itr; for (itr = local_out_devices_.begin(); itr != local_out_devices_.end(); ++itr){ (*itr)->sendPacket(pkt, len, dst); }}#ifndef NS_DIFFUSIONvoid DiffusionRouting::recvPacket(DiffPacket pkt){ struct hdr_diff *dfh = HDR_DIFF(pkt); Message *rcv_message = NULL; int8_t version, msg_type; u_int16_t data_len, num_attr, source_port; int32_t pkt_num, rdm_id, next_hop, last_hop; // Read header version = DIFF_VER(dfh); msg_type = MSG_TYPE(dfh); source_port = ntohs(SRC_PORT(dfh)); pkt_num = ntohl(PKT_NUM(dfh)); rdm_id = ntohl(RDM_ID(dfh)); num_attr = ntohs(NUM_ATTR(dfh)); next_hop = ntohl(NEXT_HOP(dfh)); last_hop = ntohl(LAST_HOP(dfh)); data_len = ntohs(DATA_LEN(dfh)); // Create a message structure from the incoming packet rcv_message = new Message(version, msg_type, source_port, data_len, num_attr, pkt_num, rdm_id, next_hop, last_hop); // Read all attributes into the Message structure rcv_message->msg_attr_vec_ = UnpackAttrs(pkt, num_attr); // Process the incoming message recvMessage(rcv_message); // We are done delete rcv_message; delete [] pkt;}#endif // !NS_DIFFUSIONvoid DiffusionRouting::recvMessage(Message *msg){ // Check version if (msg->version_ != DIFFUSION_VERSION) return; // Check destination if (msg->next_hop_ != LOCALHOST_ADDR) return; // Process the incoming message if (msg->msg_type_ == REDIRECT) processControlMessage(msg); else processMessage(msg);}void DiffusionRouting::processControlMessage(Message *msg){ NRSimpleAttribute<void *> *original_header_attr = NULL; NRAttrVec::iterator place = msg->msg_attr_vec_->begin(); RedirectMessage *original_header; FilterEntry *entry; handle my_handle; // Find the attribute containing the original packet header original_header_attr = OriginalHdrAttr.find_from(msg->msg_attr_vec_, place, &place); if (!original_header_attr){ DiffPrint(DEBUG_ALWAYS, "Error: Received an invalid REDIRECT message !\n"); return; } // Restore original message header original_header = (RedirectMessage *) original_header_attr->getVal(); my_handle = original_header->handle_; msg->msg_type_ = original_header->msg_type_; msg->source_port_ = original_header->source_port_; msg->pkt_num_ = original_header->pkt_num_; msg->rdm_id_ = original_header->rdm_id_; msg->next_hop_ = original_header->next_hop_; msg->last_hop_ = original_header->last_hop_; msg->num_attr_ = original_header->num_attr_; msg->new_message_ = original_header->new_message_; msg->next_port_ = original_header->next_port_; // Delete attribute from the original set msg->msg_attr_vec_->erase(place); delete original_header_attr; // Find the right callback GetLock(dr_mtx_); entry = findFilter(my_handle); if (entry && entry->valid_){ // Just to confirm if (OneWayMatch(entry->filter_attrs_, msg->msg_attr_vec_)){ ReleaseLock(dr_mtx_); entry->cb_->recv(msg, my_handle); return; } else{ DiffPrint(DEBUG_ALWAYS, "Warning: Filter doesn't match incoming message's attributes !\n"); } } else{ DiffPrint(DEBUG_IMPORTANT, "Report: Cannot find filter (possibly deleted ?)\n"); } ReleaseLock(dr_mtx_);}void DiffusionRouting::processMessage(Message *msg){ NRSimpleAttribute<int> *rmst_id_attr = NULL; CallbackList::iterator cbl_itr; HandleList::iterator sub_itr; NRAttrVec *callback_attrs; HandleEntry *entry; CallbackEntry *aux; CallbackList cbl; // First, acquire the lock GetLock(dr_mtx_); for (sub_itr = sub_list_.begin(); sub_itr != sub_list_.end(); ++sub_itr){ entry = *sub_itr; if ((entry->valid_) && (MatchAttrs(msg->msg_attr_vec_, entry->attrs_))) if (entry->cb_){ aux = new CallbackEntry(entry->cb_, entry->hdl_); cbl.push_back(aux); } } // We can release the lock now ReleaseLock(dr_mtx_); // Check for RMST id attribute rmst_id_attr = RmstIdAttr.find(msg->msg_attr_vec_); cbl_itr = cbl.begin(); // Process RMST fragment if we have callbacks and this message has an RmstId if (rmst_id_attr && (cbl_itr != cbl.end())){ if (!processRmst(msg)){ cbl.clear(); return; } } // Now we just call all callback functions for (cbl_itr = cbl.begin(); cbl_itr != cbl.end(); ++cbl_itr){ // Copy attributes callback_attrs = CopyAttrs(msg->msg_attr_vec_); // Call app-specific callback function aux = *cbl_itr; aux->cb_->recv(callback_attrs, aux->subscription_handle_); delete aux; // Clean up callback attributes ClearAttrs(callback_attrs); delete callback_attrs; } // We are done cbl.clear();}bool DiffusionRouting::processRmst(Message *msg){ NRSimpleAttribute<void *> *data_buf_attr = NULL; NRSimpleAttribute<int> *max_frag_attr = NULL; NRSimpleAttribute<int> *rmst_id_attr = NULL; NRSimpleAttribute<int> *frag_attr = NULL; int rmst_no, frag_no, data_buf_len, count; void *blob_ptr, *tmp_frag_ptr; Int2RecRmst::iterator rmst_iterator; Int2Frag::iterator frag_iterator; char *dstPtr; int dstSize; RecRmst *rmst_ptr; // Read Rmst attributes data_buf_attr = RmstDataAttr.find(msg->msg_attr_vec_); rmst_id_attr = RmstIdAttr.find(msg->msg_attr_vec_); frag_attr = RmstFragAttr.find(msg->msg_attr_vec_); rmst_no = rmst_id_attr->getVal(); frag_no = frag_attr->getVal(); blob_ptr = data_buf_attr->getVal(); data_buf_len = data_buf_attr->getLen(); // See if we are receiving this blob, if not start a new RecRmst rmst_iterator = rec_rmst_map_.find(rmst_no); if (rmst_iterator == rec_rmst_map_.end()){ rmst_ptr = new RecRmst(rmst_no); rec_rmst_map_.insert(Int2RecRmst::value_type(rmst_no, rmst_ptr)); } else rmst_ptr = (*rmst_iterator).second; if (frag_no == 0){ max_frag_attr = RmstMaxFragAttr.find(msg->msg_attr_vec_); rmst_ptr->max_frag_ = max_frag_attr->getVal(); rmst_ptr->mtu_len_ = data_buf_len; } // Copy fragment to map tmp_frag_ptr = new char[data_buf_len]; memcpy(tmp_frag_ptr, blob_ptr, data_buf_len); rmst_ptr->frag_map_.insert(Int2Frag::value_type(frag_no, tmp_frag_ptr)); if (frag_no == rmst_ptr->max_frag_) rmst_ptr->max_frag_len_ = data_buf_len; count = rmst_ptr->frag_map_.size(); // If this is the last rmst fragment, create the entire rmst if (count == (rmst_ptr->max_frag_ + 1)){ DiffPrint(DEBUG_DETAILS, "RMST #%d is complete, creating big blob !\n", rmst_no); // Allocate memory for the big blob dstSize = rmst_ptr->max_frag_ * rmst_ptr->mtu_len_ + rmst_ptr->max_frag_len_; dstPtr = new char[dstSize]; // Copy all but last fragment to a buffer for (int i = 0; i < rmst_ptr->max_frag_; i++){ frag_iterator = rmst_ptr->frag_map_.find(i); tmp_frag_ptr = (*frag_iterator).second; memcpy((void *)&dstPtr[i * rmst_ptr->mtu_len_], (void *)tmp_frag_ptr, rmst_ptr->mtu_len_); } // Now, copy the last fragment to the buffer frag_iterator = rmst_ptr->frag_map_.find(rmst_ptr->max_frag_); tmp_frag_ptr = (*frag_iterator).second; memcpy((void *)&dstPtr[rmst_ptr->max_frag_ * rmst_ptr->mtu_len_], (void *)tmp_frag_ptr, rmst_ptr->max_frag_len_); // Since we copied everything from the map - clean it up rec_rmst_map_.erase(rmst_iterator); delete rmst_ptr; // Now we substitute the last fragment with the reconstructed blob data_buf_attr->setVal(dstPtr, dstSize); // Deliver this to the application return true; } // We don't have the entire blob return false;}HandleEntry * DiffusionRouting::removeHandle(handle my_handle, HandleList *hl){ HandleList::iterator itr; HandleEntry *entry; for (itr = hl->begin(); itr != hl->end(); ++itr){ entry = *itr; if (entry->hdl_ == my_handle){ hl->erase(itr); return entry; } } return NULL;}HandleEntry * DiffusionRouting::findHandle(handle my_handle, HandleList *hl){ HandleList::iterator itr; HandleEntry *entry; for (itr = hl->begin(); itr != hl->end(); ++itr){ entry = *itr; if (entry->hdl_ == my_handle) return entry; } return NULL;}FilterEntry * DiffusionRouting::deleteFilter(handle my_handle){ FilterList::iterator itr; FilterEntry *entry; for (itr = filter_list_.begin(); itr != filter_list_.end(); ++itr){ entry = *itr; if (entry->handle_ == my_handle){ filter_list_.erase(itr); return entry; } } return NULL;}FilterEntry * DiffusionRouting::findFilter(handle my_handle){ FilterList::iterator itr; FilterEntry *entry; for (itr = filter_list_.begin(); itr != filter_list_.end(); ++itr){ entry = *itr; if (entry->handle_ == my_handle) return entry; } return NULL;}bool DiffusionRouting::hasScope(NRAttrVec *attrs){ NRAttribute *temp = NULL; temp = NRScopeAttr.find(attrs); if (temp) return true; return false;}bool DiffusionRouting::checkSubscription(NRAttrVec *attrs){ NRSimpleAttribute<int> *nrclass = NULL; NRSimpleAttribute<int> *nrscope = NULL; // We first try to locate both class and scope attributes nrclass = NRClassAttr.find(attrs); nrscope = NRScopeAttr.find(attrs); // There must be a class attribute in subscriptions if (!nrclass) return false; if (nrscope){ // This subcription has both class and scope attribute. So, we // check if class/scope attributes comply with the Diffusion // Routing API // Must check scope's operator. The API requires it to be "IS" if (nrscope->getOp() != NRAttribute::IS) return false; // Ok, so first check if this is a global subscription if ((nrscope->getVal() == NRAttribute::GLOBAL_SCOPE) && (nrclass->getVal() == NRAttribute::INTEREST_CLASS) && (nrclass->getOp() == NRAttribute::IS)) return true; // Check for local subscriptions if (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE) return true; // Just to be sure we did not miss any case return false; } // If there is no scope attribute, we will insert one later if this // subscription looks like a global subscription if ((nrclass->getVal() == NRAttribute::INTEREST_CLASS) && (nrclass->getOp() == NRAttribute::IS)) return true; return false;}bool DiffusionRouting::checkPublication(NRAttrVec *attrs){ NRSimpleAttribute<int> *nrclass = NULL; NRSimpleAttribute<int> *nrscope = NULL; // We first try to locate both class and scope attributes nrclass = NRClassAttr.find(attrs); nrscope = NRScopeAttr.find(attrs); // There must be a class attribute in the publication if (!nrclass) return false; // In addition, the Diffusion Routing API requires the class // attribute to be set to "IS DATA_CLASS" if ((nrclass->getVal() != NRAttribute::DATA_CLASS) || (nrclass->getOp() != NRAttribute::IS)) return false; if (nrscope){ // Ok, so this publication has both class and scope attributes. We // now have to check if they comply to the Diffusion Routing API // semantics for publish // Must check scope's operator. The API requires it to be "IS" if (nrscope->getOp() != NRAttribute::IS) return false; // We accept both global and local scope data messages if ((nrscope->getVal() == NRAttribute::GLOBAL_SCOPE) || (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE)) return true; // Just not to miss any case return false; } // A publish without a scope attribute is fine, we will include a // default NODE_LOCAL_SCOPE attribute later return true;}bool DiffusionRouting::checkSend(NRAttrVec *attrs){ NRSimpleAttribute<int> *nrclass = NULL; NRSimpleAttribute<int> *nrscope = NULL; // Currently only checks for Class and Scope attributes nrclass = NRClassAttr.find(attrs); nrscope = NRScopeAttr.find(attrs); if (nrclass || nrscope) return false; return true;}bool DiffusionRouting::isPushData(NRAttrVec *attrs){ NRSimpleAttribute<int> *nrclass = NULL; NRSimpleAttribute<int> *nrscope = NULL; // Currently only checks for Class and Scope attributes nrclass = NRClassAttr.find(attrs); nrscope = NRScopeAttr.find(attrs); // We should have both class and scope if (nrclass && nrscope){ if (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE) return false; return true; } else{ DiffPrint(DEBUG_ALWAYS, "Error: Cannot find class/scope attributes !\n"); return false; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -