⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 threadpool.cpp

📁 linux下的线程池,其中使用了条件变量,互斥锁等保持线程同步的变量!
💻 CPP
字号:
#include "threadpool.h"
#include <assert.h>
#include "mutex.h"
#include "condition.h"
#ifndef WIN32
#define ETIMEDOUT -1
#include <pthread.h>
#else
#define ETIMEDOUT -1
#endif

ThreadPool::ThreadPool()
:pPoolMutex_(NULL)
,pCondMutex_(NULL)
,pCond_(NULL)
{


}

ThreadPool::~ThreadPool()
{


}


void ThreadPool::CallProc(void *pPara)
{
	
	CallProcPara *cp = static_cast<CallProcPara *>(pPara);
	assert(cp != NULL);	
	cp->_pObj->DoJob(cp->_pPara);
	delete cp;

}

void* ThreadPool::Worker(void* pPara)
{

	ThreadPool* pObj = reinterpret_cast<ThreadPool*>(pPara);
	int retval = 0;
	bool must_exit = false;
#ifndef WIN32
	struct timespec timeout;
#endif
	
	int& _max				= pObj->getMax();
	int& _idle				= pObj->getIdle();
	int& _alive				= pObj->getAlive();
	pool_state_t& _state	= pObj->getState();
	Mutex* _pPoolMutex		= pObj->getPoolMutex();
	Condition* _pCond		= pObj->getCond();
	int& _idle_timeout		= pObj->getIdletimeout();
	JobQueue& _JobQueue		= pObj->getJobQueue();
	
	
	JobItem* pJob = NULL;
	
	
	/* loop looking for work */
	for (;;)
	{
		do 
		{
			MutexApply ma(*_pPoolMutex);
#ifndef WIN32
			timeout.tv_sec = time(NULL) + _idle_timeout;
			timeout.tv_nsec = 0;
#endif
			_idle++;
#ifdef      WAITTHRTHREAD
			if(_JobQueue.size() > 0 && _state != POOL_EXIT)
#else
				if(_JobQueue.size() > 0)
#endif
					
				{
					pJob  = _JobQueue.front();
					_JobQueue.pop();
					
				}
				
		} while(false);
		
#ifdef      WAITTHRTHREAD
		if(pJob == NULL && _state != POOL_EXIT)
#else
			if(pJob == NULL)
#endif
			{
				if(_state != POOL_EXIT)
				{
#ifndef WIN32
					retval = _pCond->timedwait(& timeout);
#endif
					if (retval == ETIMEDOUT) 
					{
						must_exit = true;
						//break;
					}
					else
					{
						MutexApply ma(*_pPoolMutex);
						
						if(_JobQueue.size() > 0)
						{
							pJob  = _JobQueue.front();
							_JobQueue.pop();
							
						}
					}
				}
			}
			else
			{
				must_exit = true;
			}
			
			
			do 
			{
				MutexApply ma(*_pPoolMutex);
				_idle --;
				if (_state == POOL_EXIT)
				{
					must_exit = true;
				}
				
			} 
			while(false);
			
			if(pJob != NULL)
			{
				pJob->_job(pJob->_pPara);
				delete pJob;
				pJob = NULL;				 
			}
			else
			{
				if(must_exit)
				{
					break;
				}
			}
			
			
			
			
			
	}
	do 
	{
		MutexApply ma(*_pPoolMutex);
		_alive --;
		
	} 
	while(false);
	
	if(_alive == 0)
	{
		_pCond->broadcast();
	}
	
	return NULL;
}


bool ThreadPool::Init(int iNum /* = 3 */,int idle_timeout /* = 5 */)
{

	if(iNum <= 0)
	{
		return false;
	}
	bool bInit = true;
	
	max_ = iNum;
	idle_ = 0;
	alive_ = 0;
	idle_timeout_ = idle_timeout;	
	pPoolMutex_ = new Mutex;
	pCondMutex_ = new Mutex;
	pCond_ = new Condition(*pCondMutex_);
	state_ = POOL_VALID;
	

	return bInit;
}


bool ThreadPool::Destroy()
{
	bool bDestory = true;
	
	if(state_ != POOL_VALID)
	{
		return false;
	}
	do 
	{
		
		
		do 
		{
			MutexApply ma(*pPoolMutex_);
			state_ = POOL_EXIT;
			
		} while(false);
		
		if(alive_ > 0)
		{
			if(!pCond_->broadcast())
			{
				bDestory = false;
				break;
			}
		}
		
	
		while(alive_ > 0)
		{
			if(!pCond_->wait())
			{
				bDestory = false;
				break;
			}

			
		}	
		
	} while(false);
	

	
	delete pPoolMutex_;
	delete pCondMutex_;
	delete pCond_;

	pPoolMutex_ = NULL;
	pCondMutex_ = NULL;
	pCond_      = NULL;
	return bDestory;
}


bool ThreadPool::Dispatch(JOB job,void* pParam)
{
	bool bDispatch = true;
	if(state_ != POOL_VALID)
	{
		return false;
	}

	do 
	{
		MutexApply ma(*pPoolMutex_);
		
		JobQueue_.push(new JobItem(job,pParam));

		
		if ((idle_ == 0) &&(alive_ < max_)) 
		{
#ifndef WIN32
			pthread_t thr_id;
			if (pthread_create(&thr_id, NULL,Worker, this) != 0)
			{
				bDispatch = false;
				break;
			} 
			else
			{
				alive_++;
				pthread_detach(thr_id);
			}

#endif
		
		}
	} 
	while(false);

	


	if(bDispatch)
	{
		return pCond_->signal();
	}
	else
	{
		return bDispatch;
	}

	
	

}
bool ThreadPool::Dispatch(ThreadJob* pJob,void* pParam)
{
	
	return Dispatch(CallProc, new CallProcPara(pJob, pParam));
}


int& ThreadPool::getMax() 
{
	return max_;
}
int& ThreadPool::getAlive() 
{
	return alive_;	
}
int& ThreadPool::getIdle() 
{
	return idle_;
}
int& ThreadPool::getIdletimeout() 
{
	return idle_timeout_;
}
JobQueue& ThreadPool::getJobQueue() 
{
	return JobQueue_;
}
pool_state_t& ThreadPool::getState() 
{
	return state_;
}
Mutex* ThreadPool::getPoolMutex() 
{
	return pPoolMutex_;
}
Condition* ThreadPool::getCond() 
{
	return pCond_;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -