⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ilmthreadpool.cpp

📁 image converter source code
💻 CPP
字号:
/////////////////////////////////////////////////////////////////////////////// Copyright (c) 2005, Industrial Light & Magic, a division of Lucas// Digital Ltd. LLC// // All rights reserved.// // Redistribution and use in source and binary forms, with or without// modification, are permitted provided that the following conditions are// met:// *       Redistributions of source code must retain the above copyright// notice, this list of conditions and the following disclaimer.// *       Redistributions in binary form must reproduce the above// copyright notice, this list of conditions and the following disclaimer// in the documentation and/or other materials provided with the// distribution.// *       Neither the name of Industrial Light & Magic nor the names of// its contributors may be used to endorse or promote products derived// from this software without specific prior written permission. // // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.///////////////////////////////////////////////////////////////////////////////-----------------------------------------------------------------------------////	class Task, class ThreadPool, class TaskGroup////-----------------------------------------------------------------------------#include "IlmThread.h"#include "IlmThreadMutex.h"#include "IlmThreadSemaphore.h"#include "IlmThreadPool.h"#include "Iex.h"#include <list>using namespace std;namespace IlmThread {namespace {class WorkerThread: public Thread{  public:    WorkerThread (ThreadPool::Data* data);    virtual void	run ();      private:    ThreadPool::Data *	_data;};} //namespacestruct TaskGroup::Data{     Data ();    ~Data ();        void	addTask () ;    void	removeTask ();        Semaphore	isEmpty;	// used to signal that the taskgroup is empty    int		numPending;	// number of pending tasks to still execute};struct ThreadPool::Data{     Data ();    ~Data();        void	finish ();    bool	stopped () const;    void	stop ();    Semaphore taskSemaphore;        // threads wait on this for ready tasks    Mutex taskMutex;                // mutual exclusion for the tasks list    list<Task*> tasks;              // the list of tasks to execute    size_t numTasks;                // fast access to list size                                    //   (list::size() can be O(n))    Semaphore threadSemaphore;      // signaled when a thread starts executing    Mutex threadMutex;              // mutual exclusion for threads list    list<WorkerThread*> threads;    // the list of all threads    size_t numThreads;              // fast access to list size        bool stopping;                  // flag indicating whether to stop threads    Mutex stopMutex;                // mutual exclusion for stopping flag};//// The global thread pool//ThreadPool gThreadPool (0);//// class WorkerThread//WorkerThread::WorkerThread (ThreadPool::Data* data):    _data (data){    start();}voidWorkerThread::run (){    //    // Signal that the thread has started executing    //    _data->threadSemaphore.post();    while (true)    {	//        // Wait for a task to become available	//        _data->taskSemaphore.wait();        {            Lock taskLock (_data->taskMutex);    	    //            // If there is a task pending, pop off the next task in the FIFO	    //            if (_data->numTasks > 0)            {                Task* task = _data->tasks.front();		TaskGroup* taskGroup = task->group();                _data->tasks.pop_front();                _data->numTasks--;                taskLock.release();                task->execute();                taskLock.acquire();                delete task;                taskGroup->_data->removeTask();            }            else if (_data->stopped())	    {                break;	    }        }    }}//// struct TaskGroup::Data//TaskGroup::Data::Data (): isEmpty (1), numPending (0){    // empty}TaskGroup::Data::~Data (){    //    // A TaskGroup acts like an "inverted" semaphore: if the count    // is above 0 then waiting on the taskgroup will block.  This    // destructor waits until the taskgroup is empty before returning.    //    isEmpty.wait ();}voidTaskGroup::Data::addTask () {    //    // Any access to the taskgroup is protected by a mutex that is    // held by the threadpool.  Therefore it is safe to access    // numPending before we wait on the semaphore.    //    if (numPending++ == 0)	isEmpty.wait ();}voidTaskGroup::Data::removeTask (){    if (--numPending == 0)	isEmpty.post ();}    //// struct ThreadPool::Data//ThreadPool::Data::Data (): numTasks (0), numThreads (0), stopping (false){    // empty}ThreadPool::Data::~Data(){    Lock lock (threadMutex);    finish ();}voidThreadPool::Data::finish (){    stop();    //    // Signal enough times to allow all threads to stop.    //    // Wait until all threads have started their run functions.    // If we do not wait before we destroy the threads then it's    // possible that the threads have not yet called their run    // functions.    // If this happens then the run function will be called off    // of an invalid object and we will crash, most likely with    // an error like: "pure virtual method called"    //    for (int i = 0; i < numThreads; i++)    {	taskSemaphore.post();	threadSemaphore.wait();    }    //    // Join all the threads    //    for (list<WorkerThread*>::iterator i = threads.begin();	 i != threads.end();	 ++i)    {	delete (*i);    }    Lock lock1 (taskMutex);    Lock lock2 (stopMutex);    threads.clear();    tasks.clear();    numThreads = 0;    numTasks = 0;    stopping = false;}boolThreadPool::Data::stopped () const{    Lock lock (stopMutex);    return stopping;}voidThreadPool::Data::stop (){    Lock lock (stopMutex);    stopping = true;}//// class Task//Task::Task (TaskGroup* g): _group(g){    // empty}Task::~Task(){    // empty}TaskGroup*Task::group (){    return _group;}TaskGroup::TaskGroup ():    _data (new Data()){    // empty}TaskGroup::~TaskGroup (){    delete _data;}//// class ThreadPool//ThreadPool::ThreadPool (unsigned nthreads):    _data (new Data()){    setNumThreads (nthreads);}ThreadPool::~ThreadPool (){    delete _data;}intThreadPool::numThreads () const{    Lock lock (_data->threadMutex);    return _data->numThreads;}voidThreadPool::setNumThreads (int count){    if (count < 0)        throw Iex::ArgExc ("Attempt to set the number of threads "			   "in a thread pool to a negative value.");    //    // Lock access to thread list and size    //    Lock lock (_data->threadMutex);    if (count > _data->numThreads)    {	//        // Add more threads	//        while (_data->numThreads < count)        {            _data->threads.push_back (new WorkerThread (_data));            _data->numThreads++;        }    }    else if (count < _data->numThreads)    {	//	// Wait until all existing threads are finished processing,	// then delete all threads.	//        _data->finish ();	//        // Add in new threads	//        while (_data->numThreads < count)        {            _data->threads.push_back (new WorkerThread (_data));            _data->numThreads++;        }    }}voidThreadPool::addTask (Task* task) {    //    // Lock the threads, needed to access numThreads    //    Lock lock (_data->threadMutex);    if (_data->numThreads == 0)    {        task->execute ();        delete task;    }    else    {	//        // Get exclusive access to the tasks queue	//        {            Lock taskLock (_data->taskMutex);	    //            // Push the new task into the FIFO	    //            _data->tasks.push_back (task);            _data->numTasks++;            task->group()->_data->addTask();        }        	//        // Signal that we have a new task to process	//        _data->taskSemaphore.post ();    }}ThreadPool&ThreadPool::globalThreadPool (){    return gThreadPool;}voidThreadPool::addGlobalTask (Task* task){    gThreadPool.addTask (task);}} // namespace IlmThread

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -