⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 assure.c

📁 自己开发基于P2P通讯的网络服务器
💻 C
字号:
/*********************************************************************
 * 模块名称:assure
 * 说明:确保UDP包可以准确发送到对方的模块
 * 其它说明: 
 * 作者: 刘青山 
 * 时间 : 2004-09-26 7:13:30 
*********************************************************************/
#include "assure.h"
#include "cmd_control.h"
#include "netcomm.h"
#include "memory.h"
#include "natsvr.h"
#include "log.h"
#include <time.h>

static pthread_mutex_t  qlock;
static LPP2P_USER qhead = NULL;
static LPP2P_USER qtail = NULL;

static unsigned long	TOTAL_GUYS = 0;
static unsigned long	TOTAL_CONTEXTS = 0;

LIST_HEAD(g_KeepAliveList);
pthread_mutex_t		g_mtKeepAlive ;
// 清除应答包内存
void ClearAnsPacket(LPANS_PACKET pAns)
{
	LogOut(KLOG_DEBUG, "ClearAnsPacket uin:%u cmd:%u seq:%u ack:%u",
		pAns->pHeader->dwSrc,
		pAns->pHeader->wCmd,
		pAns->pHeader->wSeq,
		pAns->pHeader->wAckseq
		 );
	if (pAns->pUser)
	{
		pthread_mutex_lock(&pAns->pUser->mtUser);
		if (pAns->pUser)
		{
			list_del(&pAns->listUser);
		}
		pthread_mutex_unlock(&pAns->pUser->mtUser);
	}
	free(pAns);
}

// 清除用户数据内存
void ClearUser(int iType, LPP2P_USER pUser)
{
	struct list_head *pos ;
	bool	bCanDel = false;
	
	pthread_mutex_lock(&pUser->mtUser);
	switch (iType)
	{
		case NOT_REF_COUNT:
			pUser->byStatus	= STATUS_DEAD; // The user don't keep alive any more.
			if (pUser->iRefCount <= 0)
			{
				bCanDel = true;
			}
			break;
		case REF_COUNT:
			pUser->iRefCount--;
			if (pUser->byStatus	== STATUS_DEAD && pUser->iRefCount <= 0)
			{
				bCanDel = true;
			}
			break;
		case FORCE_DEL:
			bCanDel = true;
			break;
		default:
			LogOut(KLOG_ERROR, "Recycle error, uin:%u", pUser->dwID);
			break;
	}

	if (!bCanDel)
	{
		pthread_mutex_unlock(&pUser->mtUser);
		return;
	}

	LIST_FOR_EACH(pos, &pUser->listAns)	// 清除应答队列
	{
		LPANS_PACKET  pAns = LIST_ENTRY(pos, ANS_PACKET, listUser);
		pAns->bSended = true;
		pAns->pUser   = NULL ;
	}
	pthread_mutex_unlock(&pUser->mtUser);
	ClearHashByUser(pUser);

	pthread_mutex_lock(&g_mtKeepAlive);
	list_del(&pUser->listItem);
	pthread_mutex_unlock(&g_mtKeepAlive);

	LogOut(KLOG_INFO, "Recycle user:%p type:%d uin:%u number:%u context:%u", 
		pUser, iType, pUser->dwID, TOTAL_GUYS, TOTAL_CONTEXTS);
	pthread_mutex_lock(&pUser->mtUser);

	pUser->dwID			= 0;		// KK ID
	pUser->dwLocalIP	= 0;		// 本地IP
	pUser->dwNatIP		= 0;		// NAT IP
	pUser->wLocalPort	= 0;		// 本地 PORT
	pUser->wNatPort		= 0;		// NAT PORT
	pUser->byStatus		= STATUS_DEAD;
	pUser->wRefCRC		= 0;
	memset(pUser->Window, 0, sizeof(pUser->Window));

    pUser->next = NULL;
	INIT_LIST_HEAD(&pUser->listAns) ;	// 应答队列需要初始化
	INIT_LIST_HEAD(&pUser->ipport);
	INIT_LIST_HEAD(&pUser->uin);
	INIT_LIST_HEAD(&pUser->listItem);

	pthread_mutex_unlock(&pUser->mtUser);

    pthread_mutex_lock(&qlock);
    if (qtail) {
        qtail->next = pUser;
    } else {
        qhead = pUser;
    }
    qtail = pUser;
 	TOTAL_GUYS--;
    pthread_mutex_unlock(&qlock);

}

LPP2P_USER GetUserContext()
{
    LPP2P_USER context = NULL;

    /* Grab a context off the queue */
    pthread_mutex_lock(&qlock);
    if (qhead) 
	{
        context = qhead;
        qhead   = qhead->next;
        if (!qhead)
            qtail = NULL;
    }
	TOTAL_GUYS++;
    pthread_mutex_unlock(&qlock);

    if (!context)
	{
		context = (LPP2P_USER)malloc(sizeof(P2P_USER));
		if (!context)
		{
			LogOut(KLOG_WARN, "GetUserContext system memory is not enough");
			return 0;
		}
		memset(context, 0, sizeof(P2P_USER));
		pthread_mutex_init(&context->mtUser, NULL);
		INIT_LIST_HEAD(&context->listAns) ;	// 应答队列需要初始化
		INIT_LIST_HEAD(&context->ipport);
		INIT_LIST_HEAD(&context->uin);
		INIT_LIST_HEAD(&context->listItem);
		TOTAL_CONTEXTS++;
	}
	LogOut(KLOG_INFO, "Get user context:%p numbers:%u context:%u", 
		context, TOTAL_GUYS, TOTAL_CONTEXTS);

	context->byStatus   = STATUS_INIT;
	context->iRefCount	= 0;
	context->wRefCRC	= 0;

	struct timeval	tv;
	time_t			now ;

	gettimeofday(&tv, NULL);
	now = tv.tv_sec ;
	context->dwExpire = now + KEEPALIVE_TIMEOUT;

	pthread_mutex_lock(&g_mtKeepAlive);
	list_add_tail(&context->listItem, &g_KeepAliveList);
	pthread_mutex_unlock(&g_mtKeepAlive);

	return context;
}

static void ExitCheck(void *pArg )
{
	struct list_head *pos, *head;
	LPANS_PACKET	lpAns;
	LPP2P_USER	lpUser = NULL;

	head = &g_KeepAliveList;
	pthread_mutex_lock(&g_mtKeepAlive);
	while ((pos = head->next) != head) 
	{
		lpUser = LIST_ENTRY(pos, P2P_USER, listItem);
		list_del(pos);
		free(lpUser);
	}
	pthread_mutex_unlock(&g_mtKeepAlive);

	pthread_mutex_lock(&qlock);
	while (qhead)
	{
		LPP2P_USER	lpContext = NULL;
		
		if (qhead) 
		{
			lpContext = qhead;
			qhead   = qhead->next;
			if (!qhead)
				qtail = NULL;
			TOTAL_CONTEXTS--;
			free(lpContext);
		}
	}
	pthread_mutex_unlock(&qlock);

	head = &g_quAnswer;
	LockGlobalAnswer();
	while ((pos = head->next) != head) 
	{
		lpAns = LIST_ENTRY(pos, ANS_PACKET, listGlobal);
		list_del(pos);
		free(lpAns);
	}
	UnlockGlobalAnswer();

	pthread_mutex_destroy(&qlock);
}

void *CheckAnswerQueue(void *pArg)
{
	struct list_head *pos;
	struct list_head *head = &g_quAnswer;

	struct timeval	tv;
	LPANS_PACKET	pAns;
	time_t			now ;

	pthread_mutex_init(&qlock, NULL);

	pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
    pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);

    pthread_cleanup_push(ExitCheck, (void *)pArg );

	while (!g_bFinished)
	{
		if (list_empty(head))
		{
			usleep(1000000);
			continue;
		}
		LockGlobalAnswer();
		if (list_empty(head))
		{
			UnlockGlobalAnswer();
			usleep( 1000000 );
			continue;
		}
		pos  = head->next ;
		pAns = LIST_ENTRY(pos, ANS_PACKET, listGlobal);
		list_del(pos);
		UnlockGlobalAnswer();
		
		gettimeofday(&tv, NULL);
		now = tv.tv_sec ;

		if (!pAns->dwExpire || pAns->dwExpire > now)
		{
			if (pAns->bSended || pAns->pUser==NULL)
			{
				ClearAnsPacket(pAns);
			}
			else
			{
				LockGlobalAnswer();
				list_add_tail(&pAns->listGlobal, &g_quAnswer);	
				UnlockGlobalAnswer();
			}
			continue;
		}

		if (pAns->bSended || pAns->pUser==NULL)
		{
			ClearAnsPacket(pAns);
			continue;
		}

		if (pAns->byTry < 3) 
		{
			// Resend packet
			LogOut(KLOG_INFO, "Resend uin:%u cmd:%u seq:%u ack:%u timeout:%u seconds",
				pAns->pUser->dwID,
				pAns->pHeader->wCmd,
				pAns->pHeader->wSeq,
				pAns->pHeader->wAckseq,
				TXN_TIME[pAns->byTry]);

			pAns->byTry++;

			// queue to tail
			LockGlobalAnswer();
			list_add_tail(&pAns->listGlobal,&g_quAnswer);	
			UnlockGlobalAnswer();

			SendPacket(pAns);
		} 
		else 
		{
			LPP2P_USER	lpUser = pAns->pUser;
			// Maximum attempts reached, delete it!
			LogOut(KLOG_INFO, "Delete uin:%u cmd:%u seq:%u ack:%u try %u times", 
				pAns->pUser->dwID,
				pAns->pHeader->wCmd,
				pAns->pHeader->wSeq,
				pAns->pHeader->wAckseq,
				pAns->byTry);
			ClearAnsPacket(pAns);
		}
		usleep(100);
	}
	pthread_cleanup_pop(0);

}

void *CheckKeepAlive(void *pArg)
{
	struct list_head *pos;
	struct list_head *head = &g_KeepAliveList;

	struct timeval	tv;
	time_t			now ;

	pthread_mutex_init(&g_mtKeepAlive, NULL);

	pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
    pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);

	while (!g_bFinished)
	{
		LPP2P_USER	lpUser;

		gettimeofday(&tv, NULL);
		now = tv.tv_sec ;

		lpUser = NULL;
		pthread_mutex_lock(&g_mtKeepAlive);
		if ((pos = head->next) != head) 
		{
			lpUser = LIST_ENTRY(pos, P2P_USER, listItem);
			// fprintf(stderr, "Checking user[%u]\n", lpUser->dwID);
			if (lpUser->dwExpire > now)
				lpUser = NULL;
		}
		pthread_mutex_unlock(&g_mtKeepAlive);
		
		if (lpUser) 
		{
			LogOut(KLOG_INFO, "Force delete uin:%u", lpUser->dwID);
			ClearUser(FORCE_DEL, lpUser);
			continue;
		}
		while (TOTAL_CONTEXTS - TOTAL_GUYS > 20)
		{
			LPP2P_USER	lpContext = NULL;

			if (!qhead) 
			{
				LogOut(KLOG_ERROR, "Error in free context:%u guy:%u",
					TOTAL_CONTEXTS, TOTAL_GUYS);
				break;
			}
			pthread_mutex_lock(&qlock);
			if (qhead) 
			{
				lpContext = qhead;
				qhead   = qhead->next;
				if (!qhead)
					qtail = NULL;
				TOTAL_CONTEXTS--;
				free(lpContext);
			}
			pthread_mutex_unlock(&qlock);
			
		}

		usleep(1000000);
	}

	pthread_mutex_destroy(&g_mtKeepAlive) ;

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -