📄 rt_corba_leader_follower.cpp
字号:
// RT_CORBA_Leader_Follower.cpp,v 1.9 2005/01/21 02:19:20 ossama Exp#include "RT_CORBA_Leader_Follower.h"#if defined (ACE_HAS_THREADS)// We need the following only if we have threads enabled..#include "ace/OS_main.h"#include "ace/ACE.h"#include "ace/Get_Opt.h"#include "ace/High_Res_Timer.h"#include "ace/Sched_Params.h"#include "ace/Profile_Timer.h"// Number of messages that are used in this experimentstatic size_t number_of_messages = 1000;// Number of messages that are used in this experimentstatic size_t number_of_messages_left = 0;// Number of threads used in this experimentstatic size_t number_of_threads = 2;// Global variable for the availability of the leaderstatic size_t leader_available = 0;// Number of threads that are ready to go/dispatchstatic size_t ready_threads = 0;// Work in the upcallstatic size_t message_size = 100;// Debugging conditionstatic DEBUGGING_RANGE debug = DEBUG_NONE;// Timer for the testACE_High_Res_Timer test_timer;/*******************************************************************/// Constructor for SynchronisersSynchronisers::Synchronisers (void) : mutex_ (), condition_ (mutex_), event_ (){}intSynchronisers::start_synchronization (void){ // Hold the lock and increment the global variable to indicate // number of ready threads { ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); ready_threads ++; if (debug) { ACE_DEBUG ((LM_DEBUG, "(%P|%t) Ready to go.. \n")); } if (ready_threads == number_of_threads) { // Reset the ready_threads so that we can wait at the end of // runs ready_threads = 0; // Start the timer test_timer.start (); // Signal all the threads this->event_.signal (); // return to do our work; return 0; } // If we are not the last thread, let go off the lock } // Wait blisfully till we are woken up this->event_.wait (); return 0;}intSynchronisers::end_synchronization (void){ // Hold the lock and increment the global variable to indicate // number of ready threads { ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); ready_threads ++; if (debug) { ACE_DEBUG ((LM_DEBUG, "(%P|%t) Ready to go.. \n")); } if (ready_threads == number_of_threads) { // Reset the ready_threads so that we can wait at the end of // runs ready_threads = 0; // Start the timer test_timer.stop (); // Signal all the threads this->event_.signal (); // return to do our work; return 0; } // If we are not the last thread, let go off the lock } // Wait blisfully till we are woken up this->event_.wait (); return 0;}/*******************************************************************/Leader_Follower_Task::Leader_Follower_Task (Synchronisers &synch) : messages_consumed_ (0), synch_ (synch){}intLeader_Follower_Task::processed (void){ return this->messages_consumed_;}intLeader_Follower_Task::svc (void){ (void) this->synch_.start_synchronization (); for (;;) { { ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->synch_.mutex_, -1); // Wait until there is no leader. while (leader_available) { int result = this->synch_.condition_.wait (); if (result == -1) { ACE_ERROR_RETURN ((LM_ERROR, "Leader_Follower_Task::svc (%t) -> %p\n", "wait error"), -1); } } // I am the leader. leader_available = 1; // // We are letting go of the leader follower lock before going // in the event loop. // } // // It is ok to modify these shared variables without a lock // since we are the only leader. // int exit_loop = 0; if (number_of_messages_left == 0) { exit_loop = 1; } else { --number_of_messages_left; // Local counter. ++this->messages_consumed_; if (debug) { ACE_DEBUG ((LM_DEBUG, "(%t) message for this thread %d\n", this->messages_consumed_)); } } { ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->synch_.mutex_, -1); // I am no longer the leader. leader_available = 0; // Wake up a follower. this->synch_.condition_.signal (); } if (exit_loop) { break; } else { // // Process message here. // for (size_t j = 0; j < message_size; ++j) { // Eat a little CPU /* takes about 40.2 usecs on a 167 MHz Ultra2 */ u_long n = 11UL; ACE::is_prime (n, 2, n / 2); } } } (void) this->synch_.end_synchronization (); return 0;}static intparse_args (int argc, ACE_TCHAR *argv[]){ ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("m:t:d:s:")); int c; while ((c = get_opt ()) != -1) { switch (c) { case 'm': number_of_messages = ACE_OS::atoi (get_opt.opt_arg ()); break; case 't': number_of_threads = ACE_OS::atoi (get_opt.opt_arg ()); break; case 'd': debug = static_cast<DEBUGGING_RANGE> (ACE_OS::atoi (get_opt.opt_arg ())); break; case 's': message_size = ACE_OS::atoi (get_opt.opt_arg ()); break; default: ACE_ERROR_RETURN ((LM_ERROR, "usage: %s\n" "\t[-m number of messages]\n" "\t[-s message size]\n" "\t[-w number of threads]\n" "\t[-b burst size]\n" "\t[-t timeout between bursts]\n" "\t[-d debug]\n", argv[0]), -1); } } return 0;}/*******************************************************************/// Entry pointintACE_TMAIN (int argc, ACE_TCHAR *argv[]){ int result = parse_args (argc, argv); if (result != 0) { return result; } Synchronisers synch_forms; ACE_High_Res_Timer::calibrate (); // Leader Followers. Leader_Follower_Task **leader_followers = 0; ACE_NEW_RETURN (leader_followers, Leader_Follower_Task *[number_of_threads], -1); int priority = ACE_Sched_Params::priority_max (ACE_SCHED_FIFO); long flags = THR_SCOPE_PROCESS; // Number of messages left = Number_Of_messages number_of_messages_left = number_of_messages; size_t i = 0; // Create and activate them. for (i = 0; i < number_of_threads; ++i) { ACE_NEW_RETURN (leader_followers[i], Leader_Follower_Task (synch_forms), -1); // Activate the leader_followers. result = leader_followers[i]->activate (flags, 1, 1, priority); if (result != 0) { ACE_DEBUG ((LM_DEBUG, "(%P|%t) - Activate failed for RT class " " - Using default priority for thread [%d]\n", i)); flags = THR_BOUND; priority = ACE_Sched_Params::priority_min (ACE_SCHED_OTHER, ACE_SCOPE_THREAD); // Activate the leader_followers. result = leader_followers[i]->activate (flags, 1, 1, priority); if (result != 0) { ACE_DEBUG ((LM_DEBUG, "(%P|%t) - Failed again no hope \n")); return 0; } } } // Wait for all threads to terminate. result = ACE_Thread_Manager::instance ()->wait (); ACE_hrtime_t elapsed_time = 0; test_timer.elapsed_time (elapsed_time); double elapsed_time_per_invocation = static_cast<double> ( ACE_UINT64_DBLCAST_ADAPTER (elapsed_time / number_of_messages) ); ACE_DEBUG ((LM_DEBUG, "(%P|%t) Throughput is [%f] \n", 1000000000/ elapsed_time_per_invocation)); for (i = 0; i < number_of_threads; ++i) { ACE_DEBUG ((LM_DEBUG, "Message consumed in thread [%d] is [%d] \n", i, leader_followers[i]->processed ())); delete leader_followers[i]; } delete[] leader_followers; return result;}#else /*if defined (ACE_HAS_THREADS)*/intmain (int , char *[]){ ACE_DEBUG ((LM_DEBUG, "(%p|%t) Cannot run in SIngle threaded mode \n")); return 0;}#endif /*ACE_HAS_THREADS*/
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -