⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dnpoller.cpp

📁 SMS gateway. SMS protocol for CHINA mobile, unicom, lingtong. Using mysql to exchange message.
💻 CPP
字号:
/**********************************************************************
  FileName            : DnPoller.cpp
  Description         : 下端轮询模块
  Version             : 1.0
  Date                : 2003年9月19日
  Author              : 刘荣辉
  Other               : 
***********************************************************************/

#include "GateWay.h"

//void * CGateWay::Dn_SentQ_Poller(void *)
//void * CGateWay::Dn_RecvQ_Poller(void *)

//==========================================================================
//===================下端已发送队列定时(轮询)线程函数=====================
//==========================================================================
//轮询下节点的已发送队列,确定重发,定时ActiveTest
void * CGateWay::Dn_SentQ_Poller(void * pGateway)
{
  int RetCode, threadid;
  char sSysEvent[SYS_EVENT_LEN];	//记录系统事件的字符串
  int iCheckQ;
  int i;
  CGateWay *Gateway;
  CDnNode *DnNode[MAX_DNNODE];
  int Node_Num;

  Gateway=(CGateWay *)pGateway;
  threadid = pthread_self();
  pthread_detach(threadid);
  sprintf(sSysEvent,"LOG: Dn_SentQ_Poller[%d] is started.",threadid);
  Gateway->WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG);    //写系统日志

  iCheckQ=0;
  while(1)
  {
		if(!Gateway->ToExit)
			sleep(1);	//每秒钟检查一次队列,且作为ActiveTest的单位时间
		else
			usleep(200000);	//等待上节点连接关闭
		if (iCheckQ==CHECK_Q_TIME)
		{
			//获取所有的在线下节点
			Node_Num=Gateway->Sock_CP_Map.GetAllNode((char **)DnNode,MAX_DNNODE);
			if(!Node_Num)	//无在线的下节点
			{
				sleep(1);
				if(ToExit)
					break;
				continue;
			}
			for (i=0; i<Node_Num; i++)
			{
				DnNode[i]->DnSentQCheck(Gateway->MyDB,Gateway->Relay_Interval);	//检查已发送队列,确定是否重发
				//判断是否ActiveTest的时间已到
				if(DnNode[i]->WaitTest >= DnNode[i]->TestTime && ToExit==0)
				{
					RetCode = DnNode[i]->PutActiveTest();	//要马上返回,不能阻塞
					DnNode[i]->WaitTest = 0;
				}
				else
					DnNode[i]->WaitTest += CHECK_Q_TIME;
			}
			iCheckQ=0;
		}
		else iCheckQ++;

  }//while(1)

  if(ToExit)
	  strcpy(sSysEvent,"LOG: Dn_SentQ_Poller of Gateway is ended! ");
  else
	  strcpy(sSysEvent,"Error: Dn_SentQ_Poller of Gateway QUIT! ");
  Gateway->WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG);

  return (void *)NULL;
}
//==========================================================================
//====================下端接收队列轮询/调度处理线程=========================
//==========================================================================
void * CGateWay::Dn_RecvQ_Poller(void *pGateway)
{
	int RetCode,threadid;
	char sSysEvent[SYS_EVENT_LEN];
	RecvQUnit *ReqUnit,*RspUnit;
	int GotWork;
	CGateWay *Gateway;

	Gateway=(CGateWay *)pGateway;
	threadid = pthread_self();
	pthread_detach(threadid);  
	sprintf(sSysEvent,"LOG: Dn_RecvQ_Poller[%d] is started.",threadid);
	Gateway->WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG);    //写系统日志

	int ThreadExit=0;	//线程退出标志,TSD
	pthread_setspecific(ThreadKey,(void *)&ThreadExit);

	sigset_t MySigset;
	sigfillset(&MySigset);
	pthread_sigmask(SIG_BLOCK, &MySigset, NULL);	//先阻塞所有信号
	sigemptyset(&MySigset);
	sigaddset(&MySigset, SIGUSR2);	//仅仅开放SIGUSR2
	pthread_sigmask(SIG_UNBLOCK, &MySigset, NULL);	//开放信号
	//sigset(SIGUSR2, CGateWay::SetTSD);
	struct sigaction mySigact;
	mySigact.sa_handler = CGateWay::SetTSD;	//信号处理函数
    mySigact.sa_mask = MySigset;
    mySigact.sa_flags = 0;
    sigaction(SIGUSR2, &mySigact, NULL);

	Map_Pos Pos;
    CDnNode *theDnNode;
	Gateway->Sock_CP_Map.PosInit(&Pos);
	GotWork = 0;

//sleep(1000);goto DnPollEnd;

	while(1)	//不断轮询各个下节点的接收队列
	{
		if(ThreadExit)	//检查线程退出标志
			goto DnPollEnd;	//立刻退出
		//获取下一个下节点的指针
		theDnNode=NULL;
		RetCode = Gateway->Sock_CP_Map.GetNext((char **)&theDnNode, &Pos);
		if(theDnNode==NULL)	//Map中已经没有一个元素了
		{
			//printf("\n Dn_RecvQ_Poller: NO logined DnNode now,RetCode=[%d]!\n",RetCode);
			if (RetCode<0)	//读到的是最后一个或者早已经为空,下次将从头开始取
			{
				if(!GotWork)	//若本次轮询没有获得一个包
					usleep(Gateway->RecvQ_Empty_Wait);
				GotWork = 0;
			}
			continue;
		}
		
		//--------------请求包队列处理-----------------
		if(!Gateway->ToExit)
		{
			ReqUnit = (RecvQUnit *)malloc(sizeof(RecvQUnit));	//以防下面多线程处理时的覆盖
			if(theDnNode->RecvQ->Get(*ReqUnit)==false)
				free(ReqUnit);
			else
			{
				GotWork++;	//本次轮询获得了数据包
				#ifdef DEBUG
				//printf("\n Dn_RecvQ_Poller: Got a req_pack from RecvQ of DnNode[%s].\n",theDnNode->ServiceCode);
				#endif
				//从线程池中取一空闲线程进行包的路由处理?????????
				Gateway->DnCMPPDeal(ReqUnit, theDnNode);
			}
		}

		if(ThreadExit)	//检查线程退出标志
			goto DnPollEnd;	//立刻退出
		//--------------应答包队列处理-----------------
		RspUnit = (RecvQUnit *)malloc(sizeof(RecvQUnit));	//以防下面多线程处理时的覆盖
		if(theDnNode->RspQ->Get(*RspUnit)==false)
			free(RspUnit);
		else
		{
			GotWork++;	//本次轮询获得了数据包
			#ifdef DEBUG
			//printf("\n Dn_RecvQ_Poller: Got a rsp_pack from RecvQ of DnNode[%s].\n",theDnNode->ServiceCode);
			#endif
			//从线程池中取一空闲线程进行包的路由处理?????????
			Gateway->DnCMPPDeal(RspUnit, theDnNode);
		}

	}//while(1)

DnPollEnd:
	if(Gateway->ToExit)
		strcpy(sSysEvent,"LOG: Dn_RecvQ_Poller of Gateway is ended! ");
	else
		strcpy(sSysEvent,"Error: Dn_RecvQ_Poller of Gateway is ended! ");
    Gateway->WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG);
	return (void *)NULL;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -