📄 threaddispatcher.cpp
字号:
/////////////////////////////////////////////////////////////////////
// Class Creator Version 2.0.000 Copyrigth (C) Zhang Zhanjun
///////////////////////////////////////////////////////////////////
// Implementation File ThreadDispatcher.cpp
// class CWizThreadDispatcher
//
// 11/10/2004
///////////////////////////////////////////////////////////////////
#include "stdafx.h"
#include "ThreadDispatcher.h"
#ifdef _DEBUG
#undef THIS_FILE
static char BASED_CODE THIS_FILE[] = __FILE__;
#endif
#define new DEBUG_NEW
// class CWizThreadDispatcher
// Default Constructor
CWizThreadDispatcher::CWizThreadDispatcher(CWizMultiThreadedWorker &rWorker/*, int MaxThreads*/)
:/* m_nMaxThreads(MaxThreads),*/
m_rWorker(rWorker),
m_ShutDownEvent(TRUE),
m_hThisThread(NULL),
m_pWnd (NULL),
m_nMessage(WM_USER+1)
{
m_nMaxThreads = rWorker.MaxClient;
m_ahWorkersHandles = new HANDLE[m_nMaxThreads + 1];
if (m_ahWorkersHandles == NULL)
AfxThrowMemoryException();
for (INDEX i = 0; i < m_nMaxThreads; i++)
{
m_ahWorkersHandles[i] = NULL;
m_HasDataEvent[i]=new Event(FALSE);
m_StartedDataTreatEvent[i]=new Event(FALSE);
m_ahStartedTreatmentEvents[i][0] = m_ShutDownEvent.m_h;
m_ahStartedTreatmentEvents[i][1] = m_StartedDataTreatEvent[i]->m_h;
}
}
// Destructor
CWizThreadDispatcher::~CWizThreadDispatcher()
{
delete [] m_ahWorkersHandles;
for (INDEX i = 0; i < m_nMaxThreads; i++)
{
delete m_HasDataEvent[i];
delete m_StartedDataTreatEvent[i];
}
Stop();
}
void CWizThreadDispatcher::Start()
{
CWinThread* pThr = AfxBeginThread(CWizThreadDispatcherFun, this);
if (pThr == NULL)
AfxThrowMemoryException();
}
void CWizThreadDispatcher::Stop()
{
m_ShutDownEvent.Set();
}
void CWizThreadDispatcher::MsgStop (HANDLE close)//HWND pWnd, UINT Message)
{
closeEvent=close;
Stop();
}
UINT AFX_CDECL CWizThreadDispatcherFun(LPVOID pParam)
{
ASSERT(pParam != NULL);
return ((CWizThreadDispatcher *)pParam)->Run();
}
UINT CWizThreadDispatcher::Run()
{
CWizMultiThreadedWorker::Stack stack(m_rWorker); //listen
m_hThisThread = AfxGetThread()->m_hThread;
m_ahWorkersHandles[m_nMaxThreads] = m_hThisThread;
//---------------------------------------------------------
// Start all working threads
WorkerThread* pTr[MAXCLIENT];
for (INDEX i = 0; i < m_nMaxThreads; i++)
{
pTr[i] = new WorkerThread(i, m_rWorker,
m_HasDataEvent[i]->m_h,
m_StartedDataTreatEvent[i]->m_h,
m_ShutDownEvent.m_h);
if (pTr[i] == NULL)
AfxThrowMemoryException();
//all receive threads running
CWinThread* pThr = AfxBeginThread(CWizThreadDispatcher_WorkerThread_Fun, pTr[i]);//Read and Write waiting Event
if (pThr == NULL)
AfxThrowMemoryException();
m_ahWorkersHandles[i] = pThr->m_hThread;
}
//---------------------------------------------------------
BOOL shutdownEvent = FALSE;
while (!shutdownEvent)
{
int index;
//only one Accept thread!!!!!!!!!!!!
if (!m_rWorker.WaitForData(index,m_ShutDownEvent.m_h)) //Accept
{
m_ShutDownEvent.Set();
shutdownEvent = TRUE;
break;
}
m_HasDataEvent[index]->Set();
const DWORD res = ::WaitForMultipleObjects(StartedTreatmentEventsCount, &m_ahStartedTreatmentEvents[index][0], FALSE, INFINITE);
switch (res)
{
case WAIT_OBJECT_0: // Shut down!
shutdownEvent = TRUE;
break;
case (WAIT_OBJECT_0 + 1): // Worker thread started to treat data
break;
case WAIT_FAILED: // something wrong!
throw XWaitFailed();
default:
ASSERT(0);
}
}
//---------------------------------------------------------
for (i = 0; i < m_nMaxThreads; i++)
{
m_rWorker.ShutDown(i);
}
for (i = 0; i < m_nMaxThreads; i++)
{
::WaitForSingleObject(m_ahWorkersHandles[i], INFINITE);
m_rWorker.Close(i);
m_ahWorkersHandles[i]=NULL;
}
SetEvent(closeEvent);
return 0;
}
///////////////////////////////////////////////////////////////////
CWizThreadDispatcher::WorkerThread::WorkerThread(int i, CWizMultiThreadedWorker &rWorker,
HANDLE hDataReadyEvent,
HANDLE hStartedTreatEvent,
HANDLE hShutDownEvent)
: m_rWorker (rWorker),
m_hStartedTreatEvent (hStartedTreatEvent)
{
index = i;
m_hDataWait[0] = hShutDownEvent;
m_hDataWait[1] = hDataReadyEvent;
}
UINT CWizThreadDispatcher::WorkerThread::Run()
{
while (1)
{
const DWORD res =
::WaitForMultipleObjects(DataWaitHCount, m_hDataWait, FALSE, INFINITE);
switch (res)
{
case WAIT_FAILED: // something wrong!
throw CWizThreadDispatcher::XWaitFailed();
case WAIT_OBJECT_0: // Shut down!
return 0;
case (WAIT_OBJECT_0 + 1): // Has data to treat
if (!m_rWorker.TreatData(index,m_hDataWait[0], m_hStartedTreatEvent))
return 0;
break;
default:
ASSERT(0);
}
}
return 0;
}
UINT AFX_CDECL CWizThreadDispatcher_WorkerThread_Fun(LPVOID pParam)
{
CWizThreadDispatcher::WorkerThread* pWorker = (CWizThreadDispatcher::WorkerThread*)pParam;
ASSERT(pWorker != NULL);
UINT res = 0;
try
{
UINT res = pWorker->Run();
}
catch(...)
{
delete pWorker;
throw;
}
delete pWorker;
return res;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -