⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tthreadpool.cc

📁 线程池 线程池 线程池
💻 CC
字号:
//// Project : threadpool// File    : thread_pool.cc// Purpose : class for managing a pool of threads//// Copyright (C) 2003 - Ronald Kriemann//// This library is free software; you can redistribute it and/or// modify it under the terms of the GNU Lesser General Public// License as published by the Free Software Foundation; either// version 2.1 of the License, or (at your option) any later version.//// This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the// GNU Lesser General Public License for more details.//// You should have received a copy of the GNU Lesser General Public// License along with this program; if not, write to the Free Software// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA//#include <assert.h>// hack for linux#define __USE_UNIX98#include <pthread.h>#include "TThreadPool.hh"//// global thread-pool//TThreadPool * thread_pool = NULL;/////////////////////////////////////////////////// init global thread pool/////////////////////////////////////////////////intinit_thread_pool ( uint p ){    if ( thread_pool != NULL )        delete thread_pool;        if ((thread_pool = new TThreadPool( p )) == NULL)    {        cout << "(init_thread_pool) no memory available" << endl;        return -1;    }// if    return 0;}/////////////////////////////////////////////////// thread handled by threadpool/////////////////////////////////////////////////TThreadPool::TPoolThr::~TPoolThr (){    // sync with finishing    _del_mutex.lock();    _del_mutex.unlock();}//// running method//voidTThreadPool::TPoolThr::run (){    _del_mutex.lock();        while ( ! _end )    {        //        // wait for work        //                _work_mutex.lock();                while ((_job == NULL) && ! _end )            _work_cond.wait( _work_mutex );        _work_mutex.unlock();        //        // look if we really have a job to do        // and handle it        //                if ( _job != NULL )        {            //_sync_mutex.lock();            // execute job            _job->run( _data_ptr );                        // detach thread from job            _job->set_pool_thr( NULL );            set_job( NULL, NULL );            _sync_mutex.unlock();        }// if        // append thread to idle-list        _pool->append_idle( this );    }// while    _del_mutex.unlock();}/////////////////////////////////////////////////// ThreadPool ///////////////////////////////////////////////////// constructor and destructor//TThreadPool::TThreadPool ( uint max_p ){    //    // create max_p threads for pool    //    _max_parallel = max_p;    _threads.set_size( max_p );    for ( uint i = 0; i < max_p; i++ )    {        _threads[i] = new TPoolThr( i, this );        _idle_threads.append( _threads[i] );        _threads[i]->start( true, true );    }// for    // tell the scheduling system, how many threads to expect    pthread_setconcurrency( max_p + pthread_getconcurrency() );}TThreadPool::~TThreadPool (){    // wait till all threads have finished    sync_all();        // finish all thread    for ( uint i = 0; i < _max_parallel; i++ )    {        _threads[i]->sync_mutex().lock();        _threads[i]->set_end( true );        _threads[i]->set_job( NULL, NULL );                _threads[i]->work_mutex().lock();        _threads[i]->work_cond().signal();        _threads[i]->work_mutex().unlock();                _threads[i]->sync_mutex().unlock();    }// for        // cancel and delete all threads (not really safe !)    for ( uint i = 0; i < _max_parallel; i++ )        delete _threads[i];}/////////////////////////////////////////////////// run, stop and synch with job//voidTThreadPool::run ( TThreadPool::TJob * job, void * ptr ){    assert( job != NULL );#if 0    //    // run in calling thread    //        job->run( ptr );#else    //    // run in concurrent thread    //        TPoolThr * t = get_idle();        //    // and start the job    //        t->sync_mutex().lock();        t->set_job( job, ptr );    // attach thread to job    job->set_pool_thr( t );    t->work_mutex().lock();    t->work_cond().signal();    t->work_mutex().unlock();    //    t->sync_mutex().unlock();#endif}voidTThreadPool::sync ( TJob * job ){    if ( job == NULL )        return;        TPoolThr  * t = job->pool_thr();    // check if job is already released    if ( t == NULL )        return;        //    // look if thread is working and wait for signal    //    t->sync_mutex().lock();    t->set_job( NULL, NULL );    t->sync_mutex().unlock();    // detach job and thread    job->set_pool_thr( NULL );}//// sync with all running threads//voidTThreadPool::sync_all (){    for ( uint i = 0; i < _max_parallel; i++ )    {        if ( _threads[i]->sync_mutex().trylock() )            _threads[i]->sync_mutex().lock();        _threads[i]->sync_mutex().unlock();    }// for}/////////////////////////////////////////////////// manage pool threads////// return idle thread form pool//TThreadPool::TPoolThr *TThreadPool::get_idle (){    while ( true )    {        //        // wait for an idle thread        //        _idle_mutex.lock();            while ( _idle_threads.size() == 0 )            _idle_cond.wait( _idle_mutex );            _idle_mutex.unlock();        //        // get first idle thread        //                _list_mutex.lock();                if ( _idle_threads.size() > 0 )        {            TPoolThr * t = _idle_threads.behead();            _list_mutex.unlock();                        return t;        }// if        _list_mutex.unlock();    }// while}voidTThreadPool::append_idle ( TThreadPool::TPoolThr * t ){    _list_mutex.lock();    //    // check if given thread is already in list    // and only append if not so    //        TSLL< TPoolThr * >::TIterator  iter = _idle_threads.first();    while ( ! iter.eol() )    {        if ( iter() == t )        {//             cout << "(TThreadPool) append_idle : thread " << t->thread_no()//                  << " is already idle" << endl;            _list_mutex.unlock();            return;        }// if        ++iter;    }// while        _idle_threads.append( t );    _list_mutex.unlock();        _idle_mutex.lock();    _idle_cond.broadcast();    _idle_mutex.unlock();}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -