⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dr.cc

📁 ns-2的文件包。多多下载
💻 CC
📖 第 1 页 / 共 3 页
字号:
//// dr.cc           : Diffusion Routing Class// authors         : John Heidemann and Fabio Silva//// Copyright (C) 2000-2003 by the University of Southern California// $Id: dr.cc,v 1.16 2004/01/08 23:05:53 haldar Exp $//// This program is free software; you can redistribute it and/or// modify it under the terms of the GNU General Public License,// version 2, as published by the Free Software Foundation.//// This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the// GNU General Public License for more details.//// You should have received a copy of the GNU General Public License along// with this program; if not, write to the Free Software Foundation, Inc.,// 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.////#include <stdlib.h>#include <stdio.h>#include "dr.hh"class CallbackEntry {public:  NR::Callback *cb_;  NR::handle subscription_handle_;  CallbackEntry(NR::Callback *cb, NR::handle subscription_handle) :    cb_(cb), subscription_handle_(subscription_handle) {};};class HandleEntry {public:  handle hdl_;  bool valid_;  NRAttrVec *attrs_;  NR::Callback *cb_;  struct timeval exploratory_time_;  int32_t subscription_id_; // Used for One-Phase Pull  HandleEntry()  {    GetTime(&exploratory_time_);    valid_ = true;    cb_ = NULL;  };  ~HandleEntry(){    ClearAttrs(attrs_);    delete attrs_;  };};int InterestCallback::expire(){  int retval;  // Call the interestTimeout function  retval = drt_->interestTimeout(handle_entry_);  if (retval < 0)    delete this;  return retval;}int FilterKeepaliveCallback::expire(){  int retval;  // Call the filterTimeout function  retval = drt_->filterKeepaliveTimeout(filter_entry_);  if (retval < 0)    delete this;  return retval;}int OldAPITimer::expire(){  int retval;  // Call the callback function with the provided API  retval = cb_->expire(0, p_);  if (retval < 0)    delete this;  return retval;}#ifdef NS_DIFFUSIONclass DiffEventQueue;int DiffusionRouting::getNodeId() {  return node_->address();}int DiffusionRouting::getAgentId(int id) {  if (id != -1)    agent_id_ = id;  return agent_id_;}NR * NR::create_ns_NR(u_int16_t port, DiffAppAgent *da) {  return(new DiffusionRouting(port, da));}#elseNR *dr = NULL;#ifdef USE_THREADSvoid * ReceiveThread(void *dr){  // Never returns  ((DiffusionRouting *)dr)->run(true, WAIT_FOREVER);  return NULL;}#endif // USE_THREADSNR * NR::createNR(u_int16_t port){  // Create Diffusion Routing Class  if (dr)    return dr;  dr = new DiffusionRouting(port);#ifdef USE_THREADS  int retval;  pthread_t thread;  // Fork a thread for receiving Messages  retval = pthread_create(&thread, NULL, &ReceiveThread, (void *)dr);  if (retval){    DiffPrint(DEBUG_ALWAYS, "Error creating receiving thread ! Aborting...\n");    exit(-1);  }#endif // USE_THREADS  return dr;}#endif // NS_DIFFUSIONvoid GetLock(pthread_mutex_t *mutex){#ifdef USE_THREADS  pthread_mutex_lock(mutex);#endif // USE_THREADS}void ReleaseLock(pthread_mutex_t *mutex){#ifdef USE_THREADS  pthread_mutex_unlock(mutex);#endif // USE_THREADS}#ifdef NS_DIFFUSIONDiffusionRouting::DiffusionRouting(u_int16_t port, DiffAppAgent *da){#elseDiffusionRouting::DiffusionRouting(u_int16_t port){#ifdef USE_EMSIM  char *sim_id;  char *sim_group;#endif // USE_EMSIM#endif // NS_DIFFUSION  struct timeval tv;  DiffusionIO *device;  // Initialize basic stuff  next_handle_ = 1;  GetTime(&tv);  SetSeed(&tv);  pkt_count_ = GetRand();  random_id_ = GetRand();  agent_id_ = 0;  if (port == 0)    port = DEFAULT_DIFFUSION_PORT;  diffusion_port_ = port;#ifdef USE_EMSIM  // Check if we are running in the emstar simulator  sim_id = getenv("SIM_ID");  sim_group = getenv("SIM_GROUP");  // Update diffusion port if running inside the simulator  if (sim_id && sim_group){    diffusion_port_ = diffusion_port_ + atoi(sim_id) + (100 * atoi(sim_group));  }#endif // USE_EMSIM  // Initialize timer manager  timers_manager_ = new TimerManager;  // Initialize input device#ifdef NS_DIFFUSION  device = new NsLocal(da);  local_out_devices_.push_back(device);#endif // NS_DIFFUSION#ifdef UDP  device = new UDPLocal(&agent_id_);  in_devices_.push_back(device);  local_out_devices_.push_back(device);#endif // UDP  // Print initialization message  DiffPrint(DEBUG_ALWAYS,	    "Diffusion Routing Agent initializing... Agent Id = %d\n",	    agent_id_);#ifdef USE_THREADS  // Initialize Semaphores  dr_mtx_ = new pthread_mutex_t;  pthread_mutex_init(dr_mtx_, NULL);#endif // USE_THREADS}DiffusionRouting::~DiffusionRouting(){  HandleList::iterator itr;  HandleEntry *current;  // Delete all Handles  for (itr = sub_list_.begin(); itr != sub_list_.end(); ++itr){    current = *itr;    delete current;  }  for (itr = pub_list_.begin(); itr != pub_list_.end(); ++itr){    current = *itr;    delete current;  }}handle DiffusionRouting::subscribe(NRAttrVec *subscribe_attrs, NR::Callback *cb){  NRSimpleAttribute<int> *nr_algorithm = NULL;  TimerCallback *timer_callback;  NRAttribute *scope_attr;  HandleEntry *my_handle;  // Get lock first  GetLock(dr_mtx_);  // Check the published attributes  if (!checkSubscription(subscribe_attrs)){    DiffPrint(DEBUG_ALWAYS, "Error : Invalid class/scope attributes in the subscribe attributes !\n");    ReleaseLock(dr_mtx_);    return FAIL;  }  // Create and Initialize the handle_entry structute  my_handle = new HandleEntry;  my_handle->hdl_ = next_handle_;  next_handle_++;  my_handle->cb_ = (NR::Callback *) cb;  sub_list_.push_back(my_handle);  // Copy the attributes  my_handle->attrs_ = CopyAttrs(subscribe_attrs);  // For subscriptions, scope is global if not specified  if (!hasScope(subscribe_attrs)){    scope_attr = NRScopeAttr.make(NRAttribute::IS, NRAttribute::GLOBAL_SCOPE);    my_handle->attrs_->push_back(scope_attr);  }  // For One-Phase Pull, we need a subscription id  nr_algorithm = NRAlgorithmAttr.find(subscribe_attrs);  if (nr_algorithm &&      nr_algorithm->getVal() == NRAttribute::ONE_PHASE_PULL_ALGORITHM){    my_handle->subscription_id_ = GetRand();    my_handle->attrs_->push_back(NRSubscriptionAttr.make(NRAttribute::IS,							 my_handle->subscription_id_));  }  // Create Interest Timer and add it to the queue  timer_callback = new InterestCallback(this, my_handle);  timers_manager_->addTimer(SMALL_TIMEOUT, timer_callback);  // Release lock  ReleaseLock(dr_mtx_);  return my_handle->hdl_;}int DiffusionRouting::unsubscribe(handle subscription_handle){  HandleEntry *my_handle = NULL;  // Get the lock first  GetLock(dr_mtx_);  my_handle = findHandle(subscription_handle, &sub_list_);  if (!my_handle){    // Handle doesn't exist, return FAIL    ReleaseLock(dr_mtx_);    return FAIL;  }  // Handle will be destroyed when next interest timeout happens  my_handle->valid_ = false;  // Release the lock  ReleaseLock(dr_mtx_);  return OK;}handle DiffusionRouting::publish(NRAttrVec *publish_attrs){  HandleEntry *my_handle;  NRAttribute *scope_attr;  // Get the lock first  GetLock(dr_mtx_);  // Check the published attributes  if (!checkPublication(publish_attrs)){    DiffPrint(DEBUG_ALWAYS, "Error : Invalid class/scope attributes in the publish attributes !\n");    ReleaseLock(dr_mtx_);    return FAIL;  }  // Create and Initialize the handle_entry structute  my_handle = new HandleEntry;  my_handle->hdl_ = next_handle_;  next_handle_++;  pub_list_.push_back(my_handle);  // Copy the attributes  my_handle->attrs_ = CopyAttrs(publish_attrs);  // For publications, scope is local if not specified  if (!hasScope(publish_attrs)){    scope_attr = NRScopeAttr.make(NRAttribute::IS, NRAttribute::NODE_LOCAL_SCOPE);    my_handle->attrs_->push_back(scope_attr);  }  // Release the lock  ReleaseLock(dr_mtx_);  return my_handle->hdl_;}int DiffusionRouting::unpublish(handle publication_handle){  HandleEntry *my_handle = NULL;  // Get the lock first  GetLock(dr_mtx_);  my_handle = removeHandle(publication_handle, &pub_list_);  if (!my_handle){    // Handle doesn't exist, return FAIL    ReleaseLock(dr_mtx_);    return FAIL;  }  // Free structures  delete my_handle;  // Release the lock  ReleaseLock(dr_mtx_);  return OK;}int DiffusionRouting::send(handle publication_handle,			   NRAttrVec *send_attrs){  NRSimpleAttribute<int> *nr_algorithm = NULL;  NRSimpleAttribute<int> *rmst_id_attr = NULL;  int8_t send_message_type = DATA;  struct timeval current_time;  HandleEntry *my_handle;  Message *my_message;  // Get the lock first  GetLock(dr_mtx_);  // Get attributes associated with handle  my_handle = findHandle(publication_handle, &pub_list_);  if (!my_handle){    ReleaseLock(dr_mtx_);    return FAIL;  }  // Check the send attributes  if (!checkSend(send_attrs)){    DiffPrint(DEBUG_ALWAYS,	      "Error : Invalid class/scope attributes in send attributes !\n");    ReleaseLock(dr_mtx_);    return FAIL;  }  // Check if it is time to send another exploratory data message  GetTime(&current_time);  // Check algorithms  nr_algorithm = NRAlgorithmAttr.find(my_handle->attrs_);  rmst_id_attr = RmstIdAttr.find(send_attrs);  if (!nr_algorithm && !rmst_id_attr || nr_algorithm &&      nr_algorithm->getVal() != NRAttribute::ONE_PHASE_PULL_ALGORITHM){    // In One-Phase Pull, there are no exploratory messages    if (TimevalCmp(&current_time, &(my_handle->exploratory_time_)) >= 0){      // Check if it is a push data message or a regular data message      if (isPushData(my_handle->attrs_)){	// Push data message	// Update time for the next push exploratory message	GetTime(&(my_handle->exploratory_time_));	my_handle->exploratory_time_.tv_sec += PUSH_EXPLORATORY_DELAY;	send_message_type = PUSH_EXPLORATORY_DATA;      }      else{	// Regular data message	// Update time for the next exploratory message	GetTime(&(my_handle->exploratory_time_));	my_handle->exploratory_time_.tv_sec += EXPLORATORY_DATA_DELAY;    	send_message_type = EXPLORATORY_DATA;      }    }  }  // Initialize message structure  my_message = new Message(DIFFUSION_VERSION, send_message_type, agent_id_,			   0, 0, pkt_count_, random_id_, LOCALHOST_ADDR,			   LOCALHOST_ADDR);  // Increment pkt_counter  pkt_count_++;  // First, we duplicate the 'publish' attributes  my_message->msg_attr_vec_ = CopyAttrs(my_handle->attrs_);  // Now, we add the send attributes  AddAttrs(my_message->msg_attr_vec_, send_attrs);  // Compute the total number and size of the joined attribute sets  my_message->num_attr_ = my_message->msg_attr_vec_->size();  my_message->data_len_ = CalculateSize(my_message->msg_attr_vec_);  // Release the lock  ReleaseLock(dr_mtx_);  // Send Packet  sendMessageToDiffusion(my_message);  delete my_message;  return OK;}int DiffusionRouting::sendRmst(handle publication_handle,			       NRAttrVec *send_attrs, int fragment_size){  NRSimpleAttribute<void *> *rmst_data_attr;  NRSimpleAttribute<int> *frag_number_attr;  NRSimpleAttribute<int> *max_frag_attr;  void *frag_ptr, *blob_ptr;  char *blob;  timeval send_interval;  int retval;  int id = GetRand() % 500;  int size;  int num_frag;  int max_frag_len;  // Find RMST blob to send  rmst_data_attr = RmstDataAttr.find(send_attrs);  // We must have a RMST data attribute to send  if(!rmst_data_attr){    DiffPrint(DEBUG_ALWAYS, "sendRMST - can't find blob to send !\n");    return FAIL;  }  // Copy RMST blob and calculate number of fragments  blob_ptr = rmst_data_attr->getVal();  size = rmst_data_attr->getLen();  blob = new char[size];  memcpy((void *)blob, blob_ptr, size);  num_frag = (size + fragment_size - 1) / fragment_size;  // We index starting at zero  num_frag--;  max_frag_len = size - (num_frag * fragment_size);  DiffPrint(DEBUG_DETAILS,	    "sendRMST: rmst num_frag = %d, fragment_size = %d, max_frag_len = %d\n",	    num_frag, fragment_size, max_frag_len);  // Prepare attribute vector with RMST attributes

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -