📄 threadpool.cpp
字号:
#include "threadpool.h"
#include <assert.h>
#include "mutex.h"
#include "condition.h"
#ifndef WIN32
#define ETIMEDOUT -1
#include <pthread.h>
#else
#define ETIMEDOUT -1
#endif
ThreadPool::ThreadPool()
:pPoolMutex_(NULL)
,pCondMutex_(NULL)
,pCond_(NULL)
{
}
ThreadPool::~ThreadPool()
{
}
void ThreadPool::CallProc(void *pPara)
{
CallProcPara *cp = static_cast<CallProcPara *>(pPara);
assert(cp != NULL);
cp->_pObj->DoJob(cp->_pPara);
delete cp;
}
void* ThreadPool::Worker(void* pPara)
{
ThreadPool* pObj = reinterpret_cast<ThreadPool*>(pPara);
int retval = 0;
bool must_exit = false;
#ifndef WIN32
struct timespec timeout;
#endif
int& _max = pObj->getMax();
int& _idle = pObj->getIdle();
int& _alive = pObj->getAlive();
pool_state_t& _state = pObj->getState();
Mutex* _pPoolMutex = pObj->getPoolMutex();
Condition* _pCond = pObj->getCond();
int& _idle_timeout = pObj->getIdletimeout();
JobQueue& _JobQueue = pObj->getJobQueue();
JobItem* pJob = NULL;
/* loop looking for work */
for (;;)
{
do
{
MutexApply ma(*_pPoolMutex);
#ifndef WIN32
timeout.tv_sec = time(NULL) + _idle_timeout;
timeout.tv_nsec = 0;
#endif
_idle++;
#ifdef WAITTHRTHREAD
if(_JobQueue.size() > 0 && _state != POOL_EXIT)
#else
if(_JobQueue.size() > 0)
#endif
{
pJob = _JobQueue.front();
_JobQueue.pop();
}
} while(false);
#ifdef WAITTHRTHREAD
if(pJob == NULL && _state != POOL_EXIT)
#else
if(pJob == NULL)
#endif
{
if(_state != POOL_EXIT)
{
#ifndef WIN32
retval = _pCond->timedwait(& timeout);
#endif
if (retval == ETIMEDOUT)
{
must_exit = true;
//break;
}
else
{
MutexApply ma(*_pPoolMutex);
if(_JobQueue.size() > 0)
{
pJob = _JobQueue.front();
_JobQueue.pop();
}
}
}
}
else
{
must_exit = true;
}
do
{
MutexApply ma(*_pPoolMutex);
_idle --;
if (_state == POOL_EXIT)
{
must_exit = true;
}
}
while(false);
if(pJob != NULL)
{
pJob->_job(pJob->_pPara);
delete pJob;
pJob = NULL;
}
else
{
if(must_exit)
{
break;
}
}
}
do
{
MutexApply ma(*_pPoolMutex);
_alive --;
}
while(false);
if(_alive == 0)
{
_pCond->broadcast();
}
return NULL;
}
bool ThreadPool::Init(int iNum /* = 3 */,int idle_timeout /* = 5 */)
{
if(iNum <= 0)
{
return false;
}
bool bInit = true;
max_ = iNum;
idle_ = 0;
alive_ = 0;
idle_timeout_ = idle_timeout;
pPoolMutex_ = new Mutex;
pCondMutex_ = new Mutex;
pCond_ = new Condition(*pCondMutex_);
state_ = POOL_VALID;
return bInit;
}
bool ThreadPool::Destroy()
{
bool bDestory = true;
if(state_ != POOL_VALID)
{
return false;
}
do
{
do
{
MutexApply ma(*pPoolMutex_);
state_ = POOL_EXIT;
} while(false);
if(alive_ > 0)
{
if(!pCond_->broadcast())
{
bDestory = false;
break;
}
}
while(alive_ > 0)
{
if(!pCond_->wait())
{
bDestory = false;
break;
}
}
} while(false);
delete pPoolMutex_;
delete pCondMutex_;
delete pCond_;
pPoolMutex_ = NULL;
pCondMutex_ = NULL;
pCond_ = NULL;
return bDestory;
}
bool ThreadPool::Dispatch(JOB job,void* pParam)
{
bool bDispatch = true;
if(state_ != POOL_VALID)
{
return false;
}
do
{
MutexApply ma(*pPoolMutex_);
JobQueue_.push(new JobItem(job,pParam));
if ((idle_ == 0) &&(alive_ < max_))
{
#ifndef WIN32
pthread_t thr_id;
if (pthread_create(&thr_id, NULL,Worker, this) != 0)
{
bDispatch = false;
break;
}
else
{
alive_++;
pthread_detach(thr_id);
}
#endif
}
}
while(false);
if(bDispatch)
{
return pCond_->signal();
}
else
{
return bDispatch;
}
}
bool ThreadPool::Dispatch(ThreadJob* pJob,void* pParam)
{
return Dispatch(CallProc, new CallProcPara(pJob, pParam));
}
int& ThreadPool::getMax()
{
return max_;
}
int& ThreadPool::getAlive()
{
return alive_;
}
int& ThreadPool::getIdle()
{
return idle_;
}
int& ThreadPool::getIdletimeout()
{
return idle_timeout_;
}
JobQueue& ThreadPool::getJobQueue()
{
return JobQueue_;
}
pool_state_t& ThreadPool::getState()
{
return state_;
}
Mutex* ThreadPool::getPoolMutex()
{
return pPoolMutex_;
}
Condition* ThreadPool::getCond()
{
return pCond_;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -