⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rt_corba_workers.cpp

📁 这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用于网络游戏医学图像网关的高qos要求.更详细的内容可阅读相应的材料
💻 CPP
字号:
// RT_CORBA_Workers.cpp,v 1.9 2003/11/01 11:15:25 dhinton Exp
#include "RT_CORBA_Workers.h"

#if defined (ACE_HAS_THREADS)

#include "ace/OS_main.h"
#include "ace/ACE.h"
#include "ace/Get_Opt.h"
#include "ace/High_Res_Timer.h"
#include "ace/Sched_Params.h"
#include "ace/Lock_Adapter_T.h"

// The number of messages that is being processed
static size_t number_of_messages = 100;

// The number of upcall threads
static size_t number_of_workers = 2;

// The size of the message
static size_t message_size = 100;

// Number of threads that are ready to go
static size_t ready_threads = 0;

// Number of input and output threads
static size_t io_threads = 2; // 1 for output and 1 for input

// High resolution test timer
static ACE_High_Res_Timer test_timer;

// Debugging condition
static DEBUGGING_RANGE debug = DEBUG_NONE;

// Data block used by the message blocks
ACE_Data_Block *data_block = 0;

/*******************************************************************/
// Constructor for Synchronisers
Synchronisers::Synchronisers (void)
  : mutex_ (),
    event_ ()
{
}


int
Synchronisers::start_synchronization (void)
{
  // Hold the lock and increment the global variable to indicate
  // number of ready threads
  {
    ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);

    ready_threads ++;

    if (ready_threads == (number_of_workers + io_threads))
      {
        // Reset the ready_threads so that we can wait at the end of
        // runs
        ready_threads = 0;

        if (debug)
          {
            ACE_DEBUG ((LM_DEBUG,
                        "(%P|%t) Ready to signal start \n"));
          }
        // Start the timer
        test_timer.start ();

        // Signal all the threads
        this->event_.signal ();

        // return to do our work;
        return 0;
      }

    // If we are not the last thread, let go off the lock
  }

  if (debug)
    {
      ACE_DEBUG ((LM_DEBUG,
                  "(%P|%t) Ready to wait () on event.. \n"));
    }

  // Wait blisfully till we are woken up
  this->event_.wait ();

  return 0;
}

int
Synchronisers::end_synchronization (void)
{
  // Hold the lock and increment the global variable to indicate
  // number of ready threads
  {
    ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);

    ready_threads ++;

    if (ready_threads == (number_of_workers + io_threads))
      {
        // Reset the ready_threads so that we can wait at the end of
        // runs
        ready_threads = 0;

        // Start the timer
        test_timer.stop ();

        // Signal all the threads
        this->event_.signal ();

        if (debug)
          {
            ACE_DEBUG ((LM_DEBUG,
                        "(%P|%t) Ended peacefully \n"));
          }

        // return to do our work;
        return 0;
      }


    // If we are not the last thread, let go off the lock
  }

  if (debug)
    {
      ACE_DEBUG ((LM_DEBUG,
                  "(%P|%t) Going to wait .. \n"));
    }

  // Wait blisfully till we are woken up
  this->event_.wait ();

  return 0;
}

/*******************************************************************/

Worker_Task::Worker_Task (Message_Queue *mq,
                          Synchronisers &synch)
  : ACE_Task<ACE_MT_SYNCH> (0, mq),
  synch_ (synch),
  messages_processed_ (0)
{
}

int
Worker_Task::svc (void)
{
  // Start synchronization
  (void) this->synch_.start_synchronization ();

  for (;;)
    {
      ACE_Message_Block *mb = 0;
      int result = this->getq (mb);
      if (result == -1)
        {
          ACE_ERROR_RETURN ((LM_ERROR,
                             "Worker_Task::svc (%t) -> %p\n",
                             "getq error"),
                            -1);
        }

      // Get the flag in the message blok
      ACE_Message_Block::Message_Flags flag =
        mb->self_flags ();

      // The stop flag
      int stop_flag = 0;

      // Check for the stop flag
      if (ACE_BIT_ENABLED (flag,
                           Synchronisers::MB_STOP_FLAG))
        {
          if (debug)
            {
              ACE_DEBUG ((LM_DEBUG,
                          "(%P|%t) saw flag after [%d] messages\n",
                          this->messages_processed_));
            }

          stop_flag = 1;
        }
      // Release the message block
      mb->release ();

      // Counter.
      ++this->messages_processed_;

      if (debug)
        {
          ACE_DEBUG ((LM_DEBUG,
                      "(%P|%t) dequeued my %d message\n",
                      this->messages_processed_));
        }

      //
      // Process message here.
      //

      for (size_t j = 0; j < message_size; ++j)
        {
          // Eat a little CPU
          /* takes about 40.2 usecs on a 167 MHz Ultra2 */
          u_long n = 11UL;
          ACE::is_prime (n, 2, n / 2);
        }

      // Make a message block for writing onto output queue
      ACE_Message_Block *message_block = 0;
      ACE_NEW_RETURN (message_block,
                      ACE_Message_Block (data_block),
                      -1);

      // Put this message block into the next queue or the output
      // queue
      result = this->put_next (message_block);

      if (result == -1)
        {
          ACE_ERROR_RETURN ((LM_ERROR,
                             "Input::svc (%t) -> %p\n",
                             "putq error"),
                            -1);
                            }

      // If the stop_flag is set just break and wait..
      if (stop_flag)
        {
          if (debug)
            ACE_DEBUG ((LM_DEBUG,
                        "(%P|%t) Got stop message after [%d] messages \n",
                        this->messages_processed_));

          break;
        }
    }

  (void) this->synch_.end_synchronization ();
  return 0;
}

int
Worker_Task::processed (void)
{
  return this->messages_processed_;
}

/*******************************************************************/

Input_Task::Input_Task (Message_Queue *mq,
                        Synchronisers &synch)
  : ACE_Task<ACE_MT_SYNCH> (0, mq),
    synch_ (synch)
{
}

int
Input_Task::svc (void)
{
  // Synchronise threads
  (void) this->synch_.start_synchronization ();


  size_t i = 0;
  for (i = 0;
       i < (number_of_messages - number_of_workers);
       ++i)
    {
      // Make a message block
      ACE_Message_Block *message_block = 0;
      ACE_NEW_RETURN (message_block,
                      ACE_Message_Block (data_block),
                      -1);

      int result = this->putq (message_block);

      if (result == -1)
        {
          ACE_ERROR_RETURN ((LM_ERROR,
                             "Input::svc (%t) -> %p\n",
                             "putq error"),
                            -1);
        }

      if (debug)
        {
          ACE_DEBUG ((LM_DEBUG,
                      "(%t) Input thread ->  Sent [%d] messages\n",
                      i));
        }
    }

  if (debug)
    {
      ACE_DEBUG ((LM_DEBUG,
                  "(%t) Sending close messages \n"));
    }


  // Stop messages
  for (i = 0;
       i < number_of_workers;
       ++i)
    {
      // Make a message block
      ACE_Message_Block *message_block = 0;
      ACE_NEW_RETURN (message_block,
                      ACE_Message_Block (data_block),
                      -1);

      // Set the stop flag in the message block and not in the datablock
      message_block->set_self_flags (Synchronisers::MB_STOP_FLAG);

      int result = this->putq (message_block);

      if (result == -1)
        {
          ACE_ERROR_RETURN ((LM_ERROR,
                             "Input::svc (%t) -> %p\n",
                             "putq error"),
                            -1);
        }
    }


  (void) this->synch_.end_synchronization ();
  return 0;
}

/*******************************************************************/

Output_Task::Output_Task (Message_Queue *mq,
                          Synchronisers &synch)
  : ACE_Task<ACE_MT_SYNCH> (0, mq),
    synch_ (synch)
{
}

int
Output_Task::svc (void)
{
  // Synchronise threads
  (void) this->synch_.start_synchronization ();


  for (size_t i = 0;
       i < number_of_messages;
       ++i)
    {
      // Get the message block from queue
      ACE_Message_Block *mb = 0;
      int result = this->getq (mb);

      // delete the message block
      mb->release ();

      if (debug)
        {
          ACE_DEBUG ((LM_DEBUG,
                      "(%t) Output thread ->  received [%d] message\n",
                      i));
                      }

      if (result == -1)
        {
          ACE_ERROR_RETURN ((LM_ERROR,
                             "Input::svc (%t) -> %p\n",
                             "putq error"),
                            -1);
        }
    }

  (void) this->synch_.end_synchronization ();
  return 0;
}

int
Output_Task::put (ACE_Message_Block *mb, ACE_Time_Value *)
{
  /*  if (debug)
    {
      ACE_DEBUG ((LM_DEBUG,
                  "(%P|%t) Sticking message into "
                  " output queue \n"));
                  }*/
  return this->putq (mb);
}

/*******************************************************************/

static int
parse_args (int argc, ACE_TCHAR *argv[])
{
  ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("m:s:t:d:"));
  int c;

  while ((c = get_opt ()) != -1)
    {
      switch (c)
        {
        case 'm':
          number_of_messages = ACE_OS::atoi (get_opt.opt_arg ());
          break;
        case 't':
          number_of_workers = ACE_OS::atoi (get_opt.opt_arg ());
          break;
        case 'd':
          debug = ACE_static_cast (DEBUGGING_RANGE, ACE_OS::atoi (get_opt.opt_arg ()));
          break;
        case 's':
          message_size = ACE_OS::atoi (get_opt.opt_arg ());
          break;
        default:
          ACE_ERROR_RETURN ((LM_ERROR,
                             "usage: %s\n"
                             "\t[-m number of messages]\n"
                             "\t[-s message size]\n"
                             "\t[-w number of workers]\n"
                             "\t[-b burst size]\n"
                             "\t[-t timeout between bursts]\n"
                             "\t[-d debug]\n",
                             argv[0]),
                            -1);
        }
    }

  return 0;
}


/*******************************************************************/
int
ACE_TMAIN (int argc, ACE_TCHAR *argv[])
{
  int result = parse_args (argc, argv);
  if (result != 0)
    {
      return result;
    }

  ACE_High_Res_Timer::calibrate ();

  // Create the message queue
  Message_Queue input_message_queue;
  Message_Queue output_message_queue;

  // Create the datablocks. IF we use the default Message Blocks Ctor,
  // it is going to do an extra allocation for the data block
  ACE_NEW_RETURN (data_block,
                  ACE_Locked_Data_Block<ACE_Lock_Adapter<ACE_SYNCH_MUTEX> >,
                  -1);

  // Increment the reference count so that we can share the
  // datablock. This is donw twice the number of messages for the
  // input and output queues.
  size_t i = 0;

  for (i = 0; i < 2*number_of_messages; ++i)
    {
      data_block->duplicate ();
    }

  // Create the Synchronisers
  Synchronisers synch;

  // Workers.
  Worker_Task **workers = 0;
  ACE_NEW_RETURN (workers,
                  Worker_Task *[number_of_workers],
                  -1);

  // Input Task
  Input_Task input_task (&input_message_queue,
                         synch);

  // Output Task
  Output_Task output_task (&output_message_queue,
                           synch);
  int priority =
    ACE_Sched_Params::priority_max (ACE_SCHED_FIFO);


  long flags = THR_SCHED_FIFO | THR_SCOPE_PROCESS;

  // Create and activate the worker threads
  for (i = 0; i < number_of_workers; ++i)
    {
      ACE_NEW_RETURN (workers[i],
                      Worker_Task (&input_message_queue, synch),
                      -1);

      workers[i]->next (&output_task);

      // Activate the workers.
      result = workers[i]->activate (flags,
                                     1,
                                     1,
                                     priority);
      if (result != 0)
        {
          flags = THR_BOUND;
          priority = ACE_Sched_Params::priority_min (ACE_SCHED_OTHER,
                                                     ACE_SCOPE_THREAD);
          result = workers[i]->activate (flags,
                                         1,
                                         1,
                                         priority);
          if (result != 0)
            {
              return result;
            }
        }
    }



  // Activate the input and output threads
  result = input_task.activate (flags,
                                1,
                                1,
                                priority);

  if (result != 0)
    return result;



  // Activate the workers.
  result = output_task.activate (flags,
                                 1,
                                 1,
                                 priority);

  if (result != 0)
    return result;



  // Wait for all threads to terminate.
  result = ACE_Thread_Manager::instance ()->wait ();


  ACE_hrtime_t elapsed_time = 0;

  test_timer.elapsed_time (elapsed_time);

# if !defined (ACE_WIN32)
  double elapsed_time_per_invocation =
    (double) elapsed_time / number_of_messages;

  ACE_DEBUG ((LM_DEBUG,
     "(%P|%t) Throughput is [%f] \n",
     elapsed_time_per_invocation)); 

  ACE_DEBUG ((LM_DEBUG,
              "(%P|%t) Throughput is [%f] \n",
              1000000000/ elapsed_time_per_invocation));

#endif /*ACE_WIN32 */
  for (i = 0; i < number_of_workers; ++i)
    {
      ACE_DEBUG ((LM_DEBUG,
                  "Message process for thread [%d] is [%d] \n",
                  i, workers[i]->processed ()));
      delete workers[i];
    }
  delete[] workers;

  return result;
}

#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)

template class ACE_Locked_Data_Block<ACE_Lock_Adapter<ACE_SYNCH_MUTEX> >;
template class ACE_Lock_Adapter<ACE_SYNCH_MUTEX>;

#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)

#pragma instantiate ACE_Locked_Data_Block<ACE_Lock_Adapter<ACE_SYNCH_MUTEX> >
#pragma instantiate ACE_Lock_Adapter<ACE_SYNCH_MUTEX>

#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */

#else /*ACE_HAS_THREADS*/

int
main (int, char *[])
{
  ACE_DEBUG ((LM_DEBUG,
              "(%P|%t) Not supported in single threaded builds \n"));

  return 0;
}


#endif /*ACE_HAS_THREADS*/

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -