📄 dnreceiver.cpp
字号:
/**********************************************************************
FileName : DnReceiver.cpp
Description : 下端接收线程(所有下节点共用)
Version : 1.0
Date : 2003年9月11日
Author : 刘荣辉
Other : 轮询套接字集,接收数据包到对应的下节点接收队列
***********************************************************************/
#include "GateWay.h"
void * CGateWay::DnReceiver(void * pGateway)
{
int i,RetCode;
int threadid;
char sSysEvent[SYS_EVENT_LEN];
struct timeval Sel_TO; //select的超时参数
fd_set SockSet; //下端连接套接字集
int SockNum, iMaxSock;
CGateWay *Gateway;
CDnNode * TheDnNode;
int PackTag;
//PackTag为-1表示应答包,为0表示正确的请求包
// >0表示错误的请求包(不放入接收队列,但返回应答包,仍维持连接)
// 为999表示需要重新连接
void **pPack;
char *Pack,*RspPack;
void **pRspPack;
RecvQUnit RUnit;
SendQUnit SUnit;
time_t nowtime;
Gateway=(CGateWay *)pGateway;
threadid = pthread_self();
pthread_detach(threadid);
sprintf(sSysEvent,"LOG: DnReceiver[%d] for all DownNodes is started.",threadid);
Gateway->WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG); //写系统日志
pPack = (void **)malloc(sizeof(char **));
pRspPack = (void **)malloc(sizeof(char **));
while(1) //不断对下节点套接字集多路复用接收
{
Sel_TO.tv_sec = (int)Gateway->Sel_Timeout / 1000000;
Sel_TO.tv_usec = (int)Gateway->Sel_Timeout % 1000000;
FD_ZERO(&SockSet); //清空临时下端连接套接字集
for(i=0,iMaxSock=0; i<=Gateway->MaxSock; i++)
{
if(FD_ISSET(i, &Gateway->DnSockSet)) //套接字集拷贝
{
FD_SET(i, &SockSet);
iMaxSock = i;
}
}
//Gateway->MaxSock要加互斥锁????????????????
//最多只是一次select错,下次便会根据最新的套接字集进行更新操作!!!
Gateway->MaxSock = iMaxSock; //实际的最大套接字值
if(Gateway->MaxSock==0 && Gateway->ToExit)
break;
//连接异常事件的监听处理??????????????????????????
select(Gateway->MaxSock+1, &SockSet, NULL, NULL, &Sel_TO);
if(Gateway->MaxSock==0 && Gateway->ToExit)
break;
for(SockNum=0; SockNum <= Gateway->MaxSock; SockNum++) //逐个检查Socket是否可读
{
if(FD_ISSET(SockNum, &SockSet)) //检查Socket是否可读
{
TheDnNode = (CDnNode *) Gateway->Sock_CP_Map.Find(SockNum); //根据套接字查找下节点
if(TheDnNode==NULL) //没在映射表中查到此Socket
{
sprintf(sSysEvent,"Warning: DnReceiver: Socket[%d] isn't found in the Sock_CP_Map!",SockNum);
Gateway->WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG);
break;
}
//--------------------接收数据包-------------------------
//是否会收到一部分数据包便阻塞很久?????????????????
RetCode = TheDnNode->TcpSock.RecvPack(pPack,MAX_PACK_SIZE); //接收一个完整包
Pack = (char *)*pPack;
if(RetCode) //接收时异常
{
TheDnNode->State = 2; //节点状态为连接已断
FD_CLR(TheDnNode->TcpSock.sock, &Gateway->DnSockSet); //从套接字集中清除
if(TheDnNode->TcpSock.sock == Gateway->MaxSock) //若最大的套接字失效
{
Gateway->MaxSock--;
}
//从Socket_CP映射表中删除
int DelNum = Gateway->Sock_CP_Map.Delete(TheDnNode->TcpSock.sock);
if(DelNum!=1)
{
sprintf(sSysEvent,"Warning: [%d] Sock_CP pair for DnNode[%s] is deleted!",DelNum,TheDnNode->ServiceCode);
Gateway->WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG);
}
TheDnNode->TcpSock.Close_sock();
if(RetCode==-1)
{
if(!Gateway->ToExit)
sprintf(sSysEvent,"Error[%d]: DnReceiver: Connection with DnNode[%s] lost!",errno,TheDnNode->ServiceCode);
}
else if(RetCode==-2)
sprintf(sSysEvent,"Error: Packet length from DnNode[%s] is abnormal! Connection is CUT!",TheDnNode->ServiceCode);
else
sprintf(sSysEvent,"Error[%d]: when RecvPack return, Connection with DnNode[%s] lost!",RetCode,TheDnNode->ServiceCode);
if(!Gateway->ToExit)
Gateway->WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG);
continue; //继续检查其他Socket
}
//--------------------------应答处理-----------------------------
unsigned int PackType = ntohl(*(unsigned int *)(Pack+4));
PackTag = -1;
if(PackType<0x80000000) //若为请求包,则形成应答包,返回值为状态码
{
PackTag = TheDnNode->MakeRspPack(Pack, pRspPack);
RspPack = (char *)*pRspPack;
}//if(PackType),若为请求包
if(PackTag>=999) //收到的包严重错误,需要重新建立连接
{
TheDnNode->State = 2; //节点状态为连接已断
free(Pack); //释放请求包所占内存
FD_CLR(TheDnNode->TcpSock.sock, &Gateway->DnSockSet); //从套接字集中清除
if(TheDnNode->TcpSock.sock == Gateway->MaxSock) //若最大的套接字失效
Gateway->MaxSock--;
//从Socket_CP映射表中删除
int DelNum = Gateway->Sock_CP_Map.Delete(TheDnNode->TcpSock.sock);
if(DelNum!=1)
{
sprintf(sSysEvent,"Warning: [%d] Sock_CP pair for DnNode[%s] is deleted!",DelNum,TheDnNode->ServiceCode);
Gateway->WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG);
}
TheDnNode->TcpSock.Close_sock();
if(PackTag==1000)
sprintf(sSysEvent,"Error[%d]: Got Terminate Pack from DnNode[%s].Connection lost.",PackTag,TheDnNode->ServiceCode);
else
sprintf(sSysEvent,"Error[%d]: Got error PDU from DnNode[%s].Connection lost.",PackTag,TheDnNode->ServiceCode);
Gateway->WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG);
continue; //继续检查其他Socket
}//收到的包严重错误
//将收到的正确CMPP_Submit包写入日志文件
if(PackTag==0 && PackType == CMPP_SUBMIT)
{
CMPP_Submit *pSubmit = (CMPP_Submit *)Pack;
Cmpp_Submit_Log SubmitDnLog;
Gateway->WrSystemLog->getTime(SubmitDnLog.RecvTime);
SubmitDnLog.SrcNode = TheDnNode->NodeID;
//--------修改CMPP_Submit_Resp中的MsgID--------
((CMPP_Submit_Resp *)RspPack)->Msg_Id = Gateway->GetMsgId();
RUnit.MsgId_Echo = ((CMPP_Submit_Resp *)RspPack)->Msg_Id;
SubmitDnLog.MsgId_Echo = RUnit.MsgId_Echo;
//----------------------------------------
SubmitDnLog.Submit = pSubmit;
RetCode = Gateway->WrDnSubmitLog->WriteLog(&SubmitDnLog,CMPP_SUBMIT_LOG);
if(RetCode>0)
{
sprintf(sSysEvent,"Error[%d]: DnReceiver Writing Cmpp_Submit_Log File!",RetCode);
Gateway->WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG); /*写系统日志*/
break;
}
#ifdef DEBUG
printf("\n DnReceiver: Cmpp_Submit_Log is Writen sucessfully!\n");
#endif
}//正确Submit包写入日志
time(&nowtime);
//收到的正确请求包(除ActiveTest外)或应答包,形成接收队列单元,放入接收队列
if(PackTag<=0)
{
RUnit.Pack = Pack;
RUnit.RecvTime = nowtime;
if(PackType<0x80000000) //放进请求包接收缓冲队列
{
if(TheDnNode->RecvQ->Put(RUnit)==false)
{
sprintf(sSysEvent,"Warning: RecvQ of DnNode[%s] is full, A Pack is rejected!",TheDnNode->ServiceCode);
Gateway->WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG);
continue; //继续检查其他Socket
}
#ifdef DEBUG
//printf("\n DnReceiver: Got a Req_pack from DnNode[%s],PackTag=[%d].\n",TheDnNode->ServiceCode,PackTag);
#endif
}
else //放进应答包接收缓冲队列
{
if(TheDnNode->RspQ->Put(RUnit)==false)
{
sprintf(sSysEvent,"Warning: RspQ of DnNode[%s] is full, A Pack is rejected!",TheDnNode->ServiceCode);
Gateway->WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG);
continue; //继续检查其他Socket
}
#ifdef DEBUG
//printf("\n DnReceiver: Got a Rsp_pack from DnNode[%s],PackTag=[%d].\n",TheDnNode->ServiceCode,PackTag);
#endif
}
}
//待发应答包,形成发送队列单元,放入发送队列
if(PackTag>=0 && RspPack!=NULL)
{
SUnit.Pack = RspPack;
//SUnit.SrcRecvTime
SUnit.SrcNode = TheDnNode->NodeID;
//SUnit.ID_Relate
SUnit.SendTime = nowtime;
SUnit.iResent = -1; //-1为应答包,发送成功就删除
//SUnit.SrcSequence = 0;
if(TheDnNode->SendQ->Put(SUnit)==false)
{
sprintf(sSysEvent,"Warning: SendQ of DnNode[%s] is full, A Pack is rejected!",TheDnNode->ServiceCode);
Gateway->WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG);
continue; //继续检查其他Socket
}
#ifdef DEBUG
//printf("\n DnReceiver: A RspPack to DnNode[%s] is put into SendQ.\n",TheDnNode->ServiceCode);
#endif
}
}//if(FD_ISSET)
}//for(SockNum)
}//while(1)
Gateway->DnRecvThr = 0;
free(pPack); //释放请求包指针的指针本身所占内存
free(pRspPack); //释放应答包指针的指针本身所占内存
if(Gateway->ToExit)
strcpy(sSysEvent,"LOG: DnReceiver for all DownNodes is ended!");
else
strcpy(sSysEvent,"Error: DnReceiver for all DownNodes QUIT!");
Gateway->WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG);
return (void *)NULL;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -