📄 uppoller.cpp
字号:
/********************************************************************** FileName : UpPoller.cpp Description : 上端轮询模块 Version : 1.0 Date : 2003年7月2日 Author : 刘荣辉 Other : ***********************************************************************/#include "GateWay.h"//void * CGateWay::Up_SentQ_Poller(void *)//void * CGateWay::Up_RecvQ_Poller(void *)//==========================================================================//===================上端已发送队列定时(轮询)线程函数=====================//==========================================================================//轮询上节点的已发送队列,确定重发,定时ActiveTestvoid * CGateWay::Up_SentQ_Poller(void * pGateway){ int threadid; char sSysEvent[SYS_EVENT_LEN]; //记录系统事件的字符串 int iCheckQ,iActive[UPNODE_NUM]; int i; CGateWay *Gateway; Gateway=(CGateWay *)pGateway; threadid = pthread_self(); pthread_detach(threadid); sprintf(sSysEvent,"LOG: Up_SentQ_Poller[%d] is started.",threadid); Gateway->WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG); //写系统日志 iCheckQ=0; for(i=0;i<UPNODE_NUM;i++) iActive[i]=0; #ifdef DEBUG sleep(2); Gateway->UpNode[0].PutActiveTest(); //printf("\n Up_SentQ_Poller: Have put an ActiveTest pack. \n"); #endif int AliveUpNode; while(1) { if(!Gateway->ToExit) sleep(1); //每秒钟检查一次队列,且作为ActiveTest的单位时间,勿轻易改,否则影响ActiveTest的时间!!! else usleep(200000); //等待上节点连接关闭 if(iCheckQ==CHECK_Q_TIME) { AliveUpNode=0; for(i=0;i<UPNODE_NUM;i++) //检查所有在线上节点的已发送队列,确定是否重发 { if(Gateway->UpNode[i].State==0) { #ifdef DEBUG int SendQSize = Gateway->UpNode[i].SendQ->GetLen(); int SentQSize = Gateway->UpNode[i].SentQ.GetSize(); if(SendQSize!=0 || SentQSize!=0) { printf("\n Up_SentQ_Poller: UpNode[%d]: SendQ=[%d], SentQ=[%d].\n",Gateway->UpNode[i].NodeID,SendQSize,SentQSize); } #endif Gateway->UpNode[i].UpSentQCheck(Gateway->MyDB,Gateway->Relay_Interval); AliveUpNode++; #ifdef DEBUG //printf("\n Up_SentQ_Poller: after UpSentQCheck.\n"); #endif } } if(AliveUpNode==0 && Gateway->ToExit) break; //退出线程 iCheckQ=0; } else iCheckQ++; if(!Gateway->ToExit) { for(i=0;i<UPNODE_NUM;i++) { //往所有在线上节点的发送队列中放ActiveTest包 if(iActive[i] >= Gateway->UpNode[i].ActiveTest_Time) { if(Gateway->UpNode[i].State==0) { #ifdef DEBUG //printf("\n Up_SentQ_Poller: before PutActiveTest, period=[%d].\n",Gateway->UpNode[i].ActiveTest_Time); #endif int Sequence; Sequence = Gateway->UpNode[i].PutActiveTest(); #ifdef DEBUG //printf("\n Up_SentQ_Poller: An ActiveTest Pack[%d] is put into SendQ.\n",Sequence); #endif } iActive[i]=0; } else iActive[i]++; } } }//while(1) if(Gateway->ToExit) strcpy(sSysEvent,"LOG: Up_SentQ_Poller of Gateway is ended! "); else strcpy(sSysEvent,"Error: Up_SentQ_Poller of Gateway QUIT! "); Gateway->WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG); return (void *)NULL;}//==========================================================================//=========================上端接收队列轮询/调度处理线程====================//==========================================================================void * CGateWay::Up_RecvQ_Poller(void *pGateway){ int NodeNum,threadid; char sSysEvent[SYS_EVENT_LEN]; RecvQUnit *ReqUnit, *RspUnit; int GotWork; CGateWay *Gateway; Gateway=(CGateWay *)pGateway; threadid = pthread_self(); pthread_detach(threadid); sprintf(sSysEvent,"LOG: Up_RecvQ_Poller[%d] is started.",threadid); Gateway->WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG); //写系统日志 int ThreadExit=0; //线程退出标志,TSD pthread_setspecific(ThreadKey,(void *)&ThreadExit); sigset_t MySigset; sigfillset(&MySigset); pthread_sigmask(SIG_BLOCK, &MySigset, NULL); //先阻塞所有信号 sigemptyset(&MySigset); sigaddset(&MySigset, SIGUSR2); //仅仅开放SIGUSR2 pthread_sigmask(SIG_UNBLOCK, &MySigset, NULL); //开放信号 struct sigaction mySigact; mySigact.sa_handler = CGateWay::SetTSD; //信号处理函数 mySigact.sa_mask = MySigset; mySigact.sa_flags = 0; sigaction(SIGUSR2, &mySigact, NULL);//sleep(1000);goto UpPollEnd; while(1) //不断轮询各个上节点的接收队列 { GotWork = 0; for(NodeNum=0; NodeNum<UPNODE_NUM; NodeNum++) //轮询各个上节点的接收队列 { if(ThreadExit) //检查线程退出标志 goto UpPollEnd; //立刻退出 //--------------请求包队列处理----------------- if(!Gateway->ToExit) { ReqUnit = (RecvQUnit *)malloc(sizeof(RecvQUnit)); //以防下面多线程处理时的覆盖 if(Gateway->UpNode[NodeNum].RecvQ->Get(*ReqUnit)) { GotWork++; //本次轮询获得了数据包 #ifdef DEBUG //printf("\n Up_RecvQ_Poller: Got a req_pack from RecvQ of UpNode.\n"); #endif //从线程池中取一空闲线程进行包的路由处理????????? Gateway->UpPackDeal(ReqUnit,NodeNum); } else //如果没有从队列中取到数据包 free(ReqUnit); } //--------------应答包队列处理----------------- RspUnit = (RecvQUnit *)malloc(sizeof(RecvQUnit)); //以防下面多线程处理时的覆盖 if(Gateway->UpNode[NodeNum].RspQ->Get(*RspUnit)) { GotWork++; //本次轮询获得了数据包 #ifdef DEBUG //printf("\n Up_RecvQ_Poller: Got a rsp_pack from RspQ of UpNode.\n"); #endif //从线程池中取一空闲线程进行包的路由处理????????? Gateway->UpPackDeal(RspUnit,NodeNum); } else //如果没有从队列中取到数据包 free(RspUnit); } if(ThreadExit) //检查线程退出标志 goto UpPollEnd; //立刻退出 if(!GotWork) //若本次轮询没有获得数据包 usleep(Gateway->RecvQ_Empty_Wait); }//while(1)UpPollEnd: if(Gateway->ToExit) strcpy(sSysEvent,"LOG: Up_RecvQ_Poller of Gateway is ended! "); else strcpy(sSysEvent,"Error: Up_RecvQ_Poller of Gateway is ended! "); Gateway->WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG); return (void *)NULL;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -