📄 dnpoller.cpp
字号:
/**********************************************************************
FileName : DnPoller.cpp
Description : 下端轮询模块
Version : 1.0
Date : 2003年9月19日
Author : 刘荣辉
Other :
***********************************************************************/
#include "GateWay.h"
//void * CGateWay::Dn_SentQ_Poller(void *)
//void * CGateWay::Dn_RecvQ_Poller(void *)
//==========================================================================
//===================下端已发送队列定时(轮询)线程函数=====================
//==========================================================================
//轮询下节点的已发送队列,确定重发,定时ActiveTest
void * CGateWay::Dn_SentQ_Poller(void * pGateway)
{
int RetCode, threadid;
char sSysEvent[SYS_EVENT_LEN]; //记录系统事件的字符串
int iCheckQ;
int i;
CGateWay *Gateway;
CDnNode *DnNode[MAX_DNNODE];
int Node_Num;
Gateway=(CGateWay *)pGateway;
threadid = pthread_self();
pthread_detach(threadid);
sprintf(sSysEvent,"LOG: Dn_SentQ_Poller[%d] is started.",threadid);
Gateway->WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG); //写系统日志
iCheckQ=0;
while(1)
{
if(!Gateway->ToExit)
sleep(1); //每秒钟检查一次队列,且作为ActiveTest的单位时间
else
usleep(200000); //等待上节点连接关闭
if (iCheckQ==CHECK_Q_TIME)
{
//获取所有的在线下节点
Node_Num=Gateway->Sock_CP_Map.GetAllNode((char **)DnNode,MAX_DNNODE);
if(!Node_Num) //无在线的下节点
{
sleep(1);
if(ToExit)
break;
continue;
}
for (i=0; i<Node_Num; i++)
{
DnNode[i]->DnSentQCheck(Gateway->MyDB,Gateway->Relay_Interval); //检查已发送队列,确定是否重发
//判断是否ActiveTest的时间已到
if(DnNode[i]->WaitTest >= DnNode[i]->TestTime && ToExit==0)
{
RetCode = DnNode[i]->PutActiveTest(); //要马上返回,不能阻塞
DnNode[i]->WaitTest = 0;
}
else
DnNode[i]->WaitTest += CHECK_Q_TIME;
}
iCheckQ=0;
}
else iCheckQ++;
}//while(1)
if(ToExit)
strcpy(sSysEvent,"LOG: Dn_SentQ_Poller of Gateway is ended! ");
else
strcpy(sSysEvent,"Error: Dn_SentQ_Poller of Gateway QUIT! ");
Gateway->WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG);
return (void *)NULL;
}
//==========================================================================
//====================下端接收队列轮询/调度处理线程=========================
//==========================================================================
void * CGateWay::Dn_RecvQ_Poller(void *pGateway)
{
int RetCode,threadid;
char sSysEvent[SYS_EVENT_LEN];
RecvQUnit *ReqUnit,*RspUnit;
int GotWork;
CGateWay *Gateway;
Gateway=(CGateWay *)pGateway;
threadid = pthread_self();
pthread_detach(threadid);
sprintf(sSysEvent,"LOG: Dn_RecvQ_Poller[%d] is started.",threadid);
Gateway->WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG); //写系统日志
int ThreadExit=0; //线程退出标志,TSD
pthread_setspecific(ThreadKey,(void *)&ThreadExit);
sigset_t MySigset;
sigfillset(&MySigset);
pthread_sigmask(SIG_BLOCK, &MySigset, NULL); //先阻塞所有信号
sigemptyset(&MySigset);
sigaddset(&MySigset, SIGUSR2); //仅仅开放SIGUSR2
pthread_sigmask(SIG_UNBLOCK, &MySigset, NULL); //开放信号
//sigset(SIGUSR2, CGateWay::SetTSD);
struct sigaction mySigact;
mySigact.sa_handler = CGateWay::SetTSD; //信号处理函数
mySigact.sa_mask = MySigset;
mySigact.sa_flags = 0;
sigaction(SIGUSR2, &mySigact, NULL);
Map_Pos Pos;
CDnNode *theDnNode;
Gateway->Sock_CP_Map.PosInit(&Pos);
GotWork = 0;
//sleep(1000);goto DnPollEnd;
while(1) //不断轮询各个下节点的接收队列
{
if(ThreadExit) //检查线程退出标志
goto DnPollEnd; //立刻退出
//获取下一个下节点的指针
theDnNode=NULL;
RetCode = Gateway->Sock_CP_Map.GetNext((char **)&theDnNode, &Pos);
if(theDnNode==NULL) //Map中已经没有一个元素了
{
//printf("\n Dn_RecvQ_Poller: NO logined DnNode now,RetCode=[%d]!\n",RetCode);
if (RetCode<0) //读到的是最后一个或者早已经为空,下次将从头开始取
{
if(!GotWork) //若本次轮询没有获得一个包
usleep(Gateway->RecvQ_Empty_Wait);
GotWork = 0;
}
continue;
}
//--------------请求包队列处理-----------------
if(!Gateway->ToExit)
{
ReqUnit = (RecvQUnit *)malloc(sizeof(RecvQUnit)); //以防下面多线程处理时的覆盖
if(theDnNode->RecvQ->Get(*ReqUnit)==false)
free(ReqUnit);
else
{
GotWork++; //本次轮询获得了数据包
#ifdef DEBUG
//printf("\n Dn_RecvQ_Poller: Got a req_pack from RecvQ of DnNode[%s].\n",theDnNode->ServiceCode);
#endif
//从线程池中取一空闲线程进行包的路由处理?????????
Gateway->DnCMPPDeal(ReqUnit, theDnNode);
}
}
if(ThreadExit) //检查线程退出标志
goto DnPollEnd; //立刻退出
//--------------应答包队列处理-----------------
RspUnit = (RecvQUnit *)malloc(sizeof(RecvQUnit)); //以防下面多线程处理时的覆盖
if(theDnNode->RspQ->Get(*RspUnit)==false)
free(RspUnit);
else
{
GotWork++; //本次轮询获得了数据包
#ifdef DEBUG
//printf("\n Dn_RecvQ_Poller: Got a rsp_pack from RecvQ of DnNode[%s].\n",theDnNode->ServiceCode);
#endif
//从线程池中取一空闲线程进行包的路由处理?????????
Gateway->DnCMPPDeal(RspUnit, theDnNode);
}
}//while(1)
DnPollEnd:
if(Gateway->ToExit)
strcpy(sSysEvent,"LOG: Dn_RecvQ_Poller of Gateway is ended! ");
else
strcpy(sSysEvent,"Error: Dn_RecvQ_Poller of Gateway is ended! ");
Gateway->WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG);
return (void *)NULL;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -