⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 internetport.cpp

📁 魔域的服务端源代码。Visual C++编译的版本
💻 CPP
📖 第 1 页 / 共 2 页
字号:
			{
				BufferToList(0);
			}
			else
			{
				Close();
				Open();			// 尽快重新打开
			}
		}
	} // select

	return true;
}

///////////////////////////////////////////////////////////////////////////////////////
bool CInternetPort::RecvToBuf(CRecvBuffer* pRcvBuf, SOCKET sock)		// return false: error
{
	char*	pBuf = pRcvBuf->GetTail();
	int		nLen = message_port::RECV_BUFFER_SIZE - pRcvBuf->GetLen();
	int	ret = recv(sock, pBuf, nLen, 0);		// 接收数据
	if(ret > 0)
	{
		pRcvBuf->AddLen(ret);
		return true;
	}

	if(ret == 0)
	{
		LOGERROR("message: socket close.");
	}
	else if(ret < 0)
	{
		int err = WSAGetLastError();
		if(err == WSAEWOULDBLOCK)
			return true;
		LOGERROR("message: socket error[%d], recv() failed.", err);
	}

	return false;
}

///////////////////////////////////////////////////////////////////////////////////////
bool CInternetPort::BufferToList(int idx)		// return false: 无消息
{
	if(m_id == 0)
	{
		while(true)
		{
			// 处理消息
			if(m_setRecvBuf[idx].GetLen() < MSG_HEAD_SIZE)
				break;
			CMessageHead*	pHead = m_setRecvBuf[idx].GetHead();
			int nLen = SIZE_OF_TYPE(pHead->m_nVarType);
			if(m_setRecvBuf[idx].GetLen() < MSG_HEAD_SIZE + nLen)
				break;

			if(pHead->m_nPacket >= SYS_PACKET_BASE)
			{
				if(pHead->m_nPacket == SYS_PACKET_LOGINKEY)			// 处理节点验证
				{
					LPCTSTR	pszKey = (LPCTSTR)pHead->m_bufData;
					IF_NOT(strlen(m_szKey) && strcmp(pszKey, m_szKey) == 0)
					{
						closesocket(m_setSocket[idx]);

						//??? 未强制检查KEY
					}
				}
				else if(pHead->m_nPacket == SYS_PACKET_PORTID)			// 处理节点登录
				{
					IF_OK(pHead->m_nVarType == VARTYPE_INT)
					{
						int	nLoginPort = *((int*)pHead->m_bufData);
						IF_OK(nLoginPort > MASTER_PORT_ID && nLoginPort < m_nPortSize		// >MASTER_PORT_ID : 不能为主节点号
								&& idx >= m_nPortSize)
						{
							if(m_setSocket[nLoginPort] != INVALID_SOCKET)
							{
								// 双重登录,关闭旧SOCKET
								closesocket(m_setSocket[nLoginPort]);
							}
							m_setSocket[nLoginPort]		= m_setSocket[idx];
							m_setRecvBuf[nLoginPort]	= m_setRecvBuf[idx];
							m_setSocket.erase(m_setSocket.begin() + idx);		// 要删除,倒排
							m_setRecvBuf.erase(m_setRecvBuf.begin() + idx);

							// 移动idx
							idx		= nLoginPort;
						}
					}
					SendFrom(m_id, idx, SYS_PACKET_PORTSIZE, VARTYPE_INT, &m_nPortSize);
				}
				else 
				{
					ASSERT(!"Unknown system packet!");
				}
			}
			else // not system packet
			{
				if(pHead->m_nPort != MASTER_PORT_ID)		//? m_nPort: 子节点上传到主节点时,表示TARGET_PORT
				{
					// 处理转发消息
					IF_OK(idx < m_nPortSize)
						SendFrom(idx, pHead->m_nPort, pHead->m_nPacket, (VAR_TYPE)pHead->m_nVarType, pHead->m_bufData);		// idx == sub_port_id
				}
				else
				{
					// 存消息
					IF_OK(idx < m_nPortSize)
						PushMsg(idx, pHead->m_nPacket, (VAR_TYPE)pHead->m_nVarType, pHead->m_bufData);		// idx == sub_port_id
				}
			}

			// 清除BUFFER中的消息
			m_setRecvBuf[idx].DelMsg(MSG_HEAD_SIZE + nLen);

		} // while
	}
	else // 子节点
	{
		while(true)
		{
			// 处理消息
			if(m_setRecvBuf[0].GetLen() < MSG_HEAD_SIZE)
				break;
			CMessageHead*	pHead = m_setRecvBuf[0].GetHead();
			int nLen = SIZE_OF_TYPE(pHead->m_nVarType);
			if(m_setRecvBuf[0].GetLen() < MSG_HEAD_SIZE + nLen)
				break;

			if(pHead->m_nPacket == SYS_PACKET_PORTSIZE)	// 处理节点尺寸
			{
				IF_OK(pHead->m_nVarType == VARTYPE_INT)
				{
					int	nPortSize = *((int*)pHead->m_bufData);
					IF_OK(idx == MASTER_PORT_ID)
					{
						if(nPortSize != m_nPortSize)
						{
							m_nPortSize = nPortSize;		//? reset size!!!
							m_setSocket.resize(m_nPortSize, m_sockMain);
						}
					}
				}
			}
			else
			{
				// 存消息
				PushMsg(pHead->m_nPort, pHead->m_nPacket, (VAR_TYPE)pHead->m_nVarType, pHead->m_bufData);
			}

			// 清除BUFFER中的消息
			m_setRecvBuf[0].DelMsg(MSG_HEAD_SIZE + nLen);
		}
	}

	return (m_setMsg.size() != 0);
}

///////////////////////////////////////////////////////////////////////////////////////
bool CInternetPort::WaitMsg(int nMilliSec)
{
	if(!IsOpen())
		return false;

	if(m_setMsg.size())
		return true;

	RecvMsg(nMilliSec);

	return	(m_setMsg.size() != 0);
}

///////////////////////////////////////////////////////////////////////////////////////
// 关闭接口,不再接收消息。可重复调用。
bool CInternetPort::Close()
{
	if(m_nState == STATE_CLOSED)
		return true;

	m_nState = STATE_CLOSED;
	if(m_id == 0)		// 第一号接口为服务端,其它接口为客户端
	{
		// 清空消息串
		for(MSG_SET::iterator iter = m_setMsg.begin(); iter != m_setMsg.end(); iter++)
			delete(*iter);
		m_setMsg.clear();
		// 清空BUFFER
		m_setRecvBuf.resize(m_nPortSize);
		for(int i = 0; i < m_setRecvBuf.size(); i++)
			m_setRecvBuf[i].Reset();

		// 关闭主SOCKET
		if(m_sockMain != INVALID_SOCKET)
		{
			closesocket(m_sockMain);
			m_sockMain	= INVALID_SOCKET;
		}
		// 关闭子SOCKET
		m_setSocket.resize(m_nPortSize);
		for( i = 0; i < m_setSocket.size(); i++)
		{
			if(m_setSocket[i] != INVALID_SOCKET)
			{
				closesocket(m_setSocket[i]);
				m_setSocket[i]	= INVALID_SOCKET;
			}
		}
	}
	else
	{
		// 清空消息串
		for(MSG_SET::iterator iter = m_setMsg.begin(); iter != m_setMsg.end(); iter++)
			delete(*iter);
		m_setMsg.clear();
		// 清空BUFFER
		m_setRecvBuf[0].Reset();

		// 关闭主SOCKET
		if(m_sockMain != INVALID_SOCKET)
		{
			closesocket(m_sockMain);
			m_sockMain	= INVALID_SOCKET;
		}
		// 清空SOCKET表
		for(int i = 0; i < m_setSocket.size(); i++)
			m_setSocket[i] = INVALID_SOCKET;
	}

	return true;
}

///////////////////////////////////////////////////////////////////////////////////////
// CInternetPort
///////////////////////////////////////////////////////////////////////////////////////
int CInternetPort::SIZE_OF_TYPE		(int type)
{
	const int	fixlen_type_size = 10;
	const int	size_of[fixlen_type_size] = {1,1,2,2, 4,4,4,4, 4,8};
	if(type < VARTYPE_NONE)
		return type;
	else if(type == VARTYPE_NONE)
	{
		ASSERT(!"SIZE_OF_TYPE");
		return 0;
	}
	else if(type - (VARTYPE_NONE+1) < fixlen_type_size)
		return size_of[type - (VARTYPE_NONE+1)];

	ASSERT(!"SIZE_OF_TYPE.");
	return 0;
}

///////////////////////////////////////////////////////////////////////////////////////
bool CInternetPort::PushMsg(int nPort, int nPacket, VAR_TYPE nVarType, const void* buf)	// nData中的串和结构都会被COPY
{
	CHECKF(buf);

	if(m_nState != STATE_OK)		// Close()后,不接收消息
		return false;

	if(nPort < 0 || nPort >= m_nPortSize)
	{
		ASSERT(!"PushMsg(nPort)");
		return false;
	}

	int	nSize = SIZE_OF_TYPE(nVarType);
	if(nSize == 0 || nSize > MAX_MSGPACKSIZE)
	{
		ASSERT(!"PushMsg(nSize)");
		return false;
	}

	CMessagePacket*	pMsg = new CMessagePacket;		// VVVVVVVVVVVVVVVVVVV
	if(!pMsg)
	{
		ASSERT(!"new");
		return false;
	}
	pMsg->m_nPortFrom		= nPort;
	pMsg->m_nPacket			= nPacket;
	pMsg->m_nVarType		= nVarType;
	memcpy(pMsg->m_bufData, buf, nSize);

	m_setMsg.push_back(pMsg);		// AAAAAAAAAAAAAAAAAAAAAAAA

	// 检查溢出
	if((m_setMsg.size()%100) == 0)		//? 100报警
		LOGERROR("CInternetPacket[%d]接口的数量达到[%u]", m_id, m_setMsg.size());

	if(m_setMsg.size() > 10000)
	{
		LOGERROR("ERROR: 消息堆溢出,InternetPort[%d]关闭!", m_id);
		Close();
	}

	return m_setMsg.size() <= 1000;		//? 1000出错
}

///////////////////////////////////////////////////////////////////////////////////////
bool CInternetPort::PopMsg(int *pPort, int *pPacket, VAR_TYPE *pVarType, void* buf)
{
	CHECKF(pPort && pPacket && pVarType && buf);

	if(m_nState != STATE_OK)		// Close()后,不处理消息
		return false;

	if(m_setMsg.size() == 0)
		return false;

	for(MSG_SET::iterator iter = m_setMsg.begin(); iter != m_setMsg.end(); iter++)
	{
		CMessagePacket* pMsg = *iter;
		if((*pPort == PORT_ANY || *pPort == pMsg->m_nPortFrom) && (*pPacket == PACKET_ANY || *pPacket == pMsg->m_nPacket))
		{
			*pPort		= pMsg->m_nPortFrom;
			*pPacket	= pMsg->m_nPacket;
			*pVarType	= (VAR_TYPE)pMsg->m_nVarType;

			int	nSize = SIZE_OF_TYPE(*pVarType);
			if(nSize)
			{
				memcpy(buf, pMsg->m_bufData, nSize);
			}

			delete(pMsg);
			m_setMsg.erase(iter);
			return true;
		}
	}

	return false;
}

///////////////////////////////////////////////////////////////////////////////////////
bool CInternetPort::Init()
{
	m_setSocket.resize(m_nPortSize, INVALID_SOCKET);

	if(m_id == MASTER_PORT_ID)
	{
		for(int i = 0; i < m_nPortSize; i++)
			m_setRecvBuf.push_back(CRecvBuffer());
	}
	else
		m_setRecvBuf.push_back(CRecvBuffer());

	return true;
}

///////////////////////////////////////////////////////////////////////////////////////
void CInternetPort::Clear()
{
	Close();

	for(MSG_SET::iterator iter = m_setMsg.begin(); iter != m_setMsg.end(); iter++)
		delete(*iter);
	m_setMsg.clear();
}

///////////////////////////////////////////////////////////////////////////////////////
// static
///////////////////////////////////////////////////////////////////////////////////////
bool CInternetPort::InitPortSet(int nPortSize, LPCTSTR pszMasterIP, int nMasterPort)
{
	if(!pszMasterIP || pszMasterIP[0] == 0 || strlen(pszMasterIP) >= IPSTR_SIZE)
		return false;

	if(nPortSize <= m_id)
		nPortSize = m_id + 1;		// 容错

	if(nPortSize < 1 || nPortSize > MAX_PORT_SIZE)
	{
		ASSERT(!"网络节点数错误! 请修改FD_SETSIZE值并重新编译程序。");
		return false;
	}

	m_nPortSize	= nPortSize;
	strcpy(m_szMasterIP, pszMasterIP);
	m_nMasterPort	= nMasterPort;


	// 初始化网络
	WSADATA		wsaData;
	int	err;
	if((err = WSAStartup(SOCKET_VERSION, &wsaData)) != 0)
	{
		LOGERROR("WSAStartup failed[%d].", err);
		return false;
	}
	m_bWinStartup = true;

	// 检查版本
	if(wsaData.wVersion != 0x0002)
	{
		WSACleanup();
		LOGERROR("wsaData version unmatch[0x%04X].", wsaData.wVersion);
		return false;
	}

	Init();
	return true;
}

///////////////////////////////////////////////////////////////////////////////////////
void CInternetPort::ClearPortSet()
{
	Clear();

	if(m_bWinStartup)
		WSACleanup();
}

///////////////////////////////////////////////////////////////////////////////////////
IMessagePort* CInternetPort::CreateNew(int nPortID, int nPortSize, LPCTSTR szMasterIP, int nMasterPort, LPCTSTR szKey)
{
	CHECKF(szMasterIP && szKey);
	CHECKF(nPortID >= 0 && nPortSize >=0 && nMasterPort >= 0);

	if(nPortID == 0 && nPortSize == 0 || nPortID != 0 && strlen(szMasterIP) == 0)
		return NULL;

	CInternetPort* pPort = new CInternetPort(nPortID);
	if(pPort)
	{
		if(pPort->InitPortSet(nPortSize, szMasterIP, nMasterPort))
		{
			SafeCopy(pPort->m_szKey, szKey, MAX_KEYSIZE);
			return (IMessagePort*)pPort;
		}

		delete pPort;
	}

	return NULL;
}














⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -