⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 workers.cpp

📁 ACE自适配通信环境(ADAPTIVE Communication Environment)是可以自由使用、开放源码的面向对象(OO)框架(Framework)
💻 CPP
字号:
// workers.cpp,v 1.20 2005/01/21 02:19:20 ossama Exp#include "ace/OS_main.h"#include "ace/OS_NS_unistd.h"#include "ace/ACE.h"#include "ace/Task_T.h"#include "ace/Get_Opt.h"#include "ace/High_Res_Timer.h"#include "ace/Sched_Params.h"#include "ace/Profile_Timer.h"#include "ace/Lock_Adapter_T.h"#include "../Latency_Stats.h"static size_t number_of_messages = 100;static size_t message_size = 100;static size_t number_of_workers = 10;static size_t burst_size = 10;static size_t timeout_between_bursts = 1;enum DEBUGGING_RANGE{  DEBUG_NONE = 0,  DEFAULT = 1,  PRINT_INDIVIDUAL_LATENCY = 2};static DEBUGGING_RANGE debug = DEBUG_NONE;static ACE_Data_Block *data_block = 0;class Message_Block : public ACE_Message_Block{public:  Message_Block (ACE_Data_Block *data_block,                 ACE_hrtime_t start_of_burst);  ACE_hrtime_t start_of_burst_;};Message_Block::Message_Block (ACE_Data_Block *data_block,                              ACE_hrtime_t start_of_burst)  : ACE_Message_Block (data_block),    start_of_burst_ (start_of_burst){}typedef ACE_Task<ACE_SYNCH> TASK;class Worker_Task : public TASK{public:  Worker_Task (ACE_Message_Queue<ACE_SYNCH> *mq);  int svc (void);  size_t messages_dequeued_;  Latency_Stats latency_stats_;  Throughput_Stats throughput_stats_;};class IO_Task : public TASK{public:  IO_Task (ACE_Message_Queue<ACE_SYNCH> *mq);  int svc (void);};Worker_Task::Worker_Task (ACE_Message_Queue<ACE_SYNCH> *mq)  : TASK (0, mq),    messages_dequeued_ (0){}intWorker_Task::svc (void){  for (;;)    {      ACE_Message_Block *mb = 0;      int result = this->getq (mb);      if (result == -1)        {          ACE_ERROR_RETURN ((LM_ERROR,                             "Worker_Task::svc (%t) -> %p\n",                             "getq error"),                            -1);        }      ACE_Message_Block::ACE_Message_Type message_type =        mb->msg_type ();      // If STOP message, break loop and end the task.      if (message_type == ACE_Message_Block::MB_STOP)        {          if (debug)            {              ACE_DEBUG ((LM_DEBUG,                          "(%t) stop message dequeued after %d data messages\n",                          this->messages_dequeued_));            }          mb->release ();          break;        }      Message_Block *message_block =        dynamic_cast<Message_Block *> (mb);      ACE_hrtime_t start_of_burst_for_this_message_block =        message_block->start_of_burst_;      mb->release ();      // Counter.      ++this->messages_dequeued_;      if (debug)        {          ACE_DEBUG ((LM_DEBUG,                      "(%t) dequeued its %d message\n",                      this->messages_dequeued_));        }      //      // Process message here.      //      for (size_t j = 0; j < message_size; ++j)        {          // Eat a little CPU          /* takes about 40.2 usecs on a 167 MHz Ultra2 */          u_long n = 11UL;          ACE::is_prime (n, 2, n / 2);        }      //      // Record stats for this message.      //      ACE_hrtime_t latency_from_start_of_burst =        ACE_OS::gethrtime () - start_of_burst_for_this_message_block;      this->latency_stats_.sample (latency_from_start_of_burst);      this->throughput_stats_.sample ();      if (debug >= PRINT_INDIVIDUAL_LATENCY)        {#ifndef ACE_LACKS_LONGLONG_T          ACE_DEBUG ((LM_DEBUG,                      "(%t) latency from start of burst: %Q\n",                      latency_from_start_of_burst));#else          ACE_DEBUG ((LM_DEBUG,                      "(%t) latency from start of burst: %u\n",                      latency_from_start_of_burst.lo()));#endif        }    }  return 0;}IO_Task::IO_Task (ACE_Message_Queue<ACE_SYNCH> *mq)  : TASK (0, mq){}intIO_Task::svc (void){  size_t i = 0;  size_t messages_queued = 1;  size_t burst = 1;  // Data messages.  while (number_of_messages > 0)    {      ACE_hrtime_t start_of_burst = ACE_OS::gethrtime ();      for (i = 1;           i <= burst_size && number_of_messages > 0;           ++i, --number_of_messages, ++messages_queued)        {          if (debug)            {              ACE_DEBUG ((LM_DEBUG,                          "(%t) IO thread -> burst %d: message %d; overall message %d\n",                          burst,                          i,                          messages_queued));            }          Message_Block *message_block = 0;          ACE_NEW_RETURN (message_block,                          Message_Block (data_block,                                         start_of_burst),                          -1);          int result = this->putq (message_block);          if (result == -1)            {              ACE_ERROR_RETURN ((LM_ERROR,                                 "IO::svc (%t) -> %p\n",                                 "putq error"),                                -1);            }        }      ++burst;      ACE_Time_Value tv (0, timeout_between_bursts);      ACE_OS::sleep (tv);    }  // Terminate messages.  for (i = 0; i < number_of_workers; ++i)    {      ACE_Message_Block *message_block = 0;      ACE_NEW_RETURN (message_block,                      ACE_Message_Block ((size_t)0,                                         (int)ACE_Message_Block::MB_STOP),                      -1);      int result = this->putq (message_block);      if (result == -1)        {          ACE_ERROR_RETURN ((LM_ERROR,                             "IO::svc (%t) -> %p\n",                             "putq error"),                            -1);        }    }  return 0;}static intparse_args (int argc, ACE_TCHAR *argv[]){  ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("m:s:w:b:t:d:"));  int c;  while ((c = get_opt ()) != -1)    {      switch (c)        {        case 'm':          number_of_messages = ACE_OS::atoi (get_opt.opt_arg ());          break;        case 's':          message_size = ACE_OS::atoi (get_opt.opt_arg ());          break;        case 'w':          number_of_workers = ACE_OS::atoi (get_opt.opt_arg ());          break;        case 'b':          burst_size = ACE_OS::atoi (get_opt.opt_arg ());          break;        case 't':          timeout_between_bursts = ACE_OS::atoi (get_opt.opt_arg ());          break;        case 'd':          debug = static_cast<DEBUGGING_RANGE> (ACE_OS::atoi (get_opt.opt_arg ()));          break;        default:          ACE_ERROR_RETURN ((LM_ERROR,                             "usage: %s\n"                             "\t[-m number of messages]\n"                             "\t[-s message size]\n"                             "\t[-w number of workers]\n"                             "\t[-b burst size]\n"                             "\t[-t timeout between bursts]\n"                             "\t[-d debug]\n",                             argv[0]),                            -1);        }    }  return 0;}intACE_TMAIN (int argc, ACE_TCHAR *argv[]){  int result = parse_args (argc, argv);  if (result != 0)    {      return result;    }  move_to_rt_class ();  ACE_High_Res_Timer::calibrate ();  size_t i = 0;  ACE_NEW_RETURN (data_block,                  ACE_Locked_Data_Block<ACE_Lock_Adapter<ACE_SYNCH_MUTEX> >,                  -1);  for (i = 0; i < number_of_messages; ++i)    {      data_block->duplicate ();    }  ACE_Message_Queue<ACE_SYNCH> message_queue;  // Workers.  Worker_Task **workers = 0;  ACE_NEW_RETURN (workers,                  Worker_Task *[number_of_workers],                  -1);  ACE_Profile_Timer timer;  timer.start ();  int priority =    (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO) +     ACE_Sched_Params::priority_max (ACE_SCHED_FIFO)) / 2;  // priority = ACE_Sched_Params::next_priority (ACE_SCHED_FIFO, priority);  long flags = THR_BOUND | THR_SCHED_FIFO;  // Create and activate them.  for (i = 0; i < number_of_workers; ++i)    {      ACE_NEW_RETURN (workers[i],                      Worker_Task (&message_queue),                      -1);      // Activate the workers.      result = workers[i]->activate (flags,                                     1,                                     1,                                     priority);      if (result != 0)        {          flags = THR_BOUND;          priority = ACE_Sched_Params::priority_min (ACE_SCHED_OTHER,                                                     ACE_SCOPE_THREAD);          result = workers[i]->activate (flags,                                         1,                                         1,                                         priority);          if (result != 0)            {              return result;            }        }    }  // IO Task.  IO_Task io (&message_queue);  // Activate the workers.  priority =    (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO) +     ACE_Sched_Params::priority_max (ACE_SCHED_FIFO)) / 2;  priority = ACE_Sched_Params::next_priority (ACE_SCHED_FIFO, priority);  flags = THR_BOUND | THR_SCHED_FIFO;  result = io.activate (THR_BOUND);  if (result != 0)    {      flags = THR_BOUND;      priority = ACE_Sched_Params::priority_min (ACE_SCHED_OTHER,                                                 ACE_SCOPE_THREAD);      result = io.activate (flags,                            1,                            1,                            priority);      if (result != 0)        {          return result;        }    }  // Wait for all threads to terminate.  result = ACE_Thread_Manager::instance ()->wait ();  timer.stop ();  ACE_Rusage rusage;  timer.elapsed_rusage (rusage);  Latency_Stats latency;  Throughput_Stats throughput;  for (i = 0; i < number_of_workers; ++i)    {      latency.accumulate (workers[i]->latency_stats_);      throughput.accumulate (workers[i]->throughput_stats_);      ACE_DEBUG ((LM_DEBUG, "Thread[%d]: ", i));      workers[i]->throughput_stats_.dump_results (ACE_TEXT(""), ACE_TEXT(""));    }  ACE_DEBUG ((LM_DEBUG, "\nTotals for latency:\n"));  latency.dump_results (argv[0], ACE_TEXT("latency"));  ACE_DEBUG ((LM_DEBUG, "\nTotals for throughput:\n"));  throughput.dump_results (argv[0], ACE_TEXT("throughput"));#if defined(ACE_HAS_PRUSAGE_T)  ACE_DEBUG ((LM_DEBUG, "\n(%t) Context switches %d/%d\n",              rusage.pr_vctx,              rusage.pr_ictx));#endif /* ACE_HAS_PRUSAGE_T */  for (i = 0; i < number_of_workers; ++i)    {      delete workers[i];    }  delete[] workers;  delete data_block;  return result;}#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)template class ACE_Locked_Data_Block<ACE_Lock_Adapter<ACE_SYNCH_MUTEX> >;template class ACE_Lock_Adapter<ACE_SYNCH_MUTEX>;#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)#pragma instantiate ACE_Locked_Data_Block<ACE_Lock_Adapter<ACE_SYNCH_MUTEX> >#pragma instantiate ACE_Lock_Adapter<ACE_SYNCH_MUTEX>#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -