📄 thread_pool.hpp
字号:
/* * =========================================================================== * PRODUCTION $Log: thread_pool.hpp,v $ * PRODUCTION Revision 1000.1 2004/06/03 19:28:22 gouriano * PRODUCTION PRODUCTION: UPGRADED [GCC34_MSVC7] Dev-tree R1.12 * PRODUCTION * =========================================================================== */#ifndef THREAD_POOL__HPP#define THREAD_POOL__HPP/* $Id: thread_pool.hpp,v 1000.1 2004/06/03 19:28:22 gouriano Exp $* ===========================================================================** PUBLIC DOMAIN NOTICE* National Center for Biotechnology Information** This software/database is a "United States Government Work" under the* terms of the United States Copyright Act. It was written as part of* the author's official duties as a United States Government employee and* thus cannot be copyrighted. This software/database is freely available* to the public for use. The National Library of Medicine and the U.S.* Government have not placed any restriction on its use or reproduction.** Although all reasonable efforts have been taken to ensure the accuracy* and reliability of the software and data, the NLM and the U.S.* Government do not and cannot warrant the performance or results that* may be obtained by using this software or data. The NLM and the U.S.* Government disclaim all warranties, express or implied, including* warranties of performance, merchantability or fitness for any particular* purpose.** Please cite the author in any work or product based on this material.** ===========================================================================** Author: Aaron Ucko** File Description:* Pools of generic request-handling threads.** TEMPLATES:* CBlockingQueue<> -- queue of requests, with efficiently blocking Get()* CThreadInPool<> -- abstract request-handling thread* CPoolOfThreads<> -- abstract pool of threads sharing a request queue** SPECIALIZATIONS:* CStdRequest -- abstract request type* CStdThreadInPool -- thread handling CStdRequest* CStdPoolOfThreads -- pool of threads handling CStdRequest*/#include <corelib/ncbistd.hpp>#include <corelib/ncbithr.hpp>#include <corelib/ncbi_limits.hpp>#include <util/util_exception.hpp>#include <deque>/** @addtogroup ThreadedPools * * @{ */BEGIN_NCBI_SCOPE///////////////////////////////////////////////////////////////////////////////// TEMPLATES://// CBlockingQueue<> -- queue of requests, with efficiently blocking Get()// CThreadInPool<> -- abstract request-handling thread// CPoolOfThreads<> -- abstract pool of threads sharing a request queue//template <typename TRequest>class CBlockingQueue{public: CBlockingQueue(unsigned int max_size = kMax_UInt) : m_GetSem(0,1), m_PutSem(1,1), m_MaxSize(max_size) {} void Put(const TRequest& data); // Throws exception if full void WaitForRoom(unsigned int timeout_sec = kMax_UInt, unsigned int timeout_nsec = kMax_UInt) const; // Blocks politely if empty TRequest Get(unsigned int timeout_sec = kMax_UInt, unsigned int timeout_nsec = kMax_UInt); unsigned int GetSize(void) const; unsigned int GetMaxSize(void) const { return m_MaxSize; } bool IsEmpty(void) const { return GetSize() == 0; } bool IsFull(void) const { return GetSize() == GetMaxSize(); }protected: // Derived classes should take care to use these members properly. volatile deque<TRequest> m_Queue; CSemaphore m_GetSem; // Raised iff the queue contains data mutable CSemaphore m_PutSem; // Raised iff the queue has room mutable CMutex m_Mutex; // Guards access to queueprivate: unsigned int m_MaxSize;};// Forward declarationtemplate <typename TRequest> class CPoolOfThreads;template <typename TRequest>/* abstract */ class CThreadInPool : public CThread{public: typedef CPoolOfThreads<TRequest> TPool; CThreadInPool(TPool* pool) : m_Pool(pool) {}protected: virtual ~CThreadInPool(void) {} virtual void Init(void) {} // called at beginning of Main() // Called from Main() for each request this thread handles virtual void ProcessRequest(const TRequest& req) = 0; virtual void x_OnExit(void) {} // called by OnExit()private: // to prevent overriding; inherited from CThread virtual void* Main(void); virtual void OnExit(void); TPool* m_Pool;};template <typename TRequest>/* abstract */ class CPoolOfThreads{public: friend class CThreadInPool<TRequest>; typedef CThreadInPool<TRequest> TThread; CPoolOfThreads(unsigned int max_threads, unsigned int queue_size, int spawn_threshold = 1) : m_MaxThreads(max_threads), m_Threshold(spawn_threshold), m_Queue(queue_size) { m_ThreadCount.Set(0); m_Delta.Set(0); } virtual ~CPoolOfThreads(void); void Spawn(unsigned int num_threads); void AcceptRequest(const TRequest& req); void WaitForRoom(void) { m_Queue.WaitForRoom(); } bool IsFull(void) const { return m_Queue.IsFull(); }protected: virtual TThread* NewThread(void) = 0; CAtomicCounter m_ThreadCount; volatile unsigned int m_MaxThreads; CAtomicCounter m_Delta; // # unfinished requests - # threads int m_Threshold; // for delta CMutex m_Mutex; // for m_MaxThreads CBlockingQueue<TRequest> m_Queue;};///////////////////////////////////////////////////////////////////////////////// SPECIALIZATIONS://// CStdRequest -- abstract request type// CStdThreadInPool -- thread handling CStdRequest// CStdPoolOfThreads -- pool of threads handling CStdRequest///* abstract */ class CStdRequest : public CObject{public: virtual ~CStdRequest(void) {} // Called by whichever thread handles this request. virtual void Process(void) = 0;};class NCBI_XUTIL_EXPORT CStdThreadInPool : public CThreadInPool< CRef< CStdRequest > >{public: typedef CThreadInPool< CRef< CStdRequest > > TParent; CStdThreadInPool(TPool* pool) : TParent(pool) {}protected: virtual void ProcessRequest(const CRef<CStdRequest>& req) { const_cast<CStdRequest&>(*req).Process(); } // virtual void Init(void); // called before processing any requests // virtual void x_OnExit(void); // called just before exiting};class NCBI_XUTIL_EXPORT CStdPoolOfThreads : public CPoolOfThreads< CRef< CStdRequest > >{public: typedef CPoolOfThreads< CRef< CStdRequest > > TParent; CStdPoolOfThreads(unsigned int max_threads, unsigned int queue_size, int spawn_threshold = 1) : TParent(max_threads, queue_size, spawn_threshold) {} // void Spawn(unsigned int num_threads); // void AcceptRequest(const TRequest& req); // Causes all threads in the pool to exit cleanly after finishing // all pending requests, optionally waiting for them to die. virtual void KillAllThreads(bool wait);protected: virtual TThread* NewThread(void) { return new CStdThreadInPool(this); }};//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// IMPLEMENTATION of INLINE functions//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// CBlockingQueue<>:://template <typename TRequest>void CBlockingQueue<TRequest>::Put(const TRequest& data){ CMutexGuard guard(m_Mutex); // Having the mutex, we can safely drop "volatile" deque<TRequest>& q = const_cast<deque<TRequest>&>(m_Queue); if (q.size() == m_MaxSize) { m_PutSem.TryWait(); NCBI_THROW(CBlockingQueueException, eFull, "CBlockingQueue<>::Put: " "attempt to insert into a full queue"); } else if (q.empty()) { m_GetSem.Post(); } q.push_back(data);}template <typename TRequest>void CBlockingQueue<TRequest>::WaitForRoom(unsigned int timeout_sec, unsigned int timeout_nsec) const{ // Make sure there's room, but don't actually consume anything if (m_PutSem.TryWait(timeout_sec, timeout_nsec)) { m_PutSem.Post(); } else { NCBI_THROW(CBlockingQueueException, eTimedOut, "CBlockingQueue<>::WaitForRoom: timed out"); }}template <typename TRequest>TRequest CBlockingQueue<TRequest>::Get(unsigned int timeout_sec, unsigned int timeout_nsec){ if ( !m_GetSem.TryWait(timeout_sec, timeout_nsec) ) { NCBI_THROW(CBlockingQueueException, eTimedOut, "CBlockingQueue<>::Get: timed out"); } CMutexGuard guard(m_Mutex); // Having the mutex, we can safely drop "volatile" deque<TRequest>& q = const_cast<deque<TRequest>&>(m_Queue); TRequest result = q.front(); q.pop_front(); if ( ! q.empty() ) { m_GetSem.Post(); } // Get the attention of WaitForRoom() or the like; do this // regardless of queue size because derived classes may want // to insert multiple objects atomically. m_PutSem.TryWait(); m_PutSem.Post(); return result;}template <typename TRequest>unsigned int CBlockingQueue<TRequest>::GetSize(void) const{ CMutexGuard guard(m_Mutex); return const_cast<const deque<TRequest>&>(m_Queue).size();}/////////////////////////////////////////////////////////////////////////////// CThreadInPool<>:://template <typename TRequest>void* CThreadInPool<TRequest>::Main(void){ Detach(); Init(); for (;;) { m_Pool->m_Delta.Add(-1); ProcessRequest(m_Pool->m_Queue.Get()); } return 0; // Unreachable, but necessary for WorkShop build}template <typename TRequest>void CThreadInPool<TRequest>::OnExit(void){ try { x_OnExit(); } catch (...) { // Ignore exceptions; there's nothing useful we can do anyway } m_Pool->m_ThreadCount.Add(-1);}/////////////////////////////////////////////////////////////////////////////// CPoolOfThreads<>:://template <typename TRequest>CPoolOfThreads<TRequest>::~CPoolOfThreads(void){ CAtomicCounter::TValue n = m_ThreadCount.Get(); if (n) { ERR_POST(Warning << "CPoolOfThreads<>::~CPoolOfThreads: " << n << " thread(s) still active"); }}template <typename TRequest>void CPoolOfThreads<TRequest>::Spawn(unsigned int num_threads){ for (unsigned int i = 0; i < num_threads; i++) { m_ThreadCount.Add(1); NewThread()->Run(); }}template <typename TRequest>void CPoolOfThreads<TRequest>::AcceptRequest(const TRequest& req){ bool new_thread = false; {{ CMutexGuard guard(m_Mutex); m_Queue.Put(req); if (static_cast<int>(m_Delta.Add(1)) >= m_Threshold && m_ThreadCount.Get() < m_MaxThreads) { // Add another thread to the pool because they're all busy. m_ThreadCount.Add(1); new_thread = true; } }} if (new_thread) { NewThread()->Run(); }}END_NCBI_SCOPE/* @} *//** ===========================================================================** $Log: thread_pool.hpp,v $* Revision 1000.1 2004/06/03 19:28:22 gouriano* PRODUCTION: UPGRADED [GCC34_MSVC7] Dev-tree R1.12** Revision 1.12 2004/06/02 17:49:08 ucko* CPoolOfThreads: change type of m_Delta and m_ThreadCount to* CAtomicCounter to reduce need for m_Mutex; warn if any threads are* still active when the destructor runs.** Revision 1.11 2003/04/17 17:50:37 siyan* Added doxygen support** Revision 1.10 2003/02/26 21:34:06 gouriano* modify C++ exceptions thrown by this library** Revision 1.9 2002/12/19 14:51:00 dicuccio* Added export specifier for Win32 DLL builds.** Revision 1.8 2002/11/04 21:29:00 grichenk* Fixed usage of const CRef<> and CRef<> constructor** Revision 1.7 2002/09/13 15:16:03 ucko* Give CBlockingQueue<>::{WaitForRoom,Get} optional timeouts (infinite* by default); change exceptions to use new setup.** Revision 1.6 2002/04/18 15:38:19 ucko* Use "deque" instead of "queue" -- more general, and less likely to* yield any name conflicts.* Make most of CBlockingQueue<>'s data protected for the benefit of* derived classes.* Move CVS log to end.** Revision 1.5 2002/04/11 15:12:52 ucko* Added GetSize and GetMaxSize methods to CBlockingQueue and rewrote* Is{Empty,Full} in terms of them.** Revision 1.4 2002/01/25 15:46:06 ucko* Add more methods needed by new threaded-server code.* Minor cleanups.** Revision 1.3 2002/01/24 20:17:49 ucko* Introduce new exception class for full queues* Allow waiting for a full queue to have room again** Revision 1.2 2002/01/07 20:15:06 ucko* Fully initialize thread-pool state.** Revision 1.1 2001/12/11 19:54:44 ucko* Introduce thread pools.** ===========================================================================*/#endif /* THREAD_POOL__HPP */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -