sltaskex.h
来自「Socketlib: 一个轻量级的C++ 封装Socket C API 网络编程」· C头文件 代码 · 共 173 行
H
173 行
//**********************************************************************
//
// Copyright (C) 2005-2007 Zhang bao yuan(bolidezhang@gmail.com).
// All rights reserved.
//
// This copy of Socketlib is licensed to you under the terms described
// in the LICENSE.txt file included in this distribution.
//
//**********************************************************************
#ifndef SLTASKEX_H
#define SLTASKEX_H
//基于完成端口实现消息队列通知
#pragma once
#include "config.h"
#include "slmessagequeue.h"
#include "sync/threadgroup.h"
namespace SL
{
template <class TMutex>
class CSLTaskEx
{
public:
CSLTaskEx(void)
: m_nMaxQueueSize(0)
, m_hCompletionPort(NULL)
{
};
virtual ~CSLTaskEx(void)
{
Close();
};
void SetMaxQueueSize(uint32 nMaxSize)
{
m_nMaxQueueSize = nMaxSize;
}
uint32 GetMsgQueueSize()
{
return m_msgQueue.GetSize();
};
int GetMessage(CSLMessageQueueNode& oNode)
{
return m_msgQueue.Dequeue(oNode);
};
int PostMessage(const char *szMsg, int nLen, int nAttachID, void *pAttachInfo)
{
if (m_msgQueue.Enqueue(szMsg, nLen, nAttachID, pAttachInfo) >= 0 )
{
if ( PostQueuedCompletionStatus(m_hCompletionPort,nLen,1,NULL) )
return 1;
else
return -1;
}
return -1;
};
int Pop(CSLMessageQueueNode& oNode)
{
return m_msgQueue.Dequeue(oNode);
};
int Push(const char *szMsg, int nLen, int nAttachID, void *pAttachInfo)
{
if (m_msgQueue.Enqueue(szMsg, nLen, nAttachID, pAttachInfo) >= 0 )
{
//if ( PostQueuedCompletionStatus(m_hCompletionPort,nLen,1,NULL) )
// return 1;
//else
// return -1;
return 1;
}
return -1;
};
virtual bool Open(void *p)
{
return true;
};
virtual bool Close()
{
if (NULL == m_hCompletionPort)
return false;
//使工作线程退出
for (int i=0; i<m_threadGroup.GetThreadCount(); i++)
{
//发出退出消息
::PostQueuedCompletionStatus(m_hCompletionPort,-1,-1,NULL);
}
m_threadGroup.Stop(500);
m_threadGroup.Kill();
m_msgQueue.Clear();
CloseHandle(m_hCompletionPort);
m_hCompletionPort = NULL;
return true;
};
int Activate(int nThreadCount)
{
//创建完成端口句柄
m_hCompletionPort = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,nThreadCount);
if (INVALID_HANDLE_VALUE == m_hCompletionPort)
{
return -1;
}
return m_threadGroup.Start(CSLTaskEx<TMutex>::Run, this, nThreadCount);
};
bool Resume()
{
return m_threadGroup.Resume();
};
bool Suspend()
{
return m_threadGroup.Suspend();
};
int GetThreadCount() const
{
return m_threadGroup.GetThreadCount();
};
virtual int ProcessMessage(CSLMessageQueueNode& oNode)
{
return 0;
};
private:
// Routine that runs the task routine as a daemon thread.
static unsigned int WINAPI Run(void *p)
{
CSLTaskEx<TMutex> *pTaskEx =(CSLTaskEx<TMutex>*)p;
CSLMessageQueueNode oNode;
DWORD nByteTransferred;
DWORD dwCompletionKey;
OVERLAPPED *pOverlapped;
while (1)
{
if (pTaskEx->m_threadGroup.IsStop())
return -1;
nByteTransferred = 0;
dwCompletionKey = 0;
pOverlapped = NULL;
BOOL bSuccess = ::GetQueuedCompletionStatus(pTaskEx->m_hCompletionPort,
&nByteTransferred,
&dwCompletionKey,
(LPOVERLAPPED*)&pOverlapped,
INFINITE);
//退出信号到达,退出线程
if ( -1==nByteTransferred )
return -1;
if (pTaskEx->GetMessage(oNode) >= 0)
{//取到处理消息时
pTaskEx->ProcessMessage(oNode);
}
};
return 0;
};
protected:
CSLMessageQueue<TMutex> m_msgQueue;
SYNC::CThreadGroup m_threadGroup;
HANDLE m_hCompletionPort;
uint32 m_nMaxQueueSize;
};
};
#endif
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?