⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pool_core.hpp

📁 linux 环境下比较好的一个线程池实现源代码
💻 HPP
字号:
/*! \file* \brief Thread pool core.** This file contains the threadpool's core class: pool<Task, SchedulingPolicy>.** Thread pools are a mechanism for asynchronous and parallel processing * within the same process. The pool class provides a convenient way * for dispatching asynchronous tasks as functions objects. The scheduling* of these tasks can be easily controlled by using customized schedulers. ** Copyright (c) 2005-2007 Philipp Henkel** Use, modification, and distribution are  subject to the* Boost Software License, Version 1.0. (See accompanying  file* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)** http://threadpool.sourceforge.net**/#ifndef THREADPOOL_POOL_CORE_HPP_INCLUDED#define THREADPOOL_POOL_CORE_HPP_INCLUDED#include "locking_ptr.hpp"#include "worker_thread.hpp"#include "../task_adaptors.hpp"#include <boost/thread.hpp>#include <boost/thread/exceptions.hpp>#include <boost/thread/mutex.hpp>#include <boost/thread/condition.hpp>#include <boost/smart_ptr.hpp>#include <boost/bind.hpp>#include <boost/static_assert.hpp>#include <boost/type_traits.hpp>#include <vector>/// The namespace threadpool contains a thread pool and related utility classes.namespace boost { namespace threadpool { namespace detail {  /*! \brief Thread pool.   *  * Thread pools are a mechanism for asynchronous and parallel processing   * within the same process. The pool class provides a convenient way   * for dispatching asynchronous tasks as functions objects. The scheduling  * of these tasks can be easily controlled by using customized schedulers.   * A task must not throw an exception.  *  * A pool_impl is DefaultConstructible and NonCopyable.  *  * \param Task A function object which implements the operator 'void operator() (void) const'. The operator () is called by the pool to execute the task. Exceptions are ignored.  * \param Scheduler A task container which determines how tasks are scheduled. It is guaranteed that this container is accessed only by one thread at a time. The scheduler shall not throw exceptions.  *  * \remarks The pool class is thread-safe.  *   * \see Tasks: task_func, prio_task_func  * \see Scheduling policies: fifo_scheduler, lifo_scheduler, prio_scheduler  */   template <    typename Task,     template <typename> class SchedulingPolicy,    template <typename> class SizePolicy,    template <typename> class SizePolicyController,    template <typename> class ShutdownPolicy  >   class pool_core  : public enable_shared_from_this< pool_core<Task, SchedulingPolicy, SizePolicy, SizePolicyController, ShutdownPolicy > >   , private noncopyable  {  public: // Type definitions    typedef Task task_type;                                 //!< Indicates the task's type.    typedef SchedulingPolicy<task_type> scheduler_type;     //!< Indicates the scheduler's type.    typedef pool_core<Task,                       SchedulingPolicy,                       SizePolicy,                      SizePolicyController,                      ShutdownPolicy > pool_type;           //!< Indicates the thread pool's type.    typedef SizePolicy<pool_type> size_policy_type;         //!< Indicates the sizer's type.    //typedef typename size_policy_type::size_controller size_controller_type;    typedef SizePolicyController<pool_type> size_controller_type;//    typedef SizePolicy<pool_type>::size_controller size_controller_type;    typedef ShutdownPolicy<pool_type> shutdown_policy_type;//!< Indicates the shutdown policy's type.      typedef worker_thread<pool_type> worker_type;    // The task is required to be a nullary function.    BOOST_STATIC_ASSERT(function_traits<task_type()>::arity == 0);    // The task function's result type is required to be void.    BOOST_STATIC_ASSERT(is_void<typename result_of<task_type()>::type >::value);  private:  // Friends     friend class worker_thread<pool_type>;#if defined(__SUNPRO_CC) && (__SUNPRO_CC <= 0x580)  // Tested with CC: Sun C++ 5.8 Patch 121018-08 2006/12/06   friend class SizePolicy;   friend class ShutdownPolicy;#else   friend class SizePolicy<pool_type>;   friend class ShutdownPolicy<pool_type>;#endif  private: // The following members may be accessed by _multiple_ threads at the same time:    volatile size_t m_worker_count;	    volatile size_t m_target_worker_count;	    volatile size_t m_active_worker_count;        private: // The following members are accessed only by _one_ thread at the same time:    scheduler_type  m_scheduler;    scoped_ptr<size_policy_type> m_size_policy; // is never null        bool  m_terminate_all_workers;								// Indicates if termination of all workers was triggered.    std::vector<shared_ptr<worker_type> > m_terminated_workers; // List of workers which are terminated but not fully destructed.      private: // The following members are implemented thread-safe:    mutable recursive_mutex  m_monitor;    mutable condition m_worker_idle_or_terminated_event;	// A worker is idle or was terminated.    mutable condition m_task_or_terminate_workers_event;  // Task is available OR total worker count should be reduced.  public:    /// Constructor.    pool_core()      : m_worker_count(0)       , m_target_worker_count(0)      , m_active_worker_count(0)      , m_terminate_all_workers(false)    {      pool_type volatile & self_ref = *this;      m_size_policy.reset(new size_policy_type(self_ref));      m_scheduler.clear();    }    /// Destructor.    ~pool_core()    {    }    /*! Gets the size controller which manages the number of threads in the pool.     * \return The size controller.    * \see SizePolicy    */    size_controller_type size_controller()    {      return size_controller_type(*m_size_policy, this->shared_from_this());    }    /*! Gets the number of threads in the pool.    * \return The number of threads.    */    size_t size()	const volatile    {      return m_worker_count;    }// TODO is only called once    void shutdown()    {      ShutdownPolicy<pool_type>::shutdown(*this);    }    /*! Schedules a task for asynchronous execution. The task will be executed once only.    * \param task The task function object. It should not throw execeptions.    * \return true, if the task could be scheduled and false otherwise.     */      bool schedule(task_type const & task) volatile    {	      locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);             if(lockedThis->m_scheduler.push(task))      {        lockedThis->m_task_or_terminate_workers_event.notify_one();        return true;      }      else      {        return false;      }    }	    /*! Returns the number of tasks which are currently executed.    * \return The number of active tasks.     */      size_t active() const volatile    {      return m_active_worker_count;    }    /*! Returns the number of tasks which are ready for execution.        * \return The number of pending tasks.     */      size_t pending() const volatile    {      locking_ptr<const pool_type, recursive_mutex> lockedThis(*this, m_monitor);      return lockedThis->m_scheduler.size();    }    /*! Removes all pending tasks from the pool's scheduler.    */      void clear() volatile    {       locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);      lockedThis->m_scheduler.clear();    }        /*! Indicates that there are no tasks pending.     * \return true if there are no tasks ready for execution.	    * \remarks This function is more efficient that the check 'pending() == 0'.    */       bool empty() const volatile    {      locking_ptr<const pool_type, recursive_mutex> lockedThis(*this, m_monitor);      return lockedThis->m_scheduler.empty();    }	    /*! The current thread of execution is blocked until the sum of all active    *  and pending tasks is equal or less than a given threshold.     * \param task_threshold The maximum number of tasks in pool and scheduler.    */         void wait(size_t const task_threshold = 0) const volatile    {      const pool_type* self = const_cast<const pool_type*>(this);      recursive_mutex::scoped_lock lock(self->m_monitor);      if(0 == task_threshold)      {        while(0 != self->m_active_worker_count || !self->m_scheduler.empty())        {           self->m_worker_idle_or_terminated_event.wait(lock);        }      }      else      {        while(task_threshold < self->m_active_worker_count + self->m_scheduler.size())        {           self->m_worker_idle_or_terminated_event.wait(lock);        }      }    }	    /*! The current thread of execution is blocked until the timestamp is met    * or the sum of all active and pending tasks is equal or less     * than a given threshold.      * \param timestamp The time when function returns at the latest.    * \param task_threshold The maximum number of tasks in pool and scheduler.    * \return true if the task sum is equal or less than the threshold, false otherwise.    */           bool wait(xtime const & timestamp, size_t const task_threshold = 0) const volatile    {      const pool_type* self = const_cast<const pool_type*>(this);      recursive_mutex::scoped_lock lock(self->m_monitor);      if(0 == task_threshold)      {        while(0 != self->m_active_worker_count || !self->m_scheduler.empty())        {           if(!self->m_worker_idle_or_terminated_event.timed_wait(lock, timestamp)) return false;        }      }      else      {        while(task_threshold < self->m_active_worker_count + self->m_scheduler.size())        {           if(!self->m_worker_idle_or_terminated_event.timed_wait(lock, timestamp)) return false;        }      }      return true;    }  private:	    void terminate_all_workers(bool const wait) volatile    {      pool_type* self = const_cast<pool_type*>(this);      recursive_mutex::scoped_lock lock(self->m_monitor);      self->m_terminate_all_workers = true;      m_target_worker_count = 0;      self->m_task_or_terminate_workers_event.notify_all();      if(wait)      {        while(m_active_worker_count > 0)        {          self->m_worker_idle_or_terminated_event.wait(lock);        }        for(typename std::vector<shared_ptr<worker_type> >::iterator it = self->m_terminated_workers.begin();          it != self->m_terminated_workers.end();          ++it)        {          (*it)->join();        }        self->m_terminated_workers.clear();      }    }    /*! Changes the number of worker threads in the pool. The resizing     *  is handled by the SizePolicy.    * \param threads The new number of worker threads.    * \return true, if pool will be resized and false if not.     */    bool resize(size_t const worker_count) volatile    {      locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);       if(!m_terminate_all_workers)      {        m_target_worker_count = worker_count;      }      else      {         return false;      }      if(m_worker_count <= m_target_worker_count)      { // increase worker count        while(m_worker_count < m_target_worker_count)        {          try          {            worker_thread<pool_type>::create_and_attach(lockedThis->shared_from_this());            m_worker_count++;            m_active_worker_count++;	          }          catch(thread_resource_error)          {            return false;          }        }      }      else      { // decrease worker count        lockedThis->m_task_or_terminate_workers_event.notify_all();   // TODO: Optimize number of notified workers      }      return true;    }    // worker died with unhandled exception    void worker_died_unexpectedly(shared_ptr<worker_type> worker) volatile    {      locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);      m_worker_count--;      m_active_worker_count--;      lockedThis->m_worker_idle_or_terminated_event.notify_all();	      if(m_terminate_all_workers)      {        lockedThis->m_terminated_workers.push_back(worker);      }      else      {        lockedThis->m_size_policy->worker_died_unexpectedly(m_worker_count);      }    }    void worker_destructed(shared_ptr<worker_type> worker) volatile    {      locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);      m_worker_count--;      m_active_worker_count--;      lockedThis->m_worker_idle_or_terminated_event.notify_all();	      if(m_terminate_all_workers)      {        lockedThis->m_terminated_workers.push_back(worker);      }    }    bool execute_task() volatile    {      function0<void> task;      { // fetch task        pool_type* lockedThis = const_cast<pool_type*>(this);        recursive_mutex::scoped_lock lock(lockedThis->m_monitor);        // decrease number of threads if necessary        if(m_worker_count > m_target_worker_count)        {	          return false;	// terminate worker        }        // wait for tasks        while(lockedThis->m_scheduler.empty())        {	          // decrease number of workers if necessary          if(m_worker_count > m_target_worker_count)          {	            return false;	// terminate worker          }          else          {            m_active_worker_count--;            lockedThis->m_worker_idle_or_terminated_event.notify_all();	            lockedThis->m_task_or_terminate_workers_event.wait(lock);            m_active_worker_count++;          }        }        task = lockedThis->m_scheduler.top();        lockedThis->m_scheduler.pop();      }      // call task function      if(task)      {        task();      }       //guard->disable();      return true;    }  };} } } // namespace boost::threadpool::detail#endif // THREADPOOL_POOL_CORE_HPP_INCLUDED

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -