📄 pool.hpp
字号:
/*! \file * \brief Thread pool core. * * This file contains the threadpool's core class: pool<Task, Scheduler>. * * Thread pools are a mechanism for asynchronous and parallel processing * within the same process. The pool class provides a convenient way * for dispatching asynchronous tasks as functions objects. The scheduling * of these tasks can be easily controlled by using customized schedulers. * * Copyright (c) 2005-2006 Philipp Henkel * * Distributed under the Boost Software License, Version 1.0. (See * accompanying file LICENSE_1_0.txt or copy at * http://www.boost.org/LICENSE_1_0.txt) * * http://threadpool.sourceforge.net * */#ifndef THREADPOOL_POOL_HPP_INCLUDED#define THREADPOOL_POOL_HPP_INCLUDED#include <boost/thread.hpp>#include <boost/thread/mutex.hpp>#include <boost/thread/condition.hpp>#include <boost/smart_ptr.hpp>#include <boost/bind.hpp>#include "schedulers.hpp"#include "task_adaptors.hpp"#ifdef _MSC_VER#pragma warning(push)#pragma warning(disable : 4127) // Conditional expression is constant#endif/// The namespace threadpool contains a thread pool and related utility classes.namespace threadpool{/*! \brief Thread pool. * * Thread pools are a mechanism for asynchronous and parallel processing * within the same process. The pool class provides a convenient way * for dispatching asynchronous tasks as functions objects. The scheduling * of these tasks can be easily controlled by using customized schedulers. * A task must not throw an exception. * * \param Task A function object which implements the operator 'void operator() (void) const'. The operator () is called by the pool to execute the task. Exceptions are ignored. * \param Scheduler A task container which determines how tasks are scheduled. It is guaranteed that this container is accessed only by one thread at a time. * * \remarks The pool class is thread-safe. * * \see Tasks: thread_func, prio_thread_func * \see Schedulers: fifo_scheduler, lifo_scheduler, prio_scheduler */ template <class Task = thread_func, class Scheduler = fifo_scheduler<Task> > class pool : private boost::noncopyable {public: typedef pool<Task, Scheduler> pool_type; //!< Provides the thread pool's type. typedef Task task_type; //!< Provides the tasks' type. typedef Scheduler scheduler_type; //!< Provides the scheduler's type.public: /*! Constructs a new thread pool. * \param threads The initial number of threads in the pool. * \return A new pool instance. */ static boost::shared_ptr<pool_type> create_pool(size_t threads) { // construct new pool (with own deleter because destructor is private) boost::shared_ptr<pool_type> px(new pool_type, scalar_deleter<pool_type>()); px->store_self_ptr(px); px->resize(threads); return px; } /*! Gets the number of threads in the pool. * \return The number of threads. */ size_t size() const { return m_thread_count; } /*! Changes the number of threads in the pool. * \param threads The new number of the threads. * \return true, if pool can be resized and false if a new thread of execution cannot be started. */ bool resize(size_t threads) { boost::mutex::scoped_lock lock(m_monitor); bool result = true; boost::shared_ptr<pool_type> self = m_self_ptr.lock(); m_target_thread_count = threads; while(m_thread_count < m_target_thread_count) { try { boost::shared_ptr<threadpool_worker> worker(new threadpool_worker(self)); boost::thread* thread_ptr = m_threads.create_thread(boost::bind(&threadpool_worker::run, worker)); worker->set_thread_ptr(thread_ptr); m_thread_count++; m_running_thread_count++; } catch(boost::thread_resource_error) { result = false; break; } } m_thread_is_idle.notify_all(); return result; } /*! Schedules a task for asynchronous execution. * \param task The task function object. */ void schedule(const task_type &task) { boost::mutex::scoped_lock lock(m_monitor); m_scheduler.push(task); m_task_is_available.notify_one(); } /*! Returns the number of tasks which are ready for execution. * \return The number of pending tasks. */ size_t pending() const { return m_scheduler.size(); } /*! Indicates that there are no tasks pending. * \return true if there are no tasks ready for execution. * \remarks This function is more efficient that the check 'pending() == 0'. */ bool empty() const { return m_scheduler.empty(); } /*! The current thread of execution blocks until all running * and pending tasks are finished. */ void join() { boost::mutex::scoped_lock lock(m_monitor); while(0 != m_running_thread_count || !m_scheduler.empty()) { m_thread_is_idle.wait(lock); } } /*! The current thread of execution blocks until all running * and pending tasks are finished or the timestamp is met. * \param timestamp The time when function returns at the latest. */ bool join(const boost::xtime& timestamp) { boost::mutex::scoped_lock lock(m_monitor); while(0 != m_running_thread_count || !m_scheduler.empty()) { if(!m_thread_is_idle.timed_wait(lock, timestamp)) return false; } return true; }private: // Constructor pool() : m_thread_count(0) , m_target_thread_count(0) , m_running_thread_count(0) { } // Destructor ~pool() { } template<class T> struct scalar_deleter { void operator()(T* p) { delete p; } }; private: bool execute_task() { boost::function0<void> task; { // fetch task boost::mutex::scoped_lock lock(m_monitor); // decrease number of threads if necessary if(m_thread_count > m_target_thread_count) { m_thread_count--; m_running_thread_count--; m_thread_is_idle.notify_all(); return false; // terminate thread } // wait for tasks while(m_scheduler.empty()) { m_thread_is_idle.notify_all(); // decrease number of threads if necessary if(m_thread_count > m_target_thread_count) { m_thread_count--; m_running_thread_count--; return false; // terminate thread } else { m_running_thread_count--; m_task_is_available.wait(lock); m_running_thread_count++; } } task = m_scheduler.top(); m_scheduler.pop(); } // call task function try { if(task) { task(); } } catch(...) { // ignore all exceptions from task() } return true; } void store_self_ptr(boost::shared_ptr<pool_type> self) { m_self_ptr = boost::weak_ptr<pool_type>(self); } boost::weak_ptr<pool_type> m_self_ptr; size_t m_thread_count; size_t m_target_thread_count; size_t m_running_thread_count; boost::thread_group m_threads; // TODO remove threads from thread_group! scheduler_type m_scheduler; boost::mutex m_monitor; boost::condition m_thread_is_idle; boost::condition m_task_is_available; /// Worker executes tasks. class threadpool_worker { public: /*! Constructs a new worker. * \param pool Pointer to it's parent pool. */ threadpool_worker(boost::shared_ptr<pool_type> pool) : m_pool(pool) , m_thread_ptr(0) { } /*! Executes pool's tasks sequentially. */ void run() const { while(true) { boost::shared_ptr<pool_type> pool = m_pool.lock(); if(pool) { if(!pool->execute_task()) { break; } } else { break; } } // unregister thread from pool's thread_group boost::shared_ptr<pool_type> pool = m_pool.lock(); if(pool && m_thread_ptr) { pool->m_threads.remove_thread(m_thread_ptr); } } /*! Stores the pointer to the thread whichs executes the run loop. * \param thread_ptr Pointer to executing thread. * \remarks The pointer is used to unregister the thread from the pool's thread_group when the run loop terminates. */ void set_thread_ptr(boost::thread* thread_ptr) { m_thread_ptr = thread_ptr; } private: boost::weak_ptr<pool_type> m_pool; ///< Weak pointer to the pool which owns the worker. boost::thread* m_thread_ptr; ///< Pointer of the thread which executes the run loop and is registered in the pool's thread_group. }; };/*! \brief Fifo pool. * * The pool's tasks are fifo scheduled thread_func functors. * */ typedef pool<thread_func, fifo_scheduler<thread_func> > fifo_pool;/*! \brief Lifo pool. * * The pool's tasks are lifo scheduled thread_func functors. * */ typedef pool<thread_func, lifo_scheduler<thread_func> > lifo_pool;/*! \brief Pool for prioritized task. * * The pool's tasks are prioritized prio_thread_func functors. * */ typedef pool<prio_thread_func, prio_scheduler<prio_thread_func > > prio_pool;#ifdef _MSC_VER#pragma warning(pop)#endif} // namespace threadpool#endif // THREADPOOL_POOL_HPP_INCLUDED
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -