📄 netcomm.c
字号:
/*********************************************************************
* 模块名称:netcomm
* 说明:网络通讯模块,监听网络事件,接收请求数据包
* 其它说明:
* 作者: 刘青山
* 时间 : 2004-09-23 21:13:30
*********************************************************************/
#include <time.h>
#include "natsvr.h"
#include "netcomm.h"
#include "log.h"
#include "hash.h"
#include "memory.h"
#include "assure.h"
#include "cmd_control.h"
int g_iSocksvr = 0 ;
short TXN_TIME[]={4,6,12,24};
static int m_iListenPort = 8201;
static LIST_HEAD(m_lsReqQueue);
static pthread_mutex_t m_mtReq;
static sem_t m_semReq;
void InitNetQueue()
{
INIT_LIST_HEAD( &m_lsReqQueue );
pthread_mutex_init(&m_mtReq, NULL);
sem_init(&m_semReq, 0, 0);
}
static int Bind()
{
struct sockaddr_in addr;
if (g_iSocksvr > 0)
close( g_iSocksvr ) ;
g_iSocksvr = socket(AF_INET, SOCK_DGRAM, 0);
if (g_iSocksvr < 0)
{
LogOut(KLOG_ERROR, "socket() failed\n");
return 0;
}
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_ANY);;
addr.sin_port = htons(m_iListenPort);
BOOL bValue = true;
setsockopt(g_iSocksvr, SOL_SOCKET, SO_REUSEADDR, (char *)&bValue, sizeof(bValue));
if ( bind(g_iSocksvr, (struct sockaddr *) &addr, sizeof(addr)) < 0 )
{
LogOut(KLOG_ERROR, "bind(): Can not bind on %s:%d\n",
"localhost", m_iListenPort );
goto failed;
}
return 1 ;
failed:
if (g_iSocksvr >= 0)
close(g_iSocksvr);
return 0;
}
static int PutPacket( LPP2P_USER pUser, BYTE *pReceive, int n)
{
LPREQ_PACKET pReq ;
pReq = (LPREQ_PACKET)malloc(sizeof(REQ_PACKET)) ;
if (!pReq)
{
return 0 ;
}
memset(pReq, 0, sizeof(REQ_PACKET));
memcpy(pReq->buf, pReceive, n) ;
INIT_LIST_HEAD(&pReq->listItem);
pReq->pUser = pUser ;
pReq->pHeader = (LPP2P_HDR)pReq->buf ;
pReq->pData = pReq->buf + P2P_HDR_LEN ;
// Check if we can put packet into queue
// if fail, we return soon
if (pUser->dwID == pReq->pHeader->dwSrc && pUser->byStatus != STATUS_DEAD)
{
pthread_mutex_lock(&pUser->mtUser);
if (pUser->dwID == pReq->pHeader->dwSrc && pUser->byStatus != STATUS_DEAD)
{
pUser->iRefCount++;
}
else
{
pthread_mutex_unlock(&pUser->mtUser);
free(pReq);
return 0 ;
}
pthread_mutex_unlock(&pUser->mtUser);
}
else
{
LogOut(KLOG_INFO, "PutPacket fail, uin:%u src:%u Status:%u",
pUser->dwID,
pReq->pHeader->dwSrc,
pUser->byStatus);
free(pReq);
return 0 ;
}
pthread_mutex_lock(&m_mtReq);
list_add_tail(&pReq->listItem, &m_lsReqQueue);
pthread_mutex_unlock(&m_mtReq);
sem_post(&m_semReq);
return 1;
}
LPREQ_PACKET GetReqPacket()
{
LPREQ_PACKET pReq ;
struct list_head *pos ;
pReq = NULL ;
sem_wait(&m_semReq);
pthread_mutex_lock(&m_mtReq);
if (!list_empty(&m_lsReqQueue))
{
pos = m_lsReqQueue.next ;
pReq = LIST_ENTRY(pos, REQ_PACKET, listItem) ;
list_del(pos) ;
}
pthread_mutex_unlock(&m_mtReq);
return pReq ;
}
static int PacketDecode(LPP2P_HDR pHdr)
{
char buf[256];
int ret;
BYTE hmac[MAC_LEN];
int len = pHdr->wDatalen - P2P_HDR_LEN;
BYTE *p = (BYTE*)pHdr + P2P_HDR_LEN;
// Check signature
// clock_t start, finish;
// double duration;
// start = clock();
// Check signature
sprintf(buf, "%u-%s-%u-%u", pHdr->dwDst, SECRET_KEY, pHdr->wAckseq, pHdr->wSeq);
hmac_md5((unsigned char*)buf, strlen(buf),
(unsigned char*)SECRET_KEY, KEY_LEN, hmac);
// finish = clock();
// duration = (double)(finish - start) / CLOCKS_PER_SEC;
// LogOut(KLOG_INFO, "=======PacketDecode use %f seconds======\n", duration);
if ((ret = memcmp(pHdr->HMac, hmac, MAC_LEN)))
{
LogOut(KLOG_INFO, "uin:%u decode error, signature is not equal\n", pHdr->dwSrc);
return ret;
}
// check length of the packet
if (!len)
return 0;
// Check code
BYTE i = ((pHdr->wCheck >> 8) & 0xff);
if (i >= len)
{
LogOut(KLOG_INFO, "uin:%u decode error, exceed the range of data\n", pHdr->dwSrc);
return 1;
}
BYTE v = (pHdr->wCheck & 0xff);
if (p[i] != v)
{
LogOut(KLOG_INFO, "uin:%u decode error, check code is not equal\n", pHdr->dwSrc);
return 1;
}
// If need to decrypt, we do it
/*
switch (pHdr->byDesLevel)
{
case DL_NO_DES:
break;
case DL_DES_DATA:
{
int n = len;
while (n > 0) {
dedes((char *)p);
p += 8;
n -= 8;
}
p = (BYTE*)pHdr + P2P_HDR_LEN;
break;
}
default:
LogOut(KLOG_INFO, "uin:%u decode error, don't support decrypt level %d\n",
pHdr->dwSrc, pHdr->byDesLevel);
return 1;
}
*/
return 0;
}
static int DirectSendTo(LPANS_PACKET pAns, DWORD dwNatIP, WORD wNatPort)
{
struct sockaddr_in addr;
LogOut(KLOG_DEBUG, "DirectSendTo uin:%u cmd:%u seq:%u ack:%u",
pAns->pHeader->dwDst,
pAns->pHeader->wCmd,
pAns->pHeader->wSeq,
pAns->pHeader->wAckseq );
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = dwNatIP ;
addr.sin_port = wNatPort ;
PacketEncode(pAns->pHeader);
return sendto(g_iSocksvr, (char *)pAns->buf,pAns->pHeader->wDatalen,
0, (struct sockaddr *)&addr, sizeof(addr));
}
// 这里必须是单线程被调用,按照UDP的SELECT方法,应该是一个线程
static int OnReceive( )
{
LPP2P_HDR lpHeader ;
LPP2P_USER pUser;
static BYTE buf[P2P_DATA_LEN] ;
struct sockaddr_in addr;
socklen_t len = sizeof(addr);
int n = recvfrom(g_iSocksvr, buf, P2P_DATA_LEN, 0, (struct sockaddr *)&addr, &len);
DWORD ip = addr.sin_addr.s_addr;
WORD port = addr.sin_port;
if ( n < (int)P2P_HDR_LEN )
{
pUser = GetHashUser(ip,port) ;
if (pUser)
{
ClearUser(NOT_REF_COUNT, pUser);
}
LogOut(KLOG_INFO, "OnReceive(%s) illegal packet head,len:%d[%d]",inet_ntoa(addr.sin_addr), n, P2P_HDR_LEN );
MemDump( (BYTE*)buf,n );
return 0;
}
lpHeader = (LPP2P_HDR)buf ;
if (lpHeader->wCmd != 6 && lpHeader->wCmd != 7)
{
LogOut(KLOG_DEBUG, "Receive from[%s:%d] uin:%u Cmd:%u",
inet_ntoa(addr.sin_addr), ntohs(port), lpHeader->dwSrc, lpHeader->wCmd);
}
// 版本是否对
if (lpHeader->byVer != NAT_VER ||
!(lpHeader->wCmd > CMD_START && lpHeader->wCmd< CMD_END) ||
PacketDecode(lpHeader))
{
LogOut(KLOG_INFO, "illegal packet, uin:%u cmd:%u\n",lpHeader->dwSrc, lpHeader->wCmd );
return 0;
}
pUser = GetHashUser(ip, port) ;
// pUser = GetHashUserByID(lpHeader->dwSrc);
if (lpHeader->wDatalen > P2P_DATA_LEN)
{
LogOut(KLOG_WARN, "User data length:%d uin:%u error", lpHeader->wDatalen, lpHeader->dwSrc);
return 0 ;
}
if (lpHeader->wCmd == CMD_REG_USER)
{
if (!pUser)// 注册新用户,在这里处理
{
LOG_IN:
pUser = GetUserContext();
if (!pUser)
{
LogOut(KLOG_WARN, "OnReceive() system memory is not enough" );
return 0;
}
BYTE *p = buf ;
p += P2P_HDR_LEN ;
pUser->byStatus = STATUS_AUTHING;
pUser->dwID = lpHeader->dwSrc ;
pUser->dwNatIP = ip ;
pUser->wNatPort = port ;
pUser->dwLocalIP= *(DWORD*)p ;
p+= 4 ;
pUser->wLocalPort= *(WORD*)p ;
#ifdef _DEBUG
DWORD dwIP = pUser->dwLocalIP;
WORD wPort= ntohs(pUser->wLocalPort);
LogOut(KLOG_DEBUG, "Reg uin:%u NAT[%s:%d][%u:%u] LOCAL[%u.%u.%u.%u:%u]",
lpHeader->dwSrc,
inet_ntoa(addr.sin_addr),ntohs(port),ip,port,
NIPQUAD(dwIP),wPort );
#endif
PutHashUser(pUser, ip, port);
if (!PutPacket(pUser, buf, n))
ClearUser(NOT_REF_COUNT, pUser);
}
else if (pUser->dwID == lpHeader->dwSrc)
{
if (pUser->byStatus == STATUS_AUTHING || pUser->byStatus == STATUS_ALIVE)
{
LPANS_PACKET lpAns;
lpAns = (LPANS_PACKET)malloc(sizeof(ANS_PACKET));
if (!lpAns)
{
LogOut(KLOG_ERROR, "function(CreateAckPacket) can't alloc memory");
return 0;
}
memset(lpAns, 0, sizeof(ANS_PACKET));
memcpy(lpAns->buf, lpHeader ,P2P_HDR_LEN);
lpAns->pHeader = (LPP2P_HDR)lpAns->buf ;
lpAns->pHeader->wDatalen = P2P_HDR_LEN ;
lpAns->pHeader->dwSrc = 0;
lpAns->pHeader->dwDst = lpHeader->dwSrc ;
lpAns->pHeader->wCmd = CMD_ACK;
DirectSendTo(lpAns, ip, port);
free(lpAns);
}
else if (pUser->byStatus == STATUS_DEAD)
{
LogOut(KLOG_ERROR, "uin:%u has died, we relogin", lpHeader->dwSrc);
goto LOG_IN;
}
}
else if (pUser->dwID != lpHeader->dwSrc)
{
ClearUser(NOT_REF_COUNT, pUser);
goto LOG_IN;
}
}
else if (pUser)
{
PutPacket(pUser, buf, n);
}
return 1;
}
unsigned char* PacketEncode(LPP2P_HDR pHdr)
{
char buf[256];
int len = pHdr->wDatalen - P2P_HDR_LEN;
char *p = (char*)pHdr + P2P_HDR_LEN;
// Set signature
sprintf(buf, "%u-%s-%u-%u", pHdr->dwDst, SECRET_KEY, pHdr->wAckseq, pHdr->wSeq);
hmac_md5((unsigned char*)buf, strlen(buf),
(unsigned char*)SECRET_KEY, KEY_LEN, pHdr->HMac);
// Encrypt data
/*
switch (pHdr->byDesLevel)
{
case DL_NO_DES:
break;
case DL_DES_DATA:
{
int n = len;
while (n > 0)
{
if (n < 8)
{
BYTE *cursor = (BYTE*)pHdr + pHdr->wDatalen;
int d = 8 - n;
memset(p + n, 0, d);
cursor += d;
}
endes(p);
p += 8;
n -= 8;
}
break;
}
default:
LogOut(KLOG_DEBUG, "Encode error, don't support decrypt level %d\n", pHdr->byDesLevel);
return NULL;
}
*/
// Generate check code
if (len <= 0)
return NULL;
BYTE i = (rand() & 0xff) % len;
BYTE v = p[i];
WORD cc = ((i << 8) | v);
pHdr->wCheck = cc;
return pHdr->HMac;
}
int SendPacket( LPANS_PACKET pAns )
{
struct sockaddr_in addr;
LPP2P_USER lpUser ;
struct timeval tv;
lpUser = pAns->pUser ;
if(!lpUser)
return 0 ;
//Not For KeepAlive Or Ack Packets
if (pAns->pHeader->wCmd != 6 && pAns->pHeader->wCmd != 7)
{
LogOut(KLOG_DEBUG, "SendPacket uin:%u cmd:%u seq:%u ack:%u",
pAns->pHeader->dwDst,
pAns->pHeader->wCmd,
pAns->pHeader->wSeq,
pAns->pHeader->wAckseq );
}
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = lpUser->dwNatIP ;
addr.sin_port = lpUser->wNatPort ;
gettimeofday(&tv, NULL);
pAns->dwExpire = tv.tv_sec + TXN_TIME[pAns->byTry];
PacketEncode(pAns->pHeader);
return sendto(g_iSocksvr, (char *)pAns->buf,pAns->pHeader->wDatalen,
0, (struct sockaddr *)&addr, sizeof(addr));
}
int NetRuning( )
{
int iRet = 0 ;
while(!g_bFinished)
{
int maxfd ;
int n;
struct timeval to;
fd_set readfds;
fd_set writefds;
iRet = Bind() ;
if (!iRet)
{
LogOut(KLOG_ERROR, "can't binding port[%d]!",m_iListenPort);
usleep(1000000);
continue;
}
maxfd = g_iSocksvr;
while(!g_bFinished)
{
FD_ZERO(&readfds);
FD_ZERO(&writefds);
FD_SET(g_iSocksvr, &readfds);
to.tv_usec = 0;
to.tv_sec = 1;
n = select(maxfd + 1, &readfds, &writefds, NULL, &to);
if (n > 0)
{
if (FD_ISSET(g_iSocksvr, &readfds))
{
OnReceive( ) ;
}
}
else if (n < 0)
{
if (errno == EINTR)
continue;
LogOut(KLOG_ERROR, "select() failed\n");
break;
}
}
}
for(iRet=0 ; iRet< g_iWorkThreads; iRet++)
sem_post(&m_semReq);
sem_destroy(&m_semReq);
pthread_mutex_destroy(&m_mtReq) ;
LogOut(KLOG_WARN, "nat service halt\n");
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -