⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 simplethreadpool.cpp

📁 sspi_workbench
💻 CPP
字号:
#include "precomp.h"
#include "SimpleThreadPool.h"

SimpleThreadPool::SimpleThreadPool(ERR_FCN errFcn, WORKER_THREAD_PROC worker)
: m_errFcn(errFcn),
  m_cThreads(_calcThreadCount()),
  m_iocp(CreateIoCompletionPort(INVALID_HANDLE_VALUE,0,0,0)),
  m_worker(worker),
  m_prgThreads(new HANDLE[m_cThreads]),
  m_bClosed(false)
{
    if (!m_iocp)
        _err(L"CreateIoCompletionPort");
}

DWORD SimpleThreadPool::_calcThreadCount() {
    SYSTEM_INFO si;
    GetSystemInfo(&si);
    return si.dwNumberOfProcessors * 2;
}

void SimpleThreadPool::init() {
    _ASSERT(!m_bClosed); // not specifically designed for reuse; get another instance
    for (DWORD i = 0; i < m_cThreads; ++i) {
        DWORD tid;
        HANDLE h = CreateThread(0, 0, _threadProc, this, 0, &tid);
        if (!h)
            _err(L"CreateThread");
        m_prgThreads[i] = h;
    }
}

void SimpleThreadPool::postJob(void* pCtx, int nType, void* pv, HANDLE hEvent) {
    _ASSERT(!m_bClosed);
    Job* pJob = new Job(pCtx, nType, pv, hEvent);
    if (!PostQueuedCompletionStatus(m_iocp, 0, reinterpret_cast<ULONG_PTR>(pJob), 0))
        _err(L"PostQueuedCompletionStatus");
}

bool SimpleThreadPool::executeJob(void* pCtx, int nType, void* pv, WM_DISPATCH_FCN wmDispatchFcn) {
    _ASSERT(!m_bClosed);
    HANDLE hEvent = CreateEvent(0, TRUE, FALSE, 0);
    if (!hEvent)
        _err(L"CreateEvent", false, GetLastError());
    postJob(pCtx, nType, pv, hEvent);
    return _waitAndPumpMessages(1, &hEvent, wmDispatchFcn);
}

SimpleThreadPool::~SimpleThreadPool() {
    _ASSERT(m_bClosed); // don't forget to call close
}

bool SimpleThreadPool::close(DWORD nTimeout)
{
    bool bResult = false;
    if (!m_bClosed) {
        for (DWORD i = 0; i < m_cThreads; ++i) {
            postJob(0, -1);
        }
        bResult =
            WAIT_TIMEOUT != WaitForMultipleObjects(m_cThreads, m_prgThreads, TRUE, nTimeout);

        CloseHandle(m_iocp);
        for (i = 0; i < m_cThreads; ++i) {
            // this isn't terribly clean (some of these threads may already be dead)
            if (!bResult)
                TerminateThread(m_prgThreads[i], 1);
            CloseHandle(m_prgThreads[i]);
        }
        delete [] m_prgThreads;
        m_bClosed = true;
    }
    return bResult; // return value tells you whether or not the threads were cleaned up nicely
}

// warning C4702: unreachable code
#pragma warning(disable:4702)
DWORD WINAPI SimpleThreadPool::_threadProc(void* pv) {
    SimpleThreadPool& tp = *static_cast<SimpleThreadPool*>(pv);
    for (;;) {
        DWORD       cb;
        OVERLAPPED* pov;
        ULONG_PTR   key;
        if (GetQueuedCompletionStatus(tp.m_iocp, &cb, &key, &pov, INFINITE)) {
            _ASSERT(!pov); // we don't use this structure
            Job* pJob = reinterpret_cast<Job*>(key);

            if (-1 == pJob->nType) // see if this is a request to shut down
                return 0;
        
            // do the work
            tp.m_worker(pJob->pCtx, pJob->nType, pJob->pv);
        
            if (pJob->hEvent) {
                SetEvent(pJob->hEvent);  // signal work is complete
                CloseHandle(pJob->hEvent);
            }
            delete pJob;
        }
        else tp._err(L"GetQueuedCompletionStatus");
    }
    return 0;
}
#pragma warning(default:4702)

bool SimpleThreadPool::_waitAndPumpMessages(DWORD cHandles, HANDLE* prgHandles, WM_DISPATCH_FCN wmDispatchFcn) {
    bool bCanceled = false;
    while (!bCanceled) {
        if (!_drainMessageQueue(wmDispatchFcn))
            bCanceled = true;
        DWORD nResult = MsgWaitForMultipleObjects(cHandles, prgHandles, FALSE, 1000, QS_ALLEVENTS);
        if (WAIT_OBJECT_0 + cHandles == nResult) {
            if (!_drainMessageQueue(wmDispatchFcn))
                bCanceled = true;
        }
        else if (WAIT_TIMEOUT != nResult) {
            break;
        }
    }
    return !bCanceled;
}

bool SimpleThreadPool::_drainMessageQueue(WM_DISPATCH_FCN wmDispatchFcn) {
    MSG msg;
    while (PeekMessage(&msg, 0, 0, 0, PM_REMOVE)) {
        if (!wmDispatchFcn(msg))
            return false;
    }
    return true;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -