⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 futures.cpp

📁 ACE自适配通信环境(ADAPTIVE Communication Environment)是可以自由使用、开放源码的面向对象(OO)框架(Framework)
💻 CPP
字号:
// Futures.cpp,v 1.11 2005/04/23 05:52:26 ossama Exp#include "ace/config-lite.h"#if defined (ACE_HAS_THREADS)#include "ace/OS_NS_string.h"#include "ace/OS_NS_time.h"#include "ace/Task.h"#include "ace/Unbounded_Queue.h"#include "ace/Synch.h"#include "ace/SString.h"#include "ace/Method_Request.h"#include "ace/Future.h"#include "ace/Activation_Queue.h"#define OUTSTANDING_REQUESTS 20// Listing 2 code/ch16class CompletionCallBack: public ACE_Future_Observer<ACE_CString*>{public:  virtual void update (const ACE_Future<ACE_CString*> & future)  {    ACE_CString *result = 0;    // Block for the result.    future.get (result);    ACE_DEBUG ((LM_INFO, ACE_TEXT("%C\n"), result->c_str ()));    delete result;  }};// Listing 2// Listing 1 code/ch16class LongWork : public ACE_Method_Request{public:  virtual int call (void)  {    ACE_TRACE (ACE_TEXT ("LongWork::call"));    ACE_DEBUG      ((LM_INFO, ACE_TEXT ("(%t) Attempting long work task\n")));    ACE_OS::sleep (1);    char buf[1024];    ACE_OS::strcpy (buf, "Completed assigned task\n");    ACE_CString *msg;    ACE_NEW_RETURN      (msg, ACE_CString (buf, ACE_OS::strlen (buf) + 1), -1);    result_.set (msg);    return 0;  }  ACE_Future<ACE_CString*> &future (void)  {    ACE_TRACE (ACE_TEXT ("LongWork::future"));    return result_;  }  void attach (CompletionCallBack *cb)  {    result_.attach (cb);  }private:  ACE_Future<ACE_CString*> result_;};// Listing 1class Exit : public ACE_Method_Request{public:  virtual int call (void)  {    ACE_TRACE (ACE_TEXT ("Exit::call"));    return -1;  }};class Worker;class IManager{public:  virtual ~IManager (void) { }  virtual int return_to_work (Worker *worker) = 0;};// Listing 3 code/ch16class Worker: public ACE_Task<ACE_MT_SYNCH>{public:  Worker (IManager *manager)    : manager_(manager), queue_ (msg_queue ())  { }  int perform (ACE_Method_Request *req)  {    ACE_TRACE (ACE_TEXT ("Worker::perform"));    return this->queue_.enqueue (req);  }  virtual int svc (void)  {    thread_id_ = ACE_Thread::self ();    while (1)      {        ACE_Method_Request *request = this->queue_.dequeue();        if (request == 0)          return -1;        // Invoke the request        int result = request->call ();        if (result == -1)          break;        // Return to work.        this->manager_->return_to_work (this);      }    return 0;  }  ACE_thread_t thread_id (void);private:  IManager *manager_;  ACE_thread_t thread_id_;  ACE_Activation_Queue queue_;};// Listing 3ACE_thread_t Worker::thread_id (void){  return thread_id_;}// Listing 4 code/ch16class Manager : public ACE_Task_Base, private IManager{public:  enum {POOL_SIZE = 5, MAX_TIMEOUT = 5};  Manager ()    : shutdown_(0), workers_lock_(), workers_cond_(workers_lock_)  {    ACE_TRACE (ACE_TEXT ("Manager::TP"));  }  int perform (ACE_Method_Request *req)  {    ACE_TRACE (ACE_TEXT ("Manager::perform"));    return this->queue_.enqueue (req);  }  int svc (void)  {    ACE_TRACE (ACE_TEXT ("Manager::svc"));    ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Manager started\n")));    // Create pool when you get in the first time.    create_worker_pool ();    while (!done ())      {        ACE_Time_Value tv ((long)MAX_TIMEOUT);        tv += ACE_OS::time (0);        // Get the next message        ACE_Method_Request *request = this->queue_.dequeue (&tv);        if (request == 0)          {            shut_down ();            break;          }        // Choose a worker.        Worker *worker = choose_worker ();        // Ask the worker to do the job.        worker->perform (request);      }    return 0;  }  int shut_down (void);  virtual int return_to_work (Worker *worker)  {    ACE_GUARD_RETURN      (ACE_Thread_Mutex, worker_mon, this->workers_lock_, -1);    ACE_DEBUG      ((LM_DEBUG, ACE_TEXT ("(%t) Worker returning to work.\n")));    this->workers_.enqueue_tail (worker);    this->workers_cond_.signal ();    return 0;  }private:  Worker *choose_worker (void)  {    ACE_GUARD_RETURN      (ACE_Thread_Mutex, worker_mon, this->workers_lock_, 0)      while (this->workers_.is_empty ())        workers_cond_.wait ();    Worker *worker = 0;    this->workers_.dequeue_head (worker);    return worker;  }  int create_worker_pool (void)  {    ACE_GUARD_RETURN      (ACE_Thread_Mutex, worker_mon, this->workers_lock_, -1);    for (int i = 0; i < POOL_SIZE; i++)      {        Worker *worker;        ACE_NEW_RETURN (worker, Worker (this), -1);        this->workers_.enqueue_tail (worker);        worker->activate ();      }    return 0;  }  int done (void)  {    return (shutdown_ == 1);  }  ACE_thread_t thread_id (Worker *worker)  {    return worker->thread_id ();  }private:  int shutdown_;  ACE_Thread_Mutex workers_lock_;  ACE_Condition<ACE_Thread_Mutex> workers_cond_;  ACE_Unbounded_Queue<Worker* > workers_;  ACE_Activation_Queue queue_;};// Listing 4intManager::shut_down (void){  ACE_TRACE (ACE_TEXT ("Manager::shut_down"));  ACE_Unbounded_Queue<Worker* >::ITERATOR iter = this->workers_.begin ();  Worker **worker_ptr = 0;  do    {      iter.next (worker_ptr);      Worker *worker = (*worker_ptr);      ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Attempting shutdown of %d\n"),                  thread_id (worker)));      Exit *req;      ACE_NEW_RETURN (req, Exit(), -1);      // Send the hangup message      worker->perform (req);      // Wait for the exit.      worker->wait ();      ACE_DEBUG ((LM_DEBUG,                  ACE_TEXT ("(%t) Worker %d shut down.\n"),                  thread_id (worker)));      delete req;      delete worker;    }  while (iter.advance ());  shutdown_ = 1;  return 0;}// Listing 5 code/ch16int ACE_TMAIN (int, ACE_TCHAR *[]){  Manager tp;  tp.activate ();  ACE_Time_Value tv;  tv.msec (100);  // Wait for a few seconds every time you send a message.  CompletionCallBack cb;  LongWork workArray[OUTSTANDING_REQUESTS];  for (int i = 0; i < OUTSTANDING_REQUESTS; i++)    {      workArray[i].attach (&cb);      ACE_OS::sleep (tv);      tp.perform (&workArray[i]);    }  ACE_Thread_Manager::instance ()->wait ();  return 0;}// Listing 5#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)template class ACE_Condition<ACE_Thread_Mutex>;template class ACE_Future<ACE_String_Base<char>*>;template class ACE_Future_Observer<ACE_String_Base<char>*>;template class ACE_Future_Rep<ACE_String_Base<char>*>;template class ACE_Node<Worker*>;template class ACE_Node<ACE_Future_Observer<ACE_String_Base<char>*>*>;template class ACE_Unbounded_Queue<Worker*>;template class ACE_Unbounded_Queue_Iterator<Worker*>;template class ACE_Unbounded_Set<ACE_Future_Observer<ACE_String_Base<char>*>*>;template class ACE_Unbounded_Set_Iterator<ACE_Future_Observer<ACE_String_Base<char>*>*>;#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)#pragma instantiate ACE_Condition<ACE_Thread_Mutex>#pragma instantiate ACE_Future<ACE_String_Base<char>*>#pragma instantiate ACE_Future_Observer<ACE_String_Base<char>*>#pragma instantiate ACE_Future_Rep<ACE_String_Base<char>*>#pragma instantiate ACE_Node<Worker*>#pragma instantiate ACE_Node<ACE_Future_Observer<ACE_String_Base<char>*>*>#pragma instantiate ACE_Unbounded_Queue<Worker*>#pragma instantiate ACE_Unbounded_Queue_Iterator<Worker*>#pragma instantiate ACE_Unbounded_Set<ACE_Future_Observer<ACE_String_Base<char>*>*>#pragma instantiate ACE_Unbounded_Set_Iterator<ACE_Future_Observer<ACE_String_Base<char>*>*>#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */#else#include "ace/OS_main.h"#include "ace/OS_NS_stdio.h"int ACE_TMAIN (int, ACE_TCHAR *[]){  ACE_OS::puts (ACE_TEXT ("This example requires threads."));  return 0;}#endif /* ACE_HAS_THREADS */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -