⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 comm.cpp

📁 通讯中间件
💻 CPP
📖 第 1 页 / 共 3 页
字号:
		{
			now = time(NULL);
			if ((now - pLink->m_SendTime) >= 60)
			{
				pLink->m_SendTime = now;

				memset(LoginBuf, 0, sizeof(LoginBuf));
				*(unsigned long *)&LoginBuf[0] = htonl(16);
				*(unsigned long *)&LoginBuf[4] = htonl(COMM_AUTH);
				*(unsigned long *)&LoginBuf[8] = htonl(comm->GetModuleID());
				*(unsigned long *)&LoginBuf[12] = htonl(0);

				if (comm->Send(pLink->m_SocketID, LoginBuf, 16, 0) < 0)
				{
					g_CommLog.Print("[提示信息] 服务端连接线程(%s:%u -> %s:%u):发送握手消息失败", szForeignIP, uForeignPort, szLocalIP, uLocalPort);
					break;
				}
			}
		}

		// 未连接上则退出
		if (pLink->m_SocketID == INVALID_SOCKET)
		{
			g_CommLog.Print("[提示信息] 服务端连接线程(%s:%u -> %s:%u):描述字无效", szForeignIP, uForeignPort, szLocalIP, uLocalPort);
			break;
		}

		// 事件集合
		rset = newrset;

		// 等待数据到来
		memset(&timeout, 0, sizeof(timeout));
		timeout.tv_sec = 5;
		timeout.tv_usec = 0;

		nready = select(pLink->m_SocketID + 1, &rset, NULL, NULL, &timeout);
		if (nready > 0)
		{
			if (pLink->m_SocketID != INVALID_SOCKET && FD_ISSET(pLink->m_SocketID, &rset))
			{
				if (nrecv < 0 || nrecv >= COMM_PDU_RECV)
				{
					g_CommLog.Print("[提示信息] 服务端连接线程(%s:%u -> %s:%u):残留数据总量异常:%d", szForeignIP, uForeignPort, szLocalIP, uLocalPort, nrecv);

					nrecv = 0;
				}
				nread = recv(pLink->m_SocketID, buf + nrecv, COMM_PDU_RECV - nrecv, 0);
				if (nread > 0)
				{
					now = time(NULL);
					pLink->m_RecvTime = now;

					nrecv += nread;
					if (nrecv <= 0 || nrecv > COMM_PDU_RECV)
					{
						g_CommLog.Print("[提示信息] 服务端连接线程(%s:%u -> %s:%u):接收数据后总量异常:%d", szForeignIP, uForeignPort, szLocalIP, uLocalPort, nrecv);

						nrecv = 0;
					}
					else if (nrecv >= 16)
					{
						npos = 0;
						while ((npos + 16) <= nrecv)
						{
							nlen = (int)ntohl(*(unsigned long *)&buf[npos]);
							if (nlen < 16 || nlen > COMM_PDU_SIZE)
							{
								g_CommLog.Print("[提示信息] 服务端连接线程(%s:%u -> %s:%u):消息包长度异常:%d", szForeignIP, uForeignPort, szLocalIP, uLocalPort, nlen);

								npos = 0;
								nrecv = 0;
								break;
							}
							else if ((npos + nlen) > nrecv)
							{
								break;
							}

							msglen = (unsigned long)ntohl(*(unsigned long *)&buf[npos + 0]);
							msgtype = (unsigned long)ntohl(*(unsigned long *)&buf[npos + 4]);
							sender = (unsigned long)ntohl(*(unsigned long *)&buf[npos + 8]);
							receiver = (unsigned long)ntohl(*(unsigned long *)&buf[npos + 12]);
							memcpy(bodybuf, &buf[npos + 16], msglen - 16);
							
							if (sender != 0)
							{
								if (msgtype == COMM_AUTH)
								{	
									if (pLink->m_ModuleID != sender)
									{
										if (comm->ExistID(sender) == COMM_ERR_NONE)
										{
											if (pLink->m_ModuleID != 0)
											{
												memset(&packet, 0, sizeof(packet));
												packet.m_ModuleID = pLink->m_ModuleID;
												packet.m_BodyLen = 0;

												comm->OnClose(&packet);
											}
											comm->Lock();
											pLink->Close();
											pLink->Release();
											comm->UnLock();

											g_CommLog.Print("[提示信息] 服务端连接线程(%s:%u -> %s:%u):模块冲突:%u", szForeignIP, uForeignPort, szLocalIP, uLocalPort, sender);
											comm->DelThreadCount();

											return 0;
										}
										else
										{
											if (pLink->m_ModuleID != 0)
											{
												memset(&packet, 0, sizeof(packet));
												packet.m_ModuleID = pLink->m_ModuleID;
												packet.m_BodyLen = 0;

												comm->OnClose(&packet);
											}
											pLink->m_ModuleID = sender;

											memset(&packet, 0, sizeof(packet));
											packet.m_ModuleID = pLink->m_ModuleID;
											packet.m_BodyLen = 0;

											comm->OnConnect(&packet);
										}
									}
								}
								else if (msgtype == COMM_DATA)
								{
									if (comm->GetModuleID() == receiver)
									{
										if (pLink->m_ModuleID == sender)
										{
											memset(&packet, 0, sizeof(packet));
											packet.m_ModuleID = pLink->m_ModuleID;
											packet.m_BodyLen = msglen - 16;
											memcpy(packet.m_BodyBuf, bodybuf, msglen - 16);

											comm->OnReceive(&packet);
										}
										else
										{
											g_CommLog.Print("[提示信息] 服务端连接线程(%s:%u -> %s:%u):发送模块错误:%u,%u", szForeignIP, uForeignPort, szLocalIP, uLocalPort, pLink->m_ModuleID, sender);
										}
									}
									else
									{
										g_CommLog.Print("[提示信息] 服务端连接线程(%s:%u -> %s:%u):接收模块错误:%u,%u", szForeignIP, uForeignPort, szLocalIP, uLocalPort, comm->GetModuleID(), receiver);
									}
								}
								else
								{
									g_CommLog.Print("[提示信息] 服务端连接线程(%s:%u -> %s:%u):未知消息类型(HEX: %08x)", szForeignIP, uForeignPort, szLocalIP, uLocalPort, msgtype);
								}
							}
							else
							{
								g_CommLog.Print("[提示信息] 服务端连接线程(%s:%u -> %s:%u):发送者模块标识为零", szForeignIP, uForeignPort, szLocalIP, uLocalPort);
							}

							npos += nlen;
						}
						
						for (i = npos; i < nrecv; i++)
						{
							buf[i - npos] = buf[i];
						}
						nrecv -= npos;
					}
				}
				else if (nread == 0)
				{
					g_CommLog.Print("[提示信息] 服务端连接线程(%s:%u -> %s:%u):连接关闭", szForeignIP, uForeignPort, szLocalIP, uLocalPort);
					break;
				}
				else
				{
					g_CommLog.Print("[提示信息] 服务端连接线程(%s:%u -> %s:%u):读数据异常", szForeignIP, uForeignPort, szLocalIP, uLocalPort);
					break;
				}
			}
			else
			{
				g_CommLog.Print("[提示信息] 服务端连接线程(%s:%u -> %s:%u):等待正常但无信号", szForeignIP, uForeignPort, szLocalIP, uLocalPort);
			}
		}
		else if (nready == 0)
		{
			;
		}
		else
		{
			g_CommLog.Print("[提示信息] 服务端连接线程(%s:%u -> %s:%u):等待数据错误", szForeignIP, uForeignPort, szLocalIP, uLocalPort);
			break;
		}
	}

	if (pLink->m_ModuleID != 0)
	{
		memset(&packet, 0, sizeof(packet));
		packet.m_ModuleID = pLink->m_ModuleID;
		packet.m_BodyLen = 0;

		comm->OnClose(&packet);
	}
	comm->Lock();
	pLink->Close();
	pLink->Release();
	comm->UnLock();

	g_CommLog.Print("[提示信息] 服务端连接线程(%s:%u -> %s:%u):正常退出", szForeignIP, uForeignPort, szLocalIP, uLocalPort);
	comm->DelThreadCount();

	return 0;
}

// 服务器端监听线程
#ifdef _WIN32
DWORD ServerListenThread(LPVOID lpParam)
#else
void *ServerListenThread(void *lpParam)
#endif
{
#ifndef _WIN32
	pthread_detach(pthread_self());
	signal(SIGPIPE, SIG_IGN);
#endif

	char param[80];
	unsigned long nParam;
	TComm *comm;

	SOCKET consock, maxsock;
	struct sockaddr_in cliaddr;
	int addrlen;
	fd_set rset, newrset;
	int nready;
	struct timeval timeout;
	DWORD dwThreadID;
	char *cliparam;

	HANDLE hRet;

	int i;
	time_t now;
	TPacket packet;

	int ret;
	int WaitSendFlag[COMM_WAITSEND_NUM];
	int WaitSendCount[COMM_WAITSEND_NUM];
	time_t WaitSendTime[COMM_WAITSEND_NUM];
	TPacket WaitSendPacket[COMM_WAITSEND_NUM];

	for (i = 0; i < COMM_WAITSEND_NUM; i++)
	{
		WaitSendFlag[i] = 0;
		WaitSendCount[i] = 0;
		WaitSendTime[i] = 0;
	}

	if (lpParam == NULL)
	{
		g_CommLog.Print("[系统错误] 服务端监听线程参数为空");
		return 0;
	}

	memset(param, 0, 80);
	memcpy(param, lpParam, 80);
	delete [] (char *)(lpParam);
	nParam = *(unsigned long *)&param[0];
	comm = (TComm *)nParam;

	if (comm == NULL)
	{
		g_CommLog.Print("[系统错误] 服务端监听线程参数中包含的通信类指针为空");
		return 0;
	}

	FD_ZERO(&newrset);

	comm->AddThreadCount();
	g_CommLog.Print("[提示信息] 服务端监听线程,正常启动");

	while (comm->m_Stoped == 0)
	{
		// 从重发队列取等待重发的消息包
		for (i = 0; i < COMM_WAITSEND_NUM; i++)
		{
			now = time(NULL);

			if (WaitSendFlag[i] <= 0)
			{
				ret = comm->GetPacket(&packet);
				if (ret == COMM_ERR_NONE)
				{
					WaitSendFlag[i] = 1;
					WaitSendCount[i] = 1;
					WaitSendTime[i] = now;
					WaitSendPacket[i] = packet;
				}
			}
			else
			{
				if ((now - WaitSendTime[i]) >= 60)
				{
					WaitSendCount[i]++;
					WaitSendTime[i] = now;

					packet = WaitSendPacket[i];
					ret = comm->SendMessage(&packet);
					if (ret == COMM_ERR_NONE)
					{
						WaitSendFlag[i] = 0;
					}
					else
					{
						if (WaitSendCount[i] >= 3)
						{
							WaitSendFlag[i] = 0;
						}
					}
				}
			}
		}

		// 重新监听
		for (i = 0; i < COMM_LISTEN_MAX; i++)
		{
			if (comm->m_Listen[i].m_ListenPort != 0)
			{
				if (comm->m_Listen[i].m_ListenSocket == INVALID_SOCKET)
				{
					now = time(NULL);

					if ((now - comm->m_Listen[i].m_ListenTime) >= 60)
					{
						comm->m_Listen[i].m_ListenTime = now;		
						comm->m_Listen[i].m_ListenSocket = comm->Listen(comm->m_Listen[i].m_ListenPort);
						if (comm->m_Listen[i].m_ListenSocket != INVALID_SOCKET)
						{
							FD_SET(comm->m_Listen[i].m_ListenSocket, &newrset);

							g_CommLog.Print("[提示信息] 服务端监听线程:监听成功(%u)", comm->m_Listen[i].m_ListenPort);
						}
						else
						{
							g_CommLog.Print("[提示信息] 服务端监听线程:监听失败(%u)", comm->m_Listen[i].m_ListenPort);
						}
					}
				}
			}
		}

		// 得到最大描述字
		consock = INVALID_SOCKET;
		maxsock = INVALID_SOCKET;
		for (i = 0; i < COMM_LISTEN_MAX; i++)
		{
			if (comm->m_Listen[i].m_ListenPort != 0)
			{
				if (comm->m_Listen[i].m_ListenSocket != INVALID_SOCKET)
				{
					consock = comm->m_Listen[i].m_ListenSocket;

					if (comm->m_Listen[i].m_ListenSocket > maxsock)
					{
						maxsock = comm->m_Listen[i].m_ListenSocket;
					}
				}
			}
		}

		// 判断是否有监听成功
		if (consock == INVALID_SOCKET)
		{
#ifdef _WIN32
			Sleep(5000);
#else
			sleep(5);
#endif	
			FD_ZERO(&newrset);
			continue;
		}


		// 等待连接到来
		memset(&timeout, 0, sizeof(timeout));
		timeout.tv_sec = 5;
		timeout.tv_usec = 0;

		rset = newrset;

		nready = select(maxsock + 1, &rset, NULL, NULL, &timeout);
		if (nready > 0)
		{
			for (i = 0; i < COMM_LISTEN_MAX; i++)
			{
				if (comm->m_Listen[i].m_ListenSocket != INVALID_SOCKET && FD_ISSET(comm->m_Listen[i].m_ListenSocket, &rset))
				{
					addrlen = sizeof(struct sockaddr);
					consock = accept(comm->m_Listen[i].m_ListenSocket, (struct sockaddr *)&cliaddr, (socklen_t *)&addrlen);
					if (consock != INVALID_SOCKET)
					{
						g_CommLog.Print("[提示信息] 服务端监听线程(%u),接收连接:%s:%u", comm->m_Listen[i].m_ListenPort, inet_ntoa(cliaddr.sin_addr), ntohs(cliaddr.sin_port));

						if (comm->AllowIP(inet_ntoa(cliaddr.sin_addr)) != 0)
						{
							closesocket(consock);
							consock = INVALID_SOCKET;

							g_CommLog.Print("[提示信息] 服务端监听线程(%u),断开非信任连接:%s:%u", comm->m_Listen[i].m_ListenPort, inet_ntoa(cliaddr.sin_addr), ntohs(cliaddr.sin_port));
						}
						else
						{
							cliparam = NULL;
							cliparam = (char *)new char[80];
							if (cliparam == NULL)
							{
								closesocket(consock);
								consock = INVALID_SOCKET;

								g_CommLog.Print("[提示信息] 服务端监听线程(%u),内存分配失败", comm->m_Listen[i].m_ListenPort);
							}
							else
							{
								memset(cliparam, 0, 80);
								*(unsigned long *)&cliparam[0] = *(unsigned long *)&param[0];
								*(unsigned long *)&cliparam[4] = (unsigned long)consock;

								hRet = (HANDLE)NULL;

#ifdef _WIN32
								hRet = CreateThread((LPSECURITY_ATTRIBUTES )NULL,
										0,
										(LPTHREAD_START_ROUTINE)ServerConnectThread,
										(LPVOID)cliparam,
										0,
										&dwThreadID);

								if (hRet == (HANDLE)NULL)
								{
									closesocket(consock);
									consock = INVALID_SOCKET;
									delete cliparam;
									cliparam = NULL;

									g_CommLog.Print("[提示信息] 服务端监听线程(%u),创建连接线程失败", comm->m_Listen[i].m_ListenPort);
								}
#else
								if (pthread_create(&hRet, NULL, ServerConnectThread, cliparam) != 0)
								{
									closesocket(consock);
									consock = INVALID_SOCKET;
									delete cliparam;
									cliparam = NULL;

									g_CommLog.Print("[提示信息] 服务端监听线程(%u),创建连接线程失败", comm->m_Listen[i].m_ListenPort);
								}
#endif
							}
						}
					}
				}
				else
				{
					closesocket(comm->m_Listen[i].m_ListenSocket);
					comm->m_Listen[i].m_ListenSocket = INVALID_SOCKET;

					g_CommLog.Print("[提示信息] 服务端监听线程(%u),接收连接失败", comm->m_Listen[i].m_ListenPort);
				}
			}
		}
		else if (nready == 0)
		{
			;
		}
		else
		{
			g_CommLog.Print("[提示信息] 服务端监听线程等待错误");
		}
	}

	for (i = 0; i < COMM_LISTEN_MAX; i++)
	{
		if (comm->m_Listen[i].m_ListenSocket != INVALID_SOCKET)
		{
			closesocket(comm->m_Listen[i].m_ListenSocket);
			comm->m_Listen[i].m_ListenSocket = INVALID_SOCKET;
			comm->m_Listen[i].m_ListenPort = 0;
			comm->m_Listen[i].m_ListenTime = 0;
		}
	}
	FD_ZERO(&newrset);

	g_CommLog.Print("[提示信息] 服务端监听线程,正常退出");
	comm->DelThreadCount();

	return 0;
}

TComm::TComm()
{
	int i;
	m_ModuleID = 0;
	
#ifdef _WIN32
	WSADATA wsaData;
	WSAStartup(MAKEWORD(2, 0),&wsaData);
#else
	signal(SIGPIPE, SIG_IGN);
#endif

	for (i = 0; i < COMM_TRUSTIP_MAX; i++)
	{
		m_TrustIP[i] = 0;
	}

	m_WaitSendGetPos = 0;
	m_WaitSendPutPos = 0;

	m_Stoped = 0;
	m_ThreadCount = 0;

	char *param;

	param = NULL;
	param = (char *)new char[80];
	if (param == NULL)
	{
		g_CommLog.Print("[系统错误] 创建监听线程,内存分配失败");
	}
	else
	{
		memset(param, 0, 80);
		*(unsigned long *)&param[0] = (unsigned long)this;

		DWORD dwThreadID;
		HANDLE hRet;

		hRet = (HANDLE)NULL;

#ifdef _WIN32
		hRet = CreateThread((LPSECURITY_ATTRIBUTES )NULL,
						0,
						(LPTHREAD_START_ROUTINE)ServerListenThread,
						(LPVOID)param,
						0,
						&dwThreadID);

		if (hRet == (HANDLE)NULL)
		{
			delete param;
			param = NULL;

			g_CommLog.Print("[系统错误] 创建监听线程失败");
		}
#else
		if (pthread_create(&hRet, NULL, ServerListenThread, param) != 0)
		{
			delete param;
			param = NULL;

			g_CommLog.Print("[系统错误] 创建监听线程失败");

		}
#endif
	}

}

TComm::~TComm(void)
{
	int i;

	m_Stoped = 1;

	for (i = 0; i < 5; i++)
	{
		if (m_ThreadCount <= 0)
			break;

#ifdef _WIN32
		Sleep(2);
#else
		sleep(2);
#endif
	}


#ifdef _WIN32
	WSACleanup();
#endif
}

int TComm::SendMessage(const TPacket *lpPacket)
{
	TPacket packet;
	char buf[COMM_PDU_SIZE];
	int i, nFoundIndex, nSendSuccess;
	int ret;

	if (lpPacket == NULL)
	{
		g_CommLog.Print("[应用错误] 发送消息包函数的参数为空");

		return COMM_ERR_PARAM;
	}

	packet = *lpPacket;

	if (packet.m_BodyLen > COMM_BODY_SIZE)
	{
		g_CommLog.Print("[应用错误] 发送消息包时长度异常,长度=%u", packet.m_BodyLen);

		return COMM_ERR_PARAM;
	}

	if (packet.m_ModuleID == 0)
	{
		g_CommLog.Print("[应用错误] 发送消息包时接收模块标识为零");

		return COMM_ERR_PARAM;
	}

	if (packet.m_ModuleID == m_ModuleID)
	{

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -