📄 threadpool.cpp
字号:
#include "threadpool.h"
#include <assert.h>
//#include <Wxutil.h>
#ifdef _DEBUG
#undef THIS_FILE
static char THIS_FILE[]=__FILE__;
//#define new DEBUG_NEW
#endif
/*
管理线程
*/
unsigned int CThreadPool::ManagerProc(void* p)
{
CThreadPool* pServer = (CThreadPool*)p;
HANDLE IoPort = pServer->GetMgrIoPort();
unsigned long pN1, pN2;
OVERLAPPED* pOverLapped;
LABEL_MANAGER_PROCESSING:
while(::GetQueuedCompletionStatus(IoPort, &pN1, &pN2, &pOverLapped, pServer->GetMgrWaitTime()))
{
if(pOverLapped == (OVERLAPPED*)0xFFFFFFFF)
{
return 0;
}
// else if(pOverLapped == (OVERLAPPED*)0xFFFFFFFE)
// {
// if(pN1 != 0)
// {
// DWORD rc=::WaitForSingleObject((HANDLE)pN1,INFINITE);
// if(rc=WAIT_OBJECT_0)
// {
// CloseHandle((HANDLE)pN1);//关闭该线程句柄
// }
// ReportDebug("Wait a Thread Removed!\n");
// }
// }
else
{ //zc
//ReportDebug("mgr events comes in!\n");
}
}
//time out processing
if (::GetLastError() == WAIT_TIMEOUT)
{
//ReportDebug("Time out processing!\n");
if (pServer->GetThreadPoolStatus() == CThreadPool::BUSY)
pServer->AddThreads();
if (pServer->GetThreadPoolStatus() == CThreadPool::IDLE)
pServer->RemoveThreads();
goto LABEL_MANAGER_PROCESSING;
}
return 0;
}
/*
线程池工作体
*/
unsigned int CThreadPool::WorkerProc(void* p)
{
CThreadPool* pServer=(CThreadPool*)p;
HANDLE IoPort = pServer->GetWorkerIoPort();
unsigned long pN1, pN2;
OVERLAPPED* pOverLapped;
DWORD threadId = ::GetCurrentThreadId();
//ReportDebug("worker thread id is %d.\n", threadId);
while(::GetQueuedCompletionStatus(IoPort, &pN1, &pN2,
&pOverLapped, INFINITE ))
{
if(pOverLapped == (OVERLAPPED*)0xFFFFFFFE)
{
// CThreadPool::Iterator_ThreadInfoMap it = pServer->m_threadMap.find(threadId);
// if(it!=pServer->m_threadMap.end())
// {
// ::PostQueuedCompletionStatus(pServer->m_hMgrIoPort,
// (unsigned long)it->second.m_hThread,
// 0,
// (OVERLAPPED*)0xFFFFFFFE);
pServer->RemoveThread(threadId);
// }
break;
}
else if(pOverLapped == (OVERLAPPED*)0xFFFFFFFF)
{
break;
}
else
{
//before processing, we need to change the status to busy.
pServer->ChangeStatus(threadId, true);
IWorker* pIWorker = reinterpret_cast<IWorker*>(pN1);
IJobDesc* pIJob= reinterpret_cast<IJobDesc*>(pN2);
//开始调用工作
pIWorker->ProcessJob(pIJob);
pServer->ChangeStatus(threadId, false);
}
}
return 0;
}
/*
nStatic 运行线程数
nMax 最大线程数
*/
void CThreadPool::Start(unsigned short nStatic, unsigned short nMax)
{
assert(nMax >= nStatic);
HANDLE hThread;
DWORD nThreadId;
m_nNumberOfStaticThreads = nStatic;
m_nNumberOfTotalThreads = nMax;
//自动加锁状态?
//CAutoLock AutoLock(m_arrayCs);
//CSingleLock lock(&m_arrayCs);
//lock.Lock();
m_hMgrIoPort = CreateIoCompletionPort((HANDLE)INVALID_HANDLE_VALUE, NULL, 0, 0);
m_hWorkerIoPort = CreateIoCompletionPort((HANDLE)INVALID_HANDLE_VALUE, NULL, 0, 0);
//创建管理线程的线程,
m_hMgrThread = CreateThread(
NULL, // SD
0, // initial stack size
(LPTHREAD_START_ROUTINE)ManagerProc, // thread function
(LPVOID)this, // thread argument
0,
&nThreadId ); // thread identifier
//创建工作线程,并保存在map中
for(long n = 0; n < nStatic; n++)
{
hThread = CreateThread(
NULL, // SD
0, // initial stack size
(LPTHREAD_START_ROUTINE)WorkerProc, // thread function
(LPVOID)this, // thread argument
0, // creation option
&nThreadId );
//ReportDebug("generate a worker thread handle id is %d.\n", nThreadId);
//m_threadMap.insert(m_threadMap.end(),ThreadInfoMap::value_type(nThreadId,ThreadInfo(hThread, false)));
m_threadMap.SetAt(nThreadId, ThreadInfo(hThread, false));
}
// lock.Unlock();
}
/*
关闭线程池
*/
void CThreadPool::Stop(bool bHash)
{
//CAutoLock Lock(m_arrayCs);
//CSingleLock lock(&m_arrayCs);
//lock.Lock();
::PostQueuedCompletionStatus(m_hMgrIoPort, 0, 0, (OVERLAPPED*)0xFFFFFFFF);
//在那里释放?
//lock.Unlock();
WaitForSingleObject(m_hMgrThread, INFINITE);
CloseHandle(m_hMgrThread);
CloseHandle(m_hMgrIoPort);
//shut down all the worker threads
//UINT nCount = m_threadMap.size();
UINT nCount = m_threadMap.GetCount();
HANDLE* pThread = new HANDLE[nCount];
long n = 0;
ThreadInfo info;
/*
for(Iterator_ThreadInfoMap i = m_threadMap.begin();i != m_threadMap.end(); i++)
{
::PostQueuedCompletionStatus(m_hWorkerIoPort, 0, 0, (OVERLAPPED*)0xFFFFFFFF);
info = i->second;
pThread[n++] = info.m_hThread;
}
*/
//wait for 0.5 minutes, then start to kill threads
DWORD rc = WaitForMultipleObjects(nCount, pThread, TRUE, 30000);
CloseHandle(m_hWorkerIoPort);
if(rc >= WAIT_OBJECT_0 && rc < WAIT_OBJECT_0+nCount)
{
for(unsigned int n = 0;n < nCount;n++)
{
CloseHandle(pThread[n]);
}
}
else if(rc == WAIT_TIMEOUT && bHash)
{
//some threads not terminated, we have to stop them.
DWORD exitCode;
for(unsigned int i=0; i<nCount; i++)
{
if (::GetExitCodeThread(pThread[i], &exitCode) == STILL_ACTIVE)
{
TerminateThread(pThread[i], 99);
}
CloseHandle(pThread[i]);
}
}
delete[] pThread;
}
/*
把处理消息放到工作线程完成队列中,开始处理.
*/
void CThreadPool::ProcessJob(IJobDesc* pJob, IWorker* pWorker) const
{
::PostQueuedCompletionStatus(m_hWorkerIoPort, \
reinterpret_cast<DWORD>(pWorker), \
reinterpret_cast<DWORD>(pJob),\
NULL);
}
CThreadPool::CThreadPool()
{
}
CThreadPool::~CThreadPool()
{
}
/*
增加线程
*/
void CThreadPool::AddThreads()
{
HANDLE hThread;
DWORD nThreadId;
//unsigned int nCount = m_threadMap.size();
unsigned int nCount = m_threadMap.GetCount();
unsigned int nTotal = min(nCount+2, m_nNumberOfTotalThreads);
for(unsigned int i = 0; i < nTotal - nCount; i++)
{
hThread = CreateThread(
NULL, // SD
0, // initial stack size
(LPTHREAD_START_ROUTINE)WorkerProc, // thread function
(LPVOID)this, // thread argument
0, // creation option
&nThreadId );
//ReportDebug("generate a worker thread handle id is %d.\n", nThreadId);
// m_threadMap.insert(m_threadMap.end(),ThreadInfoMap::value_type(nThreadId,ThreadInfo(hThread, false)));
m_threadMap.SetAt(nThreadId, ThreadInfo(hThread, false));
}
}
//void CThreadPool::RemoveThread(DWORD threadId)
//{
//CAutoLock lock(m_arrayCs);
//m_threadMap.erase(threadId);
//}
/*
移去所有的线程
*/
void CThreadPool::RemoveThreads()
{
//unsigned int nCount = m_threadMap.size();
unsigned int nCount = m_threadMap.GetCount();
unsigned int nTotal = max(nCount-2, m_nNumberOfStaticThreads);
for(unsigned int i = 0; i < nCount-nTotal; i++)
{
::PostQueuedCompletionStatus(m_hWorkerIoPort, 0, 0, (OVERLAPPED*)0xFFFFFFFE);
}
}
/*
获取线程池的状态
*/
CThreadPool::ThreadPoolStatus CThreadPool::GetThreadPoolStatus()
{
//int nTotal = m_threadMap.size();
int nTotal = m_threadMap.GetCount();
ThreadInfo info;
int nCount = 0;
/*
for(Iterator_ThreadInfoMap i = m_threadMap.begin();i!=m_threadMap.end() ;i++)
{
info = i->second;
if (info.m_bBusyWorking == true)
nCount++;
}
*/
if ( nCount/(1.0*nTotal) > 0.8 )
return BUSY;
if ( nCount/ (1.0*nTotal) < 0.2 )
return IDLE;
return NORMAL;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -