📄 ilmthreadpool.cpp
字号:
/////////////////////////////////////////////////////////////////////////////// Copyright (c) 2005, Industrial Light & Magic, a division of Lucas// Digital Ltd. LLC// // All rights reserved.// // Redistribution and use in source and binary forms, with or without// modification, are permitted provided that the following conditions are// met:// * Redistributions of source code must retain the above copyright// notice, this list of conditions and the following disclaimer.// * Redistributions in binary form must reproduce the above// copyright notice, this list of conditions and the following disclaimer// in the documentation and/or other materials provided with the// distribution.// * Neither the name of Industrial Light & Magic nor the names of// its contributors may be used to endorse or promote products derived// from this software without specific prior written permission. // // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.///////////////////////////////////////////////////////////////////////////////-----------------------------------------------------------------------------//// class Task, class ThreadPool, class TaskGroup////-----------------------------------------------------------------------------#include "IlmThread.h"#include "IlmThreadMutex.h"#include "IlmThreadSemaphore.h"#include "IlmThreadPool.h"#include "Iex.h"#include <list>using namespace std;namespace IlmThread {namespace {class WorkerThread: public Thread{ public: WorkerThread (ThreadPool::Data* data); virtual void run (); private: ThreadPool::Data * _data;};} //namespacestruct TaskGroup::Data{ Data (); ~Data (); void addTask () ; void removeTask (); Semaphore isEmpty; // used to signal that the taskgroup is empty int numPending; // number of pending tasks to still execute};struct ThreadPool::Data{ Data (); ~Data(); void finish (); bool stopped () const; void stop (); Semaphore taskSemaphore; // threads wait on this for ready tasks Mutex taskMutex; // mutual exclusion for the tasks list list<Task*> tasks; // the list of tasks to execute size_t numTasks; // fast access to list size // (list::size() can be O(n)) Semaphore threadSemaphore; // signaled when a thread starts executing Mutex threadMutex; // mutual exclusion for threads list list<WorkerThread*> threads; // the list of all threads size_t numThreads; // fast access to list size bool stopping; // flag indicating whether to stop threads Mutex stopMutex; // mutual exclusion for stopping flag};//// The global thread pool//ThreadPool gThreadPool (0);//// class WorkerThread//WorkerThread::WorkerThread (ThreadPool::Data* data): _data (data){ start();}voidWorkerThread::run (){ // // Signal that the thread has started executing // _data->threadSemaphore.post(); while (true) { // // Wait for a task to become available // _data->taskSemaphore.wait(); { Lock taskLock (_data->taskMutex); // // If there is a task pending, pop off the next task in the FIFO // if (_data->numTasks > 0) { Task* task = _data->tasks.front(); TaskGroup* taskGroup = task->group(); _data->tasks.pop_front(); _data->numTasks--; taskLock.release(); task->execute(); taskLock.acquire(); delete task; taskGroup->_data->removeTask(); } else if (_data->stopped()) { break; } } }}//// struct TaskGroup::Data//TaskGroup::Data::Data (): isEmpty (1), numPending (0){ // empty}TaskGroup::Data::~Data (){ // // A TaskGroup acts like an "inverted" semaphore: if the count // is above 0 then waiting on the taskgroup will block. This // destructor waits until the taskgroup is empty before returning. // isEmpty.wait ();}voidTaskGroup::Data::addTask () { // // Any access to the taskgroup is protected by a mutex that is // held by the threadpool. Therefore it is safe to access // numPending before we wait on the semaphore. // if (numPending++ == 0) isEmpty.wait ();}voidTaskGroup::Data::removeTask (){ if (--numPending == 0) isEmpty.post ();} //// struct ThreadPool::Data//ThreadPool::Data::Data (): numTasks (0), numThreads (0), stopping (false){ // empty}ThreadPool::Data::~Data(){ Lock lock (threadMutex); finish ();}voidThreadPool::Data::finish (){ stop(); // // Signal enough times to allow all threads to stop. // // Wait until all threads have started their run functions. // If we do not wait before we destroy the threads then it's // possible that the threads have not yet called their run // functions. // If this happens then the run function will be called off // of an invalid object and we will crash, most likely with // an error like: "pure virtual method called" // for (int i = 0; i < numThreads; i++) { taskSemaphore.post(); threadSemaphore.wait(); } // // Join all the threads // for (list<WorkerThread*>::iterator i = threads.begin(); i != threads.end(); ++i) { delete (*i); } Lock lock1 (taskMutex); Lock lock2 (stopMutex); threads.clear(); tasks.clear(); numThreads = 0; numTasks = 0; stopping = false;}boolThreadPool::Data::stopped () const{ Lock lock (stopMutex); return stopping;}voidThreadPool::Data::stop (){ Lock lock (stopMutex); stopping = true;}//// class Task//Task::Task (TaskGroup* g): _group(g){ // empty}Task::~Task(){ // empty}TaskGroup*Task::group (){ return _group;}TaskGroup::TaskGroup (): _data (new Data()){ // empty}TaskGroup::~TaskGroup (){ delete _data;}//// class ThreadPool//ThreadPool::ThreadPool (unsigned nthreads): _data (new Data()){ setNumThreads (nthreads);}ThreadPool::~ThreadPool (){ delete _data;}intThreadPool::numThreads () const{ Lock lock (_data->threadMutex); return _data->numThreads;}voidThreadPool::setNumThreads (int count){ if (count < 0) throw Iex::ArgExc ("Attempt to set the number of threads " "in a thread pool to a negative value."); // // Lock access to thread list and size // Lock lock (_data->threadMutex); if (count > _data->numThreads) { // // Add more threads // while (_data->numThreads < count) { _data->threads.push_back (new WorkerThread (_data)); _data->numThreads++; } } else if (count < _data->numThreads) { // // Wait until all existing threads are finished processing, // then delete all threads. // _data->finish (); // // Add in new threads // while (_data->numThreads < count) { _data->threads.push_back (new WorkerThread (_data)); _data->numThreads++; } }}voidThreadPool::addTask (Task* task) { // // Lock the threads, needed to access numThreads // Lock lock (_data->threadMutex); if (_data->numThreads == 0) { task->execute (); delete task; } else { // // Get exclusive access to the tasks queue // { Lock taskLock (_data->taskMutex); // // Push the new task into the FIFO // _data->tasks.push_back (task); _data->numTasks++; task->group()->_data->addTask(); } // // Signal that we have a new task to process // _data->taskSemaphore.post (); }}ThreadPool&ThreadPool::globalThreadPool (){ return gThreadPool;}voidThreadPool::addGlobalTask (Task* task){ gThreadPool.addTask (task);}} // namespace IlmThread
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -