📄 thrdpool.cpp
字号:
{
//Now that everything is setup we can resume the threads in the thread pool (if need be)
if (!bSuspended)
{
for (int i=0; i<m_Threads.GetSize(); i++)
{
CThreadPoolClient* pClient = m_Threads.GetAt(i);
ASSERT(pClient);
pClient->m_pWorkerThread->ResumeThread();
}
}
}
return bSuccess;
}
void CThreadPoolServer::Stop()
{
//Serialize access to the threads array
CSingleLock sl(&m_csThreads, TRUE);
int nThreads = (int) m_Threads.GetSize();
if (nThreads)
{
//Set the m_bRequestToStop in each thread to ask them to exit if they are
//currently processing a request in CThreadPoolClient::Run
int i;
for (i=0; i<nThreads; i++)
{
CThreadPoolClient* pClient = m_Threads.GetAt(i);
pClient->SetRequestToStop();
}
//Now post enough requests to get each thread in the thread pool to stop via
//a "special" request with an ID of THREADPOOL_SHUTDOWN_REQUEST
for (i=0; i<nThreads; i++)
{
CThreadPoolRequest killRequest;
killRequest.m_dwID = THREADPOOL_SHUTDOWN_REQUEST;
//Only use directed requests to shut down the thread pool if the pool supports
//directed requests
if (m_pQueue->SupportsDirectedRequests())
{
killRequest.m_bDirectedRequest = TRUE;
killRequest.m_nDirectedRequestIndex = i;
}
ASSERT(m_pQueue);
m_pQueue->PostRequest(killRequest);
}
//Wait for all the threads to exit in the thread pool
BOOL bMoreThreads = TRUE;
int nCurrentThreadIndex = 0;
while (bMoreThreads)
{
//Wait for as many threads at once as possible
int nCurrentThreadsToStop = min(MAXIMUM_WAIT_OBJECTS, nThreads - nCurrentThreadIndex);
//Setup the array of threads to wait on to exit
HANDLE hThreads[MAXIMUM_WAIT_OBJECTS];
for (int j=0; j<nCurrentThreadsToStop; j++)
{
CThreadPoolClient* pClient = m_Threads.GetAt(j + nCurrentThreadIndex);
ASSERT(pClient);
ASSERT(pClient->m_pWorkerThread);
hThreads[j] = pClient->m_pWorkerThread->m_hThread;
}
//Wait for the threads to exit
WaitForMultipleObjects(nCurrentThreadsToStop, hThreads, TRUE, INFINITE);
//Get ready for the next time around
nCurrentThreadIndex += nCurrentThreadsToStop;
bMoreThreads = (nCurrentThreadIndex < nThreads);
}
//Now free up all the memory associated with each thread
for (i=0; i<nThreads; i++)
{
CThreadPoolClient* pClient = m_Threads.GetAt(i);
ASSERT(pClient);
delete pClient->m_pWorkerThread;
pClient->m_pWorkerThread = NULL;
delete pClient;
}
m_Threads.RemoveAll();
//Close our queue object
delete m_pQueue;
m_pQueue = NULL;
}
//Bring down the monitoring thread if any
SetMaxThreadClientLifetime(FALSE, 0);
}
CThreadPoolClient* CThreadPoolServer::GetAtClient(int nIndex)
{
return m_Threads.GetAt(nIndex);
}
BOOL CThreadPoolServer::WaitForThreadsInitInstance()
{
//Assume the worst
BOOL bInitOK = TRUE;
int nThreads = (int) m_Threads.GetSize();
if (nThreads)
{
BOOL bMoreThreads = TRUE;
int nCurrentThreadIndex = 0;
while (bMoreThreads)
{
//Wait for as many threads at once as possible
int nEventsToWaitOn = min(MAXIMUM_WAIT_OBJECTS, nThreads - nCurrentThreadIndex);
//Setup the array of threads to wait on to exit
HANDLE hEvents[MAXIMUM_WAIT_OBJECTS];
int j;
for (j=0; j<nEventsToWaitOn; j++)
{
CThreadPoolClient* pClient = m_Threads.GetAt(j + nCurrentThreadIndex);
ASSERT(pClient);
hEvents[j] = pClient->m_evtInitCompleted;
}
//Wait for the threads to to complete their InitInstance code
WaitForMultipleObjects(nEventsToWaitOn, hEvents, TRUE, INFINITE);
//Update the Init completed which is the logical "And" of all the InitInstances
for (j=0; j<nEventsToWaitOn && bInitOK; j++)
{
CThreadPoolClient* pClient = m_Threads.GetAt(j + nCurrentThreadIndex);
ASSERT(pClient);
bInitOK &= pClient->m_bInitOK;
}
//Get ready for the next time around
nCurrentThreadIndex += nEventsToWaitOn;
bMoreThreads = (nCurrentThreadIndex < nThreads);
}
}
else
bInitOK = FALSE;
return bInitOK;
}
BOOL CThreadPoolServer::SetMaxThreadClientLifetime(BOOL bEnableThreadLifetime, DWORD dwMinutes)
{
//Kill the monitoring thread if currently active
if (m_pLifetimeMonitorThread)
{
m_evtRequestLifetimeThread.SetEvent();
WaitForSingleObject(m_pLifetimeMonitorThread->m_hThread, INFINITE);
delete m_pLifetimeMonitorThread;
m_pLifetimeMonitorThread = NULL;
}
//Hive away the member variables
m_bMaxLifetime = bEnableThreadLifetime;
m_dwMaxLifetime = dwMinutes;
//Recreate the monitoring thread if required
if (m_bMaxLifetime)
{
if (!m_pQueue->SupportsDirectedRequests())
{
TRACE(_T("Recyclying of threads in the thread pool is not supported because the Queue does not support directed requests\n"));
return FALSE;
}
ASSERT(m_pLifetimeMonitorThread == NULL);
m_pLifetimeMonitorThread = AfxBeginThread(CThreadPoolServer::_Monitor, this, THREAD_PRIORITY_IDLE, 0, CREATE_SUSPENDED);
if (m_pLifetimeMonitorThread == NULL)
{
TRACE(_T("CThreadPoolServer::SetMaxThreadClientLifetime, Failed to create worker thread for monitoring thread in thread pool\n"));
m_bMaxLifetime = FALSE;
m_dwMaxLifetime = 0;
return FALSE;
}
else
{
m_pLifetimeMonitorThread->m_bAutoDelete = FALSE; //We are in charge of closing the thread
m_pLifetimeMonitorThread->ResumeThread();
}
}
return TRUE;
}
UINT CThreadPoolServer::_Monitor(LPVOID pParam)
{
//Validate our parameters
ASSERT(pParam);
//Convert from the SDK world to the C++ world
CThreadPoolServer* pServer = (CThreadPoolServer*) pParam;
ASSERT(pServer);
return pServer->Monitor();
}
UINT CThreadPoolServer::Monitor()
{
//Work out the time interval (in ms) at which threads in the thread pool need to be recycled
CSingleLock sl(&m_csThreads, TRUE);
DWORD dwWaitInterval = m_dwMaxLifetime * 60000 / m_Threads.GetSize();
sl.Unlock();
//Try to use a waitable timer in preference to a Sleep busy loop
HANDLE hTimer = NULL;
if (m_lpfnCreateWaitableTimer && m_lpfnSetWaitableTimer)
hTimer = m_lpfnCreateWaitableTimer(NULL, FALSE, NULL);
__int64 nFileTimes = ((__int64)-10000) * ((__int64)dwWaitInterval);
LARGE_INTEGER li;
li.LowPart = (DWORD) (nFileTimes & 0xFFFFFFFF);
li.HighPart = (LONG) (nFileTimes >> 32);
if (hTimer && m_lpfnSetWaitableTimer(hTimer, &li,dwWaitInterval, NULL, NULL, TRUE))
{
//Set up the handle array to wait on
HANDLE HandlesToWaitOn[2];
HandlesToWaitOn[0] = hTimer;
HandlesToWaitOn[1] = m_evtRequestLifetimeThread;
//Enter into the tight loop
BOOL bWantStop = FALSE;
while (!bWantStop)
{
//wait for either of the handles to become signalled
DWORD dwWait = WaitForMultipleObjects(2, HandlesToWaitOn, FALSE, INFINITE);
int nSignaledHandle = dwWait - WAIT_OBJECT_0;
if (nSignaledHandle == 1) //It was the stop request
bWantStop = TRUE;
else if (nSignaledHandle == 0) //It was the waitable timer
RecycleThread();
}
//Free up the waitable timer now that we are finished with it
CloseHandle(hTimer);
}
else
{
TRACE(_T("CThreadPoolServer::Monitor, Waitable timer could not be created and set, falling back to using a sleep busy loop\n"));
//Must use a sleep busy loop since waitable timers are not available
DWORD dwStartTick = GetTickCount();
BOOL bWantStop = FALSE;
while (!bWantStop)
{
Sleep(1000);
//Check to see if the m_evtRequestLifetimeThread event is signaled
DWORD dwWait = WaitForSingleObject(m_evtRequestLifetimeThread, 0);
if (dwWait == WAIT_OBJECT_0)
bWantStop = TRUE;
else
{
//Check to see if it is time to recycle a thread
DWORD dwNowTick = GetTickCount();
if ((dwNowTick - dwStartTick) > dwWaitInterval)
{
RecycleThread();
dwStartTick = dwNowTick;
}
}
}
}
return 0L;
}
BOOL CThreadPoolServer::RecycleThread()
{
//Serialize access to the threads array
CSingleLock sl(&m_csThreads, TRUE);
//Assume the best
BOOL bSuccess = TRUE;
CThreadPoolClient* pClient = GetAtClient(m_nLifetimeThreadIndex);
//Get the specified thread to kill
//Set the m_bRequestToStop to ask it to exit if they are
//currently processing a request in CThreadPoolClient::Run
pClient->SetRequestToStop();
//Check to see if we need to post a directed request to the thread pool to get it to exit
BOOL bAlreadyExited = (WaitForSingleObject(pClient->m_pWorkerThread->m_hThread, 0) == WAIT_OBJECT_0);
if (!bAlreadyExited)
{
//Also post a directed request to the thread pool directly
TRACE(_T("CThreadPoolServer::RecycleThread, Killing thread at index %d\n"), m_nLifetimeThreadIndex);
CThreadPoolRequest killRequest;
killRequest.m_dwID = THREADPOOL_SHUTDOWN_REQUEST;
killRequest.m_bDirectedRequest = TRUE;
killRequest.m_nDirectedRequestIndex = m_nLifetimeThreadIndex;
ASSERT(m_pQueue);
BOOL bPostedOK = m_pQueue->PostRequest(killRequest);
ASSERT(bPostedOK); //If this call fails then you may be using a CThreadPoolQueue derived
//class which does not support directed requests, e.g. CIOCPThreadPoolQueue
}
//Wait for the thread to exit
WaitForSingleObject(pClient->m_pWorkerThread->m_hThread, INFINITE);
delete pClient->m_pWorkerThread;
//Now recreate the thread
//Spin of a worker thread for it (initially suspened so that we can setup it correctly!)
pClient->m_pWorkerThread = AfxBeginThread(CThreadPoolClient::_Run, pClient, pClient->m_nStartupThreadPriority, pClient->m_nStackSize, CREATE_SUSPENDED);
if (pClient->m_pWorkerThread == NULL)
{
TRACE(_T("CThreadPoolServer::RecycleThread, Failed to create worker thread for thread pool at index %d\n"), m_nLifetimeThreadIndex);
bSuccess = FALSE;
}
else
{
pClient->m_pWorkerThread->m_bAutoDelete = FALSE; //We are in charge of closing the thread
pClient->m_pWorkerThread->ResumeThread(); //Resume the thread now that we have set it up correctly
}
//increment the thread index, ready for the next call into RecyleThread at a later date
m_nLifetimeThreadIndex = (m_nLifetimeThreadIndex + 1) % ((int) m_Threads.GetSize());
return bSuccess;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -