⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 comm.cpp

📁 通讯中间件
💻 CPP
📖 第 1 页 / 共 3 页
字号:

#include "comm.h"

TLogFile g_CommLog;

// 客户端连接线程
#ifdef _WIN32
DWORD ClientConnectThread(LPVOID lpParam)
#else
void *ClientConnectThread(void *lpParam)
#endif
{
#ifndef _WIN32
	pthread_detach(pthread_self());
	signal(SIGPIPE, SIG_IGN);
#endif

	char param[80];
	unsigned long nParam;
	TComm *comm;
	char szHost[64];
	unsigned short nPort;
	
	struct sockaddr_in cliaddr;
	int addrlen;
	fd_set rset, newrset;
	struct timeval timeout;
	int nready, nread, i;
	int nrecv, nlen, npos;
	char buf[COMM_PDU_RECV];
	time_t now;

	unsigned long msglen;
	unsigned long msgtype;
	unsigned long sender;
	unsigned long receiver;
	char bodybuf[COMM_PDU_SIZE - 16];
	TPacket packet;

	char LoginBuf[16];
	SOCKET consock;
	
	if (lpParam == NULL)
	{
		g_CommLog.Print("[系统错误] 客户端连接线程参数为空");
		return 0;
	}

	memset(param, 0, 80);
	memcpy(param, lpParam, 80);
	delete [] (char *)lpParam;
	nParam = *(unsigned long *)&param[0];
	comm = (TComm *)nParam;
	nParam = *(unsigned long *)&param[4];
	nPort = (unsigned short)nParam;
	memset(szHost, 0, sizeof(szHost));
	strncpy(szHost, (char *)&param[8], sizeof(szHost) - 1);

	if (comm == NULL)
	{
		g_CommLog.Print("[系统错误] 客户端连接线程参数中包含的通信类指针为空");
		return 0;
	}

	if (strlen(szHost) <= 0)
	{
		g_CommLog.Print("[系统错误] 客户端连接线程参数中包含的目的地址为空");
		return 0;
	}

	if (nPort == 0)
	{
		g_CommLog.Print("[系统错误] 客户端连接线程参数中包含的目的端口为零");
		return 0;
	}

	TLink *pLink = NULL;

	pLink = comm->AssignLink();
	if (pLink == NULL)
	{
		g_CommLog.Print("[应用错误] 客户端连接线程未能分配到连接资源");
		return 0;
	}

	comm->AddThreadCount();
	g_CommLog.Print("[提示信息] 客户端连接线程(%s:%u):正常启动", szHost, nPort);

	pLink->m_SocketID = INVALID_SOCKET;
	pLink->m_ConnectType = 1;
	
	FD_ZERO(&newrset);
	nrecv = 0;

	while (comm->m_Stoped == 0)
	{
		// 断开长时间无数据的连接
		if (pLink->m_SocketID != INVALID_SOCKET)
		{
			now = time(NULL);
			if ((now - pLink->m_RecvTime) >= 300)
			{
				if (pLink->m_ModuleID != 0)
				{
					memset(&packet, 0, sizeof(packet));
					packet.m_ModuleID = pLink->m_ModuleID;
					packet.m_BodyLen = 0;

					comm->OnClose(&packet);
				}
				comm->Lock();
				pLink->Close();
				comm->UnLock();
				
				FD_ZERO(&newrset);
				nrecv = 0;

				g_CommLog.Print("[提示信息] 客户端连接线程(%s:%u):断开长时间无接收数据的连接", szHost, nPort);
			}
		}

		// 断开长时间不注册的连接
		if (pLink->m_SocketID != INVALID_SOCKET)
		{
			if (pLink->m_ModuleID == 0)
			{
				now = time(NULL);
				if ((now - pLink->m_ConnectTime) >= 60)
				{
					if (pLink->m_ModuleID != 0)
					{
						memset(&packet, 0, sizeof(packet));
						packet.m_ModuleID = pLink->m_ModuleID;
						packet.m_BodyLen = 0;

						comm->OnClose(&packet);
					}
					comm->Lock();
					pLink->Close();
					comm->UnLock();
				
					FD_ZERO(&newrset);
					nrecv = 0;

					g_CommLog.Print("[提示信息] 客户端连接线程(%s:%u):断开长时间无注册的连接", szHost, nPort);
				}
			}
		}

		// 连接不成功时或连接断开时重连
		if (pLink->m_SocketID == INVALID_SOCKET)
		{
			FD_ZERO(&newrset);
			nrecv = 0;

			now = time(NULL);
			if ((now - pLink->m_ConnectTime) >= 15)
			{
				pLink->m_ConnectTime = now;
				pLink->m_RecvTime = now;
				pLink->m_SendTime = 0;

				consock = comm->Connect(szHost, nPort);
				comm->Lock();
				pLink->m_SocketID = consock;
				comm->UnLock();
				if (pLink->m_SocketID != INVALID_SOCKET)
				{
					g_CommLog.Print("[提示信息] 客户端连接线程(%s:%u):连接成功", szHost, nPort);

					FD_ZERO(&newrset);
					FD_SET(pLink->m_SocketID, &newrset);
					nrecv = 0;

					addrlen = sizeof(struct sockaddr);
					getpeername(pLink->m_SocketID, (struct sockaddr *)&cliaddr, (socklen_t *)&addrlen);
					
					memset(pLink->m_ForeignIP, 0, sizeof(pLink->m_ForeignIP));
					strncpy(pLink->m_ForeignIP, inet_ntoa(cliaddr.sin_addr), sizeof(pLink->m_ForeignIP) - 1);
					pLink->m_ForeignPort = ntohs(cliaddr.sin_port);

					addrlen = sizeof(struct sockaddr);
					getsockname(pLink->m_SocketID, (struct sockaddr *)&cliaddr, (socklen_t *)&addrlen);
					
					memset(pLink->m_LocalIP, 0, sizeof(pLink->m_LocalIP));
					strncpy(pLink->m_LocalIP, inet_ntoa(cliaddr.sin_addr), sizeof(pLink->m_LocalIP) - 1);
					pLink->m_LocalPort = ntohs(cliaddr.sin_port);
				}
				else
				{
					g_CommLog.Print("[提示信息] 客户端连接线程(%s:%u):连接失败", szHost, nPort);
				}
			}
		} 
		
		// 发送链路握手消息
		if (pLink->m_SocketID != INVALID_SOCKET)
		{
			now = time(NULL);
			if ((now - pLink->m_SendTime) >= 60)
			{
				pLink->m_SendTime = now;

				memset(LoginBuf, 0, sizeof(LoginBuf));
				*(unsigned long *)&LoginBuf[0] = htonl(16);
				*(unsigned long *)&LoginBuf[4] = htonl(COMM_AUTH);
				*(unsigned long *)&LoginBuf[8] = htonl(comm->GetModuleID());
				*(unsigned long *)&LoginBuf[12] = htonl(0);

				if (comm->Send(pLink->m_SocketID, LoginBuf, 16, 0) < 0)
				{
					if (pLink->m_ModuleID != 0)
					{
						memset(&packet, 0, sizeof(packet));
						packet.m_ModuleID = pLink->m_ModuleID;
						packet.m_BodyLen = 0;

						comm->OnClose(&packet);
					}
					comm->Lock();
					pLink->Close();
					comm->UnLock();

					FD_ZERO(&newrset);
					nrecv = 0;

					g_CommLog.Print("[提示信息] 客户端连接线程(%s:%u):断开连接,发送握手消息失败", szHost, nPort);
				}
			}
		}


		// 未连接上则睡眠后准备重连
		if (pLink->m_SocketID == INVALID_SOCKET)
		{
#ifdef _WIN32
			Sleep(1000);
#else
			sleep(1);
#endif
			continue;
		}
		
		// 事件集合
		rset = newrset;

		// 等待数据到来
		memset(&timeout, 0, sizeof(timeout));
		timeout.tv_sec = 5;
		timeout.tv_usec = 0;
		
		nready = select(pLink->m_SocketID + 1, &rset, NULL, NULL, &timeout);
		if (nready > 0) 
		{
			if (pLink->m_SocketID != INVALID_SOCKET && FD_ISSET(pLink->m_SocketID, &rset))
			{
				if (nrecv < 0 || nrecv >= COMM_PDU_RECV)
				{
					nrecv = 0;

					g_CommLog.Print("[提示信息] 客户端连接线程(%s:%u):当前残留数据总量异常:%d", szHost, nPort, nrecv);
				}
				nread = recv(pLink->m_SocketID, buf + nrecv, COMM_PDU_RECV - nrecv, 0);
				if (nread > 0)
				{
					now = time(NULL);
					pLink->m_RecvTime = now;

					nrecv += nread;
					if (nrecv <= 0 || nrecv > COMM_PDU_RECV)
					{
						g_CommLog.Print("[提示信息] 客户端连接线程(%s:%u):当前接收后数据总量异常:%d", szHost, nPort, nrecv);

						nrecv = 0;
					}
					else if (nrecv >= 16)
					{
						npos = 0;
						while ((npos + 16) <= nrecv)
						{
							nlen = (int)ntohl(*(unsigned long *)&buf[npos]);
							if (nlen < 16 || nlen > COMM_PDU_SIZE)
							{
								g_CommLog.Print("[提示信息] 客户端连接线程(%s:%u):消息包的长度异常:%d", szHost, nPort, nlen);

								npos = 0;
								nrecv = 0;
								break;
							}
							else if ((npos + nlen) > nrecv)
							{
								break;
							}
							
							msglen = (unsigned long)ntohl(*(unsigned long *)&buf[npos + 0]);
							msgtype = (unsigned long)ntohl(*(unsigned long *)&buf[npos + 4]);
							sender = (unsigned long)ntohl(*(unsigned long *)&buf[npos + 8]);
							receiver = (unsigned long)ntohl(*(unsigned long *)&buf[npos + 12]);
							memcpy(bodybuf, &buf[npos + 16], msglen - 16);

							if (sender != 0)
							{
								if (msgtype == COMM_AUTH)
								{	
									if (pLink->m_ModuleID != sender)
									{
										if (comm->ExistID(sender) == COMM_ERR_NONE)
										{
											if (pLink->m_ModuleID != 0)
											{
												memset(&packet, 0, sizeof(packet));
												packet.m_ModuleID = pLink->m_ModuleID;
												packet.m_BodyLen = 0;

												comm->OnClose(&packet);
											}

											comm->Lock();
											pLink->Close();
											comm->UnLock();

											FD_ZERO(&newrset);
											nrecv = 0;

											g_CommLog.Print("[提示信息] 客户端连接线程(%s:%u):模块冲突:%u", szHost, nPort, sender);
										}
										else
										{
											if (pLink->m_ModuleID != 0)
											{
												memset(&packet, 0, sizeof(packet));
												packet.m_ModuleID = pLink->m_ModuleID;
												packet.m_BodyLen = 0;

												comm->OnClose(&packet);
											}
											pLink->m_ModuleID = sender;
											
											memset(&packet, 0, sizeof(packet));
											packet.m_ModuleID = pLink->m_ModuleID;
											packet.m_BodyLen = 0;
										
											comm->OnConnect(&packet);
										}
									}
								}
								else if (msgtype == COMM_DATA)
								{
									if (comm->GetModuleID() == receiver)
									{
										if (pLink->m_ModuleID == sender)
										{
											memset(&packet, 0, sizeof(packet));
											packet.m_ModuleID = pLink->m_ModuleID;
											packet.m_BodyLen = msglen - 16;
											memcpy(packet.m_BodyBuf, bodybuf, msglen - 16);

											comm->OnReceive(&packet);
										}
										else
										{
											g_CommLog.Print("[提示信息] 客户端连接线程(%s:%u):发送模块有误:%u,%u", szHost, nPort, pLink->m_ModuleID, sender);
										}
									}
									else
									{
										g_CommLog.Print("[提示信息] 客户端连接线程(%s:%u):接收模块有误:%u,%u", szHost, nPort, comm->GetModuleID(), receiver);
									}
								}
								else
								{
									g_CommLog.Print("[提示信息] 客户端连接线程(%s:%u):未知消息类型(HEX: %08x)", szHost, nPort, msgtype);
								}
							}
							else
							{
								g_CommLog.Print("[提示信息] 客户端连接线程(%s:%u):发送者模块标识为零", szHost, nPort);
							}

							npos += nlen;
						}
						
						for (i = npos; i < nrecv; i++)
						{
							buf[i - npos] = buf[i];
						}
						nrecv -= npos;
					}
				}
				else if (nread == 0)
				{
					if (pLink->m_ModuleID != 0)
					{
						memset(&packet, 0, sizeof(packet));
						packet.m_ModuleID = pLink->m_ModuleID;
						packet.m_BodyLen = 0;

						comm->OnClose(&packet);
					}

					comm->Lock();
					pLink->Close();
					comm->UnLock();

					FD_ZERO(&newrset);
					nrecv = 0;

					g_CommLog.Print("[提示信息] 客户端连接线程(%s:%u):连接关闭", szHost, nPort);
				}
				else
				{
					if (pLink->m_ModuleID != 0)
					{
						memset(&packet, 0, sizeof(packet));
						packet.m_ModuleID = pLink->m_ModuleID;
						packet.m_BodyLen = 0;

						comm->OnClose(&packet);
					}

					comm->Lock();
					pLink->Close();
					comm->UnLock();

					FD_ZERO(&newrset);
					nrecv = 0;

					g_CommLog.Print("[提示信息] 客户端连接线程(%s:%u):读数据异常", szHost, nPort);
				}
			}
			else
			{
				if (pLink->m_ModuleID != 0)
				{
					memset(&packet, 0, sizeof(packet));
					packet.m_ModuleID = pLink->m_ModuleID;
					packet.m_BodyLen = 0;

					comm->OnClose(&packet);
				}

				comm->Lock();
				pLink->Close();
				comm->UnLock();

				FD_ZERO(&newrset);
				nrecv = 0;

				g_CommLog.Print("[提示信息] 客户端连接线程(%s:%u):等待正常但无信号", szHost, nPort);
			}
		}
		else if (nready == 0)
		{
			;
		}
		else
		{
			if (pLink->m_ModuleID != 0)
			{
				memset(&packet, 0, sizeof(packet));
				packet.m_ModuleID = pLink->m_ModuleID;
				packet.m_BodyLen = 0;

				comm->OnClose(&packet);
			}

			comm->Lock();
			pLink->Close();
			comm->UnLock();

			FD_ZERO(&newrset);
			nrecv = 0;

			g_CommLog.Print("[提示信息] 客户端连接线程(%s:%u):等待数据错误", szHost, nPort);
		}
	}

	comm->Lock();
	pLink->Release();
	comm->UnLock();

	g_CommLog.Print("[提示信息] 客户端连接线程(%s:%u):正常退出", szHost, nPort);
	comm->DelThreadCount();

	return 0;
}

// 服务器端连接线程
#ifdef _WIN32
DWORD ServerConnectThread(LPVOID lpParam)
#else
void *ServerConnectThread(void *lpParam)
#endif
{
#ifndef _WIN32
	pthread_detach(pthread_self());
	signal(SIGPIPE, SIG_IGN);
#endif

	char param[80];
	unsigned long nParam;
	TComm *comm;
	SOCKET consock;

	struct sockaddr_in cliaddr;
	int addrlen;
	fd_set rset, newrset;
	int nready, nread;
	struct timeval timeout;
	char buf[COMM_PDU_RECV];
	int nrecv, npos, nlen, i;

	unsigned long msglen;
	unsigned long msgtype;
	unsigned long sender;
	unsigned long receiver;
	char bodybuf[COMM_PDU_SIZE - 16];
	TPacket packet;

	char szLocalIP[64];
	unsigned short uLocalPort;
	char szForeignIP[64];
	unsigned short uForeignPort;

	time_t now;
	char LoginBuf[16];


	if (lpParam == NULL)
	{
		g_CommLog.Print("[系统错误] 服务端连接线程参数为空");
		return 0;
	}

	memset(param, 0, 80);
	memcpy(param, lpParam, 80);
	delete [] (char *)(lpParam);
	nParam = *(unsigned long *)&param[0];
	comm = (TComm *)nParam;
	nParam = *(unsigned long *)&param[4];
	consock = (SOCKET)nParam;

	if (comm == NULL)
	{
		g_CommLog.Print("[系统错误] 服务端连接线程接参数中包含的通信类指针为空");
		return 0;
	}

	if (consock == INVALID_SOCKET)
	{
		g_CommLog.Print("[系统错误] 服务端连接线程参数中包含的连接SOCKET无效");
		return 0;
	}

	TLink *pLink = NULL;

	pLink = comm->AssignLink();
	if (pLink == NULL)
	{
		closesocket(consock);
		g_CommLog.Print("[应用错误] 服务端连接线程未能分配到连接资源");
		return 0;
	}

	pLink->m_ConnectType = 2;
	comm->Lock();
	pLink->m_SocketID = consock;
	comm->UnLock();

	now = time(NULL);
	pLink->m_ConnectTime = now;
	pLink->m_RecvTime = now;
	pLink->m_SendTime = 0;

	FD_ZERO(&newrset);
	FD_SET(pLink->m_SocketID, &newrset);
	nrecv = 0;

	addrlen = sizeof(struct sockaddr);
	getpeername(pLink->m_SocketID, (struct sockaddr *)&cliaddr, (socklen_t *)&addrlen);
	
	strncpy(pLink->m_ForeignIP, inet_ntoa(cliaddr.sin_addr), sizeof(pLink->m_ForeignIP) - 1);
	pLink->m_ForeignPort = ntohs(cliaddr.sin_port);

	addrlen = sizeof(struct sockaddr);
	getsockname(pLink->m_SocketID, (struct sockaddr *)&cliaddr, (socklen_t *)&addrlen);
	
	strncpy(pLink->m_LocalIP, inet_ntoa(cliaddr.sin_addr), sizeof(pLink->m_LocalIP) - 1);
	pLink->m_LocalPort = ntohs(cliaddr.sin_port);

	memset(szLocalIP, 0, sizeof(szLocalIP));
	memset(szForeignIP, 0, sizeof(szForeignIP));
	strncpy(szLocalIP, pLink->m_LocalIP, sizeof(szLocalIP));
	strncpy(szForeignIP, pLink->m_ForeignIP, sizeof(szForeignIP));
	uLocalPort = pLink->m_LocalPort;
	uForeignPort = pLink->m_ForeignPort;

	comm->AddThreadCount();
	g_CommLog.Print("[提示信息] 服务端连接线程(%s:%u -> %s:%u):正常启动", szForeignIP, uForeignPort, szLocalIP, uLocalPort);

	while (comm->m_Stoped == 0)
	{
		// 断开长时间无数据的连接
		if (pLink->m_SocketID != INVALID_SOCKET)
		{
			now = time(NULL);
			if ((now - pLink->m_RecvTime) >= 300)
			{
				g_CommLog.Print("[提示信息] 服务端连接线程(%s:%u -> %s:%u):长时间无接收数据", szForeignIP, uForeignPort, szLocalIP, uLocalPort);
				break;
			}
		}

		// 断开长时间不注册的连接
		if (pLink->m_SocketID != INVALID_SOCKET)
		{
			if (pLink->m_ModuleID == 0)
			{
				now = time(NULL);
				if ((now - pLink->m_ConnectTime) >= 60)
				{
					g_CommLog.Print("[提示信息] 服务端连接线程(%s:%u -> %s:%u):长时间无注册", szForeignIP, uForeignPort, szLocalIP, uLocalPort);
					break;
				}
			}
		}

		// 发送链路握手消息
		if (pLink->m_SocketID != INVALID_SOCKET)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -