⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 thrdpool.cpp

📁 MiniCA V2.0版本源码。《小型CA系统V2.1含源码》发表以来
💻 CPP
📖 第 1 页 / 共 2 页
字号:
  {
    //Now that everything is setup we can resume the threads in the thread pool (if need be)
    if (!bSuspended)
    {
      for (int i=0; i<m_Threads.GetSize(); i++)
      {
        CThreadPoolClient* pClient = m_Threads.GetAt(i);
        ASSERT(pClient);
        pClient->m_pWorkerThread->ResumeThread();
      }
    }
  }

  return bSuccess;
}

void CThreadPoolServer::Stop()
{
  //Serialize access to the threads array
  CSingleLock sl(&m_csThreads, TRUE);

  int nThreads = (int) m_Threads.GetSize();
  if (nThreads)
  {
    //Set the m_bRequestToStop in each thread to ask them to exit if they are
    //currently processing a request in CThreadPoolClient::Run
    int i;
    for (i=0; i<nThreads; i++)
    {
      CThreadPoolClient* pClient = m_Threads.GetAt(i);
      pClient->SetRequestToStop();
    }

    //Now post enough requests to get each thread in the thread pool to stop via
    //a "special" request with an ID of THREADPOOL_SHUTDOWN_REQUEST
    for (i=0; i<nThreads; i++)
    {
      CThreadPoolRequest killRequest;
      killRequest.m_dwID = THREADPOOL_SHUTDOWN_REQUEST;

      //Only use directed requests to shut down the thread pool if the pool supports
      //directed requests
      if (m_pQueue->SupportsDirectedRequests())
      {
        killRequest.m_bDirectedRequest = TRUE;
        killRequest.m_nDirectedRequestIndex = i;
      }
      ASSERT(m_pQueue);
      m_pQueue->PostRequest(killRequest);
    }

    //Wait for all the threads to exit in the thread pool
    BOOL bMoreThreads = TRUE;
    int nCurrentThreadIndex = 0;
    while (bMoreThreads)
    {
      //Wait for as many threads at once as possible
      int nCurrentThreadsToStop = min(MAXIMUM_WAIT_OBJECTS, nThreads - nCurrentThreadIndex);

      //Setup the array of threads to wait on to exit
      HANDLE hThreads[MAXIMUM_WAIT_OBJECTS];
      for (int j=0; j<nCurrentThreadsToStop; j++)
      {
        CThreadPoolClient* pClient = m_Threads.GetAt(j + nCurrentThreadIndex);
        ASSERT(pClient);
        ASSERT(pClient->m_pWorkerThread);
        hThreads[j] = pClient->m_pWorkerThread->m_hThread;
      }

      //Wait for the threads to exit
      WaitForMultipleObjects(nCurrentThreadsToStop, hThreads, TRUE, INFINITE);

      //Get ready for the next time around
      nCurrentThreadIndex += nCurrentThreadsToStop;
      bMoreThreads = (nCurrentThreadIndex < nThreads);
    }

    //Now free up all the memory associated with each thread
    for (i=0; i<nThreads; i++)
    {
      CThreadPoolClient* pClient = m_Threads.GetAt(i);
      ASSERT(pClient);
      delete pClient->m_pWorkerThread;
      pClient->m_pWorkerThread = NULL;
      delete pClient;
    }
    m_Threads.RemoveAll();

    //Close our queue object
    delete m_pQueue;
    m_pQueue = NULL;
  }

  //Bring down the monitoring thread if any
  SetMaxThreadClientLifetime(FALSE, 0);  
}

CThreadPoolClient* CThreadPoolServer::GetAtClient(int nIndex)
{
  return m_Threads.GetAt(nIndex);
}

BOOL CThreadPoolServer::WaitForThreadsInitInstance()
{
  //Assume the worst
  BOOL bInitOK = TRUE;

  int nThreads = (int) m_Threads.GetSize();
  if (nThreads)
  {
    BOOL bMoreThreads = TRUE;
    int nCurrentThreadIndex = 0;
    while (bMoreThreads)
    {
      //Wait for as many threads at once as possible
      int nEventsToWaitOn = min(MAXIMUM_WAIT_OBJECTS, nThreads - nCurrentThreadIndex);

      //Setup the array of threads to wait on to exit
      HANDLE hEvents[MAXIMUM_WAIT_OBJECTS];
      int j;
      for (j=0; j<nEventsToWaitOn; j++)
      {
        CThreadPoolClient* pClient = m_Threads.GetAt(j + nCurrentThreadIndex);
        ASSERT(pClient);
        hEvents[j] = pClient->m_evtInitCompleted;
      }

      //Wait for the threads to to complete their InitInstance code
      WaitForMultipleObjects(nEventsToWaitOn, hEvents, TRUE, INFINITE);

      //Update the Init completed which is the logical "And" of all the InitInstances
      for (j=0; j<nEventsToWaitOn && bInitOK; j++)
      {
        CThreadPoolClient* pClient = m_Threads.GetAt(j + nCurrentThreadIndex);
        ASSERT(pClient);
        bInitOK &= pClient->m_bInitOK;
      }

      //Get ready for the next time around
      nCurrentThreadIndex += nEventsToWaitOn;
      bMoreThreads = (nCurrentThreadIndex < nThreads);
    }
  }
  else
    bInitOK = FALSE;

  return bInitOK;
}

BOOL CThreadPoolServer::SetMaxThreadClientLifetime(BOOL bEnableThreadLifetime, DWORD dwMinutes)
{
  //Kill the monitoring thread if currently active
  if (m_pLifetimeMonitorThread)
  {
    m_evtRequestLifetimeThread.SetEvent();
    WaitForSingleObject(m_pLifetimeMonitorThread->m_hThread, INFINITE);
    delete m_pLifetimeMonitorThread;
    m_pLifetimeMonitorThread = NULL;
  }

  //Hive away the member variables
  m_bMaxLifetime = bEnableThreadLifetime;
  m_dwMaxLifetime = dwMinutes;

  //Recreate the monitoring thread if required
  if (m_bMaxLifetime)
  {
    if (!m_pQueue->SupportsDirectedRequests())
    {
      TRACE(_T("Recyclying of threads in the thread pool is not supported because the Queue does not support directed requests\n"));
      return FALSE;
    }

    ASSERT(m_pLifetimeMonitorThread == NULL);

    m_pLifetimeMonitorThread = AfxBeginThread(CThreadPoolServer::_Monitor, this, THREAD_PRIORITY_IDLE, 0, CREATE_SUSPENDED);
    if (m_pLifetimeMonitorThread == NULL)
    {
      TRACE(_T("CThreadPoolServer::SetMaxThreadClientLifetime, Failed to create worker thread for monitoring thread in thread pool\n"));
      m_bMaxLifetime = FALSE;
      m_dwMaxLifetime = 0;
      return FALSE;
    }
    else
    {
      m_pLifetimeMonitorThread->m_bAutoDelete = FALSE; //We are in charge of closing the thread
      m_pLifetimeMonitorThread->ResumeThread();
    }
  }

  return TRUE;
}

UINT CThreadPoolServer::_Monitor(LPVOID pParam)
{
  //Validate our parameters
  ASSERT(pParam);

  //Convert from the SDK world to the C++ world
  CThreadPoolServer* pServer = (CThreadPoolServer*) pParam;
  ASSERT(pServer);
  return pServer->Monitor();
}

UINT CThreadPoolServer::Monitor()
{
  //Work out the time interval (in ms) at which threads in the thread pool need to be recycled
  CSingleLock sl(&m_csThreads, TRUE);
  DWORD dwWaitInterval = m_dwMaxLifetime * 60000 / m_Threads.GetSize(); 
  sl.Unlock();

  //Try to use a waitable timer in preference to a Sleep busy loop
  HANDLE hTimer = NULL;
  if (m_lpfnCreateWaitableTimer && m_lpfnSetWaitableTimer)
    hTimer = m_lpfnCreateWaitableTimer(NULL, FALSE, NULL);
  __int64 nFileTimes = ((__int64)-10000) * ((__int64)dwWaitInterval);
  LARGE_INTEGER li;
  li.LowPart = (DWORD) (nFileTimes & 0xFFFFFFFF);
  li.HighPart = (LONG) (nFileTimes >> 32); 
  if (hTimer && m_lpfnSetWaitableTimer(hTimer, &li,dwWaitInterval, NULL, NULL, TRUE))
  {
    //Set up the handle array to wait on
    HANDLE HandlesToWaitOn[2];
    HandlesToWaitOn[0] = hTimer;
    HandlesToWaitOn[1] = m_evtRequestLifetimeThread;

    //Enter into the tight loop
    BOOL bWantStop = FALSE;
    while (!bWantStop)
    {
      //wait for either of the handles to become signalled
      DWORD dwWait = WaitForMultipleObjects(2, HandlesToWaitOn, FALSE, INFINITE);

      int nSignaledHandle = dwWait - WAIT_OBJECT_0;
      if (nSignaledHandle == 1) //It was the stop request
        bWantStop = TRUE;
      else if (nSignaledHandle == 0) //It was the waitable timer
        RecycleThread();
    }

    //Free up the waitable timer now that we are finished with it
    CloseHandle(hTimer);
  }
  else
  {
    TRACE(_T("CThreadPoolServer::Monitor, Waitable timer could not be created and set, falling back to using a sleep busy loop\n"));

    //Must use a sleep busy loop since waitable timers are not available
    DWORD dwStartTick = GetTickCount();
    BOOL bWantStop = FALSE;
    while (!bWantStop)
    {
      Sleep(1000);

      //Check to see if the m_evtRequestLifetimeThread event is signaled
      DWORD dwWait = WaitForSingleObject(m_evtRequestLifetimeThread, 0);
      if (dwWait == WAIT_OBJECT_0)
        bWantStop = TRUE;
      else
      {
        //Check to see if it is time to recycle a thread
        DWORD dwNowTick = GetTickCount();
        if ((dwNowTick - dwStartTick) > dwWaitInterval) 
        {
          RecycleThread();
          dwStartTick = dwNowTick;
        }
      }
    }
  }

  return 0L;
}

BOOL CThreadPoolServer::RecycleThread()
{
  //Serialize access to the threads array
  CSingleLock sl(&m_csThreads, TRUE);

  //Assume the best
  BOOL bSuccess = TRUE;

  CThreadPoolClient* pClient = GetAtClient(m_nLifetimeThreadIndex);

  //Get the specified thread to kill

  //Set the m_bRequestToStop to ask it to exit if they are
  //currently processing a request in CThreadPoolClient::Run
  pClient->SetRequestToStop();

   //Check to see if we need to post a directed request to the thread pool to get it to exit
  BOOL bAlreadyExited = (WaitForSingleObject(pClient->m_pWorkerThread->m_hThread, 0) == WAIT_OBJECT_0);
  if (!bAlreadyExited)
  {
    //Also post a directed request to the thread pool directly
    TRACE(_T("CThreadPoolServer::RecycleThread, Killing thread at index %d\n"), m_nLifetimeThreadIndex);
    CThreadPoolRequest killRequest;
    killRequest.m_dwID = THREADPOOL_SHUTDOWN_REQUEST;
    killRequest.m_bDirectedRequest = TRUE;
    killRequest.m_nDirectedRequestIndex = m_nLifetimeThreadIndex;
    ASSERT(m_pQueue);
    BOOL bPostedOK = m_pQueue->PostRequest(killRequest); 
    ASSERT(bPostedOK); //If this call fails then you may be using a CThreadPoolQueue derived 
                       //class which does not support directed requests, e.g. CIOCPThreadPoolQueue
  }

  //Wait for the thread to exit
  WaitForSingleObject(pClient->m_pWorkerThread->m_hThread, INFINITE);
  delete pClient->m_pWorkerThread;


  //Now recreate the thread

  //Spin of a worker thread for it (initially suspened so that we can setup it correctly!)
  pClient->m_pWorkerThread = AfxBeginThread(CThreadPoolClient::_Run, pClient, pClient->m_nStartupThreadPriority, pClient->m_nStackSize, CREATE_SUSPENDED);
  if (pClient->m_pWorkerThread == NULL)
  {
    TRACE(_T("CThreadPoolServer::RecycleThread, Failed to create worker thread for thread pool at index %d\n"), m_nLifetimeThreadIndex);
    bSuccess = FALSE;
  }
  else
  {
    pClient->m_pWorkerThread->m_bAutoDelete = FALSE; //We are in charge of closing the thread
    pClient->m_pWorkerThread->ResumeThread(); //Resume the thread now that we have set it up correctly
  }

  //increment the thread index, ready for the next call into RecyleThread at a later date
  m_nLifetimeThreadIndex = (m_nLifetimeThreadIndex + 1) % ((int) m_Threads.GetSize());

  return bSuccess;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -