⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 threadpool.cpp

📁 ACE自适配通信环境(ADAPTIVE Communication Environment)是可以自由使用、开放源码的面向对象(OO)框架(Framework)
💻 CPP
字号:
// ThreadPool.cpp,v 1.11 2005/04/23 05:52:26 ossama Exp#include "ace/config-lite.h"#if defined (ACE_HAS_THREADS)#include "ace/OS_NS_string.h"#include "ace/OS_NS_time.h"#include "ace/Task.h"#include "ace/Containers.h"#include "ace/Synch.h"#include "ace/SString.h"#include "ace/Method_Request.h"#include "ace/Future.h"#include "ace/Activation_Queue.h"class Worker;class IManager{public:  virtual ~IManager (void) { }  virtual int return_to_work (Worker *worker) = 0;};// Listing 2 code/ch16class Worker : public ACE_Task<ACE_MT_SYNCH>{public:  Worker (IManager *manager) : manager_(manager) { }  virtual int svc (void)  {    thread_id_ = ACE_Thread::self ();    while (1)      {        ACE_Message_Block *mb = 0;        if (this->getq (mb) == -1)          ACE_ERROR_BREAK            ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("getq")));        if (mb->msg_type () == ACE_Message_Block::MB_HANGUP)          {            ACE_DEBUG ((LM_INFO,                        ACE_TEXT ("(%t) Shutting down\n")));            mb->release ();            break;          }        // Process the message.        process_message (mb);        // Return to work.        this->manager_->return_to_work (this);      }    return 0;  }  // Listing 2  ACE_thread_t thread_id (void)  {    return thread_id_;  }private:  void process_message (ACE_Message_Block *mb)  {    ACE_TRACE (ACE_TEXT ("Worker::process_message"));    int msgId;    ACE_OS::memcpy (&msgId, mb->rd_ptr (), sizeof(int));    mb->release ();    ACE_DEBUG ((LM_DEBUG,                ACE_TEXT ("(%t) Started processing message %d\n"),                msgId));    ACE_OS::sleep (3);    ACE_DEBUG ((LM_DEBUG,                ACE_TEXT ("(%t) Finished processing message %d\n"),                msgId));  }  IManager *manager_;  ACE_thread_t thread_id_;};// Listing 1 code/ch16class Manager: public ACE_Task<ACE_MT_SYNCH>, private IManager{public:  enum {POOL_SIZE = 5, MAX_TIMEOUT = 5};  Manager ()    : shutdown_(0), workers_lock_(), workers_cond_(workers_lock_)  {    ACE_TRACE (ACE_TEXT ("Manager::Manager"));  }  int svc (void)  {    ACE_TRACE (ACE_TEXT ("Manager::svc"));    ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Manager started\n")));    // Create pool.    create_worker_pool ();    while (!done ())      {        ACE_Message_Block *mb = 0;        ACE_Time_Value tv ((long)MAX_TIMEOUT);        tv += ACE_OS::time (0);        // Get a message request.        if (this->getq (mb, &tv) < 0)          {            shut_down ();            break;          }        // Choose a worker.        Worker *worker = 0;        {          ACE_GUARD_RETURN (ACE_Thread_Mutex,                            worker_mon, this->workers_lock_, -1);          while (this->workers_.is_empty ())            workers_cond_.wait ();          this->workers_.dequeue_head (worker);        }        // Ask the worker to do the job.        worker->putq (mb);      }    return 0;  }  int shut_down (void);  ACE_thread_t thread_id (Worker *worker);  virtual int return_to_work (Worker *worker)  {    ACE_GUARD_RETURN (ACE_Thread_Mutex,                      worker_mon, this->workers_lock_, -1);    ACE_DEBUG ((LM_DEBUG,                ACE_TEXT ("(%t) Worker %d returning to work.\n"),                worker->thr_mgr ()->thr_self ()));    this->workers_.enqueue_tail (worker);    this->workers_cond_.signal ();    return 0;  }private:  int create_worker_pool (void)  {    ACE_GUARD_RETURN (ACE_Thread_Mutex,                      worker_mon,                      this->workers_lock_,                      -1);    for (int i = 0; i < POOL_SIZE; i++)      {        Worker *worker;        ACE_NEW_RETURN (worker, Worker (this), -1);        this->workers_.enqueue_tail (worker);        worker->activate ();      }    return 0;  }  int done (void);private:  int shutdown_;  ACE_Thread_Mutex workers_lock_;  ACE_Condition<ACE_Thread_Mutex> workers_cond_;  ACE_Unbounded_Queue<Worker* > workers_;};// Listing 1int Manager::done (void){  return (shutdown_ == 1);}intManager::shut_down (void){  ACE_TRACE (ACE_TEXT ("Manager::shut_down"));  ACE_Unbounded_Queue<Worker* >::ITERATOR iter =    this->workers_.begin ();  Worker **worker_ptr = 0;  do    {      iter.next (worker_ptr);      Worker *worker = (*worker_ptr);      ACE_DEBUG ((LM_DEBUG,                 ACE_TEXT ("(%t) Attempting shutdown of %d\n"),                 thread_id (worker)));      // Send the hangup message.      ACE_Message_Block *mb;      ACE_NEW_RETURN        (mb,         ACE_Message_Block(0,                           ACE_Message_Block::MB_HANGUP),         -1);      worker->putq (mb);      // Wait for the exit.      worker->wait ();      ACE_ASSERT (worker->msg_queue ()->is_empty ());      ACE_DEBUG ((LM_DEBUG,                  ACE_TEXT ("(%t) Worker %d shut down.\n)"),                  thread_id (worker)));      delete worker;    }  while (iter.advance ());  shutdown_ = 1;  return 0;}ACE_thread_tManager::thread_id (Worker *worker){  return worker->thread_id ();}int ACE_TMAIN (int, ACE_TCHAR *[]){  Manager tp;  tp.activate ();  // Wait for a moment every time you send a message.  ACE_Time_Value tv;  tv.msec (100);  ACE_Message_Block *mb;  for (int i = 0; i < 30; i++)    {      ACE_NEW_RETURN        (mb, ACE_Message_Block(sizeof(int)), -1);      ACE_OS::memcpy (mb->wr_ptr (), &i, sizeof(int));      ACE_OS::sleep (tv);      // Add a new work item.      tp.putq (mb);    }  ACE_Thread_Manager::instance ()->wait ();  return 0;}#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)template class ACE_Condition<ACE_Thread_Mutex>;template class ACE_Node<Worker*>;template class ACE_Unbounded_Queue<Worker*>;template class ACE_Unbounded_Queue_Iterator<Worker*>;#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)#pragma instantiate ACE_Condition<ACE_Thread_Mutex>#pragma instantiate ACE_Node<Worker*>#pragma instantiate ACE_Unbounded_Queue<Worker*>#pragma instantiate ACE_Unbounded_Queue_Iterator<Worker*>#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */#else#include "ace/OS_main.h"#include "ace/OS_NS_stdio.h"int ACE_TMAIN (int, ACE_TCHAR *[]){  ACE_OS::puts (ACE_TEXT ("This example requires threads."));  return 0;}#endif /* ACE_HAS_THREADS */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -