📄 rt_corba_workers.cpp
字号:
// RT_CORBA_Workers.cpp,v 1.10 2005/01/21 02:19:20 ossama Exp#include "RT_CORBA_Workers.h"#if defined (ACE_HAS_THREADS)#include "ace/OS_main.h"#include "ace/ACE.h"#include "ace/Get_Opt.h"#include "ace/High_Res_Timer.h"#include "ace/Sched_Params.h"#include "ace/Lock_Adapter_T.h"// The number of messages that is being processedstatic size_t number_of_messages = 100;// The number of upcall threadsstatic size_t number_of_workers = 2;// The size of the messagestatic size_t message_size = 100;// Number of threads that are ready to gostatic size_t ready_threads = 0;// Number of input and output threadsstatic size_t io_threads = 2; // 1 for output and 1 for input// High resolution test timerstatic ACE_High_Res_Timer test_timer;// Debugging conditionstatic DEBUGGING_RANGE debug = DEBUG_NONE;// Data block used by the message blocksACE_Data_Block *data_block = 0;/*******************************************************************/// Constructor for SynchronisersSynchronisers::Synchronisers (void) : mutex_ (), event_ (){}intSynchronisers::start_synchronization (void){ // Hold the lock and increment the global variable to indicate // number of ready threads { ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); ready_threads ++; if (ready_threads == (number_of_workers + io_threads)) { // Reset the ready_threads so that we can wait at the end of // runs ready_threads = 0; if (debug) { ACE_DEBUG ((LM_DEBUG, "(%P|%t) Ready to signal start \n")); } // Start the timer test_timer.start (); // Signal all the threads this->event_.signal (); // return to do our work; return 0; } // If we are not the last thread, let go off the lock } if (debug) { ACE_DEBUG ((LM_DEBUG, "(%P|%t) Ready to wait () on event.. \n")); } // Wait blisfully till we are woken up this->event_.wait (); return 0;}intSynchronisers::end_synchronization (void){ // Hold the lock and increment the global variable to indicate // number of ready threads { ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); ready_threads ++; if (ready_threads == (number_of_workers + io_threads)) { // Reset the ready_threads so that we can wait at the end of // runs ready_threads = 0; // Start the timer test_timer.stop (); // Signal all the threads this->event_.signal (); if (debug) { ACE_DEBUG ((LM_DEBUG, "(%P|%t) Ended peacefully \n")); } // return to do our work; return 0; } // If we are not the last thread, let go off the lock } if (debug) { ACE_DEBUG ((LM_DEBUG, "(%P|%t) Going to wait .. \n")); } // Wait blisfully till we are woken up this->event_.wait (); return 0;}/*******************************************************************/Worker_Task::Worker_Task (Message_Queue *mq, Synchronisers &synch) : ACE_Task<ACE_MT_SYNCH> (0, mq), synch_ (synch), messages_processed_ (0){}intWorker_Task::svc (void){ // Start synchronization (void) this->synch_.start_synchronization (); for (;;) { ACE_Message_Block *mb = 0; int result = this->getq (mb); if (result == -1) { ACE_ERROR_RETURN ((LM_ERROR, "Worker_Task::svc (%t) -> %p\n", "getq error"), -1); } // Get the flag in the message blok ACE_Message_Block::Message_Flags flag = mb->self_flags (); // The stop flag int stop_flag = 0; // Check for the stop flag if (ACE_BIT_ENABLED (flag, Synchronisers::MB_STOP_FLAG)) { if (debug) { ACE_DEBUG ((LM_DEBUG, "(%P|%t) saw flag after [%d] messages\n", this->messages_processed_)); } stop_flag = 1; } // Release the message block mb->release (); // Counter. ++this->messages_processed_; if (debug) { ACE_DEBUG ((LM_DEBUG, "(%P|%t) dequeued my %d message\n", this->messages_processed_)); } // // Process message here. // for (size_t j = 0; j < message_size; ++j) { // Eat a little CPU /* takes about 40.2 usecs on a 167 MHz Ultra2 */ u_long n = 11UL; ACE::is_prime (n, 2, n / 2); } // Make a message block for writing onto output queue ACE_Message_Block *message_block = 0; ACE_NEW_RETURN (message_block, ACE_Message_Block (data_block), -1); // Put this message block into the next queue or the output // queue result = this->put_next (message_block); if (result == -1) { ACE_ERROR_RETURN ((LM_ERROR, "Input::svc (%t) -> %p\n", "putq error"), -1); } // If the stop_flag is set just break and wait.. if (stop_flag) { if (debug) ACE_DEBUG ((LM_DEBUG, "(%P|%t) Got stop message after [%d] messages \n", this->messages_processed_)); break; } } (void) this->synch_.end_synchronization (); return 0;}intWorker_Task::processed (void){ return this->messages_processed_;}/*******************************************************************/Input_Task::Input_Task (Message_Queue *mq, Synchronisers &synch) : ACE_Task<ACE_MT_SYNCH> (0, mq), synch_ (synch){}intInput_Task::svc (void){ // Synchronise threads (void) this->synch_.start_synchronization (); size_t i = 0; for (i = 0; i < (number_of_messages - number_of_workers); ++i) { // Make a message block ACE_Message_Block *message_block = 0; ACE_NEW_RETURN (message_block, ACE_Message_Block (data_block), -1); int result = this->putq (message_block); if (result == -1) { ACE_ERROR_RETURN ((LM_ERROR, "Input::svc (%t) -> %p\n", "putq error"), -1); } if (debug) { ACE_DEBUG ((LM_DEBUG, "(%t) Input thread -> Sent [%d] messages\n", i)); } } if (debug) { ACE_DEBUG ((LM_DEBUG, "(%t) Sending close messages \n")); } // Stop messages for (i = 0; i < number_of_workers; ++i) { // Make a message block ACE_Message_Block *message_block = 0; ACE_NEW_RETURN (message_block, ACE_Message_Block (data_block), -1); // Set the stop flag in the message block and not in the datablock message_block->set_self_flags (Synchronisers::MB_STOP_FLAG); int result = this->putq (message_block); if (result == -1) { ACE_ERROR_RETURN ((LM_ERROR, "Input::svc (%t) -> %p\n", "putq error"), -1); } } (void) this->synch_.end_synchronization (); return 0;}/*******************************************************************/Output_Task::Output_Task (Message_Queue *mq, Synchronisers &synch) : ACE_Task<ACE_MT_SYNCH> (0, mq), synch_ (synch){}intOutput_Task::svc (void){ // Synchronise threads (void) this->synch_.start_synchronization (); for (size_t i = 0; i < number_of_messages; ++i) { // Get the message block from queue ACE_Message_Block *mb = 0; int result = this->getq (mb); // delete the message block mb->release (); if (debug) { ACE_DEBUG ((LM_DEBUG, "(%t) Output thread -> received [%d] message\n", i)); } if (result == -1) { ACE_ERROR_RETURN ((LM_ERROR, "Input::svc (%t) -> %p\n", "putq error"), -1); } } (void) this->synch_.end_synchronization (); return 0;}intOutput_Task::put (ACE_Message_Block *mb, ACE_Time_Value *){ /* if (debug) { ACE_DEBUG ((LM_DEBUG, "(%P|%t) Sticking message into " " output queue \n")); }*/ return this->putq (mb);}/*******************************************************************/static intparse_args (int argc, ACE_TCHAR *argv[]){ ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("m:s:t:d:")); int c; while ((c = get_opt ()) != -1) { switch (c) { case 'm': number_of_messages = ACE_OS::atoi (get_opt.opt_arg ()); break; case 't': number_of_workers = ACE_OS::atoi (get_opt.opt_arg ()); break; case 'd': debug = static_cast<DEBUGGING_RANGE> (ACE_OS::atoi (get_opt.opt_arg ())); break; case 's': message_size = ACE_OS::atoi (get_opt.opt_arg ()); break; default: ACE_ERROR_RETURN ((LM_ERROR, "usage: %s\n" "\t[-m number of messages]\n" "\t[-s message size]\n" "\t[-w number of workers]\n" "\t[-b burst size]\n" "\t[-t timeout between bursts]\n" "\t[-d debug]\n", argv[0]), -1); } } return 0;}/*******************************************************************/intACE_TMAIN (int argc, ACE_TCHAR *argv[]){ int result = parse_args (argc, argv); if (result != 0) { return result; } ACE_High_Res_Timer::calibrate (); // Create the message queue Message_Queue input_message_queue; Message_Queue output_message_queue; // Create the datablocks. IF we use the default Message Blocks Ctor, // it is going to do an extra allocation for the data block ACE_NEW_RETURN (data_block, ACE_Locked_Data_Block<ACE_Lock_Adapter<ACE_SYNCH_MUTEX> >, -1); // Increment the reference count so that we can share the // datablock. This is donw twice the number of messages for the // input and output queues. size_t i = 0; for (i = 0; i < 2*number_of_messages; ++i) { data_block->duplicate (); } // Create the Synchronisers Synchronisers synch; // Workers. Worker_Task **workers = 0; ACE_NEW_RETURN (workers, Worker_Task *[number_of_workers], -1); // Input Task Input_Task input_task (&input_message_queue, synch); // Output Task Output_Task output_task (&output_message_queue, synch); int priority = ACE_Sched_Params::priority_max (ACE_SCHED_FIFO); long flags = THR_SCHED_FIFO | THR_SCOPE_PROCESS; // Create and activate the worker threads for (i = 0; i < number_of_workers; ++i) { ACE_NEW_RETURN (workers[i], Worker_Task (&input_message_queue, synch), -1); workers[i]->next (&output_task); // Activate the workers. result = workers[i]->activate (flags, 1, 1, priority); if (result != 0) { flags = THR_BOUND; priority = ACE_Sched_Params::priority_min (ACE_SCHED_OTHER, ACE_SCOPE_THREAD); result = workers[i]->activate (flags, 1, 1, priority); if (result != 0) { return result; } } } // Activate the input and output threads result = input_task.activate (flags, 1, 1, priority); if (result != 0) return result; // Activate the workers. result = output_task.activate (flags, 1, 1, priority); if (result != 0) return result; // Wait for all threads to terminate. result = ACE_Thread_Manager::instance ()->wait (); ACE_hrtime_t elapsed_time = 0; test_timer.elapsed_time (elapsed_time);# if !defined (ACE_WIN32) double elapsed_time_per_invocation = (double) elapsed_time / number_of_messages; ACE_DEBUG ((LM_DEBUG, "(%P|%t) Throughput is [%f] \n", elapsed_time_per_invocation)); ACE_DEBUG ((LM_DEBUG, "(%P|%t) Throughput is [%f] \n", 1000000000/ elapsed_time_per_invocation));#endif /*ACE_WIN32 */ for (i = 0; i < number_of_workers; ++i) { ACE_DEBUG ((LM_DEBUG, "Message process for thread [%d] is [%d] \n", i, workers[i]->processed ())); delete workers[i]; } delete[] workers; return result;}#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)template class ACE_Locked_Data_Block<ACE_Lock_Adapter<ACE_SYNCH_MUTEX> >;template class ACE_Lock_Adapter<ACE_SYNCH_MUTEX>;#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)#pragma instantiate ACE_Locked_Data_Block<ACE_Lock_Adapter<ACE_SYNCH_MUTEX> >#pragma instantiate ACE_Lock_Adapter<ACE_SYNCH_MUTEX>#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */#else /*ACE_HAS_THREADS*/intmain (int, char *[]){ ACE_DEBUG ((LM_DEBUG, "(%P|%t) Not supported in single threaded builds \n")); return 0;}#endif /*ACE_HAS_THREADS*/
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -