⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 uppoller.cpp

📁 SMS gateway. SMS protocol for CHINA mobile, unicom, lingtong. Using mysql to exchange message.
💻 CPP
字号:
/**********************************************************************  FileName            : UpPoller.cpp  Description         : 上端轮询模块  Version             : 1.0  Date                : 2003年7月2日  Author              : 刘荣辉  Other               : ***********************************************************************/#include "GateWay.h"//void * CGateWay::Up_SentQ_Poller(void *)//void * CGateWay::Up_RecvQ_Poller(void *)//==========================================================================//===================上端已发送队列定时(轮询)线程函数=====================//==========================================================================//轮询上节点的已发送队列,确定重发,定时ActiveTestvoid * CGateWay::Up_SentQ_Poller(void * pGateway){  int threadid;  char sSysEvent[SYS_EVENT_LEN];	//记录系统事件的字符串  int iCheckQ,iActive[UPNODE_NUM];  int i;  CGateWay *Gateway;  Gateway=(CGateWay *)pGateway;  threadid = pthread_self();  pthread_detach(threadid);  sprintf(sSysEvent,"LOG: Up_SentQ_Poller[%d] is started.",threadid);  Gateway->WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG);    //写系统日志    iCheckQ=0;  for(i=0;i<UPNODE_NUM;i++)	  iActive[i]=0;  #ifdef DEBUG  sleep(2);  Gateway->UpNode[0].PutActiveTest();  //printf("\n Up_SentQ_Poller: Have put an ActiveTest pack. \n");  #endif    int AliveUpNode;  while(1)  {		if(!Gateway->ToExit)			sleep(1);	//每秒钟检查一次队列,且作为ActiveTest的单位时间,勿轻易改,否则影响ActiveTest的时间!!!		else			usleep(200000);	//等待上节点连接关闭		if(iCheckQ==CHECK_Q_TIME)		{			AliveUpNode=0;			for(i=0;i<UPNODE_NUM;i++)	//检查所有在线上节点的已发送队列,确定是否重发			{				if(Gateway->UpNode[i].State==0)				{					#ifdef DEBUG					int SendQSize = Gateway->UpNode[i].SendQ->GetLen();					int SentQSize = Gateway->UpNode[i].SentQ.GetSize();					if(SendQSize!=0 || SentQSize!=0)					{						printf("\n Up_SentQ_Poller: UpNode[%d]: SendQ=[%d], SentQ=[%d].\n",Gateway->UpNode[i].NodeID,SendQSize,SentQSize);					}					#endif					Gateway->UpNode[i].UpSentQCheck(Gateway->MyDB,Gateway->Relay_Interval);					AliveUpNode++;					#ifdef DEBUG					//printf("\n Up_SentQ_Poller: after UpSentQCheck.\n");					#endif				}			}			if(AliveUpNode==0 && Gateway->ToExit)				break;	//退出线程			iCheckQ=0;		}		else iCheckQ++;		if(!Gateway->ToExit)		{			for(i=0;i<UPNODE_NUM;i++)			{				//往所有在线上节点的发送队列中放ActiveTest包				if(iActive[i] >= Gateway->UpNode[i].ActiveTest_Time)				{					if(Gateway->UpNode[i].State==0)					{						#ifdef DEBUG						//printf("\n Up_SentQ_Poller: before PutActiveTest, period=[%d].\n",Gateway->UpNode[i].ActiveTest_Time);						#endif						int Sequence;						Sequence = Gateway->UpNode[i].PutActiveTest();						#ifdef DEBUG						//printf("\n Up_SentQ_Poller: An ActiveTest Pack[%d] is put into SendQ.\n",Sequence);						#endif					}					iActive[i]=0;				}				else iActive[i]++;			}		}  }//while(1)  if(Gateway->ToExit)	  strcpy(sSysEvent,"LOG: Up_SentQ_Poller of Gateway is ended! ");  else	  strcpy(sSysEvent,"Error: Up_SentQ_Poller of Gateway QUIT! ");  Gateway->WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG);  return (void *)NULL;}//==========================================================================//=========================上端接收队列轮询/调度处理线程====================//==========================================================================void * CGateWay::Up_RecvQ_Poller(void *pGateway){	int NodeNum,threadid;	char sSysEvent[SYS_EVENT_LEN];	RecvQUnit *ReqUnit, *RspUnit;	int GotWork;	CGateWay *Gateway;	Gateway=(CGateWay *)pGateway;	threadid = pthread_self();	pthread_detach(threadid);  	sprintf(sSysEvent,"LOG: Up_RecvQ_Poller[%d] is started.",threadid);	Gateway->WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG);    //写系统日志		int ThreadExit=0;	//线程退出标志,TSD	pthread_setspecific(ThreadKey,(void *)&ThreadExit);	sigset_t MySigset;	sigfillset(&MySigset);	pthread_sigmask(SIG_BLOCK, &MySigset, NULL);	//先阻塞所有信号	sigemptyset(&MySigset);	sigaddset(&MySigset, SIGUSR2);	//仅仅开放SIGUSR2	pthread_sigmask(SIG_UNBLOCK, &MySigset, NULL);	//开放信号	struct sigaction mySigact;	mySigact.sa_handler = CGateWay::SetTSD;	//信号处理函数    mySigact.sa_mask = MySigset;    mySigact.sa_flags = 0;    sigaction(SIGUSR2, &mySigact, NULL);//sleep(1000);goto UpPollEnd;	while(1)	//不断轮询各个上节点的接收队列	{		GotWork = 0;		for(NodeNum=0; NodeNum<UPNODE_NUM; NodeNum++)	//轮询各个上节点的接收队列		{			if(ThreadExit)	//检查线程退出标志				goto UpPollEnd;	//立刻退出			//--------------请求包队列处理-----------------			if(!Gateway->ToExit)			{				ReqUnit = (RecvQUnit *)malloc(sizeof(RecvQUnit));	//以防下面多线程处理时的覆盖				if(Gateway->UpNode[NodeNum].RecvQ->Get(*ReqUnit))				{					GotWork++;	//本次轮询获得了数据包					#ifdef DEBUG					//printf("\n Up_RecvQ_Poller: Got a req_pack from RecvQ of UpNode.\n");					#endif					//从线程池中取一空闲线程进行包的路由处理?????????					Gateway->UpPackDeal(ReqUnit,NodeNum);				}				else	//如果没有从队列中取到数据包					free(ReqUnit);			}			//--------------应答包队列处理-----------------			RspUnit = (RecvQUnit *)malloc(sizeof(RecvQUnit));	//以防下面多线程处理时的覆盖			if(Gateway->UpNode[NodeNum].RspQ->Get(*RspUnit))			{				GotWork++;	//本次轮询获得了数据包				#ifdef DEBUG				//printf("\n Up_RecvQ_Poller: Got a rsp_pack from RspQ of UpNode.\n");				#endif				//从线程池中取一空闲线程进行包的路由处理?????????				Gateway->UpPackDeal(RspUnit,NodeNum);			}			else	//如果没有从队列中取到数据包				free(RspUnit);		}		if(ThreadExit)	//检查线程退出标志			goto UpPollEnd;	//立刻退出		if(!GotWork)	//若本次轮询没有获得数据包			usleep(Gateway->RecvQ_Empty_Wait);	}//while(1)UpPollEnd:	if(Gateway->ToExit)		strcpy(sSysEvent,"LOG: Up_RecvQ_Poller of Gateway is ended! ");	else		strcpy(sSysEvent,"Error: Up_RecvQ_Poller of Gateway is ended! ");    Gateway->WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG);	return (void *)NULL;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -