⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rt_corba_leader_follower.cpp

📁 一个开源的网络开发库ACE
💻 CPP
字号:
// RT_CORBA_Leader_Follower.cpp,v 1.3 2002/03/06 21:48:03 nanbor Exp

#include "RT_CORBA_Leader_Follower.h"



#if defined (ACE_HAS_THREADS)

// We need the following only if we have threads enabled..
#include "ace/Get_Opt.h"
#include "ace/High_Res_Timer.h"
#include "ace/Sched_Params.h"
#include "ace/Profile_Timer.h"

// Number of messages that are used in this experiment
static size_t number_of_messages = 1000;

// Number of messages that are used in this experiment
static size_t number_of_messages_left = 0;

// Number of threads used in this experiment
static size_t number_of_threads = 2;

// Global variable for the availability of the leader
static size_t leader_available = 0;

// Number of threads that are ready to go/dispatch
static size_t ready_threads = 0;

// Work in the upcall
static size_t message_size = 100;

// Debugging condition
static DEBUGGING_RANGE debug = DEBUG_NONE;

// Timer for the test
ACE_High_Res_Timer test_timer;




/*******************************************************************/
// Constructor for Synchronisers
Synchronisers::Synchronisers (void)
  : mutex_ (),
    condition_ (mutex_),
    event_ ()
{
}


int
Synchronisers::start_synchronization (void)
{
  // Hold the lock and increment the global variable to indicate
  // number of ready threads
  {
    ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);

    ready_threads ++;

    if (debug)
      {
        ACE_DEBUG ((LM_DEBUG,
                    "(%P|%t) Ready to go.. \n"));
      }

    if (ready_threads == number_of_threads)
      {

        // Reset the ready_threads so that we can wait at the end of
        // runs
        ready_threads = 0;

        // Start the timer
        test_timer.start ();

        // Signal all the threads
        this->event_.signal ();

        // return to do our work;
        return 0;
      }

    // If we are not the last thread, let go off the lock
  }

  // Wait blisfully till we are woken up
  this->event_.wait ();

  return 0;
}

int
Synchronisers::end_synchronization (void)
{
  // Hold the lock and increment the global variable to indicate
  // number of ready threads
  {
    ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);

    ready_threads ++;

    if (debug)
      {
        ACE_DEBUG ((LM_DEBUG,
                    "(%P|%t) Ready to go.. \n"));
      }

    if (ready_threads == number_of_threads)
      {

        // Reset the ready_threads so that we can wait at the end of
        // runs
        ready_threads = 0;

        // Start the timer
        test_timer.stop ();

        // Signal all the threads
        this->event_.signal ();

        // return to do our work;
        return 0;
      }

    // If we are not the last thread, let go off the lock
  }

  // Wait blisfully till we are woken up
  this->event_.wait ();

  return 0;
}


/*******************************************************************/

Leader_Follower_Task::Leader_Follower_Task (Synchronisers &synch)
  : messages_consumed_ (0),
    synch_ (synch)
{
}

int
Leader_Follower_Task::processed (void)
{
  return this->messages_consumed_;
}

int
Leader_Follower_Task::svc (void)
{
  (void) this->synch_.start_synchronization ();

  for (;;)
    {
      {
        ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->synch_.mutex_, -1);

        // Wait until there is no leader.
        while (leader_available)
          {
            int result = this->synch_.condition_.wait ();
            if (result == -1)
              {
                ACE_ERROR_RETURN ((LM_ERROR,
                                   "Leader_Follower_Task::svc (%t) -> %p\n",
                                   "wait error"),
                                  -1);
              }
          }

        // I am the leader.
        leader_available = 1;

        //
        // We are letting go of the leader follower lock before going
        // in the event loop.
        //
      }

      //
      // It is ok to modify these shared variables without a lock
      // since we are the only leader.
      //

      int exit_loop = 0;
      if (number_of_messages_left == 0)
        {
          exit_loop = 1;
        }
      else
        {
          --number_of_messages_left;

          // Local counter.
          ++this->messages_consumed_;

          if (debug)
            {
              ACE_DEBUG ((LM_DEBUG,
                          "(%t) message for this thread %d\n",
                          this->messages_consumed_));
            }
        }

      {
        ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->synch_.mutex_, -1);

        // I am no longer the leader.
        leader_available = 0;

        // Wake up a follower.
        this->synch_.condition_.signal ();
      }

      if (exit_loop)
        {
          break;
        }
      else
        {
          //
          // Process message here.
          //
          for (size_t j = 0; j < message_size; ++j)
            {
              // Eat a little CPU
              /* takes about 40.2 usecs on a 167 MHz Ultra2 */
              u_long n = 11UL;
              ACE::is_prime (n, 2, n / 2);
            }

        }
    }

  (void) this->synch_.end_synchronization ();

  return 0;
}

static int
parse_args (int argc, ACE_TCHAR *argv[])
{
  ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("m:t:d:s:"));
  int c;

  while ((c = get_opt ()) != -1)
    {
      switch (c)
        {
        case 'm':
          number_of_messages = ACE_OS::atoi (get_opt.opt_arg ());
          break;
        case 't':
          number_of_threads = ACE_OS::atoi (get_opt.opt_arg ());
          break;
        case 'd':
          debug = ACE_static_cast (DEBUGGING_RANGE, ACE_OS::atoi (get_opt.opt_arg ()));
          break;
        case 's':
          message_size = ACE_OS::atoi (get_opt.opt_arg ());
          break;
        default:
          ACE_ERROR_RETURN ((LM_ERROR,
                             "usage: %s\n"
                             "\t[-m number of messages]\n"
                             "\t[-s message size]\n"
                             "\t[-w number of threads]\n"
                             "\t[-b burst size]\n"
                             "\t[-t timeout between bursts]\n"
                             "\t[-d debug]\n",
                             argv[0]),
                            -1);
        }
    }

  return 0;
}

/*******************************************************************/

// Entry point

int
ACE_TMAIN (int argc, ACE_TCHAR *argv[])
{
  int result = parse_args (argc, argv);
  if (result != 0)
    {
      return result;
    }

  Synchronisers synch_forms;

  ACE_High_Res_Timer::calibrate ();

  // Leader Followers.
  Leader_Follower_Task **leader_followers = 0;
  ACE_NEW_RETURN (leader_followers,
                  Leader_Follower_Task *[number_of_threads],
                  -1);




  int priority =
    ACE_Sched_Params::priority_max (ACE_SCHED_FIFO);


  long flags = THR_SCOPE_PROCESS;

  // Number of messages left = Number_Of_messages
  number_of_messages_left = number_of_messages;

  size_t i = 0;
  // Create and activate them.
  for (i = 0; i < number_of_threads; ++i)
    {
      ACE_NEW_RETURN (leader_followers[i],
                      Leader_Follower_Task (synch_forms),
                      -1);

      // Activate the leader_followers.
      result = leader_followers[i]->activate (flags,
                                              1,
                                              1,
                                              priority);
      if (result != 0)
        {
          ACE_DEBUG ((LM_DEBUG,
                      "(%P|%t) - Activate failed  for RT class "
                      " - Using default priority for thread [%d]\n",
                      i));

          flags = THR_BOUND;
          priority = ACE_Sched_Params::priority_min (ACE_SCHED_OTHER,
                                                     ACE_SCOPE_THREAD);

          // Activate the leader_followers.
          result = leader_followers[i]->activate (flags,
                                                  1,
                                                  1,
                                                  priority);

          if (result != 0)
            {
              ACE_DEBUG ((LM_DEBUG,
                          "(%P|%t) - Failed again no hope \n"));

              return 0;
            }

        }
    }

  // Wait for all threads to terminate.
  result = ACE_Thread_Manager::instance ()->wait ();

  ACE_hrtime_t elapsed_time = 0;

  test_timer.elapsed_time (elapsed_time);

  double elapsed_time_per_invocation =
    (double) elapsed_time / number_of_messages;

  /*ACE_DEBUG ((LM_DEBUG,
              "(%P|%t) Throughput is [%f] \n",
              elapsed_time_per_invocation));*/

  ACE_DEBUG ((LM_DEBUG,
              "(%P|%t) Throughput is [%f] \n",
              1000000000/ elapsed_time_per_invocation));


  for (i = 0; i < number_of_threads; ++i)
    {
      ACE_DEBUG ((LM_DEBUG,
                  "Message consumed in thread [%d] is [%d] \n",
                  i, leader_followers[i]->processed ()));
      delete leader_followers[i];
    }
  delete[] leader_followers;

  return result;
}




#else /*if defined (ACE_HAS_THREADS)*/

int
main (int , char *[])
{
  ACE_DEBUG ((LM_DEBUG,
              "(%p|%t) Cannot run in SIngle threaded mode \n"));

  return 0;
}
#endif /*ACE_HAS_THREADS*/

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -