⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rt_corba_workers.cpp

📁 ACE自适配通信环境(ADAPTIVE Communication Environment)是可以自由使用、开放源码的面向对象(OO)框架(Framework)
💻 CPP
字号:
// RT_CORBA_Workers.cpp,v 1.10 2005/01/21 02:19:20 ossama Exp#include "RT_CORBA_Workers.h"#if defined (ACE_HAS_THREADS)#include "ace/OS_main.h"#include "ace/ACE.h"#include "ace/Get_Opt.h"#include "ace/High_Res_Timer.h"#include "ace/Sched_Params.h"#include "ace/Lock_Adapter_T.h"// The number of messages that is being processedstatic size_t number_of_messages = 100;// The number of upcall threadsstatic size_t number_of_workers = 2;// The size of the messagestatic size_t message_size = 100;// Number of threads that are ready to gostatic size_t ready_threads = 0;// Number of input and output threadsstatic size_t io_threads = 2; // 1 for output and 1 for input// High resolution test timerstatic ACE_High_Res_Timer test_timer;// Debugging conditionstatic DEBUGGING_RANGE debug = DEBUG_NONE;// Data block used by the message blocksACE_Data_Block *data_block = 0;/*******************************************************************/// Constructor for SynchronisersSynchronisers::Synchronisers (void)  : mutex_ (),    event_ (){}intSynchronisers::start_synchronization (void){  // Hold the lock and increment the global variable to indicate  // number of ready threads  {    ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);    ready_threads ++;    if (ready_threads == (number_of_workers + io_threads))      {        // Reset the ready_threads so that we can wait at the end of        // runs        ready_threads = 0;        if (debug)          {            ACE_DEBUG ((LM_DEBUG,                        "(%P|%t) Ready to signal start \n"));          }        // Start the timer        test_timer.start ();        // Signal all the threads        this->event_.signal ();        // return to do our work;        return 0;      }    // If we are not the last thread, let go off the lock  }  if (debug)    {      ACE_DEBUG ((LM_DEBUG,                  "(%P|%t) Ready to wait () on event.. \n"));    }  // Wait blisfully till we are woken up  this->event_.wait ();  return 0;}intSynchronisers::end_synchronization (void){  // Hold the lock and increment the global variable to indicate  // number of ready threads  {    ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);    ready_threads ++;    if (ready_threads == (number_of_workers + io_threads))      {        // Reset the ready_threads so that we can wait at the end of        // runs        ready_threads = 0;        // Start the timer        test_timer.stop ();        // Signal all the threads        this->event_.signal ();        if (debug)          {            ACE_DEBUG ((LM_DEBUG,                        "(%P|%t) Ended peacefully \n"));          }        // return to do our work;        return 0;      }    // If we are not the last thread, let go off the lock  }  if (debug)    {      ACE_DEBUG ((LM_DEBUG,                  "(%P|%t) Going to wait .. \n"));    }  // Wait blisfully till we are woken up  this->event_.wait ();  return 0;}/*******************************************************************/Worker_Task::Worker_Task (Message_Queue *mq,                          Synchronisers &synch)  : ACE_Task<ACE_MT_SYNCH> (0, mq),  synch_ (synch),  messages_processed_ (0){}intWorker_Task::svc (void){  // Start synchronization  (void) this->synch_.start_synchronization ();  for (;;)    {      ACE_Message_Block *mb = 0;      int result = this->getq (mb);      if (result == -1)        {          ACE_ERROR_RETURN ((LM_ERROR,                             "Worker_Task::svc (%t) -> %p\n",                             "getq error"),                            -1);        }      // Get the flag in the message blok      ACE_Message_Block::Message_Flags flag =        mb->self_flags ();      // The stop flag      int stop_flag = 0;      // Check for the stop flag      if (ACE_BIT_ENABLED (flag,                           Synchronisers::MB_STOP_FLAG))        {          if (debug)            {              ACE_DEBUG ((LM_DEBUG,                          "(%P|%t) saw flag after [%d] messages\n",                          this->messages_processed_));            }          stop_flag = 1;        }      // Release the message block      mb->release ();      // Counter.      ++this->messages_processed_;      if (debug)        {          ACE_DEBUG ((LM_DEBUG,                      "(%P|%t) dequeued my %d message\n",                      this->messages_processed_));        }      //      // Process message here.      //      for (size_t j = 0; j < message_size; ++j)        {          // Eat a little CPU          /* takes about 40.2 usecs on a 167 MHz Ultra2 */          u_long n = 11UL;          ACE::is_prime (n, 2, n / 2);        }      // Make a message block for writing onto output queue      ACE_Message_Block *message_block = 0;      ACE_NEW_RETURN (message_block,                      ACE_Message_Block (data_block),                      -1);      // Put this message block into the next queue or the output      // queue      result = this->put_next (message_block);      if (result == -1)        {          ACE_ERROR_RETURN ((LM_ERROR,                             "Input::svc (%t) -> %p\n",                             "putq error"),                            -1);                            }      // If the stop_flag is set just break and wait..      if (stop_flag)        {          if (debug)            ACE_DEBUG ((LM_DEBUG,                        "(%P|%t) Got stop message after [%d] messages \n",                        this->messages_processed_));          break;        }    }  (void) this->synch_.end_synchronization ();  return 0;}intWorker_Task::processed (void){  return this->messages_processed_;}/*******************************************************************/Input_Task::Input_Task (Message_Queue *mq,                        Synchronisers &synch)  : ACE_Task<ACE_MT_SYNCH> (0, mq),    synch_ (synch){}intInput_Task::svc (void){  // Synchronise threads  (void) this->synch_.start_synchronization ();  size_t i = 0;  for (i = 0;       i < (number_of_messages - number_of_workers);       ++i)    {      // Make a message block      ACE_Message_Block *message_block = 0;      ACE_NEW_RETURN (message_block,                      ACE_Message_Block (data_block),                      -1);      int result = this->putq (message_block);      if (result == -1)        {          ACE_ERROR_RETURN ((LM_ERROR,                             "Input::svc (%t) -> %p\n",                             "putq error"),                            -1);        }      if (debug)        {          ACE_DEBUG ((LM_DEBUG,                      "(%t) Input thread ->  Sent [%d] messages\n",                      i));        }    }  if (debug)    {      ACE_DEBUG ((LM_DEBUG,                  "(%t) Sending close messages \n"));    }  // Stop messages  for (i = 0;       i < number_of_workers;       ++i)    {      // Make a message block      ACE_Message_Block *message_block = 0;      ACE_NEW_RETURN (message_block,                      ACE_Message_Block (data_block),                      -1);      // Set the stop flag in the message block and not in the datablock      message_block->set_self_flags (Synchronisers::MB_STOP_FLAG);      int result = this->putq (message_block);      if (result == -1)        {          ACE_ERROR_RETURN ((LM_ERROR,                             "Input::svc (%t) -> %p\n",                             "putq error"),                            -1);        }    }  (void) this->synch_.end_synchronization ();  return 0;}/*******************************************************************/Output_Task::Output_Task (Message_Queue *mq,                          Synchronisers &synch)  : ACE_Task<ACE_MT_SYNCH> (0, mq),    synch_ (synch){}intOutput_Task::svc (void){  // Synchronise threads  (void) this->synch_.start_synchronization ();  for (size_t i = 0;       i < number_of_messages;       ++i)    {      // Get the message block from queue      ACE_Message_Block *mb = 0;      int result = this->getq (mb);      // delete the message block      mb->release ();      if (debug)        {          ACE_DEBUG ((LM_DEBUG,                      "(%t) Output thread ->  received [%d] message\n",                      i));                      }      if (result == -1)        {          ACE_ERROR_RETURN ((LM_ERROR,                             "Input::svc (%t) -> %p\n",                             "putq error"),                            -1);        }    }  (void) this->synch_.end_synchronization ();  return 0;}intOutput_Task::put (ACE_Message_Block *mb, ACE_Time_Value *){  /*  if (debug)    {      ACE_DEBUG ((LM_DEBUG,                  "(%P|%t) Sticking message into "                  " output queue \n"));                  }*/  return this->putq (mb);}/*******************************************************************/static intparse_args (int argc, ACE_TCHAR *argv[]){  ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("m:s:t:d:"));  int c;  while ((c = get_opt ()) != -1)    {      switch (c)        {        case 'm':          number_of_messages = ACE_OS::atoi (get_opt.opt_arg ());          break;        case 't':          number_of_workers = ACE_OS::atoi (get_opt.opt_arg ());          break;        case 'd':          debug = static_cast<DEBUGGING_RANGE> (ACE_OS::atoi (get_opt.opt_arg ()));          break;        case 's':          message_size = ACE_OS::atoi (get_opt.opt_arg ());          break;        default:          ACE_ERROR_RETURN ((LM_ERROR,                             "usage: %s\n"                             "\t[-m number of messages]\n"                             "\t[-s message size]\n"                             "\t[-w number of workers]\n"                             "\t[-b burst size]\n"                             "\t[-t timeout between bursts]\n"                             "\t[-d debug]\n",                             argv[0]),                            -1);        }    }  return 0;}/*******************************************************************/intACE_TMAIN (int argc, ACE_TCHAR *argv[]){  int result = parse_args (argc, argv);  if (result != 0)    {      return result;    }  ACE_High_Res_Timer::calibrate ();  // Create the message queue  Message_Queue input_message_queue;  Message_Queue output_message_queue;  // Create the datablocks. IF we use the default Message Blocks Ctor,  // it is going to do an extra allocation for the data block  ACE_NEW_RETURN (data_block,                  ACE_Locked_Data_Block<ACE_Lock_Adapter<ACE_SYNCH_MUTEX> >,                  -1);  // Increment the reference count so that we can share the  // datablock. This is donw twice the number of messages for the  // input and output queues.  size_t i = 0;  for (i = 0; i < 2*number_of_messages; ++i)    {      data_block->duplicate ();    }  // Create the Synchronisers  Synchronisers synch;  // Workers.  Worker_Task **workers = 0;  ACE_NEW_RETURN (workers,                  Worker_Task *[number_of_workers],                  -1);  // Input Task  Input_Task input_task (&input_message_queue,                         synch);  // Output Task  Output_Task output_task (&output_message_queue,                           synch);  int priority =    ACE_Sched_Params::priority_max (ACE_SCHED_FIFO);  long flags = THR_SCHED_FIFO | THR_SCOPE_PROCESS;  // Create and activate the worker threads  for (i = 0; i < number_of_workers; ++i)    {      ACE_NEW_RETURN (workers[i],                      Worker_Task (&input_message_queue, synch),                      -1);      workers[i]->next (&output_task);      // Activate the workers.      result = workers[i]->activate (flags,                                     1,                                     1,                                     priority);      if (result != 0)        {          flags = THR_BOUND;          priority = ACE_Sched_Params::priority_min (ACE_SCHED_OTHER,                                                     ACE_SCOPE_THREAD);          result = workers[i]->activate (flags,                                         1,                                         1,                                         priority);          if (result != 0)            {              return result;            }        }    }  // Activate the input and output threads  result = input_task.activate (flags,                                1,                                1,                                priority);  if (result != 0)    return result;  // Activate the workers.  result = output_task.activate (flags,                                 1,                                 1,                                 priority);  if (result != 0)    return result;  // Wait for all threads to terminate.  result = ACE_Thread_Manager::instance ()->wait ();  ACE_hrtime_t elapsed_time = 0;  test_timer.elapsed_time (elapsed_time);# if !defined (ACE_WIN32)  double elapsed_time_per_invocation =    (double) elapsed_time / number_of_messages;  ACE_DEBUG ((LM_DEBUG,     "(%P|%t) Throughput is [%f] \n",     elapsed_time_per_invocation));   ACE_DEBUG ((LM_DEBUG,              "(%P|%t) Throughput is [%f] \n",              1000000000/ elapsed_time_per_invocation));#endif /*ACE_WIN32 */  for (i = 0; i < number_of_workers; ++i)    {      ACE_DEBUG ((LM_DEBUG,                  "Message process for thread [%d] is [%d] \n",                  i, workers[i]->processed ()));      delete workers[i];    }  delete[] workers;  return result;}#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)template class ACE_Locked_Data_Block<ACE_Lock_Adapter<ACE_SYNCH_MUTEX> >;template class ACE_Lock_Adapter<ACE_SYNCH_MUTEX>;#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)#pragma instantiate ACE_Locked_Data_Block<ACE_Lock_Adapter<ACE_SYNCH_MUTEX> >#pragma instantiate ACE_Lock_Adapter<ACE_SYNCH_MUTEX>#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */#else /*ACE_HAS_THREADS*/intmain (int, char *[]){  ACE_DEBUG ((LM_DEBUG,              "(%P|%t) Not supported in single threaded builds \n"));  return 0;}#endif /*ACE_HAS_THREADS*/

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -