📄 dr.cc
字号:
max_frag_attr = RmstMaxFragAttr.make(NRAttribute::IS, num_frag); send_attrs->push_back(max_frag_attr); send_attrs->push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, RMST_RESP)); frag_number_attr = RmstFragAttr.make(NRAttribute::IS, 0); send_attrs->push_back(frag_number_attr); send_attrs->push_back(RmstIdAttr.make(NRAttribute::IS, id)); // Replace the large blob with a blob fragment frag_ptr = (void *)&blob[0]; // The call to setVal will delete the original blob!! if (num_frag == 0) rmst_data_attr->setVal(frag_ptr, max_frag_len); else rmst_data_attr->setVal(frag_ptr, fragment_size); // Send 1st fragment retval = send(publication_handle, send_attrs); // Send other fragments for (int i = 1; i <= num_frag; i++){ // Small delay between sending fragments send_interval.tv_sec = 0; send_interval.tv_usec = 25000; select(0, NULL, NULL, NULL, &send_interval); // Send next fragment frag_number_attr->setVal(i); frag_ptr = (void *)&blob[i * fragment_size]; if (num_frag == i) rmst_data_attr->setVal(frag_ptr, max_frag_len); else rmst_data_attr->setVal(frag_ptr, fragment_size); retval = send(publication_handle, send_attrs); } ClearAttrs(send_attrs); delete blob; return OK;}int DiffusionRouting::addToBlacklist(int32_t node){ ControlMessage *control_blob; NRAttribute *ctrl_msg_attr; Message *my_message; NRAttrVec *attrs; control_blob = new ControlMessage(ADD_TO_BLACKLIST, node, 0); ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS, (void *)control_blob, sizeof(ControlMessage)); attrs = new NRAttrVec; attrs->push_back(ctrl_msg_attr); my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0, 0, pkt_count_, random_id_, LOCALHOST_ADDR, LOCALHOST_ADDR); // Increment pkt_counter pkt_count_++; // Add attributes to the message my_message->msg_attr_vec_ = attrs; my_message->num_attr_ = attrs->size(); my_message->data_len_ = CalculateSize(attrs); // Send Packet sendMessageToDiffusion(my_message); // Delete message delete my_message; delete control_blob; return OK;}int DiffusionRouting::clearBlacklist(){ ControlMessage *control_blob; NRAttribute *ctrl_msg_attr; Message *my_message; NRAttrVec *attrs; control_blob = new ControlMessage(CLEAR_BLACKLIST, 0, 0); ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS, (void *)control_blob, sizeof(ControlMessage)); attrs = new NRAttrVec; attrs->push_back(ctrl_msg_attr); my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0, 0, pkt_count_, random_id_, LOCALHOST_ADDR, LOCALHOST_ADDR); // Increment pkt_counter pkt_count_++; // Add attributes to the message my_message->msg_attr_vec_ = attrs; my_message->num_attr_ = attrs->size(); my_message->data_len_ = CalculateSize(attrs); // Send Packet sendMessageToDiffusion(my_message); // Delete message delete my_message; delete control_blob; return OK;}handle DiffusionRouting::addFilter(NRAttrVec *filter_attrs, u_int16_t priority, FilterCallback *cb){ FilterEntry *filter_entry; NRAttrVec *attrs; NRAttribute *ctrl_msg_attr; ControlMessage *control_blob; Message *my_message; TimerCallback *timer_callback; // Check parameters if (!filter_attrs || !cb || priority < FILTER_MIN_PRIORITY || priority > FILTER_MAX_PRIORITY){ DiffPrint(DEBUG_ALWAYS, "Received invalid parameters when adding filter !\n"); return FAIL; } // Get lock first GetLock(dr_mtx_); // Create and Initialize the handle_entry structute filter_entry = new FilterEntry(next_handle_, priority, agent_id_); next_handle_++; filter_entry->cb_ = (FilterCallback *) cb; filter_list_.push_back(filter_entry); // Copy attributes (keep them for matching later) filter_entry->filter_attrs_ = CopyAttrs(filter_attrs); // Copy the attributes (and add the control attr) attrs = CopyAttrs(filter_attrs); control_blob = new ControlMessage(ADD_UPDATE_FILTER, priority, filter_entry->handle_); ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS, (void *)control_blob, sizeof(ControlMessage)); attrs->push_back(ctrl_msg_attr); // Initialize message structure my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0, 0, pkt_count_, random_id_, LOCALHOST_ADDR, LOCALHOST_ADDR); // Increment pkt_counter pkt_count_++; // Add attributes to the message my_message->msg_attr_vec_ = attrs; my_message->num_attr_ = attrs->size(); my_message->data_len_ = CalculateSize(attrs); // Release the lock ReleaseLock(dr_mtx_); // Send Packet sendMessageToDiffusion(my_message); // Add keepalive timer to the event queue timer_callback = new FilterKeepaliveCallback(this, filter_entry); timers_manager_->addTimer(FILTER_KEEPALIVE_DELAY, timer_callback); // Delete message, attribute set and controlblob delete my_message; delete control_blob; return filter_entry->handle_;}int DiffusionRouting::removeFilter(handle filter_handle){ FilterEntry *filter_entry = NULL; ControlMessage *control_blob; NRAttribute *ctrl_msg_attr; NRAttrVec *attrs; Message *my_message; // Get lock first GetLock(dr_mtx_); filter_entry = findFilter(filter_handle); if (!filter_entry){ // Handle doesn't exist, return FAIL ReleaseLock(dr_mtx_); return FAIL; } control_blob = new ControlMessage(REMOVE_FILTER, filter_entry->handle_, 0); ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS, (void *)control_blob, sizeof(ControlMessage)); attrs = new NRAttrVec; attrs->push_back(ctrl_msg_attr); my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0, 0, pkt_count_, random_id_, LOCALHOST_ADDR, LOCALHOST_ADDR); // Increment pkt_counter pkt_count_++; // Add attributes to the message my_message->msg_attr_vec_ = attrs; my_message->num_attr_ = attrs->size(); my_message->data_len_ = CalculateSize(attrs); // Handle will be destroyed when next keepalive timer happens filter_entry->valid_ = false; // Send Packet sendMessageToDiffusion(my_message); // Release the lock ReleaseLock(dr_mtx_); // Delete message delete my_message; delete control_blob; return OK;}handle DiffusionRouting::addTimer(int timeout, TimerCallback *callback){ return (timers_manager_->addTimer(timeout, callback));}handle DiffusionRouting::addTimer(int timeout, void *p, TimerCallbacks *cb){ TimerCallback *callback; callback = new OldAPITimer(cb, p); return (addTimer(timeout, callback));}bool DiffusionRouting::removeTimer(handle hdl){ return (timers_manager_->removeTimer(hdl));}int DiffusionRouting::filterKeepaliveTimeout(FilterEntry *filter_entry){ FilterEntry *my_entry = NULL; ControlMessage *control_blob; NRAttribute *ctrl_msg_attr; NRAttrVec *attrs; Message *my_message; // Acquire lock first GetLock(dr_mtx_); if (filter_entry->valid_){ // Send keepalive control_blob = new ControlMessage(ADD_UPDATE_FILTER, filter_entry->priority_, filter_entry->handle_); ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS, (void *)control_blob, sizeof(ControlMessage)); attrs = CopyAttrs(filter_entry->filter_attrs_); attrs->push_back(ctrl_msg_attr); my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0, 0, pkt_count_, random_id_, LOCALHOST_ADDR, LOCALHOST_ADDR); // Increment pkt_counter pkt_count_++; // Add attributes to the message my_message->msg_attr_vec_ = attrs; my_message->num_attr_ = attrs->size(); my_message->data_len_ = CalculateSize(attrs); // Send Message sendMessageToDiffusion(my_message); delete my_message; delete control_blob; // Release lock ReleaseLock(dr_mtx_); // Reschedule another filter keepalive timer in event queue return (FILTER_KEEPALIVE_DELAY); } else{ // Filter was removed my_entry = deleteFilter(filter_entry->handle_); // We should have removed the correct handle if (my_entry != filter_entry){ DiffPrint(DEBUG_ALWAYS, "DiffusionRouting::KeepaliveTimeout: Handles should match !\n"); exit(-1); } delete my_entry; // Release lock ReleaseLock(dr_mtx_); return -1; }}int DiffusionRouting::interestTimeout(HandleEntry *handle_entry){ HandleEntry *my_handle = NULL; Message *my_message; // Acquire lock first GetLock(dr_mtx_); if (handle_entry->valid_){ // Send the interest message if entry is still valid my_message = new Message(DIFFUSION_VERSION, INTEREST, agent_id_, 0, 0, pkt_count_, random_id_, LOCALHOST_ADDR, LOCALHOST_ADDR); // Increment pkt_counter pkt_count_++; // Add attributes to the message my_message->msg_attr_vec_ = CopyAttrs(handle_entry->attrs_); my_message->num_attr_ = handle_entry->attrs_->size(); my_message->data_len_ = CalculateSize(handle_entry->attrs_); // Send Packet sendMessageToDiffusion(my_message); delete my_message; // Release lock ReleaseLock(dr_mtx_); // Reschedule this timer in the queue return (int) (floor(-1 * (log(1 - (GetRand() * 1.0 / RAND_MAX))) / INTEREST_LAMBDA)); } else{ // Interest was canceled. Just delete it from the handle_list my_handle = removeHandle(handle_entry->hdl_, &sub_list_); // We should have removed the correct handle if (my_handle != handle_entry){ DiffPrint(DEBUG_ALWAYS, "Error: interestTimeout: Handles should match !\n"); exit(-1); } delete my_handle; // Release lock ReleaseLock(dr_mtx_); // Delete timer from the queue return -1; }}int DiffusionRouting::sendMessage(Message *msg, handle h, u_int16_t priority){ RedirectMessage *original_hdr; NRAttribute *original_attr, *ctrl_msg_attr; ControlMessage *control_blob; NRAttrVec *attrs; Message *my_message; if ((priority < FILTER_MIN_PRIORITY) || (priority > FILTER_KEEP_PRIORITY)) return FAIL; // Create an attribute with the original header original_hdr = new RedirectMessage(msg->new_message_, msg->msg_type_, msg->source_port_, msg->data_len_, msg->num_attr_, msg->rdm_id_, msg->pkt_num_, msg->next_hop_, msg->last_hop_, 0, msg->next_port_); original_attr = OriginalHdrAttr.make(NRAttribute::IS, (void *)original_hdr, sizeof(RedirectMessage)); // Create the attribute with the control message control_blob = new ControlMessage(SEND_MESSAGE, h, priority); ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS, (void *)control_blob, sizeof(ControlMessage)); // Copy Attributes and add originalAttr and controlAttr attrs = CopyAttrs(msg->msg_attr_vec_); attrs->push_back(original_attr); attrs->push_back(ctrl_msg_attr); my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0, 0, pkt_count_, random_id_, LOCALHOST_ADDR, LOCALHOST_ADDR); // Increment pkt_counter pkt_count_++; // Add attributes to the message my_message->msg_attr_vec_ = attrs; my_message->num_attr_ = attrs->size(); my_message->data_len_ = CalculateSize(attrs); // Send Packet sendMessageToDiffusion(my_message); delete my_message; delete control_blob; delete original_hdr; return OK;}#ifndef NS_DIFFUSIONvoid DiffusionRouting::doIt(){ run(true, WAIT_FOREVER);}void DiffusionRouting::doOne(long timeout){ run(false, timeout);}void DiffusionRouting::run(bool wait_condition, long max_timeout){ DeviceList::iterator itr; int status, max_sock, fd; bool flag; DiffPacket in_pkt; fd_set fds; struct timeval tv; struct timeval max_tv; do{ FD_ZERO(&fds); max_sock = 0; // Set the maximum timeout value max_tv.tv_sec = (int) (max_timeout / 1000); max_tv.tv_usec = (int) ((max_timeout % 1000) * 1000); for (itr = in_devices_.begin(); itr != in_devices_.end(); ++itr){ (*itr)->addInFDS(&fds, &max_sock); } // Check for the next timer timers_manager_->nextTimerTime(&tv); if (tv.tv_sec == MAXVALUE){ // If we don't have any timers, we wait for POLLING_INTERVAL if (max_timeout == WAIT_FOREVER){ tv.tv_sec = POLLING_INTERVAL; tv.tv_usec = 0; } else{ tv = max_tv; } } else{ if ((max_timeout != WAIT_FOREVER) && (TimevalCmp(&tv, &max_tv) > 0)){ // max_timeout value is smaller than next timer's time, so we // use themax_timeout value instead tv = max_tv; } } status = select(max_sock+1, &fds, NULL, NULL, &tv); if (status == 0){ // Process all timers that have expired timers_manager_->executeAllExpiredTimers(); } if (status > 0){ do{ flag = false; for (itr = in_devices_.begin(); itr != in_devices_.end(); ++itr){ fd = (*itr)->checkInFDS(&fds); if (fd != -1){ // Message waiting in_pkt = (*itr)->recvPacket(fd); recvPacket(in_pkt); // Clear this fd FD_CLR(fd, &fds); status--; flag = true;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -