⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 directedthreadpoolqueue.cpp

📁 MiniCA V2.0版本源码。《小型CA系统V2.1含源码》发表以来
💻 CPP
字号:
/*
Module : DirectedThreadPoolQueue.cpp
Purpose: Implementation for an MFC class which implements a Pseudo IOCP like class which also 
         supports a directed post to the queue. i.e. the request is for a specific thread in the 
         thread pool (which is the normal use of a Directed IOCP). This can be used by a thread pool
         server (see the author's CThreadPoolServer class) to implement a thread pool which implements
         recycling.
Created: PJN / 16-04-2002
History: PJN / 11-11-2004 1. Provided a GetRequestArray() method which allows access to the internal
                          array used to hold the thread pool requests. Can prove handy to have access
                          to this in certain circumstances.
                          2. Updated to compile cleanly when Detect 64 bit issues and Force conformance in for loop
                          options are enabled in Visual Studio .Net
         PJN / 30-11-2004 1. Updated the macro which detects if arrays use INT_PTR for index rather than int.
                          2. Also removed some ASSERTS which were overly restrictive in light of the queue now 
                          being externally modifiable via CThreadPoolServer::GetQueue

Copyright (c) 2002 - 2005 by PJ Naughter.  (Web: www.naughter.com, Email: pjna@naughter.com)

All rights reserved.

Copyright / Usage Details:

You are allowed to include the source code in any product (commercial, shareware, freeware or otherwise) 
when your product is released in binary form. You are allowed to modify the source code in any way you want 
except you cannot modify the copyright details at the top of each module. If you want to distribute source 
code with your application, then you are only allowed to distribute versions released by the author. This is 
to maintain a single distribution point for the source code. 

*/



//////////////////// Includes /////////////////////////////////////////////////

#include "stdafx.h"
#include "DirectedThreadPoolQueue.h"



/////////////////// Macros / Defines //////////////////////////////////////////

#ifdef _DEBUG
#define new DEBUG_NEW
#undef THIS_FILE
static char THIS_FILE[] = __FILE__;
#endif



///////////////// Implementation //////////////////////////////////////////////

IMPLEMENT_DYNCREATE(CDirectedThreadPoolQueue, CThreadPoolQueue)

CDirectedThreadPoolQueue::CDirectedThreadPoolQueue()
{
  //Initialize our member variables
  m_hPostRequestSemaphore = NULL;
  m_hGetRequestSemaphore = NULL;
}

CDirectedThreadPoolQueue::~CDirectedThreadPoolQueue()
{
  Close();
}

BOOL CDirectedThreadPoolQueue::Create(DWORD dwMaxQSize)
{
  //Close if already created (this will empty out the request Q for us
  Close();

  //Create the PostRequest semaphores
  m_hPostRequestSemaphore = CreateSemaphore(NULL, dwMaxQSize, dwMaxQSize, NULL);
  if (m_hPostRequestSemaphore == NULL)
  {
    TRACE(_T("CDirectedThreadPoolQueue::Create, Failed to create a PostRequest semaphore, Error:%d\n"), GetLastError());
    Close();
    return FALSE;
  }
  m_PostRequestSemaphores.SetSize(0, dwMaxQSize);
  DWORD i;
  for (i=0; i<dwMaxQSize; i++)
  {
    HANDLE hSemaphore = CreateSemaphore(NULL, dwMaxQSize, dwMaxQSize, NULL);
    if (hSemaphore == NULL)
    {
      TRACE(_T("CDirectedThreadPoolQueue::Create, Failed to create a PostRequest semaphore, Error:%d\n"), GetLastError());
      Close();
      return FALSE;
    }

    m_PostRequestSemaphores.Add(hSemaphore);
  }

  //Create the GetRequest semaphores
  m_hGetRequestSemaphore = CreateSemaphore(NULL, 0, dwMaxQSize, NULL);
  if (m_hGetRequestSemaphore == NULL)
  {
    TRACE(_T("CDirectedThreadPoolQueue::Create, Failed to create a GetRequest semaphore, Error:%d\n"), GetLastError());
    Close();
    return FALSE;
  }
  m_GetRequestSemaphores.SetSize(0, dwMaxQSize);
  for (i=0; i<dwMaxQSize; i++)
  {
    HANDLE hSemaphore = CreateSemaphore(NULL, 0, dwMaxQSize, NULL);
    if (hSemaphore == NULL)
    {
      TRACE(_T("CDirectedThreadPoolQueue::Create, Failed to create a GetRequest semaphore, Error:%d\n"), GetLastError());
      Close();
      return FALSE;
    }

    m_GetRequestSemaphores.Add(hSemaphore);
  }

  return TRUE;
}

BOOL CDirectedThreadPoolQueue::PostRequestWithoutLimitCheck(const CThreadPoolRequest& request)
{
  ASSERT(IsCreated()); //Must have been created

  //By default assume the best
  BOOL bSuccess = TRUE;

  //Add the request to the request queue
  CSingleLock sl(&m_csRequests, TRUE);
  CThreadPoolRequest CopyOfRequest(request);
#if (_MFC_VER >= 0x700)
  INT_PTR nRequestIndex = m_Requests.Add(CopyOfRequest);
#else
  int nRequestIndex = m_Requests.Add(CopyOfRequest);
#endif
  sl.Unlock();

  //Release the semaphore
  if (request.m_bDirectedRequest)
  {
    if (!ReleaseSemaphore(m_GetRequestSemaphores.GetAt(request.m_nDirectedRequestIndex), 1, NULL))
    {
      bSuccess = FALSE;

      //Remove the item from the Q since we could not update the "Get" semaphores
      sl.Lock();
      m_Requests.RemoveAt(nRequestIndex);
      sl.Unlock();

      TRACE(_T("CDirectedThreadPoolQueue::PostRequestWithoutLimitCheck, Failed to release a semaphore, Error:%d\n"), GetLastError());
    }
  }
  else
  {
    if (!ReleaseSemaphore(m_hGetRequestSemaphore, 1, NULL))
    {
      bSuccess = FALSE;

      //Remove the item from the Q since we could not update the "Get" semaphores
      sl.Lock();
      m_Requests.RemoveAt(nRequestIndex);
      sl.Unlock();

      TRACE(_T("CDirectedThreadPoolQueue::PostRequestWithoutLimitCheck, Failed to release a semaphore, Error:%d\n"), GetLastError());
    }
  }

  return bSuccess;
}

BOOL CDirectedThreadPoolQueue::PostRequest(const CThreadPoolRequest& request, DWORD dwMilliseconds)
{
  ASSERT(IsCreated()); //Must have been created

  //Wait for the post request semaphore
  DWORD dwWait;
  if (request.m_bDirectedRequest)
    dwWait = WaitForSingleObject(m_PostRequestSemaphores.GetAt(request.m_nDirectedRequestIndex), dwMilliseconds);
  else
    dwWait = WaitForSingleObject(m_hPostRequestSemaphore, dwMilliseconds);
  if (dwWait != WAIT_OBJECT_0)
  {
    TRACE(_T("CDirectedThreadPoolQueue::PostRequest, Failed while waiting for the Q to free up, Error:%d\n"), GetLastError());
    return FALSE;
  }

  //Pass the buck to the other PostRequest method
  return PostRequestWithoutLimitCheck(request);
}

int CDirectedThreadPoolQueue::GetNonDirectedRequestIndexToRemove()
{
  //Work out the item to remove from the Q, by default we pick
  //the first non directed request starting from the tail of the queue
  int nIndexToRemoveAt = -1;
#if (_MFC_VER >= 0x700)
  INT_PTR nRequestSize = m_Requests.GetSize();
#else
  int nRequestSize = m_Requests.GetSize();
#endif
  for (int i=0; (i<nRequestSize) && (nIndexToRemoveAt == -1); i++)
  {
    CThreadPoolRequest& tempRequest = m_Requests.ElementAt(i);
    if (!tempRequest.m_bDirectedRequest)
      nIndexToRemoveAt = i;
  }

  return nIndexToRemoveAt;
}

#if (_MFC_VER >= 0x700)
INT_PTR CDirectedThreadPoolQueue::GetDirectedRequestIndexToRemove(int nThreadIndexForDirectedRequest)
#else
int CDirectedThreadPoolQueue::GetDirectedRequestIndexToRemove(int nThreadIndexForDirectedRequest)
#endif
{
  //Work out the item to remove from the Q, by default we pick the first 
  //directed request for this thread starting from the tail of the queue
#if (_MFC_VER >= 0x700)
  INT_PTR nIndexToRemoveAt = -1;
  INT_PTR nRequestSize = m_Requests.GetSize();
  INT_PTR i;
#else
  int nIndexToRemoveAt = -1;
  int nRequestSize = m_Requests.GetSize();
  int i;
#endif
  ASSERT(nRequestSize);
  for (i=0; (i<nRequestSize) && (nIndexToRemoveAt == -1); i++)
  {
    CThreadPoolRequest& tempRequest = m_Requests.ElementAt(i);
    if (tempRequest.m_bDirectedRequest && nThreadIndexForDirectedRequest == tempRequest.m_nDirectedRequestIndex)
      nIndexToRemoveAt = i;
  }

  return nIndexToRemoveAt;
}

BOOL CDirectedThreadPoolQueue::GetRequest(CThreadPoolRequest& request, int nThreadIndexForDirectedRequest, DWORD dwMilliseconds)
{
  ASSERT(IsCreated()); //Must have been created

  //Wait for either a non directed request or a directed request for this thread to become available on the Q
  HANDLE hWaitHandles[2];
  hWaitHandles[0] = m_GetRequestSemaphores.GetAt(nThreadIndexForDirectedRequest);
  hWaitHandles[1] = m_hGetRequestSemaphore;
  DWORD dwWait = WaitForMultipleObjects(2, hWaitHandles, FALSE, dwMilliseconds);
  int nSignaledHandle = dwWait - WAIT_OBJECT_0;

  //Work out what the return value from WFMO means!
  BOOL bRemoveDirected = FALSE;
  if (nSignaledHandle == 0)
    bRemoveDirected = TRUE;
  else if (nSignaledHandle != 1)
  {
    TRACE(_T("CDirectedThreadPoolQueue::GetRequest, Failed while waiting for the item on the Q, Error:%d\n"), GetLastError());
    return FALSE;
  }

  //Lock down access to the Q
  CSingleLock sl(&m_csRequests, TRUE);

  //Remove some item from the request Q  
  if (bRemoveDirected)
  {
    //Work out the item to remove from the Q
#if (_MFC_VER >= 0x700)
    INT_PTR nIndexToRemoveAt = GetDirectedRequestIndexToRemove(nThreadIndexForDirectedRequest);
#else
    int nIndexToRemoveAt = GetDirectedRequestIndexToRemove(nThreadIndexForDirectedRequest);
#endif
    ASSERT(nIndexToRemoveAt != -1); //something has gone badly wrong if we could not find a request to remove
    request = m_Requests.GetAt(nIndexToRemoveAt);
    ASSERT(request.m_bDirectedRequest); //the GetDirectedRequestIndexToRemove call above has returned an incorrect index
    m_Requests.RemoveAt(nIndexToRemoveAt);

    //Release the PostRequest semaphore
    ReleaseSemaphore(m_PostRequestSemaphores.GetAt(nThreadIndexForDirectedRequest), 1, NULL);
  }
  else
  {
    //Work out the item to remove from the Q
    int nIndexToRemoveAt = GetNonDirectedRequestIndexToRemove();
    if (nIndexToRemoveAt != -1)
    {
      request = m_Requests.GetAt(nIndexToRemoveAt);
      ASSERT(!request.m_bDirectedRequest); //the GetNonDirectedRequestIndexToRemove call above has returned an incorrect index
      m_Requests.RemoveAt(nIndexToRemoveAt);

      //Release the PostRequest semaphore
      ReleaseSemaphore(m_hPostRequestSemaphore, 1, NULL);
    }
    else
      return FALSE;
  }
  
  return TRUE;
}

BOOL CDirectedThreadPoolQueue::Close()
{
  //Empty out the request queue
  CSingleLock sl(&m_csRequests, TRUE);
  m_Requests.RemoveAll();
  sl.Unlock();
  
  //Free up the PostRequest semaphores
  if (m_hPostRequestSemaphore)
  {
    CloseHandle(m_hPostRequestSemaphore);
    m_hPostRequestSemaphore = NULL;
  }
  int i;
  for (i=0; i<m_PostRequestSemaphores.GetSize(); i++)
  {
    HANDLE hSemaphore = m_PostRequestSemaphores.GetAt(i);
    CloseHandle(hSemaphore);
  }
  m_PostRequestSemaphores.SetSize(0);

  //Free up the GetRequest semaphores
  if (m_hGetRequestSemaphore)
  {
    CloseHandle(m_hGetRequestSemaphore);
    m_hGetRequestSemaphore = NULL;
  }
  for (i=0; i<m_GetRequestSemaphores.GetSize(); i++)
  {
    HANDLE hSemaphore = m_GetRequestSemaphores.GetAt(i);
    CloseHandle(hSemaphore);
  }
  m_GetRequestSemaphores.SetSize(0);

  return TRUE;
}

BOOL CDirectedThreadPoolQueue::IsCreated() const
{
  return (m_hPostRequestSemaphore != NULL);
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -