⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cmd_control.c

📁 自己开发基于P2P通讯的网络服务器
💻 C
📖 第 1 页 / 共 2 页
字号:
		{	
			pthread_mutex_lock(&lpCRCUser->mtUser);
			LPANS_PACKET lpCRCAns = CreateAnsPacket(pReq, lpCRCUser, m_wSeq++);
			
			lpCRCAns->pHeader->dwDst = lpCRCUser->dwID;
			lpCRCAns->pHeader->dwSrc = 0;
			lpCRCAns->pHeader->wCmd = CMD_REQ_CRC;
			// 填充CRC两端用户信息
			WriteData(lpCRCAns, &dwSrcUser, 4); 
			WriteData(lpCRCAns, &dwDesUser, 4);
			// 填充请求方信息
			WriteData(lpCRCAns, &dwSrcUser, 4);
			WriteData(lpCRCAns, &pReq->pUser->dwLocalIP, 4); 
			WriteData(lpCRCAns, &pReq->pUser->wLocalPort, 2); 
			WriteData(lpCRCAns, &pReq->pUser->dwNatIP , 4); 
			WriteData(lpCRCAns, &pReq->pUser->wNatPort, 2);
			// 填充目的方信息
			WriteData(lpCRCAns, &dwDesUser, 4);
			WriteData(lpCRCAns, &lpDstUser->dwLocalIP, 4); 
			WriteData(lpCRCAns, &lpDstUser->wLocalPort, 2); 
			WriteData(lpCRCAns, &lpDstUser->dwNatIP , 4); 
			WriteData(lpCRCAns, &lpDstUser->wNatPort, 2);
			SendPacket(lpCRCAns);
			LogOut(KLOG_INFO, "向CRC中转用户%d发送%d---%d", lpCRCUser->dwID, dwSrcUser, dwDesUser);

			// 填充选中的CRC中转用户信息
			WriteData(lpSrcAns, &lpCRCUser->dwID, 4); 
			WriteData(lpSrcAns, &lpCRCUser->dwLocalIP, 4); 
			WriteData(lpSrcAns, &lpCRCUser->wLocalPort, 2); 
			WriteData(lpSrcAns, &lpCRCUser->dwNatIP, 4); 
			WriteData(lpSrcAns, &lpCRCUser->wNatPort, 2); 
			wUsersRet++;

			pthread_mutex_unlock(&lpCRCUser->mtUser);
		}
	}
	pthread_mutex_unlock(&g_mtKeepAlive);

	LogOut(KLOG_INFO, "+++++++ Complete OnCRC %d++++++++", wUsersRet);//*/
	// 重新写入返回用户数据量
	memcpy(lpSrcAns->buf + P2P_HDR_LEN + 12, &wUsersRet, 2);
	SendPacket(lpSrcAns);
	
	// 填充发往目标用户的ACK_CRC包,并发送
	memcpy(lpDesAns->pData, lpSrcAns->pData, lpSrcAns->pHeader->wDatalen - P2P_HDR_LEN);
	lpDesAns->pHeader->wDatalen = lpSrcAns->pHeader->wDatalen;
	SendPacket(lpDesAns);
}

// 中转用户已建立CRC连接
static void OnEstCRC(LPREQ_PACKET pReq)
{
	LPP2P_USER		lpUser;
	LPANS_PACKET	lpAns;

	lpUser = GetHashUserByID(pReq->pHeader->dwSrc) ;
	if (lpUser)	
	{
		pthread_mutex_lock(&lpUser->mtUser);
		lpUser->wRefCRC++;
		pthread_mutex_unlock(&lpUser->mtUser);
	}

	lpAns = CreateAckPacket(pReq);
	if (!lpAns)
	{
		LogOut(KLOG_ERROR, "Function(OnEstCRC) can't alloc memory");
		return;
	}
	SendPacket(lpAns);
	free(lpAns);
}

// 中转用户已释放CRC连接
static void OnRelCRC(LPREQ_PACKET pReq)
{
	LPP2P_USER		lpUser;
	LPANS_PACKET	lpAns;
	
	lpUser = GetHashUserByID(pReq->pHeader->dwSrc) ;
	if (lpUser)	
	{
		pthread_mutex_lock(&lpUser->mtUser);
		lpUser->wRefCRC = max(0, lpUser->wRefCRC - 1);
		pthread_mutex_unlock(&lpUser->mtUser);
	}

	lpAns = CreateAckPacket(pReq);
	if (!lpAns)
	{
		LogOut(KLOG_ERROR, "Function(OnRelCRC) can't alloc memory");
		return;
	}
	SendPacket(lpAns);
	free(lpAns);
}

// 在服务器注销用户
static void OnUnregister(LPREQ_PACKET pReq)
{
	LPP2P_USER		lpUser ;
	LPANS_PACKET	lpAns ;

	lpUser = GetHashUserByID(pReq->pHeader->dwSrc) ;
	if (lpUser)	// 注销用户
	{
		struct	sockaddr_in addr;

		addr.sin_family = AF_INET;
		addr.sin_addr.s_addr = lpUser->dwNatIP ;
		addr.sin_port = lpUser->wNatPort ;
		lpAns = CreateAckPacket(pReq);
		if (!lpAns)
		{
			ClearUser(NOT_REF_COUNT, lpUser);
			return;
		}
		ClearUser(NOT_REF_COUNT, lpUser);
//		LogOut(KLOG_DEBUG, "SendPacket dst:%u cmd:%u seq:%u ack:%u",
//			lpAns->pHeader->dwDst,lpAns->pHeader->wCmd,lpAns->pHeader->wSeq,lpAns->pHeader->wAckseq );

		PacketEncode(lpAns->pHeader);
		sendto(g_iSocksvr, (char *)lpAns->buf, lpAns->pHeader->wDatalen, 
					  0, (struct sockaddr*)&addr, sizeof(addr));

		free(lpAns);
	}
}
// 客户端回应报文
static void OnAck(LPREQ_PACKET pReq)
{
	LPP2P_USER		lpUser ;
	struct list_head *pos ;

	lpUser = pReq->pUser;
	if (!lpUser)
		return;
	pthread_mutex_lock(&pReq->pUser->mtUser);
	LIST_FOR_EACH(pos, &lpUser->listAns)	// 检查应答队列
	{
		LPANS_PACKET  lpAns = LIST_ENTRY(pos, ANS_PACKET, listUser);
		if (lpAns->pHeader->wAckseq == pReq->pHeader->wAckseq)
		{
			LogOut(KLOG_DEBUG, "OnAck uin:%u cmd:%u seq:%u ack:%u ",
				lpAns->pHeader->dwDst,
				lpAns->pHeader->wCmd,
				lpAns->pHeader->wSeq,
				lpAns->pHeader->wAckseq
				 );
			lpAns->bSended = true;
			break;
		}
	}
	pthread_mutex_unlock(&pReq->pUser->mtUser);
}
// 响应保持连接,不需要对方回应
static void OnKeepAlive(LPREQ_PACKET pReq)
{
	LPANS_PACKET	lpAns ;
	LPP2P_USER		lpUser ;
	struct timeval	tv;
	time_t			now ;

	gettimeofday(&tv, NULL);
	now = tv.tv_sec ;
	lpUser = pReq->pUser;

	pthread_mutex_lock(&g_mtKeepAlive);
	list_del(&lpUser->listItem);
	lpUser->dwExpire = now + KEEPALIVE_TIMEOUT;
	list_add_tail(&lpUser->listItem, &g_KeepAliveList);
	pthread_mutex_unlock(&g_mtKeepAlive);

	lpAns = CreateAckPacket(pReq);
	if (!lpAns)
	{
		LogOut(KLOG_ERROR, "function(OnKeepAlive) can't alloc memory");
		return ;
	}
	SendPacket(lpAns);
	free(lpAns);
}
// 普通数据包处理
static void OnNormalPacket(LPREQ_PACKET pReq)
{
	LPANS_PACKET	lpAns ;

	lpAns = CreateAckPacket(pReq);
	if (!lpAns)
	{
		LogOut(KLOG_ERROR, "function(OnNormalPacket) can't alloc memory");
		return ;
	}
	SendPacket(lpAns);
	free(lpAns);
}
// 转发数据包
static void OnSwitch(LPREQ_PACKET pReq)
{
	LPP2P_USER		lpUser ;
	LPANS_PACKET	lpAns,lpSwitch ;

	lpAns = CreateAckPacket(pReq);
	if (!lpAns)
	{
		LogOut(KLOG_ERROR, "function(OnSwitch) can't alloc memory");
		return ;
	}
	lpUser = GetHashUserByID(pReq->pHeader->dwDst) ;
	if (!lpUser)	// 还没有注册过
	{
		LogOut(KLOG_ERROR, "uin:%u hadn't register", pReq->pHeader->dwDst);
		WriteData(lpAns, (void*)&ANS_NOT_REG, 2); 
		SendPacket(lpAns);
		free(lpAns);
		return;
	}

	lpSwitch = CreateAnsPacket(pReq,lpUser,m_wSeq++);
	if (lpSwitch)
	{
		memcpy(lpSwitch->pData, pReq->pData, pReq->pHeader->wDatalen - P2P_HDR_LEN)  ;
		lpSwitch->pHeader->wDatalen = pReq->pHeader->wDatalen;
		SendPacket(lpSwitch);
		WriteData(lpAns, (void*)&ANS_OK, 2); 
	}
	else
	{
		LogOut(KLOG_ERROR, "Send to uin:%u fail", pReq->pHeader->dwDst);
		WriteData(lpAns, (void*)&ANS_FAIL, 2); 
	}

	SendPacket(lpAns);
	free(lpAns);
}
bool SetWindow(WORD wSeq,LPP2P_USER pUser)
{
	BYTE *byte;
	BYTE mask;

	pthread_mutex_lock(&pUser->mtUser);

	byte = &pUser->Window[wSeq / 8];
	mask = (1 << (wSeq % 8));

	if ( wSeq == 0)
	{
		memset(pUser->Window, 0, sizeof(pUser->Window));
	}
	if ((*byte) & mask) {
		pthread_mutex_unlock(&pUser->mtUser);
		return false;
	}
	(*byte) |= mask;

	pthread_mutex_unlock(&pUser->mtUser);
	return true;
}

static void RunControl(void *pArg)
{
	while(!g_bFinished)
	{
		LPREQ_PACKET	pReq;
		LPANS_PACKET	lpAns;

		pReq = GetReqPacket();
		if (!pReq)
			continue;
		if (pReq->pUser->dwID != pReq->pHeader->dwSrc)
		{
			// The request packet's owner must be release,
			// so we can't process it
			LogOut(KLOG_ERROR, "Packet's uin:%u must be release, ignore it", pReq->pUser->dwID);
			free(pReq);
			continue;
		}
//		pthread_mutex_lock(&pReq->pUser->mtUser);
		if (pReq->pHeader->wCmd < CMD_END)
		{
			if (pReq->pHeader->wCmd != 6 && pReq->pHeader->wCmd != 7)
			{
			LogOut(KLOG_DEBUG, "process src:%u cmd:%u seq:%u ack:%u",
				pReq->pHeader->dwSrc,
				pReq->pHeader->wCmd,
				pReq->pHeader->wSeq,
				pReq->pHeader->wAckseq );
			}

			if (pReq->pHeader->wCmd != CMD_ACK)
			{
				if (!SetWindow(pReq->pHeader->wSeq,pReq->pUser) && 
					pReq->pHeader->wCmd != CMD_REG_USER)
				{
					LogOut(KLOG_INFO, "receive duplicate packet src:%u cmd:%u seq:%d ack:%u",
						pReq->pHeader->dwSrc,
						pReq->pHeader->wCmd,
						pReq->pHeader->wSeq,
						pReq->pHeader->wAckseq );

					lpAns = CreateAckPacket(pReq);
					if (!lpAns)
					{
//						pthread_mutex_unlock(&pReq->pUser->mtUser);
						ClearUser(REF_COUNT, pReq->pUser);
						free(pReq) ;
						continue;
					}

					SendPacket(lpAns);
					free(lpAns);

//					pthread_mutex_unlock(&pReq->pUser->mtUser);
					ClearUser(REF_COUNT, pReq->pUser);
					free(pReq) ;
					continue;
				}
				switch (pReq->pHeader->wCmd)
				{
					case CMD_REG_USER:
						OnRegUser(pReq);
						break;
					case CMD_REQ_USERINFO:
						OnUserInfo(pReq);
						break;
					case CMD_REQ_HOLE:
						OnHole(pReq);
						break;
					case CMD_SWITCH:
						OnSwitch(pReq);
						break;
					case CMD_UNREGISTER:
						OnUnregister(pReq);
						break;
					case CMD_KEEPALIVE:
						OnKeepAlive(pReq);
						break;
					case CMD_NORMAL_PACKET:
						OnNormalPacket(pReq);
						break;
					case CMD_REQ_CRC:
						OnCRC(pReq);
						break;
					case CMD_EST_CRC:
						OnEstCRC(pReq);
						break;
					case CMD_REL_CRC:
						OnRelCRC(pReq);
						break;
					default:
						LogOut(KLOG_ERROR, "Can't process %d command", pReq->pHeader->wCmd);
						break;
				}
			}
			else
			{
				OnAck(pReq);
			}
		}
//		pthread_mutex_unlock(&pReq->pUser->mtUser);
		ClearUser(REF_COUNT, pReq->pUser);
		free(pReq);
	}
	
}

static void ExitControl(void *pArg )
{
	// 会被多次调用,不过没什么问题
	pthread_mutex_destroy(&m_mtGlobalAnswer) ;

}

static void *WorkerThread(void *pArg)
{
	pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
    pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);

    pthread_cleanup_push(ExitControl, (void *)pArg );
	
    //	核心处理函数
	RunControl(pArg);
	
    pthread_cleanup_pop(0);
}

void InitControl()
{
	m_wSeq	  = (rand() & 0x3fff);
	m_wAckSeq = (rand() & 0x3fff);

	INIT_LIST_HEAD(&g_quAnswer);

	pthread_mutex_init(&m_mtGlobalAnswer, NULL);

	CreateThreads(CheckAnswerQueue, NULL, 1);
	CreateThreads(WorkerThread, NULL, g_iWorkThreads);

	CreateThreads(CheckKeepAlive, NULL, 1);
}



⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -