📄 assure.c
字号:
/*********************************************************************
* 模块名称:assure
* 说明:确保UDP包可以准确发送到对方的模块
* 其它说明:
* 作者: 刘青山
* 时间 : 2004-09-26 7:13:30
*********************************************************************/
#include "assure.h"
#include "cmd_control.h"
#include "netcomm.h"
#include "memory.h"
#include "natsvr.h"
#include "log.h"
#include <time.h>
static pthread_mutex_t qlock;
static LPP2P_USER qhead = NULL;
static LPP2P_USER qtail = NULL;
static unsigned long TOTAL_GUYS = 0;
static unsigned long TOTAL_CONTEXTS = 0;
LIST_HEAD(g_KeepAliveList);
pthread_mutex_t g_mtKeepAlive ;
// 清除应答包内存
void ClearAnsPacket(LPANS_PACKET pAns)
{
LogOut(KLOG_DEBUG, "ClearAnsPacket uin:%u cmd:%u seq:%u ack:%u",
pAns->pHeader->dwSrc,
pAns->pHeader->wCmd,
pAns->pHeader->wSeq,
pAns->pHeader->wAckseq
);
if (pAns->pUser)
{
pthread_mutex_lock(&pAns->pUser->mtUser);
if (pAns->pUser)
{
list_del(&pAns->listUser);
}
pthread_mutex_unlock(&pAns->pUser->mtUser);
}
free(pAns);
}
// 清除用户数据内存
void ClearUser(int iType, LPP2P_USER pUser)
{
struct list_head *pos ;
bool bCanDel = false;
pthread_mutex_lock(&pUser->mtUser);
switch (iType)
{
case NOT_REF_COUNT:
pUser->byStatus = STATUS_DEAD; // The user don't keep alive any more.
if (pUser->iRefCount <= 0)
{
bCanDel = true;
}
break;
case REF_COUNT:
pUser->iRefCount--;
if (pUser->byStatus == STATUS_DEAD && pUser->iRefCount <= 0)
{
bCanDel = true;
}
break;
case FORCE_DEL:
bCanDel = true;
break;
default:
LogOut(KLOG_ERROR, "Recycle error, uin:%u", pUser->dwID);
break;
}
if (!bCanDel)
{
pthread_mutex_unlock(&pUser->mtUser);
return;
}
LIST_FOR_EACH(pos, &pUser->listAns) // 清除应答队列
{
LPANS_PACKET pAns = LIST_ENTRY(pos, ANS_PACKET, listUser);
pAns->bSended = true;
pAns->pUser = NULL ;
}
pthread_mutex_unlock(&pUser->mtUser);
ClearHashByUser(pUser);
pthread_mutex_lock(&g_mtKeepAlive);
list_del(&pUser->listItem);
pthread_mutex_unlock(&g_mtKeepAlive);
LogOut(KLOG_INFO, "Recycle user:%p type:%d uin:%u number:%u context:%u",
pUser, iType, pUser->dwID, TOTAL_GUYS, TOTAL_CONTEXTS);
pthread_mutex_lock(&pUser->mtUser);
pUser->dwID = 0; // KK ID
pUser->dwLocalIP = 0; // 本地IP
pUser->dwNatIP = 0; // NAT IP
pUser->wLocalPort = 0; // 本地 PORT
pUser->wNatPort = 0; // NAT PORT
pUser->byStatus = STATUS_DEAD;
pUser->wRefCRC = 0;
memset(pUser->Window, 0, sizeof(pUser->Window));
pUser->next = NULL;
INIT_LIST_HEAD(&pUser->listAns) ; // 应答队列需要初始化
INIT_LIST_HEAD(&pUser->ipport);
INIT_LIST_HEAD(&pUser->uin);
INIT_LIST_HEAD(&pUser->listItem);
pthread_mutex_unlock(&pUser->mtUser);
pthread_mutex_lock(&qlock);
if (qtail) {
qtail->next = pUser;
} else {
qhead = pUser;
}
qtail = pUser;
TOTAL_GUYS--;
pthread_mutex_unlock(&qlock);
}
LPP2P_USER GetUserContext()
{
LPP2P_USER context = NULL;
/* Grab a context off the queue */
pthread_mutex_lock(&qlock);
if (qhead)
{
context = qhead;
qhead = qhead->next;
if (!qhead)
qtail = NULL;
}
TOTAL_GUYS++;
pthread_mutex_unlock(&qlock);
if (!context)
{
context = (LPP2P_USER)malloc(sizeof(P2P_USER));
if (!context)
{
LogOut(KLOG_WARN, "GetUserContext system memory is not enough");
return 0;
}
memset(context, 0, sizeof(P2P_USER));
pthread_mutex_init(&context->mtUser, NULL);
INIT_LIST_HEAD(&context->listAns) ; // 应答队列需要初始化
INIT_LIST_HEAD(&context->ipport);
INIT_LIST_HEAD(&context->uin);
INIT_LIST_HEAD(&context->listItem);
TOTAL_CONTEXTS++;
}
LogOut(KLOG_INFO, "Get user context:%p numbers:%u context:%u",
context, TOTAL_GUYS, TOTAL_CONTEXTS);
context->byStatus = STATUS_INIT;
context->iRefCount = 0;
context->wRefCRC = 0;
struct timeval tv;
time_t now ;
gettimeofday(&tv, NULL);
now = tv.tv_sec ;
context->dwExpire = now + KEEPALIVE_TIMEOUT;
pthread_mutex_lock(&g_mtKeepAlive);
list_add_tail(&context->listItem, &g_KeepAliveList);
pthread_mutex_unlock(&g_mtKeepAlive);
return context;
}
static void ExitCheck(void *pArg )
{
struct list_head *pos, *head;
LPANS_PACKET lpAns;
LPP2P_USER lpUser = NULL;
head = &g_KeepAliveList;
pthread_mutex_lock(&g_mtKeepAlive);
while ((pos = head->next) != head)
{
lpUser = LIST_ENTRY(pos, P2P_USER, listItem);
list_del(pos);
free(lpUser);
}
pthread_mutex_unlock(&g_mtKeepAlive);
pthread_mutex_lock(&qlock);
while (qhead)
{
LPP2P_USER lpContext = NULL;
if (qhead)
{
lpContext = qhead;
qhead = qhead->next;
if (!qhead)
qtail = NULL;
TOTAL_CONTEXTS--;
free(lpContext);
}
}
pthread_mutex_unlock(&qlock);
head = &g_quAnswer;
LockGlobalAnswer();
while ((pos = head->next) != head)
{
lpAns = LIST_ENTRY(pos, ANS_PACKET, listGlobal);
list_del(pos);
free(lpAns);
}
UnlockGlobalAnswer();
pthread_mutex_destroy(&qlock);
}
void *CheckAnswerQueue(void *pArg)
{
struct list_head *pos;
struct list_head *head = &g_quAnswer;
struct timeval tv;
LPANS_PACKET pAns;
time_t now ;
pthread_mutex_init(&qlock, NULL);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
pthread_cleanup_push(ExitCheck, (void *)pArg );
while (!g_bFinished)
{
if (list_empty(head))
{
usleep(1000000);
continue;
}
LockGlobalAnswer();
if (list_empty(head))
{
UnlockGlobalAnswer();
usleep( 1000000 );
continue;
}
pos = head->next ;
pAns = LIST_ENTRY(pos, ANS_PACKET, listGlobal);
list_del(pos);
UnlockGlobalAnswer();
gettimeofday(&tv, NULL);
now = tv.tv_sec ;
if (!pAns->dwExpire || pAns->dwExpire > now)
{
if (pAns->bSended || pAns->pUser==NULL)
{
ClearAnsPacket(pAns);
}
else
{
LockGlobalAnswer();
list_add_tail(&pAns->listGlobal, &g_quAnswer);
UnlockGlobalAnswer();
}
continue;
}
if (pAns->bSended || pAns->pUser==NULL)
{
ClearAnsPacket(pAns);
continue;
}
if (pAns->byTry < 3)
{
// Resend packet
LogOut(KLOG_INFO, "Resend uin:%u cmd:%u seq:%u ack:%u timeout:%u seconds",
pAns->pUser->dwID,
pAns->pHeader->wCmd,
pAns->pHeader->wSeq,
pAns->pHeader->wAckseq,
TXN_TIME[pAns->byTry]);
pAns->byTry++;
// queue to tail
LockGlobalAnswer();
list_add_tail(&pAns->listGlobal,&g_quAnswer);
UnlockGlobalAnswer();
SendPacket(pAns);
}
else
{
LPP2P_USER lpUser = pAns->pUser;
// Maximum attempts reached, delete it!
LogOut(KLOG_INFO, "Delete uin:%u cmd:%u seq:%u ack:%u try %u times",
pAns->pUser->dwID,
pAns->pHeader->wCmd,
pAns->pHeader->wSeq,
pAns->pHeader->wAckseq,
pAns->byTry);
ClearAnsPacket(pAns);
}
usleep(100);
}
pthread_cleanup_pop(0);
}
void *CheckKeepAlive(void *pArg)
{
struct list_head *pos;
struct list_head *head = &g_KeepAliveList;
struct timeval tv;
time_t now ;
pthread_mutex_init(&g_mtKeepAlive, NULL);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
while (!g_bFinished)
{
LPP2P_USER lpUser;
gettimeofday(&tv, NULL);
now = tv.tv_sec ;
lpUser = NULL;
pthread_mutex_lock(&g_mtKeepAlive);
if ((pos = head->next) != head)
{
lpUser = LIST_ENTRY(pos, P2P_USER, listItem);
// fprintf(stderr, "Checking user[%u]\n", lpUser->dwID);
if (lpUser->dwExpire > now)
lpUser = NULL;
}
pthread_mutex_unlock(&g_mtKeepAlive);
if (lpUser)
{
LogOut(KLOG_INFO, "Force delete uin:%u", lpUser->dwID);
ClearUser(FORCE_DEL, lpUser);
continue;
}
while (TOTAL_CONTEXTS - TOTAL_GUYS > 20)
{
LPP2P_USER lpContext = NULL;
if (!qhead)
{
LogOut(KLOG_ERROR, "Error in free context:%u guy:%u",
TOTAL_CONTEXTS, TOTAL_GUYS);
break;
}
pthread_mutex_lock(&qlock);
if (qhead)
{
lpContext = qhead;
qhead = qhead->next;
if (!qhead)
qtail = NULL;
TOTAL_CONTEXTS--;
free(lpContext);
}
pthread_mutex_unlock(&qlock);
}
usleep(1000000);
}
pthread_mutex_destroy(&g_mtKeepAlive) ;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -