📄 workqueue.cpp
字号:
#include "stdafx.h"
#include "WorkQueue.h"
#include <assert.h>
typedef struct _THREAD_CONTEXT
{
CWorkQueue* pWorkQueue;
void* pThreadData;
} THREAD_CONTEXT,*PTHREAD_CONTEXT;
/*------------------------------------------------------------------------
Create
初始化工作队列
const unsigned int nNumberOfThreads - 要创建的工作队列中线程的数目
void* *ThreadData - 送给线程的需要执行的工作项目
------------------------------------------------------------------------*/
bool CWorkQueue::Create(const unsigned int nNumberOfThreads,
void* *ThreadData /*=NULL*/)
{
//初始化工作队列
m_pWorkItemQueue = new WorkItemQueue();
if(NULL == m_pWorkItemQueue )
{
return false;
}
//创建Semaphore对象
m_phSincObjectsArray[SEMAPHORE_INDEX] = CreateSemaphore(NULL,0,LONG_MAX,NULL);
if(m_phSincObjectsArray[SEMAPHORE_INDEX] == NULL)
{
delete m_pWorkItemQueue;
m_pWorkItemQueue = NULL;
return false;
}
//创建event 事件对象
m_phSincObjectsArray[ABORT_EVENT_INDEX] = CreateEvent(NULL,TRUE,FALSE,NULL);
if(m_phSincObjectsArray[ABORT_EVENT_INDEX] == NULL)
{
delete m_pWorkItemQueue;
m_pWorkItemQueue = NULL;
CloseHandle(m_phSincObjectsArray[SEMAPHORE_INDEX]);
return false;
}
//创建临界区以保护工作队列
InitializeCriticalSection(&m_CriticalSection);
//分配线程句柄数组
m_phThreads = new HANDLE[nNumberOfThreads];
if(m_phThreads == NULL)
{
delete m_pWorkItemQueue;
m_pWorkItemQueue = NULL;
CloseHandle(m_phSincObjectsArray[SEMAPHORE_INDEX]);
CloseHandle(m_phSincObjectsArray[ABORT_EVENT_INDEX]);
DeleteCriticalSection(&m_CriticalSection);
return false;
}
unsigned int i;
m_nNumberOfThreads = nNumberOfThreads;
DWORD dwThreadId;
PTHREAD_CONTEXT pThreadsContext ;
//创建所有的线程
for(i = 0 ; i < nNumberOfThreads ; i++ )
{
//初始化每个线程的上下文,用于传递给线程函数
pThreadsContext = new THREAD_CONTEXT;
pThreadsContext->pWorkQueue = this;
pThreadsContext->pThreadData = ThreadData == NULL? NULL : ThreadData[i];
//创建线程
m_phThreads[i] = CreateThread(NULL,
0,
CWorkQueue::ThreadFunc,
pThreadsContext,
0,
&dwThreadId);
if(m_phThreads[i] == NULL)
{
delete pThreadsContext;
m_nNumberOfThreads = i;
Destroy();
return false;
}
}
return true;
}
/*------------------------------------------------------------------------
InsertWorkItem
插入工作任务到工作队列
------------------------------------------------------------------------*/
bool CWorkQueue::InsertWorkItem(WorkItemBase* pWorkItem)
{
assert(pWorkItem != NULL);
//锁住
EnterCriticalSection(&m_CriticalSection);
//插入队列
m_pWorkItemQueue->push(pWorkItem);
//解锁
LeaveCriticalSection(&m_CriticalSection);
//触发线程
if (!ReleaseSemaphore(m_phSincObjectsArray[SEMAPHORE_INDEX],1,NULL))
{
assert(false);
return false;
}
return true;
}
/*------------------------------------------------------------------------
RemoveWorkItem()
从工作队列中取出任务,并且移除
------------------------------------------------------------------------*/
WorkItemBase* CWorkQueue::RemoveWorkItem()
{
WorkItemBase* pWorkItem;
//锁住
EnterCriticalSection(&m_CriticalSection);
//从队列中移除任务
pWorkItem = m_pWorkItemQueue->front();
m_pWorkItemQueue->pop();
//解锁
LeaveCriticalSection(&m_CriticalSection);
assert(pWorkItem != NULL);
return pWorkItem;
}
/*------------------------------------------------------------------------
ThreadFunc
这是线程函数,用来等待工作任务事件的到来或者离开线程事件的触发
------------------------------------------------------------------------*/
unsigned long __stdcall CWorkQueue::ThreadFunc( void* pParam )
{
PTHREAD_CONTEXT pThreadContext = (PTHREAD_CONTEXT)pParam;//线程的传入参数
WorkItemBase* pWorkItem = NULL;
CWorkQueue* pWorkQueue = pThreadContext->pWorkQueue;//工作队列指针
void* pThreadData = pThreadContext->pThreadData;
DWORD dwWaitResult;
for(;;)
{
//等待两个事件
dwWaitResult = WaitForMultipleObjects(NUMBER_OF_SYNC_OBJ,pWorkQueue->m_phSincObjectsArray,FALSE,INFINITE);
switch(dwWaitResult - WAIT_OBJECT_0)
{
case ABORT_EVENT_INDEX: //离开线程事件
delete pThreadContext;
return 0;
case SEMAPHORE_INDEX://工作任务事件
//得到工作队列的第一个工作任务
pWorkItem = pWorkQueue->RemoveWorkItem();
if(pWorkItem == NULL)
{
assert(false);
break;
}
//调用相应的工作函数
pWorkItem->DoWork(pThreadData);
break;
default:
assert(false);
delete pThreadContext;
return 0;
}
}
delete pThreadContext;
return 1;
}
/*------------------------------------------------------------------------
Destroy
设置线程退出事件,等待所有线程的结束
------------------------------------------------------------------------*/
void CWorkQueue::Destroy()
{
//设置退出事件
if(!SetEvent(m_phSincObjectsArray[ABORT_EVENT_INDEX]))
{
assert(false);
return;
}
//等待所有的线程结束
WaitForMultipleObjects(m_nNumberOfThreads,m_phThreads,true,INFINITE);
//清除队列
while(!m_pWorkItemQueue->empty())
{
m_pWorkItemQueue->front()->Abort();
m_pWorkItemQueue->pop();
}
delete m_pWorkItemQueue;
m_pWorkItemQueue = NULL;
CloseHandle(m_phSincObjectsArray[SEMAPHORE_INDEX]);
CloseHandle(m_phSincObjectsArray[ABORT_EVENT_INDEX]);
DeleteCriticalSection(&m_CriticalSection);
//关闭所有的线程句柄
for(int i = 0 ; i < m_nNumberOfThreads ; i++)
CloseHandle(m_phThreads[i]);
delete[] m_phThreads;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -