📄 workthreadpool.cpp
字号:
#include "WorkThreadPool.h"
#include "TaskQueue.h"
#include "LogHelper.h"
#include <string>
using namespace std;
extern CTaskQueue theQueue;
extern CLogHelper theLogger;
CWorkThreadPool thePool(1);
LockType g_ThreadPoolLock;
//线程队列和任务队列调度线程
//主要的工作是,清除线程组中的无用线程,然后从
//任务队列中取出来一个任务对象,接着尝试从
//线程组中找出一个空闲的线程,如果不能找到的话
//就创建一个新的线程,并把新线程加入到线程组中
#ifdef __LINUX__
void*
#else
DWORD WINAPI
#endif
TaskScheduleThread(
#ifdef _WIN32
LPVOID
#else
void*
#endif
data);
CWorkThreadPool::CWorkThreadPool(unsigned int nSize)
{
InitLock(&g_ThreadPoolLock);
m_nThread = 0;
m_nThread = nSize;
for(int i = 0; i < nSize; i++)
{
StartWorkThread(i + 1);
}
//再启动调度线程,负责消费任务对列中的任务,
//将之分配给线程池中的工作线程进行执行
if(!StartScheduleThread())
{
string str = "调度线程启动失败,系统退出[CWorkThreadPool::CWorkThreadPool(const unsigned int nSize)]";
theLogger.LogMessage(str, true);
exit(0);
}
}
CWorkThreadPool::~CWorkThreadPool()
{
}
#ifdef __LINUX__
void*
#else
DWORD WINAPI
#endif
TaskScheduleThread(
#ifdef _WIN32
LPVOID
#else
void*
#endif
data)
{
//得到当前线程池指针
CWorkThreadPool* wp = (CWorkThreadPool*)data;
//判断是否有效
if(wp == NULL)
{
string str = "无效的线程池参数[TaskScheduleThread]";
theLogger.LogMessage(str, true);
exit(0);
}
//循环查找合适的任务和合适的线程
while(1)
{
//任务队列空闲时,工作线程不需要运行
if(theQueue.IsEmpty())
{
SLEEP(2, 50);
continue;
}
//当任务队列不空时,取出一个任务进行执行
CTask* task = theQueue.FetchTask();
if(task != NULL)
{
CWorkThread* wth = wp->FetchFreeThread();
while(wth == NULL)
{
string str = "调度线程: 无空闲线程[TaskScheduleThread]";
theLogger.LogMessage(str, true);
int nID = wp->ThreadSize() + 1;
wp->StartWorkThread(nID);
SLEEP(0, 100);
wth = wp->FetchFreeThread();
}
string str = string(wth->GetName()) + "分配到新的任务[TaskScheduleThread]";
theLogger.LogMessage(str, true);
wth->AssigTask(task);
}
else
{
continue;
}
}
return 0;
}
int CWorkThreadPool::StartScheduleThread()
{
#ifdef _WIN32
DWORD dwThreadID;
if(CreateThread(NULL, 0, TaskScheduleThread, this, 0, &dwThreadID) == NULL)
{
ThreadSafePrint("调度线程启动失败");
return 0;
}
#else
pthread_t thread;
if(pthread_create(&thread, NULL, TaskScheduleThread, this))
{
ThreadSafePrint("调度线程启动失败");
return 0;
}
if(pthread_detach(thread))
{
ThreadSafePrint("调度线程分离失败");
return 0;
}
#endif
return 1;
}
void CWorkThreadPool::AddThread(CWorkThread *th)
{
if(th != NULL)
{
LockOn(&g_ThreadPoolLock);
m_pThreadQueue.push(th);
LockOff(&g_ThreadPoolLock);
}
}
CWorkThread* CWorkThreadPool::FetchFreeThread( )
{
CWorkThread* th = NULL;
LockOn(&g_ThreadPoolLock);
m_nThread = m_pThreadQueue.size();
LockOff(&g_ThreadPoolLock);
if(m_nThread <= 0)
{
return NULL;
}
//循环检测队列中是否有空闲线程存在
LockOn(&g_ThreadPoolLock);
int i = m_nThread;
while(i-- > 0)
{
//取出头元素
th = m_pThreadQueue.front();
//将头元素吐出来
m_pThreadQueue.pop();
//然后再加到队尾
m_pThreadQueue.push(th);
//如果有效的话则改变他的状态
if(th && th->IsFree())
{
string str = "取到一个空闲线程:" + string(th->GetName()) + "[CWorkThread* CWorkThreadPool::FetchFreeThread( )]";
theLogger.LogMessage(str, true);
th->ChangeStatus(false);
break;
}
th = NULL;
}
LockOff(&g_ThreadPoolLock);
return th;
}
int CWorkThreadPool::ThreadSize()
{
int nRet = 0;
LockOn(&g_ThreadPoolLock);
nRet = m_pThreadQueue.size();
LockOff(&g_ThreadPoolLock);
return nRet;
}
int CWorkThreadPool::FreeThreads()
{
int nRet = 0;
LockOn(&g_ThreadPoolLock);
int i = m_pThreadQueue.size();
while(i-- >0)
{
CWorkThread* wth = m_pThreadQueue.front();
m_pThreadQueue.pop();
m_pThreadQueue.push(wth);
if(wth && wth->IsFree())
{
nRet++;
}
}
LockOff(&g_ThreadPoolLock);
return nRet;
}
int CWorkThreadPool::StartWorkThread(int nID)
{
int nRet = 0;
//创建一个新的线程对象
CWorkThread* wl = new CWorkThread;
//设置线程的ID
wl->SetID(nID);
//启动线程
nRet = wl->Start(wl);
//并将线程添加到线程队列中
AddThread(wl);
m_nThread += nRet;
return nRet;
}
int CWorkThreadPool::CleanDeadThreads()
{
int nRet = 0;
LockOn(&g_ThreadPoolLock);
int i = m_pThreadQueue.size();
while(i-- >0)
{
CWorkThread* wth = m_pThreadQueue.front();
m_pThreadQueue.pop();
if(wth->ShouldExit())
{
nRet++;
}else
{
m_pThreadQueue.push(wth);
}
}
LockOff(&g_ThreadPoolLock);
return nRet;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -