📄 futures.cpp
字号:
// Futures.cpp,v 1.7 2004/01/12 23:15:25 shuston Exp
#include "ace/OS_NS_string.h"
#include "ace/OS_NS_time.h"
#include "ace/Task.h"
#include "ace/Unbounded_Queue.h"
#include "ace/Synch.h"
#include "ace/SString.h"
#include "ace/Method_Request.h"
#include "ace/Future.h"
#include "ace/Activation_Queue.h"
#define OUTSTANDING_REQUESTS 20
// Listing 2 code/ch16
class CompletionCallBack: public ACE_Future_Observer<ACE_CString*>
{
public:
virtual void update (const ACE_Future<ACE_CString*> & future)
{
ACE_CString *result;
// Block for the result.
((ACE_Future<ACE_CString*>)future).get (result);
ACE_DEBUG ((LM_INFO, ACE_TEXT("%C\n"), result->c_str ()));
delete result;
}
};
// Listing 2
// Listing 1 code/ch16
class LongWork : public ACE_Method_Request
{
public:
virtual int call (void)
{
ACE_TRACE (ACE_TEXT ("LongWork::call"));
ACE_DEBUG
((LM_INFO, ACE_TEXT ("(%t) Attempting long work task\n")));
ACE_OS::sleep (1);
char buf[1024];
ACE_OS::strcpy (buf, ACE_TEXT ("Completed assigned task\n"));
ACE_CString *msg;
ACE_NEW_RETURN
(msg, ACE_CString (buf, ACE_OS::strlen (buf) + 1), -1);
result_.set (msg);
return 0;
}
ACE_Future<ACE_CString*> &future (void)
{
ACE_TRACE (ACE_TEXT ("LongWork::future"));
return result_;
}
void attach (CompletionCallBack *cb)
{
result_.attach (cb);
}
private:
ACE_Future<ACE_CString*> result_;
};
// Listing 1
class Exit : public ACE_Method_Request
{
public:
virtual int call (void)
{
ACE_TRACE (ACE_TEXT ("Exit::call"));
return -1;
}
};
class Worker;
class IManager
{
public:
virtual int return_to_work (Worker *worker) = 0;
};
// Listing 3 code/ch16
class Worker: public ACE_Task<ACE_MT_SYNCH>
{
public:
Worker (IManager *manager)
: manager_(manager), queue_ (msg_queue ())
{ }
int perform (ACE_Method_Request *req)
{
ACE_TRACE (ACE_TEXT ("Worker::perform"));
return this->queue_.enqueue (req);
}
virtual int svc (void)
{
thread_id_ = ACE_Thread::self ();
while (1)
{
ACE_Method_Request *request = this->queue_.dequeue();
if (request == 0)
return -1;
// Invoke the request
int result = request->call ();
if (result == -1)
break;
// Return to work.
this->manager_->return_to_work (this);
}
return 0;
}
ACE_thread_t thread_id (void);
private:
IManager *manager_;
ACE_thread_t thread_id_;
ACE_Activation_Queue queue_;
};
// Listing 3
ACE_thread_t Worker::thread_id (void)
{
return thread_id_;
}
// Listing 4 code/ch16
class Manager : public ACE_Task_Base, private IManager
{
public:
enum {POOL_SIZE = 5, MAX_TIMEOUT = 5};
Manager ()
: shutdown_(0), workers_lock_(), workers_cond_(workers_lock_)
{
ACE_TRACE (ACE_TEXT ("Manager::TP"));
}
int perform (ACE_Method_Request *req)
{
ACE_TRACE (ACE_TEXT ("Manager::perform"));
return this->queue_.enqueue (req);
}
int svc (void)
{
ACE_TRACE (ACE_TEXT ("Manager::svc"));
ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Manager started\n")));
// Create pool when you get in the first time.
create_worker_pool ();
while (!done ())
{
ACE_Time_Value tv ((long)MAX_TIMEOUT);
tv += ACE_OS::time (0);
// Get the next message
ACE_Method_Request *request = this->queue_.dequeue (&tv);
if (request == 0)
{
shut_down ();
break;
}
// Choose a worker.
Worker *worker = choose_worker ();
// Ask the worker to do the job.
worker->perform (request);
}
return 0;
}
int shut_down (void);
virtual int return_to_work (Worker *worker)
{
ACE_GUARD_RETURN
(ACE_Thread_Mutex, worker_mon, this->workers_lock_, -1);
ACE_DEBUG
((LM_DEBUG, ACE_TEXT ("(%t) Worker returning to work.\n")));
this->workers_.enqueue_tail (worker);
this->workers_cond_.signal ();
return 0;
}
private:
Worker *choose_worker (void)
{
ACE_GUARD_RETURN
(ACE_Thread_Mutex, worker_mon, this->workers_lock_, 0)
while (this->workers_.is_empty ())
workers_cond_.wait ();
Worker *worker;
this->workers_.dequeue_head (worker);
return worker;
}
int create_worker_pool (void)
{
ACE_GUARD_RETURN
(ACE_Thread_Mutex, worker_mon, this->workers_lock_, -1);
for (int i = 0; i < POOL_SIZE; i++)
{
Worker *worker;
ACE_NEW_RETURN (worker, Worker (this), -1);
this->workers_.enqueue_tail (worker);
worker->activate ();
}
return 0;
}
int done (void)
{
return (shutdown_ == 1);
}
ACE_thread_t thread_id (Worker *worker)
{
return worker->thread_id ();
}
private:
int shutdown_;
ACE_Thread_Mutex workers_lock_;
ACE_Condition<ACE_Thread_Mutex> workers_cond_;
ACE_Unbounded_Queue<Worker* > workers_;
ACE_Activation_Queue queue_;
};
// Listing 4
int
Manager::shut_down (void)
{
ACE_TRACE (ACE_TEXT ("Manager::shut_down"));
ACE_Unbounded_Queue<Worker* >::ITERATOR iter = this->workers_.begin ();
Worker **worker_ptr = NULL;
do
{
iter.next (worker_ptr);
Worker *worker = (*worker_ptr);
ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Attempting shutdown of %d\n"),
thread_id (worker)));
Exit *req;
ACE_NEW_RETURN (req, Exit(), -1);
// Send the hangup message
worker->perform (req);
// Wait for the exit.
worker->wait ();
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%t) Worker %d shut down.\n"),
thread_id (worker)));
delete req;
delete worker;
}
while (iter.advance ());
shutdown_ = 1;
return 0;
}
// Listing 5 code/ch16
int ACE_TMAIN (int, ACE_TCHAR *[])
{
Manager tp;
tp.activate ();
ACE_Time_Value tv;
tv.msec (100);
// Wait for a few seconds every time you send a message.
CompletionCallBack cb;
LongWork workArray[OUTSTANDING_REQUESTS];
for (int i = 0; i < OUTSTANDING_REQUESTS; i++)
{
workArray[i].attach (&cb);
ACE_OS::sleep (tv);
tp.perform (&workArray[i]);
}
ACE_Thread_Manager::instance ()->wait ();
return 0;
}
// Listing 5
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Condition<ACE_Thread_Mutex>;
template class ACE_Future<ACE_String_Base<char>*>;
template class ACE_Future_Observer<ACE_String_Base<char>*>;
template class ACE_Future_Rep<ACE_String_Base<char>*>;
template class ACE_Node<Worker*>;
template class ACE_Node<ACE_Future_Observer<ACE_String_Base<char>*>*>;
template class ACE_Unbounded_Queue<Worker*>;
template class ACE_Unbounded_Queue_Iterator<Worker*>;
template class ACE_Unbounded_Set<ACE_Future_Observer<ACE_String_Base<char>*>*>;
template class ACE_Unbounded_Set_Iterator<ACE_Future_Observer<ACE_String_Base<char>*>*>;
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate ACE_Condition<ACE_Thread_Mutex>
#pragma instantiate ACE_Future<ACE_String_Base<char>*>
#pragma instantiate ACE_Future_Observer<ACE_String_Base<char>*>
#pragma instantiate ACE_Future_Rep<ACE_String_Base<char>*>
#pragma instantiate ACE_Node<Worker*>
#pragma instantiate ACE_Node<ACE_Future_Observer<ACE_String_Base<char>*>*>
#pragma instantiate ACE_Unbounded_Queue<Worker*>
#pragma instantiate ACE_Unbounded_Queue_Iterator<Worker*>
#pragma instantiate ACE_Unbounded_Set<ACE_Future_Observer<ACE_String_Base<char>*>*>
#pragma instantiate ACE_Unbounded_Set_Iterator<ACE_Future_Observer<ACE_String_Base<char>*>*>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -