📄 cgrowablethreadpool.cpp
字号:
CMclWaitableCollection WaitSet;
CDispatchRecord Job;
long lJobsHandled = 0;
BOOL fThreadAlive = TRUE;
enum { EXIT_EVENT_INDEX = 0, JOBS_PENDING_INDEX, NUM_OBJECTS };
WaitSet.AddObject(m_ExitEvent); // Index 0.
WaitSet.AddObject(m_JobsPending); // Index 1.
while( fThreadAlive )
{
// Wait for something to do, a timeout to occur, or being told
// to exit.
//
DWORD dwWaitResult = WaitSet.Wait(FALSE, m_dwThreadLifetime);
if( CMclWaitSucceeded(dwWaitResult, NUM_OBJECTS) )
{
if( CMclWaitSucceededIndex(dwWaitResult) == JOBS_PENDING_INDEX )
{
printf(
"[0x%08lx] CGrowableThreadPool::ThreadHandlerProc have a job to do\n",
GetCurrentThreadId()
);
// m_JobsPending was signaled.
//
if( m_DispatchQ.Get(Job) )
{
// Do it!
//
Job.Execute();
lJobsHandled++;
}
m_ThreadsFree.Release(1);
}
else
{
printf(
"[0x%08lx] CGrowableThreadPool::ThreadHandlerProc exit signaled\n",
GetCurrentThreadId()
);
// m_ExitEvent was signaled. We won't clean up anything
// because the destructor has already recorded that we're
// alive and will be waiting for our thread handle to
// become signaled.
//
fThreadAlive = FALSE;
}
}
else if( CMclWaitTimeout(dwWaitResult) )
{
// This thread has been asleep a long time and has timed
// out waiting for something to do. See if it should die.
//
printf(
"[0x%08lx] CGrowableThreadPool::ThreadHandlerProc timed out\n",
GetCurrentThreadId()
);
CMclAutoLock Lock(m_csPool);
if( m_lCurThreads > m_lMinThreads )
{
// The current thread count in the pool is above the minimum
// we need to maintain, so cleanup after ourselves and exit
// this thread.
//
m_lCurThreads--;
fThreadAlive = FALSE;
CMclThread *pThread = m_ThreadIdMap.GetThreadFromId(GetCurrentThreadId());
if( pThread )
{
m_ThreadIdMap.FreeSlot(GetCurrentThreadId());
if( m_ThreadsInPool.Remove(pThread) )
{
printf(
"[0x%08lx] CGrowableThreadPool::ThreadHandlerProc exiting\n",
GetCurrentThreadId()
);
delete pThread;
}
}
}
else
{
// The minimum number of threads has already been reached, so
// continue to stay alive and wait for jobs to be queued.
//
printf(
"[0x%08lx] CGrowableThreadPool::ThreadHandlerProc min threads hit\n",
GetCurrentThreadId()
);
}
}
else
{
// The wait operation failed in general. Cleanup as if we
// were terminating ourselves.
//
printf(
"[0x%08lx] CGrowableThreadPool::ThreadHandlerProc other failure\n",
GetCurrentThreadId()
);
// Other failure.
//
CMclAutoLock Lock(m_csPool);
m_lCurThreads--;
fThreadAlive = FALSE;
CMclThread *pThread;
pThread = m_ThreadIdMap.GetThreadFromId(GetCurrentThreadId());
if( pThread )
{
m_ThreadIdMap.FreeSlot(GetCurrentThreadId());
if( m_ThreadsInPool.Remove(pThread) )
{
delete pThread;
}
}
}
}
printf(
"[0x%08lx] CGrowableThreadPool::ThreadHandlerProc done, %d jobs handled\n",
GetCurrentThreadId(),
lJobsHandled
);
return(0);
}
// CDispatchRecord class implementation.
//
CGrowableThreadPool::CDispatchRecord::CDispatchRecord()
: m_pUserThreadHandler(0)
{
}
CGrowableThreadPool::CDispatchRecord::CDispatchRecord
(
CMclThreadHandler *pThreadHandler
)
: m_pUserThreadHandler(pThreadHandler)
{
}
unsigned CGrowableThreadPool::CDispatchRecord::Execute( void )
{
unsigned wThreadExitCode = 0;
if( m_pUserThreadHandler )
{
wThreadExitCode = m_pUserThreadHandler->ThreadHandlerProc();
}
return(wThreadExitCode);
}
int CGrowableThreadPool::CDispatchRecord::operator ==
(
const CDispatchRecord& rhs
)
{
return int(m_pUserThreadHandler == rhs.m_pUserThreadHandler);
}
// CThreadIdToPtrMap class implementation.
//
CGrowableThreadPool::CThreadIdToPtrMap::CThreadIdToPtrMap( long lMaxThreads )
: m_lMaxThreads(lMaxThreads)
{
m_pThreadIdMap = new THREADINFO[lMaxThreads];
memset(m_pThreadIdMap, 0, sizeof(THREADINFO) * lMaxThreads);
}
CGrowableThreadPool::CThreadIdToPtrMap::~CThreadIdToPtrMap()
{
delete [] m_pThreadIdMap;
}
void CGrowableThreadPool::CThreadIdToPtrMap::AddThread
(
DWORD dwThreadId,
CMclThread *pThread
)
{
for( long lThread = 0; lThread < m_lMaxThreads; lThread++ )
{
if( !m_pThreadIdMap[lThread].fSlotUsed )
{
m_pThreadIdMap[lThread].fSlotUsed = TRUE;
m_pThreadIdMap[lThread].dwThreadId = dwThreadId;
m_pThreadIdMap[lThread].pThread = pThread;
return;
}
}
}
CMclThread *CGrowableThreadPool::CThreadIdToPtrMap::GetThreadFromId
(
DWORD dwThreadId
)
{
for( long lThread = 0; lThread < m_lMaxThreads; lThread++ )
{
if(
m_pThreadIdMap[lThread].fSlotUsed &&
(m_pThreadIdMap[lThread].dwThreadId == dwThreadId)
)
{
return(m_pThreadIdMap[lThread].pThread);
}
}
return(0);
}
void CGrowableThreadPool::CThreadIdToPtrMap::FreeSlot( DWORD dwThreadId )
{
for( long lThread = 0; lThread < m_lMaxThreads; lThread++ )
{
if(
m_pThreadIdMap[lThread].fSlotUsed &&
(m_pThreadIdMap[lThread].dwThreadId == dwThreadId)
)
{
m_pThreadIdMap[lThread].fSlotUsed = FALSE;
}
}
}
void CGrowableThreadPool::CThreadIdToPtrMap::FreeSlot( CMclThread *pThread )
{
for( long lThread = 0; lThread < m_lMaxThreads; lThread++ )
{
if(
m_pThreadIdMap[lThread].fSlotUsed &&
(m_pThreadIdMap[lThread].pThread == pThread)
)
{
m_pThreadIdMap[lThread].fSlotUsed = FALSE;
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -