📄 simplethreadpool.cpp
字号:
#include "precomp.h"
#include "SimpleThreadPool.h"
SimpleThreadPool::SimpleThreadPool(ERR_FCN errFcn, WORKER_THREAD_PROC worker)
: m_errFcn(errFcn),
m_cThreads(_calcThreadCount()),
m_iocp(CreateIoCompletionPort(INVALID_HANDLE_VALUE,0,0,0)),
m_worker(worker),
m_prgThreads(new HANDLE[m_cThreads]),
m_bClosed(false)
{
if (!m_iocp)
_err(L"CreateIoCompletionPort");
}
DWORD SimpleThreadPool::_calcThreadCount() {
SYSTEM_INFO si;
GetSystemInfo(&si);
return si.dwNumberOfProcessors * 2;
}
void SimpleThreadPool::init() {
_ASSERT(!m_bClosed); // not specifically designed for reuse; get another instance
for (DWORD i = 0; i < m_cThreads; ++i) {
DWORD tid;
HANDLE h = CreateThread(0, 0, _threadProc, this, 0, &tid);
if (!h)
_err(L"CreateThread");
m_prgThreads[i] = h;
}
}
void SimpleThreadPool::postJob(void* pCtx, int nType, void* pv, HANDLE hEvent) {
_ASSERT(!m_bClosed);
Job* pJob = new Job(pCtx, nType, pv, hEvent);
if (!PostQueuedCompletionStatus(m_iocp, 0, reinterpret_cast<ULONG_PTR>(pJob), 0))
_err(L"PostQueuedCompletionStatus");
}
bool SimpleThreadPool::executeJob(void* pCtx, int nType, void* pv, WM_DISPATCH_FCN wmDispatchFcn) {
_ASSERT(!m_bClosed);
HANDLE hEvent = CreateEvent(0, TRUE, FALSE, 0);
if (!hEvent)
_err(L"CreateEvent", false, GetLastError());
postJob(pCtx, nType, pv, hEvent);
return _waitAndPumpMessages(1, &hEvent, wmDispatchFcn);
}
SimpleThreadPool::~SimpleThreadPool() {
_ASSERT(m_bClosed); // don't forget to call close
}
bool SimpleThreadPool::close(DWORD nTimeout)
{
bool bResult = false;
if (!m_bClosed) {
for (DWORD i = 0; i < m_cThreads; ++i) {
postJob(0, -1);
}
bResult =
WAIT_TIMEOUT != WaitForMultipleObjects(m_cThreads, m_prgThreads, TRUE, nTimeout);
CloseHandle(m_iocp);
for (i = 0; i < m_cThreads; ++i) {
// this isn't terribly clean (some of these threads may already be dead)
if (!bResult)
TerminateThread(m_prgThreads[i], 1);
CloseHandle(m_prgThreads[i]);
}
delete [] m_prgThreads;
m_bClosed = true;
}
return bResult; // return value tells you whether or not the threads were cleaned up nicely
}
// warning C4702: unreachable code
#pragma warning(disable:4702)
DWORD WINAPI SimpleThreadPool::_threadProc(void* pv) {
SimpleThreadPool& tp = *static_cast<SimpleThreadPool*>(pv);
for (;;) {
DWORD cb;
OVERLAPPED* pov;
ULONG_PTR key;
if (GetQueuedCompletionStatus(tp.m_iocp, &cb, &key, &pov, INFINITE)) {
_ASSERT(!pov); // we don't use this structure
Job* pJob = reinterpret_cast<Job*>(key);
if (-1 == pJob->nType) // see if this is a request to shut down
return 0;
// do the work
tp.m_worker(pJob->pCtx, pJob->nType, pJob->pv);
if (pJob->hEvent) {
SetEvent(pJob->hEvent); // signal work is complete
CloseHandle(pJob->hEvent);
}
delete pJob;
}
else tp._err(L"GetQueuedCompletionStatus");
}
return 0;
}
#pragma warning(default:4702)
bool SimpleThreadPool::_waitAndPumpMessages(DWORD cHandles, HANDLE* prgHandles, WM_DISPATCH_FCN wmDispatchFcn) {
bool bCanceled = false;
while (!bCanceled) {
if (!_drainMessageQueue(wmDispatchFcn))
bCanceled = true;
DWORD nResult = MsgWaitForMultipleObjects(cHandles, prgHandles, FALSE, 1000, QS_ALLEVENTS);
if (WAIT_OBJECT_0 + cHandles == nResult) {
if (!_drainMessageQueue(wmDispatchFcn))
bCanceled = true;
}
else if (WAIT_TIMEOUT != nResult) {
break;
}
}
return !bCanceled;
}
bool SimpleThreadPool::_drainMessageQueue(WM_DISPATCH_FCN wmDispatchFcn) {
MSG msg;
while (PeekMessage(&msg, 0, 0, 0, PM_REMOVE)) {
if (!wmDispatchFcn(msg))
return false;
}
return true;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -