⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 messageport.cpp

📁 魔域的源代码
💻 CPP
字号:
// 接口类。接口:基于消息通讯机制的线程间通讯接口。
// 仙剑修,2002.8.28

#include "MessagePort.h"
#include "BaseFunc.h"
#include <stdlib.h>

using namespace message_port;


long	g_nMessagePortErrorCount;
///////////////////////////////////////////////////////////////////////////////////////
// interface
///////////////////////////////////////////////////////////////////////////////////////
// 初始化,设置接口ID号,开始接收消息。可重复调用(PORT_ID不能改变)。
bool CMessagePort::Open	()
{
	LOCKOBJ;

	m_nState = STATE_OK;
	for(MSG_SET::iterator iter = m_setMsg.begin(); iter != m_setMsg.end(); iter++)
		m_setRecycle.push(*iter);
	m_setMsg.clear();
	::ResetEvent(m_hHaveMsg);

	return true;
}

///////////////////////////////////////////////////////////////////////////////////////
// 发送消息到指定接口。包含消息ID、数据类型、数据。
bool CMessagePort::Send	(int nPort, int nPacket, VAR_TYPE nVarType, const void* buf)
{
	LOCKOBJ;		//VVVVV

	ASSERT(buf);

	if(!IsOpen())
		return false;

	if(nPort == INVALID_PORT)
		return true;

	if(nPort < 0 || nPort >= m_setPort.size())
	{
		ASSERT(!"nPort");
		return false;
	}

	UNLOCKOBJ;		//AAAAA ★调用外部接口,释放互斥。必须释放,否则死锁★

	return m_setPort[nPort]->PushMsg(m_id, nPacket, nVarType, buf);
}

///////////////////////////////////////////////////////////////////////////////////////
// 接收指定接口(或所有接口)发来的消息。可指定消息ID,也可不指定。
bool CMessagePort::Recv	(int nPort, int nPacket, VAR_TYPE nVarType, void* buf, CMessageStatus* pStatus /*= NULL*/)	// return false: 没数据
{
	LOCKOBJ;

	ASSERT(buf);

	if(m_nState == STATE_CLOSED)
	{
		if(pStatus)
			pStatus->m_nError	= STATUS_FLAG_CLOSE;
		return false;
	}

	int	nRcvPort = nPort, nRcvPacket = nPacket;
	VAR_TYPE	nRcvVarType;
	if(PopMsg(&nRcvPort, &nRcvPacket, &nRcvVarType, buf))		// 内部函数,不用打开互斥锁
	{
		// 检查类型
		if(SIZE_OF_TYPE(nRcvVarType) > SIZE_OF_TYPE(nVarType))
		{
			ASSERT(!"VarType");
			if(pStatus)
				pStatus->m_nError		= STATUS_FLAG_ERROR;			//? 以后可支持自动转换类型
			return false;
		}

		if(pStatus)
		{
			pStatus->m_nPortFrom	= nRcvPort;
			pStatus->m_nPacket		= nRcvPacket;
			pStatus->m_nVarType		= nRcvVarType;
			pStatus->m_nError		= STATUS_FLAG_OK;
		}
		return true;
	}

	if(pStatus)
		pStatus->m_nError	= STATUS_FLAG_OK;
	return false;
}

///////////////////////////////////////////////////////////////////////////////////////
// 接收指定接口(或所有接口)发来的消息。可指定消息ID,也可不指定。return false: 超时或错误。
bool CMessagePort::WaitMsg(int nMilliSec)
{
	LOCKOBJ;

	if(m_nState == STATE_CLOSED)
		return false;

	// 原子操作
	if(m_setMsg.size())
		return true;
	::ResetEvent(m_hHaveMsg);		// 因为PushMsg()的SetEvent是在LOCK状态调用的,size()和Event有原子性,无共享冲突。

	UNLOCKOBJ;		//AAAAA ★等待其它线程的SEND()调用,释放互斥。必须释放,否则死锁★

	DWORD	ret = ::WaitForSingleObject(m_hHaveMsg, nMilliSec);
	if(ret == WAIT_OBJECT_0)
	{
		return true;
	}
	if(ret == WAIT_ABANDONED)
	{
		LOCKOBJ;		//? 内部调用,锁定
		Close();		// 本地函数,不用打开互斥锁
		return false;
	}
	else // if(ret == WAIT_TIMEOUT)
	{
		return false;
	}
}

///////////////////////////////////////////////////////////////////////////////////////
// 关闭接口,不再接收消息。可重复调用。
bool CMessagePort::Close()
{
	LOCKOBJ;

	m_nState = STATE_CLOSED;

	return true;
}

///////////////////////////////////////////////////////////////////////////////////////
// CMessagePort
///////////////////////////////////////////////////////////////////////////////////////
int CMessagePort::SIZE_OF_TYPE		(int type)
{
	const int	fixlen_type_size = 10;
	const int	size_of[fixlen_type_size] = {1,1,2,2, 4,4,4,4, 4,8};
	if(type < VARTYPE_NONE)
		return type;
	else if(type == VARTYPE_NONE)
	{
		ASSERT(!"SIZE_OF_TYPE");
		return 0;
	}
	else if(type - (VARTYPE_NONE+1) < fixlen_type_size)
		return size_of[type - (VARTYPE_NONE+1)];

	ASSERT(!"SIZE_OF_TYPE.");
	return 0;
}

///////////////////////////////////////////////////////////////////////////////////////
bool CMessagePort::PushMsg(int nPort, int nPacket, VAR_TYPE nVarType, const void* buf)	// nData中的串和结构都会被COPY
{
	LOCKOBJ;

	ASSERT(buf);

	if(m_nState != STATE_OK)		// Close()后,不接收消息
		return false;

	if(nPort < 0 || nPort >= m_setPort.size())
	{
		ASSERT(!"PushMsg(nPort)");
		return false;
	}

	int	nSize = SIZE_OF_TYPE(nVarType);
	if(nSize == 0 || nSize > MAX_MSGPACKSIZE)
	{
		ASSERT(!"PushMsg(nSize)");
		return false;
	}

	CMessagePacket*	pMsg = m_setRecycle.pop();
	if(!pMsg)
		pMsg = new CMessagePacket;		// VVVVVVVVVVVVVVVVVVV
	if(!pMsg)
	{
		ASSERT(!"new");
		return false;
	}
	pMsg->m_nPortFrom		= nPort;
	pMsg->m_nPacket			= nPacket;
	pMsg->m_nVarType		= nVarType;
	memcpy(pMsg->m_bufData, buf, nSize);

	// 原子操作
	m_setMsg.push_back(pMsg);		// AAAAAAAAAAAAAAAAAAAAAAAA
	::SetEvent(m_hHaveMsg);					// 有消息了

	// 检查溢出
	int nSetSize = m_setMsg.size();
	if(nSetSize >= 5000 && (nSetSize%1000) == 0 && nSetSize != m_debugPackets)		//? 100报警
	{
		m_debugPackets = nSetSize;
		LOGERROR("CMessagePacket[%d]节点的数量达到[%u]。[%d][%d][%s]", m_id, nSetSize, nPort, nPacket, (LPCTSTR)DumpBuffer((char*)buf, __min(16,nSize)));
		InterlockedExchange(&g_nMessagePortErrorCount, nSetSize);
	}

	// 随机取此全局变量
	if (::RandGet(300) == 123)
		InterlockedExchange(&g_nMessagePortErrorCount, nSetSize);

	if(m_setMsg.size() > 100000)
	{
		LOGERROR("ERROR: 消息堆溢出,MessagePort[%d]关闭!", m_id);
		Close();
	}

	return m_setMsg.size() <= 1000;		//? 1000出错
}

///////////////////////////////////////////////////////////////////////////////////////
bool CMessagePort::PopMsg(int *pPort, int *pPacket, VAR_TYPE *pVarType, void* buf)
{
// 内部函数	LOCKOBJ;

	ASSERT(pPort && pPacket && pVarType && buf);

	if(m_nState != STATE_OK)		// Close()后,不处理消息
		return false;

	if(m_setMsg.size() == 0)
		return false;

	for(MSG_SET::iterator iter = m_setMsg.begin(); iter != m_setMsg.end(); iter++)
	{
		CMessagePacket* pMsg = *iter;
		if((*pPort == PORT_ANY || *pPort == pMsg->m_nPortFrom) && (*pPacket == PACKET_ANY || *pPacket == pMsg->m_nPacket))
		{
			*pPort		= pMsg->m_nPortFrom;
			*pPacket	= pMsg->m_nPacket;
			*pVarType	= (VAR_TYPE)pMsg->m_nVarType;

			int	nSize = SIZE_OF_TYPE(*pVarType);
			if(nSize)
			{
				memcpy(buf, pMsg->m_bufData, nSize);
			}

			m_setRecycle.push(pMsg);
			m_setMsg.erase(iter);
			return true;
		}
	}

	return false;
}

///////////////////////////////////////////////////////////////////////////////////////
bool CMessagePort::Init()
{
	if((m_hHaveMsg	= ::CreateEvent(NULL, false, false, NULL)) == NULL)
	{
		LOGERROR("无法创建CMessagePort句柄!");
		return false;
	}
	return true;
}

///////////////////////////////////////////////////////////////////////////////////////
void CMessagePort::Clear()
{
	for(MSG_SET::iterator iter = m_setMsg.begin(); iter != m_setMsg.end(); iter++)
		delete(*iter);
	m_setMsg.clear();

	while(m_setRecycle.size())
		delete(m_setRecycle.pop());

	if(m_hHaveMsg != NULL)
		::CloseHandle(m_hHaveMsg);
	m_hHaveMsg = NULL;
}

///////////////////////////////////////////////////////////////////////////////////////
// static
///////////////////////////////////////////////////////////////////////////////////////
bool CMessagePort::InitPortSet(int nPortNum)
{
	ASSERT(nPortNum >= 1);

	for(int i = 0; i < nPortNum; i++)
	{
		CMessagePort*	pPort = new CMessagePort(i);		// VVVVVVVVVVVVVVVV
		if(!pPort)
			return false;
		m_setPort.push_back(pPort);			// AAAAAAAAAAAAAAAAAAAAAAAAAA
		if(!pPort->Init())
			return false;
	}

	return true;
}

///////////////////////////////////////////////////////////////////////////////////////
void CMessagePort::ClearPortSet()
{
	for(int i = 0; i < m_setPort.size(); i++)
	{
		//m_setPort[i]->Clear();
		delete(m_setPort[i]);
	}

	m_setPort.clear();
}

///////////////////////////////////////////////////////////////////////////////////////
CMessagePort::PORT_SET CMessagePort::m_setPort;
CRITSECT CMessagePort::m_xSetCtrl;			// 用于m_setPort互斥












⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -