📄 tthreadpool.cc
字号:
//// Project : threadpool// File : thread_pool.cc// Purpose : class for managing a pool of threads//// Copyright (C) 2003 - Ronald Kriemann//// This library is free software; you can redistribute it and/or// modify it under the terms of the GNU Lesser General Public// License as published by the Free Software Foundation; either// version 2.1 of the License, or (at your option) any later version.//// This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the// GNU Lesser General Public License for more details.//// You should have received a copy of the GNU Lesser General Public// License along with this program; if not, write to the Free Software// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA//#include <assert.h>// hack for linux#define __USE_UNIX98#include <pthread.h>#include "TThreadPool.hh"//// global thread-pool//TThreadPool * thread_pool = NULL;/////////////////////////////////////////////////// init global thread pool/////////////////////////////////////////////////intinit_thread_pool ( uint p ){ if ( thread_pool != NULL ) delete thread_pool; if ((thread_pool = new TThreadPool( p )) == NULL) { cout << "(init_thread_pool) no memory available" << endl; return -1; }// if return 0;}/////////////////////////////////////////////////// thread handled by threadpool/////////////////////////////////////////////////TThreadPool::TPoolThr::~TPoolThr (){ // sync with finishing _del_mutex.lock(); _del_mutex.unlock();}//// running method//voidTThreadPool::TPoolThr::run (){ _del_mutex.lock(); while ( ! _end ) { // // wait for work // _work_mutex.lock(); while ((_job == NULL) && ! _end ) _work_cond.wait( _work_mutex ); _work_mutex.unlock(); // // look if we really have a job to do // and handle it // if ( _job != NULL ) { //_sync_mutex.lock(); // execute job _job->run( _data_ptr ); // detach thread from job _job->set_pool_thr( NULL ); set_job( NULL, NULL ); _sync_mutex.unlock(); }// if // append thread to idle-list _pool->append_idle( this ); }// while _del_mutex.unlock();}/////////////////////////////////////////////////// ThreadPool ///////////////////////////////////////////////////// constructor and destructor//TThreadPool::TThreadPool ( uint max_p ){ // // create max_p threads for pool // _max_parallel = max_p; _threads.set_size( max_p ); for ( uint i = 0; i < max_p; i++ ) { _threads[i] = new TPoolThr( i, this ); _idle_threads.append( _threads[i] ); _threads[i]->start( true, true ); }// for // tell the scheduling system, how many threads to expect pthread_setconcurrency( max_p + pthread_getconcurrency() );}TThreadPool::~TThreadPool (){ // wait till all threads have finished sync_all(); // finish all thread for ( uint i = 0; i < _max_parallel; i++ ) { _threads[i]->sync_mutex().lock(); _threads[i]->set_end( true ); _threads[i]->set_job( NULL, NULL ); _threads[i]->work_mutex().lock(); _threads[i]->work_cond().signal(); _threads[i]->work_mutex().unlock(); _threads[i]->sync_mutex().unlock(); }// for // cancel and delete all threads (not really safe !) for ( uint i = 0; i < _max_parallel; i++ ) delete _threads[i];}/////////////////////////////////////////////////// run, stop and synch with job//voidTThreadPool::run ( TThreadPool::TJob * job, void * ptr ){ assert( job != NULL );#if 0 // // run in calling thread // job->run( ptr );#else // // run in concurrent thread // TPoolThr * t = get_idle(); // // and start the job // t->sync_mutex().lock(); t->set_job( job, ptr ); // attach thread to job job->set_pool_thr( t ); t->work_mutex().lock(); t->work_cond().signal(); t->work_mutex().unlock(); // t->sync_mutex().unlock();#endif}voidTThreadPool::sync ( TJob * job ){ if ( job == NULL ) return; TPoolThr * t = job->pool_thr(); // check if job is already released if ( t == NULL ) return; // // look if thread is working and wait for signal // t->sync_mutex().lock(); t->set_job( NULL, NULL ); t->sync_mutex().unlock(); // detach job and thread job->set_pool_thr( NULL );}//// sync with all running threads//voidTThreadPool::sync_all (){ for ( uint i = 0; i < _max_parallel; i++ ) { if ( _threads[i]->sync_mutex().trylock() ) _threads[i]->sync_mutex().lock(); _threads[i]->sync_mutex().unlock(); }// for}/////////////////////////////////////////////////// manage pool threads////// return idle thread form pool//TThreadPool::TPoolThr *TThreadPool::get_idle (){ while ( true ) { // // wait for an idle thread // _idle_mutex.lock(); while ( _idle_threads.size() == 0 ) _idle_cond.wait( _idle_mutex ); _idle_mutex.unlock(); // // get first idle thread // _list_mutex.lock(); if ( _idle_threads.size() > 0 ) { TPoolThr * t = _idle_threads.behead(); _list_mutex.unlock(); return t; }// if _list_mutex.unlock(); }// while}voidTThreadPool::append_idle ( TThreadPool::TPoolThr * t ){ _list_mutex.lock(); // // check if given thread is already in list // and only append if not so // TSLL< TPoolThr * >::TIterator iter = _idle_threads.first(); while ( ! iter.eol() ) { if ( iter() == t ) {// cout << "(TThreadPool) append_idle : thread " << t->thread_no()// << " is already idle" << endl; _list_mutex.unlock(); return; }// if ++iter; }// while _idle_threads.append( t ); _list_mutex.unlock(); _idle_mutex.lock(); _idle_cond.broadcast(); _idle_mutex.unlock();}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -