📄 dr.cc
字号:
//// dr.cc : Diffusion Routing Class// authors : John Heidemann and Fabio Silva//// Copyright (C) 2000-2002 by the University of Southern California// $Id: dr.cc,v 1.13 2002/10/08 07:11:32 difa Exp $//// This program is free software; you can redistribute it and/or// modify it under the terms of the GNU General Public License,// version 2, as published by the Free Software Foundation.//// This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the// GNU General Public License for more details.//// You should have received a copy of the GNU General Public License along// with this program; if not, write to the Free Software Foundation, Inc.,// 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.////#include <stdlib.h>#include <stdio.h>#include "dr.hh"class CallbackEntry {public: NR::Callback *cb_; NR::handle subscription_handle_; CallbackEntry(NR::Callback *cb, NR::handle subscription_handle) : cb_(cb), subscription_handle_(subscription_handle) {};};class HandleEntry {public: handle hdl_; bool valid_; NRAttrVec *attrs_; NR::Callback *cb_; struct timeval exploratory_time_; HandleEntry() { GetTime(&exploratory_time_); valid_ = true; cb_ = NULL; }; ~HandleEntry(){ ClearAttrs(attrs_); delete attrs_; };};int InterestCallback::expire(){ int retval; // Call the interestTimeout function retval = drt_->interestTimeout(handle_entry_); if (retval < 0) delete this; return retval;}int FilterKeepaliveCallback::expire(){ int retval; // Call the filterTimeout function retval = drt_->filterKeepaliveTimeout(filter_entry_); if (retval < 0) delete this; return retval;}int OldAPITimer::expire(){ int retval; // Call the callback function with the provided API retval = cb_->expire(0, p_); if (retval < 0) delete this; return retval;}#ifdef NS_DIFFUSIONclass DiffEventQueue;int DiffusionRouting::getAgentId(int id /* = -1 */) { if (id != -1) agent_id_ = id; return agent_id_;}NR * NR::create_ns_NR(u_int16_t port, DiffAppAgent *da) { return(new DiffusionRouting(port, da));}#elseNR *dr = NULL;#ifdef USE_THREADSvoid * ReceiveThread(void *dr){ // Never returns ((DiffusionRouting *)dr)->run(true, WAIT_FOREVER); return NULL;}#endif // USE_THREADSNR * NR::createNR(u_int16_t port){ // Create Diffusion Routing Class if (dr) return dr; dr = new DiffusionRouting(port);#ifdef USE_THREADS int retval; pthread_t thread; // Fork a thread for receiving Messages retval = pthread_create(&thread, NULL, &ReceiveThread, (void *)dr); if (retval){ DiffPrint(DEBUG_ALWAYS, "Error creating receiving thread ! Aborting...\n"); exit(-1); }#endif // USE_THREADS return dr;}#endif // NS_DIFFUSIONvoid GetLock(pthread_mutex_t *mutex){#ifdef USE_THREADS pthread_mutex_lock(mutex);#endif // USE_THREADS}void ReleaseLock(pthread_mutex_t *mutex){#ifdef USE_THREADS pthread_mutex_unlock(mutex);#endif // USE_THREADS}#ifdef NS_DIFFUSIONDiffusionRouting::DiffusionRouting(u_int16_t port, DiffAppAgent *da)#elseDiffusionRouting::DiffusionRouting(u_int16_t port)#endif // NS_DIFFUSION{ struct timeval tv; DiffusionIO *device; // Initialize basic stuff next_handle_ = 1; GetTime(&tv); SetSeed(&tv); pkt_count_ = GetRand(); random_id_ = GetRand(); agent_id_ = 0; if (port == 0) port = DEFAULT_DIFFUSION_PORT; diffusion_port_ = port; // Initialize timer manager timers_manager_ = new TimerManager; // Initialize input device#ifdef NS_DIFFUSION device = new NsLocal(da); local_out_devices_.push_back(device);#endif // NS_DIFFUSION#ifdef UDP device = new UDPLocal(&agent_id_); in_devices_.push_back(device); local_out_devices_.push_back(device);#endif // UDP // Print initialization message DiffPrint(DEBUG_ALWAYS, "Diffusion Routing Agent initializing... Agent Id = %d\n", agent_id_);#ifdef USE_THREADS // Initialize Semaphores dr_mtx_ = new pthread_mutex_t; pthread_mutex_init(dr_mtx_, NULL);#endif // USE_THREADS}DiffusionRouting::~DiffusionRouting(){ HandleList::iterator itr; HandleEntry *current; // Delete all Handles for (itr = sub_list_.begin(); itr != sub_list_.end(); ++itr){ current = *itr; delete current; } for (itr = pub_list_.begin(); itr != pub_list_.end(); ++itr){ current = *itr; delete current; }}handle DiffusionRouting::subscribe(NRAttrVec *subscribe_attrs, NR::Callback *cb){ HandleEntry *my_handle; NRAttribute *scope_attr; TimerCallback *timer_callback; // Get lock first GetLock(dr_mtx_); // Check the published attributes if (!checkSubscription(subscribe_attrs)){ DiffPrint(DEBUG_ALWAYS, "Error : Invalid class/scope attributes in the subscribe attributes !\n"); ReleaseLock(dr_mtx_); return FAIL; } // Create and Initialize the handle_entry structute my_handle = new HandleEntry; my_handle->hdl_ = next_handle_; next_handle_++; my_handle->cb_ = (NR::Callback *) cb; sub_list_.push_back(my_handle); // Copy the attributes my_handle->attrs_ = CopyAttrs(subscribe_attrs); // For subscriptions, scope is global if not specified if (!hasScope(subscribe_attrs)){ scope_attr = NRScopeAttr.make(NRAttribute::IS, NRAttribute::GLOBAL_SCOPE); my_handle->attrs_->push_back(scope_attr); } // Create Interest Timer and add it to the queue timer_callback = new InterestCallback(this, my_handle); timers_manager_->addTimer(SMALL_TIMEOUT, timer_callback); // Release lock ReleaseLock(dr_mtx_); return my_handle->hdl_;}int DiffusionRouting::unsubscribe(handle subscription_handle){ HandleEntry *my_handle = NULL; // Get the lock first GetLock(dr_mtx_); my_handle = findHandle(subscription_handle, &sub_list_); if (!my_handle){ // Handle doesn't exist, return FAIL ReleaseLock(dr_mtx_); return FAIL; } // Handle will be destroyed when next interest timeout happens my_handle->valid_ = false; // Release the lock ReleaseLock(dr_mtx_); return OK;}handle DiffusionRouting::publish(NRAttrVec *publish_attrs){ HandleEntry *my_handle; NRAttribute *scope_attr; // Get the lock first GetLock(dr_mtx_); // Check the published attributes if (!checkPublication(publish_attrs)){ DiffPrint(DEBUG_ALWAYS, "Error : Invalid class/scope attributes in the publish attributes !\n"); ReleaseLock(dr_mtx_); return FAIL; } // Create and Initialize the handle_entry structute my_handle = new HandleEntry; my_handle->hdl_ = next_handle_; next_handle_++; pub_list_.push_back(my_handle); // Copy the attributes my_handle->attrs_ = CopyAttrs(publish_attrs); // For publications, scope is local if not specified if (!hasScope(publish_attrs)){ scope_attr = NRScopeAttr.make(NRAttribute::IS, NRAttribute::NODE_LOCAL_SCOPE); my_handle->attrs_->push_back(scope_attr); } // Release the lock ReleaseLock(dr_mtx_); return my_handle->hdl_;}int DiffusionRouting::unpublish(handle publication_handle){ HandleEntry *my_handle = NULL; // Get the lock first GetLock(dr_mtx_); my_handle = removeHandle(publication_handle, &pub_list_); if (!my_handle){ // Handle doesn't exist, return FAIL ReleaseLock(dr_mtx_); return FAIL; } // Free structures delete my_handle; // Release the lock ReleaseLock(dr_mtx_); return OK;}int DiffusionRouting::send(handle publication_handle, NRAttrVec *send_attrs){ Message *my_message; HandleEntry *my_handle; int8_t send_message_type = DATA; struct timeval current_time; // Get the lock first GetLock(dr_mtx_); // Get attributes associated with handle my_handle = findHandle(publication_handle, &pub_list_); if (!my_handle){ ReleaseLock(dr_mtx_); return FAIL; } // Check the send attributes if (!checkSend(send_attrs)){ DiffPrint(DEBUG_ALWAYS, "Error : Invalid class/scope attributes in the send attributes !\n"); ReleaseLock(dr_mtx_); return FAIL; } // Check if it is time to send another exploratory data message GetTime(¤t_time); if (TimevalCmp(¤t_time, &(my_handle->exploratory_time_)) >= 0){ // Check if it is a push data message or a regular data message if (isPushData(my_handle->attrs_)){ // Push data message // Update time for the next push exploratory message GetTime(&(my_handle->exploratory_time_)); my_handle->exploratory_time_.tv_sec += PUSH_EXPLORATORY_DELAY; send_message_type = PUSH_EXPLORATORY_DATA; } else{ // Regular data message // Update time for the next exploratory message GetTime(&(my_handle->exploratory_time_)); my_handle->exploratory_time_.tv_sec += EXPLORATORY_DATA_DELAY; send_message_type = EXPLORATORY_DATA; } } // Initialize message structure my_message = new Message(DIFFUSION_VERSION, send_message_type, agent_id_, 0, 0, pkt_count_, random_id_, LOCALHOST_ADDR, LOCALHOST_ADDR); // Increment pkt_counter pkt_count_++; // First, we duplicate the 'publish' attributes my_message->msg_attr_vec_ = CopyAttrs(my_handle->attrs_); // Now, we add the send attributes AddAttrs(my_message->msg_attr_vec_, send_attrs);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -