📄 cmd_control.c
字号:
{
pthread_mutex_lock(&lpCRCUser->mtUser);
LPANS_PACKET lpCRCAns = CreateAnsPacket(pReq, lpCRCUser, m_wSeq++);
lpCRCAns->pHeader->dwDst = lpCRCUser->dwID;
lpCRCAns->pHeader->dwSrc = 0;
lpCRCAns->pHeader->wCmd = CMD_REQ_CRC;
// 填充CRC两端用户信息
WriteData(lpCRCAns, &dwSrcUser, 4);
WriteData(lpCRCAns, &dwDesUser, 4);
// 填充请求方信息
WriteData(lpCRCAns, &dwSrcUser, 4);
WriteData(lpCRCAns, &pReq->pUser->dwLocalIP, 4);
WriteData(lpCRCAns, &pReq->pUser->wLocalPort, 2);
WriteData(lpCRCAns, &pReq->pUser->dwNatIP , 4);
WriteData(lpCRCAns, &pReq->pUser->wNatPort, 2);
// 填充目的方信息
WriteData(lpCRCAns, &dwDesUser, 4);
WriteData(lpCRCAns, &lpDstUser->dwLocalIP, 4);
WriteData(lpCRCAns, &lpDstUser->wLocalPort, 2);
WriteData(lpCRCAns, &lpDstUser->dwNatIP , 4);
WriteData(lpCRCAns, &lpDstUser->wNatPort, 2);
SendPacket(lpCRCAns);
LogOut(KLOG_INFO, "向CRC中转用户%d发送%d---%d", lpCRCUser->dwID, dwSrcUser, dwDesUser);
// 填充选中的CRC中转用户信息
WriteData(lpSrcAns, &lpCRCUser->dwID, 4);
WriteData(lpSrcAns, &lpCRCUser->dwLocalIP, 4);
WriteData(lpSrcAns, &lpCRCUser->wLocalPort, 2);
WriteData(lpSrcAns, &lpCRCUser->dwNatIP, 4);
WriteData(lpSrcAns, &lpCRCUser->wNatPort, 2);
wUsersRet++;
pthread_mutex_unlock(&lpCRCUser->mtUser);
}
}
pthread_mutex_unlock(&g_mtKeepAlive);
LogOut(KLOG_INFO, "+++++++ Complete OnCRC %d++++++++", wUsersRet);//*/
// 重新写入返回用户数据量
memcpy(lpSrcAns->buf + P2P_HDR_LEN + 12, &wUsersRet, 2);
SendPacket(lpSrcAns);
// 填充发往目标用户的ACK_CRC包,并发送
memcpy(lpDesAns->pData, lpSrcAns->pData, lpSrcAns->pHeader->wDatalen - P2P_HDR_LEN);
lpDesAns->pHeader->wDatalen = lpSrcAns->pHeader->wDatalen;
SendPacket(lpDesAns);
}
// 中转用户已建立CRC连接
static void OnEstCRC(LPREQ_PACKET pReq)
{
LPP2P_USER lpUser;
LPANS_PACKET lpAns;
lpUser = GetHashUserByID(pReq->pHeader->dwSrc) ;
if (lpUser)
{
pthread_mutex_lock(&lpUser->mtUser);
lpUser->wRefCRC++;
pthread_mutex_unlock(&lpUser->mtUser);
}
lpAns = CreateAckPacket(pReq);
if (!lpAns)
{
LogOut(KLOG_ERROR, "Function(OnEstCRC) can't alloc memory");
return;
}
SendPacket(lpAns);
free(lpAns);
}
// 中转用户已释放CRC连接
static void OnRelCRC(LPREQ_PACKET pReq)
{
LPP2P_USER lpUser;
LPANS_PACKET lpAns;
lpUser = GetHashUserByID(pReq->pHeader->dwSrc) ;
if (lpUser)
{
pthread_mutex_lock(&lpUser->mtUser);
lpUser->wRefCRC = max(0, lpUser->wRefCRC - 1);
pthread_mutex_unlock(&lpUser->mtUser);
}
lpAns = CreateAckPacket(pReq);
if (!lpAns)
{
LogOut(KLOG_ERROR, "Function(OnRelCRC) can't alloc memory");
return;
}
SendPacket(lpAns);
free(lpAns);
}
// 在服务器注销用户
static void OnUnregister(LPREQ_PACKET pReq)
{
LPP2P_USER lpUser ;
LPANS_PACKET lpAns ;
lpUser = GetHashUserByID(pReq->pHeader->dwSrc) ;
if (lpUser) // 注销用户
{
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = lpUser->dwNatIP ;
addr.sin_port = lpUser->wNatPort ;
lpAns = CreateAckPacket(pReq);
if (!lpAns)
{
ClearUser(NOT_REF_COUNT, lpUser);
return;
}
ClearUser(NOT_REF_COUNT, lpUser);
// LogOut(KLOG_DEBUG, "SendPacket dst:%u cmd:%u seq:%u ack:%u",
// lpAns->pHeader->dwDst,lpAns->pHeader->wCmd,lpAns->pHeader->wSeq,lpAns->pHeader->wAckseq );
PacketEncode(lpAns->pHeader);
sendto(g_iSocksvr, (char *)lpAns->buf, lpAns->pHeader->wDatalen,
0, (struct sockaddr*)&addr, sizeof(addr));
free(lpAns);
}
}
// 客户端回应报文
static void OnAck(LPREQ_PACKET pReq)
{
LPP2P_USER lpUser ;
struct list_head *pos ;
lpUser = pReq->pUser;
if (!lpUser)
return;
pthread_mutex_lock(&pReq->pUser->mtUser);
LIST_FOR_EACH(pos, &lpUser->listAns) // 检查应答队列
{
LPANS_PACKET lpAns = LIST_ENTRY(pos, ANS_PACKET, listUser);
if (lpAns->pHeader->wAckseq == pReq->pHeader->wAckseq)
{
LogOut(KLOG_DEBUG, "OnAck uin:%u cmd:%u seq:%u ack:%u ",
lpAns->pHeader->dwDst,
lpAns->pHeader->wCmd,
lpAns->pHeader->wSeq,
lpAns->pHeader->wAckseq
);
lpAns->bSended = true;
break;
}
}
pthread_mutex_unlock(&pReq->pUser->mtUser);
}
// 响应保持连接,不需要对方回应
static void OnKeepAlive(LPREQ_PACKET pReq)
{
LPANS_PACKET lpAns ;
LPP2P_USER lpUser ;
struct timeval tv;
time_t now ;
gettimeofday(&tv, NULL);
now = tv.tv_sec ;
lpUser = pReq->pUser;
pthread_mutex_lock(&g_mtKeepAlive);
list_del(&lpUser->listItem);
lpUser->dwExpire = now + KEEPALIVE_TIMEOUT;
list_add_tail(&lpUser->listItem, &g_KeepAliveList);
pthread_mutex_unlock(&g_mtKeepAlive);
lpAns = CreateAckPacket(pReq);
if (!lpAns)
{
LogOut(KLOG_ERROR, "function(OnKeepAlive) can't alloc memory");
return ;
}
SendPacket(lpAns);
free(lpAns);
}
// 普通数据包处理
static void OnNormalPacket(LPREQ_PACKET pReq)
{
LPANS_PACKET lpAns ;
lpAns = CreateAckPacket(pReq);
if (!lpAns)
{
LogOut(KLOG_ERROR, "function(OnNormalPacket) can't alloc memory");
return ;
}
SendPacket(lpAns);
free(lpAns);
}
// 转发数据包
static void OnSwitch(LPREQ_PACKET pReq)
{
LPP2P_USER lpUser ;
LPANS_PACKET lpAns,lpSwitch ;
lpAns = CreateAckPacket(pReq);
if (!lpAns)
{
LogOut(KLOG_ERROR, "function(OnSwitch) can't alloc memory");
return ;
}
lpUser = GetHashUserByID(pReq->pHeader->dwDst) ;
if (!lpUser) // 还没有注册过
{
LogOut(KLOG_ERROR, "uin:%u hadn't register", pReq->pHeader->dwDst);
WriteData(lpAns, (void*)&ANS_NOT_REG, 2);
SendPacket(lpAns);
free(lpAns);
return;
}
lpSwitch = CreateAnsPacket(pReq,lpUser,m_wSeq++);
if (lpSwitch)
{
memcpy(lpSwitch->pData, pReq->pData, pReq->pHeader->wDatalen - P2P_HDR_LEN) ;
lpSwitch->pHeader->wDatalen = pReq->pHeader->wDatalen;
SendPacket(lpSwitch);
WriteData(lpAns, (void*)&ANS_OK, 2);
}
else
{
LogOut(KLOG_ERROR, "Send to uin:%u fail", pReq->pHeader->dwDst);
WriteData(lpAns, (void*)&ANS_FAIL, 2);
}
SendPacket(lpAns);
free(lpAns);
}
bool SetWindow(WORD wSeq,LPP2P_USER pUser)
{
BYTE *byte;
BYTE mask;
pthread_mutex_lock(&pUser->mtUser);
byte = &pUser->Window[wSeq / 8];
mask = (1 << (wSeq % 8));
if ( wSeq == 0)
{
memset(pUser->Window, 0, sizeof(pUser->Window));
}
if ((*byte) & mask) {
pthread_mutex_unlock(&pUser->mtUser);
return false;
}
(*byte) |= mask;
pthread_mutex_unlock(&pUser->mtUser);
return true;
}
static void RunControl(void *pArg)
{
while(!g_bFinished)
{
LPREQ_PACKET pReq;
LPANS_PACKET lpAns;
pReq = GetReqPacket();
if (!pReq)
continue;
if (pReq->pUser->dwID != pReq->pHeader->dwSrc)
{
// The request packet's owner must be release,
// so we can't process it
LogOut(KLOG_ERROR, "Packet's uin:%u must be release, ignore it", pReq->pUser->dwID);
free(pReq);
continue;
}
// pthread_mutex_lock(&pReq->pUser->mtUser);
if (pReq->pHeader->wCmd < CMD_END)
{
if (pReq->pHeader->wCmd != 6 && pReq->pHeader->wCmd != 7)
{
LogOut(KLOG_DEBUG, "process src:%u cmd:%u seq:%u ack:%u",
pReq->pHeader->dwSrc,
pReq->pHeader->wCmd,
pReq->pHeader->wSeq,
pReq->pHeader->wAckseq );
}
if (pReq->pHeader->wCmd != CMD_ACK)
{
if (!SetWindow(pReq->pHeader->wSeq,pReq->pUser) &&
pReq->pHeader->wCmd != CMD_REG_USER)
{
LogOut(KLOG_INFO, "receive duplicate packet src:%u cmd:%u seq:%d ack:%u",
pReq->pHeader->dwSrc,
pReq->pHeader->wCmd,
pReq->pHeader->wSeq,
pReq->pHeader->wAckseq );
lpAns = CreateAckPacket(pReq);
if (!lpAns)
{
// pthread_mutex_unlock(&pReq->pUser->mtUser);
ClearUser(REF_COUNT, pReq->pUser);
free(pReq) ;
continue;
}
SendPacket(lpAns);
free(lpAns);
// pthread_mutex_unlock(&pReq->pUser->mtUser);
ClearUser(REF_COUNT, pReq->pUser);
free(pReq) ;
continue;
}
switch (pReq->pHeader->wCmd)
{
case CMD_REG_USER:
OnRegUser(pReq);
break;
case CMD_REQ_USERINFO:
OnUserInfo(pReq);
break;
case CMD_REQ_HOLE:
OnHole(pReq);
break;
case CMD_SWITCH:
OnSwitch(pReq);
break;
case CMD_UNREGISTER:
OnUnregister(pReq);
break;
case CMD_KEEPALIVE:
OnKeepAlive(pReq);
break;
case CMD_NORMAL_PACKET:
OnNormalPacket(pReq);
break;
case CMD_REQ_CRC:
OnCRC(pReq);
break;
case CMD_EST_CRC:
OnEstCRC(pReq);
break;
case CMD_REL_CRC:
OnRelCRC(pReq);
break;
default:
LogOut(KLOG_ERROR, "Can't process %d command", pReq->pHeader->wCmd);
break;
}
}
else
{
OnAck(pReq);
}
}
// pthread_mutex_unlock(&pReq->pUser->mtUser);
ClearUser(REF_COUNT, pReq->pUser);
free(pReq);
}
}
static void ExitControl(void *pArg )
{
// 会被多次调用,不过没什么问题
pthread_mutex_destroy(&m_mtGlobalAnswer) ;
}
static void *WorkerThread(void *pArg)
{
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
pthread_cleanup_push(ExitControl, (void *)pArg );
// 核心处理函数
RunControl(pArg);
pthread_cleanup_pop(0);
}
void InitControl()
{
m_wSeq = (rand() & 0x3fff);
m_wAckSeq = (rand() & 0x3fff);
INIT_LIST_HEAD(&g_quAnswer);
pthread_mutex_init(&m_mtGlobalAnswer, NULL);
CreateThreads(CheckAnswerQueue, NULL, 1);
CreateThreads(WorkerThread, NULL, g_iWorkThreads);
CreateThreads(CheckKeepAlive, NULL, 1);
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -