📄 dr.cc
字号:
//// dr.cc : Diffusion Routing Class// authors : John Heidemann and Fabio Silva//// Copyright (C) 2000-2003 by the University of Southern California// $Id: dr.cc,v 1.16 2004/01/08 23:05:53 haldar Exp $//// This program is free software; you can redistribute it and/or// modify it under the terms of the GNU General Public License,// version 2, as published by the Free Software Foundation.//// This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the// GNU General Public License for more details.//// You should have received a copy of the GNU General Public License along// with this program; if not, write to the Free Software Foundation, Inc.,// 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.////#include <stdlib.h>#include <stdio.h>#include "dr.hh"class CallbackEntry {public: NR::Callback *cb_; NR::handle subscription_handle_; CallbackEntry(NR::Callback *cb, NR::handle subscription_handle) : cb_(cb), subscription_handle_(subscription_handle) {};};class HandleEntry {public: handle hdl_; bool valid_; NRAttrVec *attrs_; NR::Callback *cb_; struct timeval exploratory_time_; int32_t subscription_id_; // Used for One-Phase Pull HandleEntry() { GetTime(&exploratory_time_); valid_ = true; cb_ = NULL; }; ~HandleEntry(){ ClearAttrs(attrs_); delete attrs_; };};int InterestCallback::expire(){ int retval; // Call the interestTimeout function retval = drt_->interestTimeout(handle_entry_); if (retval < 0) delete this; return retval;}int FilterKeepaliveCallback::expire(){ int retval; // Call the filterTimeout function retval = drt_->filterKeepaliveTimeout(filter_entry_); if (retval < 0) delete this; return retval;}int OldAPITimer::expire(){ int retval; // Call the callback function with the provided API retval = cb_->expire(0, p_); if (retval < 0) delete this; return retval;}#ifdef NS_DIFFUSIONclass DiffEventQueue;int DiffusionRouting::getNodeId() { return node_->address();}int DiffusionRouting::getAgentId(int id) { if (id != -1) agent_id_ = id; return agent_id_;}NR * NR::create_ns_NR(u_int16_t port, DiffAppAgent *da) { return(new DiffusionRouting(port, da));}#elseNR *dr = NULL;#ifdef USE_THREADSvoid * ReceiveThread(void *dr){ // Never returns ((DiffusionRouting *)dr)->run(true, WAIT_FOREVER); return NULL;}#endif // USE_THREADSNR * NR::createNR(u_int16_t port){ // Create Diffusion Routing Class if (dr) return dr; dr = new DiffusionRouting(port);#ifdef USE_THREADS int retval; pthread_t thread; // Fork a thread for receiving Messages retval = pthread_create(&thread, NULL, &ReceiveThread, (void *)dr); if (retval){ DiffPrint(DEBUG_ALWAYS, "Error creating receiving thread ! Aborting...\n"); exit(-1); }#endif // USE_THREADS return dr;}#endif // NS_DIFFUSIONvoid GetLock(pthread_mutex_t *mutex){#ifdef USE_THREADS pthread_mutex_lock(mutex);#endif // USE_THREADS}void ReleaseLock(pthread_mutex_t *mutex){#ifdef USE_THREADS pthread_mutex_unlock(mutex);#endif // USE_THREADS}#ifdef NS_DIFFUSIONDiffusionRouting::DiffusionRouting(u_int16_t port, DiffAppAgent *da){#elseDiffusionRouting::DiffusionRouting(u_int16_t port){#ifdef USE_EMSIM char *sim_id; char *sim_group;#endif // USE_EMSIM#endif // NS_DIFFUSION struct timeval tv; DiffusionIO *device; // Initialize basic stuff next_handle_ = 1; GetTime(&tv); SetSeed(&tv); pkt_count_ = GetRand(); random_id_ = GetRand(); agent_id_ = 0; if (port == 0) port = DEFAULT_DIFFUSION_PORT; diffusion_port_ = port;#ifdef USE_EMSIM // Check if we are running in the emstar simulator sim_id = getenv("SIM_ID"); sim_group = getenv("SIM_GROUP"); // Update diffusion port if running inside the simulator if (sim_id && sim_group){ diffusion_port_ = diffusion_port_ + atoi(sim_id) + (100 * atoi(sim_group)); }#endif // USE_EMSIM // Initialize timer manager timers_manager_ = new TimerManager; // Initialize input device#ifdef NS_DIFFUSION device = new NsLocal(da); local_out_devices_.push_back(device);#endif // NS_DIFFUSION#ifdef UDP device = new UDPLocal(&agent_id_); in_devices_.push_back(device); local_out_devices_.push_back(device);#endif // UDP // Print initialization message DiffPrint(DEBUG_ALWAYS, "Diffusion Routing Agent initializing... Agent Id = %d\n", agent_id_);#ifdef USE_THREADS // Initialize Semaphores dr_mtx_ = new pthread_mutex_t; pthread_mutex_init(dr_mtx_, NULL);#endif // USE_THREADS}DiffusionRouting::~DiffusionRouting(){ HandleList::iterator itr; HandleEntry *current; // Delete all Handles for (itr = sub_list_.begin(); itr != sub_list_.end(); ++itr){ current = *itr; delete current; } for (itr = pub_list_.begin(); itr != pub_list_.end(); ++itr){ current = *itr; delete current; }}handle DiffusionRouting::subscribe(NRAttrVec *subscribe_attrs, NR::Callback *cb){ NRSimpleAttribute<int> *nr_algorithm = NULL; TimerCallback *timer_callback; NRAttribute *scope_attr; HandleEntry *my_handle; // Get lock first GetLock(dr_mtx_); // Check the published attributes if (!checkSubscription(subscribe_attrs)){ DiffPrint(DEBUG_ALWAYS, "Error : Invalid class/scope attributes in the subscribe attributes !\n"); ReleaseLock(dr_mtx_); return FAIL; } // Create and Initialize the handle_entry structute my_handle = new HandleEntry; my_handle->hdl_ = next_handle_; next_handle_++; my_handle->cb_ = (NR::Callback *) cb; sub_list_.push_back(my_handle); // Copy the attributes my_handle->attrs_ = CopyAttrs(subscribe_attrs); // For subscriptions, scope is global if not specified if (!hasScope(subscribe_attrs)){ scope_attr = NRScopeAttr.make(NRAttribute::IS, NRAttribute::GLOBAL_SCOPE); my_handle->attrs_->push_back(scope_attr); } // For One-Phase Pull, we need a subscription id nr_algorithm = NRAlgorithmAttr.find(subscribe_attrs); if (nr_algorithm && nr_algorithm->getVal() == NRAttribute::ONE_PHASE_PULL_ALGORITHM){ my_handle->subscription_id_ = GetRand(); my_handle->attrs_->push_back(NRSubscriptionAttr.make(NRAttribute::IS, my_handle->subscription_id_)); } // Create Interest Timer and add it to the queue timer_callback = new InterestCallback(this, my_handle); timers_manager_->addTimer(SMALL_TIMEOUT, timer_callback); // Release lock ReleaseLock(dr_mtx_); return my_handle->hdl_;}int DiffusionRouting::unsubscribe(handle subscription_handle){ HandleEntry *my_handle = NULL; // Get the lock first GetLock(dr_mtx_); my_handle = findHandle(subscription_handle, &sub_list_); if (!my_handle){ // Handle doesn't exist, return FAIL ReleaseLock(dr_mtx_); return FAIL; } // Handle will be destroyed when next interest timeout happens my_handle->valid_ = false; // Release the lock ReleaseLock(dr_mtx_); return OK;}handle DiffusionRouting::publish(NRAttrVec *publish_attrs){ HandleEntry *my_handle; NRAttribute *scope_attr; // Get the lock first GetLock(dr_mtx_); // Check the published attributes if (!checkPublication(publish_attrs)){ DiffPrint(DEBUG_ALWAYS, "Error : Invalid class/scope attributes in the publish attributes !\n"); ReleaseLock(dr_mtx_); return FAIL; } // Create and Initialize the handle_entry structute my_handle = new HandleEntry; my_handle->hdl_ = next_handle_; next_handle_++; pub_list_.push_back(my_handle); // Copy the attributes my_handle->attrs_ = CopyAttrs(publish_attrs); // For publications, scope is local if not specified if (!hasScope(publish_attrs)){ scope_attr = NRScopeAttr.make(NRAttribute::IS, NRAttribute::NODE_LOCAL_SCOPE); my_handle->attrs_->push_back(scope_attr); } // Release the lock ReleaseLock(dr_mtx_); return my_handle->hdl_;}int DiffusionRouting::unpublish(handle publication_handle){ HandleEntry *my_handle = NULL; // Get the lock first GetLock(dr_mtx_); my_handle = removeHandle(publication_handle, &pub_list_); if (!my_handle){ // Handle doesn't exist, return FAIL ReleaseLock(dr_mtx_); return FAIL; } // Free structures delete my_handle; // Release the lock ReleaseLock(dr_mtx_); return OK;}int DiffusionRouting::send(handle publication_handle, NRAttrVec *send_attrs){ NRSimpleAttribute<int> *nr_algorithm = NULL; NRSimpleAttribute<int> *rmst_id_attr = NULL; int8_t send_message_type = DATA; struct timeval current_time; HandleEntry *my_handle; Message *my_message; // Get the lock first GetLock(dr_mtx_); // Get attributes associated with handle my_handle = findHandle(publication_handle, &pub_list_); if (!my_handle){ ReleaseLock(dr_mtx_); return FAIL; } // Check the send attributes if (!checkSend(send_attrs)){ DiffPrint(DEBUG_ALWAYS, "Error : Invalid class/scope attributes in send attributes !\n"); ReleaseLock(dr_mtx_); return FAIL; } // Check if it is time to send another exploratory data message GetTime(¤t_time); // Check algorithms nr_algorithm = NRAlgorithmAttr.find(my_handle->attrs_); rmst_id_attr = RmstIdAttr.find(send_attrs); if (!nr_algorithm && !rmst_id_attr || nr_algorithm && nr_algorithm->getVal() != NRAttribute::ONE_PHASE_PULL_ALGORITHM){ // In One-Phase Pull, there are no exploratory messages if (TimevalCmp(¤t_time, &(my_handle->exploratory_time_)) >= 0){ // Check if it is a push data message or a regular data message if (isPushData(my_handle->attrs_)){ // Push data message // Update time for the next push exploratory message GetTime(&(my_handle->exploratory_time_)); my_handle->exploratory_time_.tv_sec += PUSH_EXPLORATORY_DELAY; send_message_type = PUSH_EXPLORATORY_DATA; } else{ // Regular data message // Update time for the next exploratory message GetTime(&(my_handle->exploratory_time_)); my_handle->exploratory_time_.tv_sec += EXPLORATORY_DATA_DELAY; send_message_type = EXPLORATORY_DATA; } } } // Initialize message structure my_message = new Message(DIFFUSION_VERSION, send_message_type, agent_id_, 0, 0, pkt_count_, random_id_, LOCALHOST_ADDR, LOCALHOST_ADDR); // Increment pkt_counter pkt_count_++; // First, we duplicate the 'publish' attributes my_message->msg_attr_vec_ = CopyAttrs(my_handle->attrs_); // Now, we add the send attributes AddAttrs(my_message->msg_attr_vec_, send_attrs); // Compute the total number and size of the joined attribute sets my_message->num_attr_ = my_message->msg_attr_vec_->size(); my_message->data_len_ = CalculateSize(my_message->msg_attr_vec_); // Release the lock ReleaseLock(dr_mtx_); // Send Packet sendMessageToDiffusion(my_message); delete my_message; return OK;}int DiffusionRouting::sendRmst(handle publication_handle, NRAttrVec *send_attrs, int fragment_size){ NRSimpleAttribute<void *> *rmst_data_attr; NRSimpleAttribute<int> *frag_number_attr; NRSimpleAttribute<int> *max_frag_attr; void *frag_ptr, *blob_ptr; char *blob; timeval send_interval; int retval; int id = GetRand() % 500; int size; int num_frag; int max_frag_len; // Find RMST blob to send rmst_data_attr = RmstDataAttr.find(send_attrs); // We must have a RMST data attribute to send if(!rmst_data_attr){ DiffPrint(DEBUG_ALWAYS, "sendRMST - can't find blob to send !\n"); return FAIL; } // Copy RMST blob and calculate number of fragments blob_ptr = rmst_data_attr->getVal(); size = rmst_data_attr->getLen(); blob = new char[size]; memcpy((void *)blob, blob_ptr, size); num_frag = (size + fragment_size - 1) / fragment_size; // We index starting at zero num_frag--; max_frag_len = size - (num_frag * fragment_size); DiffPrint(DEBUG_DETAILS, "sendRMST: rmst num_frag = %d, fragment_size = %d, max_frag_len = %d\n", num_frag, fragment_size, max_frag_len); // Prepare attribute vector with RMST attributes
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -