⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cgrowablethreadpool.cpp

📁 window下的多线程编程参考书。值得一读
💻 CPP
📖 第 1 页 / 共 2 页
字号:

    CMclWaitableCollection  WaitSet;
    CDispatchRecord         Job;
    long                    lJobsHandled = 0;
    BOOL                    fThreadAlive = TRUE;

    enum { EXIT_EVENT_INDEX = 0, JOBS_PENDING_INDEX, NUM_OBJECTS };

    WaitSet.AddObject(m_ExitEvent);     // Index 0.
    WaitSet.AddObject(m_JobsPending);   // Index 1.

    while( fThreadAlive )
    {
        // Wait for something to do, a timeout to occur, or being told
        // to exit.
        //
        DWORD dwWaitResult = WaitSet.Wait(FALSE, m_dwThreadLifetime);

        if( CMclWaitSucceeded(dwWaitResult, NUM_OBJECTS) )
        {
            if( CMclWaitSucceededIndex(dwWaitResult) == JOBS_PENDING_INDEX )
            {
                printf(
                    "[0x%08lx] CGrowableThreadPool::ThreadHandlerProc have a job to do\n",
                    GetCurrentThreadId()
                );

                // m_JobsPending was signaled.
                //
                if( m_DispatchQ.Get(Job) )
                {
                    // Do it!
                    //
                    Job.Execute();
                    lJobsHandled++;
                }

                m_ThreadsFree.Release(1);
            }
            else
            {
                printf(
                    "[0x%08lx] CGrowableThreadPool::ThreadHandlerProc exit signaled\n",
                    GetCurrentThreadId()
                );

                // m_ExitEvent was signaled.  We won't clean up anything
                // because the destructor has already recorded that we're
                // alive and will be waiting for our thread handle to
                // become signaled.
                //
                fThreadAlive = FALSE;
            }
        }
        else if( CMclWaitTimeout(dwWaitResult) )
        {
            // This thread has been asleep a long time and has timed
            // out waiting for something to do.  See if it should die.
            //
            printf(
                "[0x%08lx] CGrowableThreadPool::ThreadHandlerProc timed out\n",
                GetCurrentThreadId()
            );

            CMclAutoLock Lock(m_csPool);

            if( m_lCurThreads > m_lMinThreads )
            {
                // The current thread count in the pool is above the minimum
                // we need to maintain, so cleanup after ourselves and exit
                // this thread.
                //
                m_lCurThreads--;
                
                fThreadAlive = FALSE;

                CMclThread *pThread = m_ThreadIdMap.GetThreadFromId(GetCurrentThreadId());
                
                if( pThread )
                {
                    m_ThreadIdMap.FreeSlot(GetCurrentThreadId());

                    if( m_ThreadsInPool.Remove(pThread) )
                    {
                        printf(
                            "[0x%08lx] CGrowableThreadPool::ThreadHandlerProc exiting\n",
                            GetCurrentThreadId()
                        );

                        delete pThread;
                    }
                }
            }
            else
            {
                // The minimum number of threads has already been reached, so 
                // continue to stay alive and wait for jobs to be queued.
                //
                printf(
                    "[0x%08lx] CGrowableThreadPool::ThreadHandlerProc min threads hit\n",
                    GetCurrentThreadId()
                );
            }
        }
        else
        {
            // The wait operation failed in general.  Cleanup as if we
            // were terminating ourselves.
            //
            printf(
                "[0x%08lx] CGrowableThreadPool::ThreadHandlerProc other failure\n",
                GetCurrentThreadId()
            );

            // Other failure.
            //
            CMclAutoLock Lock(m_csPool);
            
            m_lCurThreads--;
            
            fThreadAlive = FALSE;

            CMclThread *pThread;
            
            pThread = m_ThreadIdMap.GetThreadFromId(GetCurrentThreadId());
            
            if( pThread )
            {
                m_ThreadIdMap.FreeSlot(GetCurrentThreadId());

                if( m_ThreadsInPool.Remove(pThread) )
                {
                    delete pThread;
                }
            }
        }
    }
        
    printf(
        "[0x%08lx] CGrowableThreadPool::ThreadHandlerProc done, %d jobs handled\n",
        GetCurrentThreadId(),
        lJobsHandled
    );

    return(0);
}

// CDispatchRecord class implementation.
//
CGrowableThreadPool::CDispatchRecord::CDispatchRecord()
    : m_pUserThreadHandler(0)
{
}

CGrowableThreadPool::CDispatchRecord::CDispatchRecord
(
    CMclThreadHandler *pThreadHandler
)
    : m_pUserThreadHandler(pThreadHandler)
{
}

unsigned CGrowableThreadPool::CDispatchRecord::Execute( void )
{
    unsigned wThreadExitCode = 0;

    if( m_pUserThreadHandler )
    {
        wThreadExitCode = m_pUserThreadHandler->ThreadHandlerProc();
    }

    return(wThreadExitCode);
}

int CGrowableThreadPool::CDispatchRecord::operator ==
(
    const CDispatchRecord& rhs
)
{
    return int(m_pUserThreadHandler == rhs.m_pUserThreadHandler);
}

// CThreadIdToPtrMap class implementation.
//
CGrowableThreadPool::CThreadIdToPtrMap::CThreadIdToPtrMap( long lMaxThreads )
    : m_lMaxThreads(lMaxThreads)
{
    m_pThreadIdMap = new THREADINFO[lMaxThreads];
    memset(m_pThreadIdMap, 0, sizeof(THREADINFO) * lMaxThreads);
}

CGrowableThreadPool::CThreadIdToPtrMap::~CThreadIdToPtrMap()
{
    delete [] m_pThreadIdMap;
}

void CGrowableThreadPool::CThreadIdToPtrMap::AddThread
(
    DWORD       dwThreadId,
    CMclThread *pThread
)
{
    for( long lThread = 0; lThread < m_lMaxThreads; lThread++ )
    {
        if( !m_pThreadIdMap[lThread].fSlotUsed )
        {
            m_pThreadIdMap[lThread].fSlotUsed  = TRUE;
            m_pThreadIdMap[lThread].dwThreadId = dwThreadId;
            m_pThreadIdMap[lThread].pThread    = pThread;
            return;
        }
    }
}

CMclThread *CGrowableThreadPool::CThreadIdToPtrMap::GetThreadFromId
(
    DWORD   dwThreadId 
)
{
    for( long lThread = 0; lThread < m_lMaxThreads; lThread++ )
    {
        if(
            m_pThreadIdMap[lThread].fSlotUsed &&
            (m_pThreadIdMap[lThread].dwThreadId == dwThreadId)
        )
        {
            return(m_pThreadIdMap[lThread].pThread);
        }
    }

    return(0);
}

void CGrowableThreadPool::CThreadIdToPtrMap::FreeSlot( DWORD dwThreadId )
{
    for( long lThread = 0; lThread < m_lMaxThreads; lThread++ )
    {
        if(
            m_pThreadIdMap[lThread].fSlotUsed &&
            (m_pThreadIdMap[lThread].dwThreadId == dwThreadId)
        )
        {
            m_pThreadIdMap[lThread].fSlotUsed = FALSE;
        }
    }
}

void CGrowableThreadPool::CThreadIdToPtrMap::FreeSlot( CMclThread *pThread )
{
    for( long lThread = 0; lThread < m_lMaxThreads; lThread++ )
    {
        if(
            m_pThreadIdMap[lThread].fSlotUsed &&
            (m_pThreadIdMap[lThread].pThread == pThread)
        )
        {
            m_pThreadIdMap[lThread].fSlotUsed = FALSE;
        }
    }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -