⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pool.hpp

📁 threadpool is a cross-platform C++ thread pool library
💻 HPP
字号:
/*! \file * \brief Thread pool core. * * This file contains the threadpool's core class: pool<Task, Scheduler>. * * Thread pools are a mechanism for asynchronous and parallel processing  * within the same process. The pool class provides a convenient way  * for dispatching asynchronous tasks as functions objects. The scheduling * of these tasks can be easily controlled by using customized schedulers.  * * Copyright (c) 2005-2006 Philipp Henkel * * Distributed under the Boost Software License, Version 1.0. (See * accompanying file LICENSE_1_0.txt or copy at * http://www.boost.org/LICENSE_1_0.txt) * * http://threadpool.sourceforge.net * */#ifndef THREADPOOL_POOL_HPP_INCLUDED#define THREADPOOL_POOL_HPP_INCLUDED#include <boost/thread.hpp>#include <boost/thread/mutex.hpp>#include <boost/thread/condition.hpp>#include <boost/smart_ptr.hpp>#include <boost/bind.hpp>#include "schedulers.hpp"#include "task_adaptors.hpp"#ifdef _MSC_VER#pragma warning(push)#pragma warning(disable : 4127) // Conditional expression is constant#endif/// The namespace threadpool contains a thread pool and related utility classes.namespace threadpool{/*! \brief Thread pool.  * * Thread pools are a mechanism for asynchronous and parallel processing  * within the same process. The pool class provides a convenient way  * for dispatching asynchronous tasks as functions objects. The scheduling * of these tasks can be easily controlled by using customized schedulers.  * A task must not throw an exception. * * \param Task A function object which implements the operator 'void operator() (void) const'. The operator () is called by the pool to execute the task. Exceptions are ignored. * \param Scheduler A task container which determines how tasks are scheduled. It is guaranteed that this container is accessed only by one thread at a time. * * \remarks The pool class is thread-safe. *  * \see Tasks: thread_func, prio_thread_func * \see Schedulers: fifo_scheduler, lifo_scheduler, prio_scheduler */ template <class Task = thread_func, class Scheduler = fifo_scheduler<Task> > class pool : private boost::noncopyable {public:  typedef pool<Task, Scheduler> pool_type;  //!< Provides the thread pool's type.  typedef Task task_type;                   //!< Provides the tasks' type.  typedef Scheduler scheduler_type;         //!< Provides the scheduler's type.public:  /*! Constructs a new thread pool.   * \param threads The initial number of threads in the pool.   * \return A new pool instance.   */  static boost::shared_ptr<pool_type> create_pool(size_t threads)  {    // construct new pool (with own deleter because destructor is private)    boost::shared_ptr<pool_type> px(new pool_type, scalar_deleter<pool_type>());    px->store_self_ptr(px);    px->resize(threads);    return px;  }  /*! Gets the number of threads in the pool.   * \return The number of threads.   */  size_t size()	const  {    return m_thread_count;  }  /*! Changes the number of threads in the pool.   * \param threads The new number of the threads.   * \return true, if pool can be resized and false if a new thread of execution cannot be started.    */  bool resize(size_t threads)  {    boost::mutex::scoped_lock lock(m_monitor);        bool result = true;    boost::shared_ptr<pool_type> self = m_self_ptr.lock();    m_target_thread_count = threads;    while(m_thread_count < m_target_thread_count)    {      try      {        boost::shared_ptr<threadpool_worker> worker(new threadpool_worker(self));      	boost::thread* thread_ptr = m_threads.create_thread(boost::bind(&threadpool_worker::run, worker));      	worker->set_thread_ptr(thread_ptr);      	      	m_thread_count++;      	m_running_thread_count++;	      }      catch(boost::thread_resource_error)      {      	result = false;      	break;      }    }    m_thread_is_idle.notify_all();        return result;  }  /*! Schedules a task for asynchronous execution.   * \param task The task function object.   */    void schedule(const task_type &task)  {	    boost::mutex::scoped_lock lock(m_monitor);    m_scheduler.push(task);    m_task_is_available.notify_one();  }	    /*! Returns the number of tasks which are ready for execution.   * \return The number of pending tasks.    */    size_t pending() const  {    return m_scheduler.size();  }    /*! Indicates that there are no tasks pending.    * \return true if there are no tasks ready for execution.	   * \remarks This function is more efficient that the check 'pending() == 0'.   */     bool empty() const  {    return m_scheduler.empty();  }	  /*! The current thread of execution blocks until all running   *  and pending tasks are finished.    */       void join()  {    boost::mutex::scoped_lock lock(m_monitor);    while(0 != m_running_thread_count || !m_scheduler.empty())    {       m_thread_is_idle.wait(lock);    }  }	  /*! The current thread of execution blocks until all running   *  and pending tasks are finished or the timestamp is met.    * \param timestamp The time when function returns at the latest.   */         bool join(const boost::xtime& timestamp)  {    boost::mutex::scoped_lock lock(m_monitor);    while(0 != m_running_thread_count || !m_scheduler.empty())    {       if(!m_thread_is_idle.timed_wait(lock, timestamp)) return false;    }    return true;  }private:  // Constructor  pool()  : m_thread_count(0)   , m_target_thread_count(0)  , m_running_thread_count(0)  {  }  // Destructor  ~pool()  {  }  template<class T>  struct scalar_deleter  {    void operator()(T* p)    {      delete p;    }  };    private:	  bool execute_task()  {    boost::function0<void> task;        { // fetch task	      boost::mutex::scoped_lock lock(m_monitor);      // decrease number of threads if necessary      if(m_thread_count > m_target_thread_count)      {	        m_thread_count--;        m_running_thread_count--;        m_thread_is_idle.notify_all();		        return false;	// terminate thread      }      // wait for tasks      while(m_scheduler.empty())      {	        m_thread_is_idle.notify_all();		        // decrease number of threads if necessary        if(m_thread_count > m_target_thread_count)        {          m_thread_count--;          m_running_thread_count--;          return false;	// terminate thread        }        else        {          m_running_thread_count--;          m_task_is_available.wait(lock);          m_running_thread_count++;        }      }      task = m_scheduler.top();      m_scheduler.pop();    }    // call task function    try    {      if(task)      {        task();      }    }    catch(...)    {      // ignore all exceptions from task()    }    return true;  }  void store_self_ptr(boost::shared_ptr<pool_type> self)  {    m_self_ptr = boost::weak_ptr<pool_type>(self);  }	  boost::weak_ptr<pool_type> m_self_ptr;  size_t m_thread_count;	  size_t m_target_thread_count;	  size_t m_running_thread_count;  boost::thread_group m_threads; // TODO remove threads from thread_group!  scheduler_type m_scheduler;  boost::mutex m_monitor;  boost::condition m_thread_is_idle;	  boost::condition m_task_is_available;  /// Worker executes tasks.  class threadpool_worker  {  public:    /*! Constructs a new worker.     * \param pool Pointer to it's parent pool.     */    threadpool_worker(boost::shared_ptr<pool_type> pool)    : m_pool(pool)    , m_thread_ptr(0)    {    }    /*! Executes pool's tasks sequentially.     */    void run() const     {       while(true)      {        boost::shared_ptr<pool_type> pool = m_pool.lock();        if(pool)        {          if(!pool->execute_task())          {            break;          }        }        else        {          break;        }      }    	  // unregister thread from pool's thread_group	      boost::shared_ptr<pool_type> pool = m_pool.lock();      if(pool && m_thread_ptr)      {      	pool->m_threads.remove_thread(m_thread_ptr);      }    }        /*! Stores the pointer to the thread whichs executes the run loop.     * \param thread_ptr Pointer to executing thread.     * \remarks The pointer is used to unregister the thread from the pool's thread_group when the run loop terminates.     */    void set_thread_ptr(boost::thread* thread_ptr)    {      m_thread_ptr = thread_ptr;    }  private:    boost::weak_ptr<pool_type> m_pool;  ///< Weak pointer to the pool which owns the worker.    boost::thread* m_thread_ptr;        ///< Pointer of the thread which executes the run loop and is registered in the pool's thread_group.  };	};/*! \brief Fifo pool. * * The pool's tasks are fifo scheduled thread_func functors. * */ typedef pool<thread_func, fifo_scheduler<thread_func> > fifo_pool;/*! \brief Lifo pool. * * The pool's tasks are lifo scheduled thread_func functors. * */ typedef pool<thread_func, lifo_scheduler<thread_func> > lifo_pool;/*! \brief Pool for prioritized task. * * The pool's tasks are prioritized prio_thread_func functors. * */ typedef pool<prio_thread_func, prio_scheduler<prio_thread_func > > prio_pool;#ifdef _MSC_VER#pragma warning(pop)#endif} // namespace threadpool#endif // THREADPOOL_POOL_HPP_INCLUDED

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -