⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 leader_follower.cpp

📁 这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用于网络游戏医学图像网关的高qos要求.更详细的内容可阅读相应的材料
💻 CPP
字号:
// leader_follower.cpp,v 1.17 2003/11/01 11:15:25 dhinton Exp

#include "ace/OS_main.h"
#include "ace/ACE.h"
#include "ace/Task.h"
#include "ace/Get_Opt.h"
#include "ace/High_Res_Timer.h"
#include "ace/Sched_Params.h"
#include "ace/Profile_Timer.h"
#include "../Latency_Stats.h"

static size_t number_of_messages = 100;
static size_t message_size = 100;
static size_t number_of_threads = 10;
static size_t burst_size = 10;
static size_t timeout_between_bursts = 1;

static size_t leader_available = 0;
static size_t messages_in_this_burst = 0;
static size_t total_messages_consumed = 0;
static size_t burst = 1;

static ACE_hrtime_t start_of_burst;

enum DEBUGGING_RANGE
{
  DEBUG_NONE = 0,
  DEFAULT = 1,
  PRINT_INDIVIDUAL_LATENCY = 2
};

static DEBUGGING_RANGE debug = DEBUG_NONE;

typedef ACE_Task<ACE_MT_SYNCH> TASK;

class Leader_Follower_Task : public TASK
{
public:
  Leader_Follower_Task (ACE_SYNCH_MUTEX &mutex,
                        ACE_SYNCH_CONDITION &condition);
  int svc (void);

  size_t messages_consumed_;
  ACE_SYNCH_MUTEX &mutex_;
  ACE_SYNCH_CONDITION &condition_;

  Latency_Stats latency_stats_;
  Throughput_Stats throughput_stats_;
};

Leader_Follower_Task::Leader_Follower_Task (ACE_SYNCH_MUTEX &mutex,
                                            ACE_SYNCH_CONDITION &condition)
  : messages_consumed_ (0),
    mutex_ (mutex),
    condition_ (condition)
{
}

int
Leader_Follower_Task::svc (void)
{
  for (;;)
    {
      {
        ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);

        // Wait until there is no leader.
        while (leader_available)
          {
            int result = this->condition_.wait ();
            if (result == -1)
              {
                ACE_ERROR_RETURN ((LM_ERROR,
                                   "Leader_Follower_Task::svc (%t) -> %p\n",
                                   "wait error"),
                                  -1);
              }
          }

        // I am the leader.
        leader_available = 1;

        //
        // We are letting go of the leader follower lock before going
        // in the event loop.
        //
      }

      //
      // It is ok to modify these shared variables without a lock
      // since we are the only leader.
      //

      int exit_loop = 0;
      if (number_of_messages == 0)
        {
          exit_loop = 1;
        }
      else
        {
          if (messages_in_this_burst == burst_size)
            {
              ++burst;
              messages_in_this_burst = 0;
              ACE_Time_Value tv (0, timeout_between_bursts);
              ACE_OS::sleep (tv);
            }

          if (messages_in_this_burst == 0)
            {
              start_of_burst = ACE_OS::gethrtime ();
            }

          --number_of_messages;

          // Burst counter.
          ++messages_in_this_burst;

          // Global counter.
          ++total_messages_consumed;

          // Local counter.
          ++this->messages_consumed_;

          if (debug)
            {
              ACE_DEBUG ((LM_DEBUG,
                          "(%t) burst %d: message %d; overall message %d; message for this thread %d\n",
                          burst,
                          messages_in_this_burst,
                          total_messages_consumed,
                          this->messages_consumed_));
            }
        }

      {
        ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);

        // I am no longer the leader.
        leader_available = 0;

        // Wake up a follower.
        this->condition_.signal ();
      }

      if (exit_loop)
        {
          break;
        }
      else
        {
          //
          // Process message here.
          //

          for (size_t j = 0; j < message_size; ++j)
            {
              // Eat a little CPU
              /* takes about 40.2 usecs on a 167 MHz Ultra2 */
              u_long n = 11UL;
              ACE::is_prime (n, 2, n / 2);
            }

          //
          // Record stats for this message.
          //
          ACE_hrtime_t latency_from_start_of_burst =
            ACE_OS::gethrtime () - start_of_burst;
          this->latency_stats_.sample (latency_from_start_of_burst);

          this->throughput_stats_.sample ();

          if (debug >= PRINT_INDIVIDUAL_LATENCY)
            {
#ifndef ACE_LACKS_LONGLONG_T
              ACE_DEBUG ((LM_DEBUG,
                          "(%t) latency from start of burst: %Q\n",
                          latency_from_start_of_burst));
#else
              ACE_DEBUG ((LM_DEBUG,
                          "(%t) latency from start of burst: %u\n",
                          latency_from_start_of_burst.lo()));
#endif
            }
        }
    }

  return 0;
}

static int
parse_args (int argc, ACE_TCHAR *argv[])
{
  ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("m:s:w:b:t:d:"));
  int c;

  while ((c = get_opt ()) != -1)
    {
      switch (c)
        {
        case 'm':
          number_of_messages = ACE_OS::atoi (get_opt.opt_arg ());
          break;
        case 's':
          message_size = ACE_OS::atoi (get_opt.opt_arg ());
          break;
        case 'w':
          number_of_threads = ACE_OS::atoi (get_opt.opt_arg ());
          break;
        case 'b':
          burst_size = ACE_OS::atoi (get_opt.opt_arg ());
          break;
        case 't':
          timeout_between_bursts = ACE_OS::atoi (get_opt.opt_arg ());
          break;
        case 'd':
          debug = ACE_static_cast (DEBUGGING_RANGE, ACE_OS::atoi (get_opt.opt_arg ()));
          break;
        default:
          ACE_ERROR_RETURN ((LM_ERROR,
                             "usage: %s\n"
                             "\t[-m number of messages]\n"
                             "\t[-s message size]\n"
                             "\t[-w number of threads]\n"
                             "\t[-b burst size]\n"
                             "\t[-t timeout between bursts]\n"
                             "\t[-d debug]\n",
                             argv[0]),
                            -1);
        }
    }

  return 0;
}

int
ACE_TMAIN (int argc, ACE_TCHAR *argv[])
{
  int result = parse_args (argc, argv);
  if (result != 0)
    {
      return result;
    }

  move_to_rt_class ();
  ACE_High_Res_Timer::calibrate ();

  ACE_SYNCH_MUTEX mutex;
  ACE_SYNCH_CONDITION condition (mutex);

  // Leader Followers.
  Leader_Follower_Task **leader_followers = 0;
  ACE_NEW_RETURN (leader_followers,
                  Leader_Follower_Task *[number_of_threads],
                  -1);

  ACE_Profile_Timer timer;
  timer.start ();

  int priority =
    (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO) +
     ACE_Sched_Params::priority_max (ACE_SCHED_FIFO)) / 2;

  long flags = THR_BOUND | THR_SCHED_FIFO;

  // Create and activate them.
  size_t i;
  for (i = 0; i < number_of_threads; ++i)
    {
      ACE_NEW_RETURN (leader_followers[i],
                      Leader_Follower_Task (mutex,
                                            condition),
                      -1);

      // Activate the leader_followers.
      result = leader_followers[i]->activate (flags,
                                              1,
                                              1,
                                              priority);
      if (result != 0)
        {
          flags = THR_BOUND;
          priority = ACE_Sched_Params::priority_min (ACE_SCHED_OTHER,
                                                     ACE_SCOPE_THREAD);
          result = leader_followers[i]->activate (flags,
                                                  1,
                                                  1,
                                                  priority);
          if (result != 0)
            {
              return result;
            }
        }
    }

  // Wait for all threads to terminate.
  result = ACE_Thread_Manager::instance ()->wait ();

  timer.stop ();
  ACE_Rusage rusage;
  timer.elapsed_rusage (rusage);

  Latency_Stats latency;
  Throughput_Stats throughput;
  for (i = 0; i < number_of_threads; ++i)
    {
      latency.accumulate (leader_followers[i]->latency_stats_);
      throughput.accumulate (leader_followers[i]->throughput_stats_);
      ACE_DEBUG ((LM_DEBUG, "Thread[%d]: ", i));
      leader_followers[i]->throughput_stats_.dump_results (ACE_TEXT(""), ACE_TEXT(""));
    }

  ACE_DEBUG ((LM_DEBUG, "\nTotals for latency:\n"));
  latency.dump_results (argv[0], ACE_TEXT("latency"));

  ACE_DEBUG ((LM_DEBUG, "\nTotals for throughput:\n"));
  throughput.dump_results (argv[0], ACE_TEXT("throughput"));

#if defined(ACE_HAS_PRUSAGE_T)
  ACE_DEBUG ((LM_DEBUG, "\n(%t) Context switches %d/%d\n",
              rusage.pr_vctx,
              rusage.pr_ictx));
#endif /* ACE_HAS_PRUSAGE_T */

  for (i = 0; i < number_of_threads; ++i)
    {
      delete leader_followers[i];
    }
  delete[] leader_followers;

  return result;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -