⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 workers.cpp

📁 这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用于网络游戏医学图像网关的高qos要求.更详细的内容可阅读相应的材料
💻 CPP
字号:
// workers.cpp,v 1.16 2003/11/01 11:15:25 dhinton Exp

#include "ace/OS_main.h"
#include "ace/ACE.h"
#include "ace/Task.h"
#include "ace/Get_Opt.h"
#include "ace/High_Res_Timer.h"
#include "ace/Sched_Params.h"
#include "ace/Profile_Timer.h"
#include "ace/Lock_Adapter_T.h"
#include "../Latency_Stats.h"

static size_t number_of_messages = 100;
static size_t message_size = 100;
static size_t number_of_workers = 10;
static size_t burst_size = 10;
static size_t timeout_between_bursts = 1;

enum DEBUGGING_RANGE
{
  DEBUG_NONE = 0,
  DEFAULT = 1,
  PRINT_INDIVIDUAL_LATENCY = 2
};

static DEBUGGING_RANGE debug = DEBUG_NONE;

static ACE_Data_Block *data_block = 0;

class Message_Block : public ACE_Message_Block
{
public:
  Message_Block (ACE_Data_Block *data_block,
                 ACE_hrtime_t start_of_burst);

  ACE_hrtime_t start_of_burst_;
};

Message_Block::Message_Block (ACE_Data_Block *data_block,
                              ACE_hrtime_t start_of_burst)
  : ACE_Message_Block (data_block),
    start_of_burst_ (start_of_burst)
{
}

typedef ACE_Task<ACE_MT_SYNCH> TASK;

class Worker_Task : public TASK
{
public:
  Worker_Task (ACE_Message_Queue<ACE_MT_SYNCH> *mq);
  int svc (void);

  size_t messages_dequeued_;

  Latency_Stats latency_stats_;
  Throughput_Stats throughput_stats_;
};

class IO_Task : public TASK
{
public:
  IO_Task (ACE_Message_Queue<ACE_MT_SYNCH> *mq);
  int svc (void);
};

Worker_Task::Worker_Task (ACE_Message_Queue<ACE_MT_SYNCH> *mq)
  : TASK (0, mq),
    messages_dequeued_ (0)
{
}

int
Worker_Task::svc (void)
{
  for (;;)
    {
      ACE_Message_Block *mb = 0;
      int result = this->getq (mb);
      if (result == -1)
        {
          ACE_ERROR_RETURN ((LM_ERROR,
                             "Worker_Task::svc (%t) -> %p\n",
                             "getq error"),
                            -1);
        }

      ACE_Message_Block::ACE_Message_Type message_type =
        mb->msg_type ();

      // If STOP message, break loop and end the task.
      if (message_type == ACE_Message_Block::MB_STOP)
        {
          if (debug)
            {
              ACE_DEBUG ((LM_DEBUG,
                          "(%t) stop message dequeued after %d data messages\n",
                          this->messages_dequeued_));
            }

          mb->release ();
          break;
        }

      Message_Block *message_block =
        ACE_dynamic_cast (Message_Block *, mb);

      ACE_hrtime_t start_of_burst_for_this_message_block =
        message_block->start_of_burst_;

      mb->release ();

      // Counter.
      ++this->messages_dequeued_;

      if (debug)
        {
          ACE_DEBUG ((LM_DEBUG,
                      "(%t) dequeued its %d message\n",
                      this->messages_dequeued_));
        }

      //
      // Process message here.
      //

      for (size_t j = 0; j < message_size; ++j)
        {
          // Eat a little CPU
          /* takes about 40.2 usecs on a 167 MHz Ultra2 */
          u_long n = 11UL;
          ACE::is_prime (n, 2, n / 2);
        }

      //
      // Record stats for this message.
      //
      ACE_hrtime_t latency_from_start_of_burst =
        ACE_OS::gethrtime () - start_of_burst_for_this_message_block;
      this->latency_stats_.sample (latency_from_start_of_burst);

      this->throughput_stats_.sample ();

      if (debug >= PRINT_INDIVIDUAL_LATENCY)
        {
#ifndef ACE_LACKS_LONGLONG_T
          ACE_DEBUG ((LM_DEBUG,
                      "(%t) latency from start of burst: %Q\n",
                      latency_from_start_of_burst));
#else
          ACE_DEBUG ((LM_DEBUG,
                      "(%t) latency from start of burst: %u\n",
                      latency_from_start_of_burst.lo()));
#endif
        }
    }

  return 0;
}

IO_Task::IO_Task (ACE_Message_Queue<ACE_MT_SYNCH> *mq)
  : TASK (0, mq)
{
}

int
IO_Task::svc (void)
{
  size_t i = 0;
  size_t messages_queued = 1;
  size_t burst = 1;

  // Data messages.
  while (number_of_messages > 0)
    {
      ACE_hrtime_t start_of_burst = ACE_OS::gethrtime ();

      for (i = 1;
           i <= burst_size && number_of_messages > 0;
           ++i, --number_of_messages, ++messages_queued)
        {
          if (debug)
            {
              ACE_DEBUG ((LM_DEBUG,
                          "(%t) IO thread -> burst %d: message %d; overall message %d\n",
                          burst,
                          i,
                          messages_queued));
            }

          Message_Block *message_block = 0;
          ACE_NEW_RETURN (message_block,
                          Message_Block (data_block,
                                         start_of_burst),
                          -1);

          int result = this->putq (message_block);
          if (result == -1)
            {
              ACE_ERROR_RETURN ((LM_ERROR,
                                 "IO::svc (%t) -> %p\n",
                                 "putq error"),
                                -1);
            }
        }

      ++burst;
      ACE_Time_Value tv (0, timeout_between_bursts);
      ACE_OS::sleep (tv);
    }

  // Terminate messages.
  for (i = 0; i < number_of_workers; ++i)
    {
      ACE_Message_Block *message_block = 0;
      ACE_NEW_RETURN (message_block,
                      ACE_Message_Block ((size_t)0,
                                         (int)ACE_Message_Block::MB_STOP),
                      -1);

      int result = this->putq (message_block);
      if (result == -1)
        {
          ACE_ERROR_RETURN ((LM_ERROR,
                             "IO::svc (%t) -> %p\n",
                             "putq error"),
                            -1);
        }
    }

  return 0;
}

static int
parse_args (int argc, ACE_TCHAR *argv[])
{
  ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("m:s:w:b:t:d:"));
  int c;

  while ((c = get_opt ()) != -1)
    {
      switch (c)
        {
        case 'm':
          number_of_messages = ACE_OS::atoi (get_opt.opt_arg ());
          break;
        case 's':
          message_size = ACE_OS::atoi (get_opt.opt_arg ());
          break;
        case 'w':
          number_of_workers = ACE_OS::atoi (get_opt.opt_arg ());
          break;
        case 'b':
          burst_size = ACE_OS::atoi (get_opt.opt_arg ());
          break;
        case 't':
          timeout_between_bursts = ACE_OS::atoi (get_opt.opt_arg ());
          break;
        case 'd':
          debug = ACE_static_cast (DEBUGGING_RANGE, ACE_OS::atoi (get_opt.opt_arg ()));
          break;
        default:
          ACE_ERROR_RETURN ((LM_ERROR,
                             "usage: %s\n"
                             "\t[-m number of messages]\n"
                             "\t[-s message size]\n"
                             "\t[-w number of workers]\n"
                             "\t[-b burst size]\n"
                             "\t[-t timeout between bursts]\n"
                             "\t[-d debug]\n",
                             argv[0]),
                            -1);
        }
    }

  return 0;
}

int
ACE_TMAIN (int argc, ACE_TCHAR *argv[])
{
  int result = parse_args (argc, argv);
  if (result != 0)
    {
      return result;
    }

  move_to_rt_class ();
  ACE_High_Res_Timer::calibrate ();

  size_t i = 0;

  ACE_NEW_RETURN (data_block,
                  ACE_Locked_Data_Block<ACE_Lock_Adapter<ACE_SYNCH_MUTEX> >,
                  -1);

  for (i = 0; i < number_of_messages; ++i)
    {
      data_block->duplicate ();
    }

  ACE_Message_Queue<ACE_MT_SYNCH> message_queue;

  // Workers.
  Worker_Task **workers = 0;
  ACE_NEW_RETURN (workers,
                  Worker_Task *[number_of_workers],
                  -1);

  ACE_Profile_Timer timer;
  timer.start ();

  int priority =
    (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO) +
     ACE_Sched_Params::priority_max (ACE_SCHED_FIFO)) / 2;
  // priority = ACE_Sched_Params::next_priority (ACE_SCHED_FIFO, priority);

  long flags = THR_BOUND | THR_SCHED_FIFO;

  // Create and activate them.
  for (i = 0; i < number_of_workers; ++i)
    {
      ACE_NEW_RETURN (workers[i],
                      Worker_Task (&message_queue),
                      -1);

      // Activate the workers.
      result = workers[i]->activate (flags,
                                     1,
                                     1,
                                     priority);
      if (result != 0)
        {
          flags = THR_BOUND;
          priority = ACE_Sched_Params::priority_min (ACE_SCHED_OTHER,
                                                     ACE_SCOPE_THREAD);
          result = workers[i]->activate (flags,
                                         1,
                                         1,
                                         priority);
          if (result != 0)
            {
              return result;
            }
        }
    }

  // IO Task.
  IO_Task io (&message_queue);

  // Activate the workers.
  priority =
    (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO) +
     ACE_Sched_Params::priority_max (ACE_SCHED_FIFO)) / 2;
  priority = ACE_Sched_Params::next_priority (ACE_SCHED_FIFO, priority);

  flags = THR_BOUND | THR_SCHED_FIFO;

  result = io.activate (THR_BOUND);
  if (result != 0)
    {
      flags = THR_BOUND;
      priority = ACE_Sched_Params::priority_min (ACE_SCHED_OTHER,
                                                 ACE_SCOPE_THREAD);
      result = io.activate (flags,
                            1,
                            1,
                            priority);
      if (result != 0)
        {
          return result;
        }
    }

  // Wait for all threads to terminate.
  result = ACE_Thread_Manager::instance ()->wait ();

  timer.stop ();
  ACE_Rusage rusage;
  timer.elapsed_rusage (rusage);

  Latency_Stats latency;
  Throughput_Stats throughput;
  for (i = 0; i < number_of_workers; ++i)
    {
      latency.accumulate (workers[i]->latency_stats_);
      throughput.accumulate (workers[i]->throughput_stats_);
      ACE_DEBUG ((LM_DEBUG, "Thread[%d]: ", i));
      workers[i]->throughput_stats_.dump_results (ACE_TEXT(""), ACE_TEXT(""));
    }

  ACE_DEBUG ((LM_DEBUG, "\nTotals for latency:\n"));
  latency.dump_results (argv[0], ACE_TEXT("latency"));

  ACE_DEBUG ((LM_DEBUG, "\nTotals for throughput:\n"));
  throughput.dump_results (argv[0], ACE_TEXT("throughput"));

#if defined(ACE_HAS_PRUSAGE_T)
  ACE_DEBUG ((LM_DEBUG, "\n(%t) Context switches %d/%d\n",
              rusage.pr_vctx,
              rusage.pr_ictx));
#endif /* ACE_HAS_PRUSAGE_T */

  for (i = 0; i < number_of_workers; ++i)
    {
      delete workers[i];
    }
  delete[] workers;
  delete data_block;

  return result;
}

#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)

template class ACE_Locked_Data_Block<ACE_Lock_Adapter<ACE_SYNCH_MUTEX> >;
template class ACE_Lock_Adapter<ACE_SYNCH_MUTEX>;

#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)

#pragma instantiate ACE_Locked_Data_Block<ACE_Lock_Adapter<ACE_SYNCH_MUTEX> >
#pragma instantiate ACE_Lock_Adapter<ACE_SYNCH_MUTEX>

#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -