⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 netcomm.c

📁 自己开发基于P2P通讯的网络服务器
💻 C
字号:
/*********************************************************************
 * 模块名称:netcomm
 * 说明:网络通讯模块,监听网络事件,接收请求数据包
 * 其它说明: 
 * 作者: 刘青山 
 * 时间 : 2004-09-23 21:13:30 
*********************************************************************/
#include <time.h>
#include "natsvr.h"
#include "netcomm.h"
#include "log.h"
#include "hash.h"
#include "memory.h"
#include "assure.h"
#include "cmd_control.h"

int g_iSocksvr = 0 ;
short TXN_TIME[]={4,6,12,24};

static int	m_iListenPort = 8201;
static LIST_HEAD(m_lsReqQueue);

static pthread_mutex_t		m_mtReq;
static sem_t				m_semReq;

void InitNetQueue()
{
	INIT_LIST_HEAD( &m_lsReqQueue );
	pthread_mutex_init(&m_mtReq, NULL);
	sem_init(&m_semReq, 0, 0);
}

static int Bind()
{
	struct sockaddr_in addr;
	
	if (g_iSocksvr > 0)
		close( g_iSocksvr ) ;
	g_iSocksvr = socket(AF_INET, SOCK_DGRAM, 0);
	if (g_iSocksvr < 0) 
	{
		LogOut(KLOG_ERROR, "socket() failed\n");
		return 0;
	}

	memset(&addr, 0, sizeof(addr));
	addr.sin_family = AF_INET;
	addr.sin_addr.s_addr = htonl(INADDR_ANY);;
	addr.sin_port = htons(m_iListenPort);

	BOOL bValue = true;
	setsockopt(g_iSocksvr, SOL_SOCKET, SO_REUSEADDR, (char *)&bValue, sizeof(bValue));
	if ( bind(g_iSocksvr, (struct sockaddr *) &addr, sizeof(addr)) < 0 ) 
	{
		LogOut(KLOG_ERROR, "bind(): Can not bind on %s:%d\n",
			"localhost", m_iListenPort );
		goto failed;
	}

	return 1 ;

failed:
	if (g_iSocksvr >= 0)
		close(g_iSocksvr);
	return 0;
}

static int PutPacket( LPP2P_USER pUser, BYTE *pReceive, int n)
{
	LPREQ_PACKET	pReq ;
	
	pReq = (LPREQ_PACKET)malloc(sizeof(REQ_PACKET)) ;
	if (!pReq)
	{
		return 0 ;
	}
	memset(pReq, 0, sizeof(REQ_PACKET));
	memcpy(pReq->buf, pReceive, n) ;
	INIT_LIST_HEAD(&pReq->listItem);

	pReq->pUser	  = pUser ;
	pReq->pHeader = (LPP2P_HDR)pReq->buf ;
	pReq->pData   = pReq->buf + P2P_HDR_LEN ;

	// Check if we can put packet into queue
	// if fail, we return soon
	if (pUser->dwID == pReq->pHeader->dwSrc && pUser->byStatus != STATUS_DEAD)
	{
		pthread_mutex_lock(&pUser->mtUser);
		if (pUser->dwID == pReq->pHeader->dwSrc && pUser->byStatus != STATUS_DEAD)
		{
			pUser->iRefCount++;
		}
		else
		{
			pthread_mutex_unlock(&pUser->mtUser);
			free(pReq);
			return 0 ;
		}
		pthread_mutex_unlock(&pUser->mtUser);
	}
	else
	{
		LogOut(KLOG_INFO, "PutPacket fail, uin:%u src:%u Status:%u", 
			pUser->dwID, 
			pReq->pHeader->dwSrc, 
			pUser->byStatus);
		free(pReq);
		return 0 ;
	}

	pthread_mutex_lock(&m_mtReq);
	list_add_tail(&pReq->listItem, &m_lsReqQueue);
	pthread_mutex_unlock(&m_mtReq);

	sem_post(&m_semReq);

	return 1; 
}

LPREQ_PACKET GetReqPacket() 
{
	LPREQ_PACKET		pReq ;
	struct list_head	*pos ;

	pReq = NULL ;

	sem_wait(&m_semReq);

	pthread_mutex_lock(&m_mtReq);
	if (!list_empty(&m_lsReqQueue))
	{
		pos = m_lsReqQueue.next ;
		pReq = LIST_ENTRY(pos, REQ_PACKET, listItem) ;
		list_del(pos) ;		
	}
	pthread_mutex_unlock(&m_mtReq);

	return pReq ;
}


static int PacketDecode(LPP2P_HDR pHdr)
{
	char buf[256];
	int	 ret;
	BYTE hmac[MAC_LEN];
	int	 len = pHdr->wDatalen - P2P_HDR_LEN;
	BYTE *p  = (BYTE*)pHdr + P2P_HDR_LEN;

	// Check signature
//	clock_t start, finish;
//	double  duration;
//	start = clock();
	
	// Check signature
	sprintf(buf, "%u-%s-%u-%u", pHdr->dwDst, SECRET_KEY, pHdr->wAckseq, pHdr->wSeq);
	hmac_md5((unsigned char*)buf, strlen(buf), 
		(unsigned char*)SECRET_KEY, KEY_LEN, hmac);
//	finish = clock();
//	duration = (double)(finish - start) / CLOCKS_PER_SEC;
//	LogOut(KLOG_INFO, "=======PacketDecode use %f seconds======\n", duration);
	if ((ret = memcmp(pHdr->HMac, hmac, MAC_LEN)))
	{
		LogOut(KLOG_INFO, "uin:%u decode error, signature is not equal\n", pHdr->dwSrc);
		return ret; 
	}
	// check length of the packet
	if (!len)
		return 0;

	// Check code
	BYTE i = ((pHdr->wCheck >> 8) & 0xff);
	if (i >= len)
	{
		LogOut(KLOG_INFO, "uin:%u decode error, exceed the range of data\n", pHdr->dwSrc);
		return 1;
	}
	BYTE v = (pHdr->wCheck & 0xff);
	if (p[i] != v)
	{
		LogOut(KLOG_INFO, "uin:%u decode error, check code is not equal\n", pHdr->dwSrc);
		return 1;
	}
	// If need to decrypt, we do it
/*
	switch (pHdr->byDesLevel)
	{
	case DL_NO_DES:
		break;
	case DL_DES_DATA:
	{
		int n = len;
		while (n > 0) {
			dedes((char *)p);
			p += 8;
			n -= 8;
		}
		p = (BYTE*)pHdr + P2P_HDR_LEN;
		break;
	}
	default:
		LogOut(KLOG_INFO, "uin:%u decode error, don't support decrypt level %d\n", 
		pHdr->dwSrc, pHdr->byDesLevel);
		return 1; 
	}
*/
	return 0;
}
static int DirectSendTo(LPANS_PACKET pAns, DWORD dwNatIP, WORD wNatPort) 
{
	struct	sockaddr_in addr;

	LogOut(KLOG_DEBUG, "DirectSendTo uin:%u cmd:%u seq:%u ack:%u",
		pAns->pHeader->dwDst,
		pAns->pHeader->wCmd,
		pAns->pHeader->wSeq,
		pAns->pHeader->wAckseq );

	addr.sin_family = AF_INET;
	addr.sin_addr.s_addr = dwNatIP ;
	addr.sin_port = wNatPort ;

	PacketEncode(pAns->pHeader);
	return sendto(g_iSocksvr, (char *)pAns->buf,pAns->pHeader->wDatalen, 
				  0, (struct sockaddr *)&addr, sizeof(addr));
}

// 这里必须是单线程被调用,按照UDP的SELECT方法,应该是一个线程
static int OnReceive( )
{
	LPP2P_HDR	lpHeader ;
	LPP2P_USER	pUser;
	static BYTE	buf[P2P_DATA_LEN] ; 
	struct sockaddr_in addr;

	socklen_t len = sizeof(addr);
	int n = recvfrom(g_iSocksvr, buf, P2P_DATA_LEN, 0, (struct sockaddr *)&addr, &len);

	DWORD	ip  = addr.sin_addr.s_addr;
	WORD	port = addr.sin_port;
	if ( n < (int)P2P_HDR_LEN )
	{
		pUser = GetHashUser(ip,port) ;
		if (pUser)
		{
			ClearUser(NOT_REF_COUNT, pUser);
		}
		LogOut(KLOG_INFO, "OnReceive(%s) illegal packet head,len:%d[%d]",inet_ntoa(addr.sin_addr), n, P2P_HDR_LEN );
		MemDump( (BYTE*)buf,n );
		return 0;
	}

	lpHeader = (LPP2P_HDR)buf ;
	if (lpHeader->wCmd != 6 && lpHeader->wCmd != 7)
	{
		LogOut(KLOG_DEBUG, "Receive from[%s:%d] uin:%u Cmd:%u", 
				inet_ntoa(addr.sin_addr), ntohs(port), lpHeader->dwSrc, lpHeader->wCmd);
	}

	// 版本是否对
	if (lpHeader->byVer != NAT_VER ||
		!(lpHeader->wCmd > CMD_START && lpHeader->wCmd< CMD_END) ||
		PacketDecode(lpHeader))
	{
		LogOut(KLOG_INFO, "illegal packet, uin:%u cmd:%u\n",lpHeader->dwSrc, lpHeader->wCmd );
		return 0;
	}
	pUser = GetHashUser(ip, port) ;
	// pUser = GetHashUserByID(lpHeader->dwSrc);
	if (lpHeader->wDatalen > P2P_DATA_LEN)
	{
		LogOut(KLOG_WARN, "User data length:%d uin:%u error", lpHeader->wDatalen, lpHeader->dwSrc);
		return 0 ;
	}

	if (lpHeader->wCmd == CMD_REG_USER)
	{
		if (!pUser)// 注册新用户,在这里处理
		{
LOG_IN:
			pUser = GetUserContext();
			if (!pUser)
			{
				LogOut(KLOG_WARN, "OnReceive() system memory is not enough" );
				return 0;
			}
			BYTE *p = buf ;
			p += P2P_HDR_LEN ;

			pUser->byStatus = STATUS_AUTHING;
			pUser->dwID     = lpHeader->dwSrc ;
			pUser->dwNatIP  = ip ;
			pUser->wNatPort = port ;
			pUser->dwLocalIP= *(DWORD*)p ; 
			p+= 4 ;
			pUser->wLocalPort= *(WORD*)p ;
			
	#ifdef _DEBUG
			DWORD	dwIP = pUser->dwLocalIP;
			WORD	wPort= ntohs(pUser->wLocalPort);

			LogOut(KLOG_DEBUG, "Reg uin:%u NAT[%s:%d][%u:%u] LOCAL[%u.%u.%u.%u:%u]",
				lpHeader->dwSrc,
				inet_ntoa(addr.sin_addr),ntohs(port),ip,port,
				NIPQUAD(dwIP),wPort );
	#endif
			PutHashUser(pUser, ip, port);
			if (!PutPacket(pUser, buf, n))
				ClearUser(NOT_REF_COUNT, pUser);
		}
		else if (pUser->dwID == lpHeader->dwSrc)
		{
			if (pUser->byStatus == STATUS_AUTHING || pUser->byStatus == STATUS_ALIVE)
			{
				LPANS_PACKET    lpAns;

				lpAns = (LPANS_PACKET)malloc(sizeof(ANS_PACKET));
				if (!lpAns)
				{
					LogOut(KLOG_ERROR, "function(CreateAckPacket) can't alloc memory");
					return 0;
				}
				memset(lpAns, 0, sizeof(ANS_PACKET));
				memcpy(lpAns->buf, lpHeader ,P2P_HDR_LEN);
				lpAns->pHeader = (LPP2P_HDR)lpAns->buf ;
				lpAns->pHeader->wDatalen = P2P_HDR_LEN ;
				lpAns->pHeader->dwSrc = 0;
				lpAns->pHeader->dwDst = lpHeader->dwSrc ;

				lpAns->pHeader->wCmd = CMD_ACK;

				DirectSendTo(lpAns, ip, port);
				free(lpAns);	
			}
			else if (pUser->byStatus == STATUS_DEAD)
			{
				LogOut(KLOG_ERROR, "uin:%u has died, we relogin", lpHeader->dwSrc);
				goto LOG_IN;
			}
		}
		else if (pUser->dwID != lpHeader->dwSrc)
		{
			ClearUser(NOT_REF_COUNT, pUser);
			goto LOG_IN;
		}
	}
	else if (pUser)
	{
		PutPacket(pUser, buf, n);
	}
	return 1; 
}

unsigned char* PacketEncode(LPP2P_HDR pHdr)
{
	char buf[256];
	int len = pHdr->wDatalen - P2P_HDR_LEN;
	char *p = (char*)pHdr + P2P_HDR_LEN;
	// Set signature
	sprintf(buf, "%u-%s-%u-%u", pHdr->dwDst, SECRET_KEY, pHdr->wAckseq, pHdr->wSeq);
	hmac_md5((unsigned char*)buf, strlen(buf), 
		(unsigned char*)SECRET_KEY, KEY_LEN, pHdr->HMac);

	// Encrypt data
/*
	switch (pHdr->byDesLevel)
	{
	case DL_NO_DES:
		break;
	case DL_DES_DATA:
	{
		int n = len;
		while (n > 0) 
		{
			if (n < 8) 
			{
				BYTE *cursor = (BYTE*)pHdr + pHdr->wDatalen;
				int d = 8 - n;
				memset(p + n, 0, d);
				cursor += d;
			}
			endes(p);
			p += 8;
			n -= 8;
		}
		break;
	}
	default:
		LogOut(KLOG_DEBUG, "Encode error, don't support decrypt level %d\n", pHdr->byDesLevel);
		return NULL; 
	}
*/
	// Generate check code
	if (len <= 0)
		return NULL;
	BYTE i = (rand() & 0xff) % len;
	BYTE v = p[i];
	WORD cc = ((i << 8) | v);
	pHdr->wCheck = cc;
	return pHdr->HMac;
}

int	SendPacket( LPANS_PACKET pAns ) 
{
	struct	sockaddr_in addr;
	LPP2P_USER	lpUser ;
	struct timeval	tv;

	lpUser = pAns->pUser ;
	if(!lpUser)
		return 0 ;

	//Not For KeepAlive Or Ack Packets
	if (pAns->pHeader->wCmd != 6 && pAns->pHeader->wCmd != 7)
	{
		LogOut(KLOG_DEBUG, "SendPacket uin:%u cmd:%u seq:%u ack:%u",
			pAns->pHeader->dwDst,
			pAns->pHeader->wCmd,
			pAns->pHeader->wSeq,
			pAns->pHeader->wAckseq );
	}	

	addr.sin_family = AF_INET;
	addr.sin_addr.s_addr = lpUser->dwNatIP ;
	addr.sin_port = lpUser->wNatPort ;

	gettimeofday(&tv, NULL);
	pAns->dwExpire = tv.tv_sec + TXN_TIME[pAns->byTry];

	PacketEncode(pAns->pHeader);
	return sendto(g_iSocksvr, (char *)pAns->buf,pAns->pHeader->wDatalen, 
				  0, (struct sockaddr *)&addr, sizeof(addr));
}

int NetRuning( ) 
{
	int iRet = 0 ;

	while(!g_bFinished)
	{
		int		maxfd ;
		int		n;
		struct	timeval to;
		fd_set	readfds;
		fd_set	writefds;
		
		iRet = Bind() ;
		if (!iRet)
		{
			LogOut(KLOG_ERROR, "can't binding port[%d]!",m_iListenPort);
			usleep(1000000);
			continue;
		}
		maxfd = g_iSocksvr;
		while(!g_bFinished)
		{
			FD_ZERO(&readfds);
			FD_ZERO(&writefds);

			FD_SET(g_iSocksvr, &readfds);
			
			to.tv_usec = 0;
			to.tv_sec = 1;

			n = select(maxfd + 1, &readfds, &writefds, NULL, &to);

			if (n > 0) 
			{
				if (FD_ISSET(g_iSocksvr, &readfds))
				{
					
					OnReceive( ) ;
				}
			} 
			else if (n < 0) 
			{
				if (errno == EINTR)
					continue;

				LogOut(KLOG_ERROR, "select() failed\n");
				break;
			}
		}
	}
	for(iRet=0 ; iRet< g_iWorkThreads; iRet++)
		sem_post(&m_semReq);

	sem_destroy(&m_semReq);
	pthread_mutex_destroy(&m_mtReq) ;
	LogOut(KLOG_WARN, "nat service halt\n");
}






⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -