📄 upreceiver_1117.cpp
字号:
/********************************************************************** FileName : UpReceiver.cpp Description : 上节点接收(线程)模块 Version : 1.0 Date : 2003年6月17日 Author : 刘荣辉 Other : ***********************************************************************/#include "UpNode.h"void * CUpNode::UpReceiver(void *node){ int threadid; int RetCode; char sSysEvent[SYS_EVENT_LEN]; //记录系统事件的字符串 int PackTag; //PackTag为-1表示应答包,为0表示正确的请求包 // >0表示错误的请求包(不放入接收队列,但返回应答包,仍维持连接) // 为999表示需要重新连接 void **pPack; char *Pack,*RspPack; void **pRspPack; unsigned int PackType; RecvQUnit RUnit; SendQUnit SUnit; time_t nowtime; threadid = pthread_self(); pthread_detach(threadid); sprintf(sSysEvent,"LOG: UpReceiver[%d] for UpNode[%s] is started.",threadid,NodeCode); WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG); //写系统日志 pPack = (void **)malloc(sizeof(char **)); pRspPack = (void **)malloc(sizeof(char **)); while(1) //不断从上节点接收数据包放入接收队列 { #ifdef DEBUG //printf("\n UpReceiver: Waiting for a pack...\n"); #endif RetCode = TcpSock.RecvPack(pPack,MAX_PACK_SIZE); //接收一个完整包 Pack = (char *)*pPack; if(RetCode) { TcpSock.Close_sock(); State = 1; //节点状态为正在连接 if(RetCode==-1) { if(ToExit) { State=2; break; } else sprintf(sSysEvent,"Error[%d]: Connection with UpNode[%s] lost!Reconnecting.",errno,NodeCode); } else if(RetCode==-2) sprintf(sSysEvent,"Error: Packet length from UpNode[%s] is abnormal! Reconnecting.",NodeCode); else sprintf(sSysEvent,"Error[%d]: when RecvPack return.Reconnecting UpNode[%s].",RetCode,NodeCode); if(!ToExit) WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG); break; //return (void *)NULL; //退出接收模块 } //-----------------debug---------------------- //printf("\nUpReceiver: RecvPack RetCode=%d,\n",RetCode); unsigned char *ptmp; ptmp=(unsigned char *)Pack; int len; len = ntohl(*(int *)Pack); /*printf("\n===========UpReceiver: checking a Pack============= \n"); printf("PackLen=[%d]\n",len); for(int i=0;i<len;i++) { printf("%02x ",*(ptmp+i)); if(((i+1)%4)==0) printf(" "); if((i+1)==12) printf("\n"); } printf("\n=========================================== \n");*/ if(len > 13) { if(Protocol==0) printf("\n=====UpReceiver: checking a CMPP_Pack Received==== \n"); if(Protocol==1) printf("\n=====UpReceiver: checking a SMGP_Pack Received==== \n"); printf("PackLen=[%d]\n",len); for(int i=0;i<len;i++) { printf("%02x ",*(ptmp+i)); if(((i+1)%4)==0) printf(" "); if((i+1)==12) printf("\n"); } printf("\n=========================================== \n"); } /*for(int i=0;i<len;i++) { printf("%d ",*(ptmp+i)); if(((i+1)%4)==0) printf(" "); if((i+1)==12) printf("\n"); } printf("\n=========================================== \n"); //--------------------debug end-------------------*/ //--------------------------应答处理----------------------------- PackType = ntohl(*(unsigned int *)(Pack+4)); PackTag = -1; if(PackType<0x80000000) //若为请求包,则形成应答包,返回值为状态码 { PackTag = MakeRspPack(Pack, pRspPack); RspPack = (char *)*pRspPack; /* if(PackTag) { sprintf(sSysEvent,"Warning: Got a bad Pack[Type=%d] from UpNode[%s],Result=[%d]!",PackType,NodeCode,PackTag); WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG); }*/ /*/-----------------debug---------------------- if(RspPack) { printf("\n=========checking the RspPack made========= \n"); unsigned char *ptmp; ptmp=(unsigned char *)RspPack; int len; len = ntohl(*(int *)RspPack); for(int i=0;i<len;i++) { printf("%02x ",*(ptmp+i)); if(((i+1)%4)==0) printf(" "); if((i+1)==12) printf("\n"); } printf("\n=========================================== \n"); if(len>13) { for(int i=0;i<12;i++) { printf("%d ",*(ptmp+i)); if(((i+1)%4)==0) printf(" "); if((i+1)==12) printf("\n"); } printf("\n=========================================== \n"); } }//if(RspPack) //--------------------debug end------------------- */ }//if(PackType<0x80000000) #ifdef DEBUG if(Protocol==0 && PackType == CMPP_ACTIVE_TEST) { printf("\n UpReceiver: Got an CMPP_ActiveTest!\n"); } else if(Protocol==0 && PackType == CMPP_ACTIVE_TEST_RESP) { printf("\n UpReceiver: Got an CMPP_ActiveTest_Rsp!\n"); } #endif if(PackTag>=999) //收到的包严重错误,需要重新建立连接 { free(Pack); //释放请求包所占内存 TcpSock.Close_sock(); State = 1; //节点状态为正在连接 if(PackTag==1000) sprintf(sSysEvent,"Error[%d]: Got Terminate/Exit Pack from UpNode[%s]! Reconnecting.",PackTag,NodeCode); else sprintf(sSysEvent,"Error[%d]: Got error PDU from UpNode[%s]! Reconnecting.",PackTag,NodeCode); WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG); break; //return (void *)NULL; //退出接收模块 } //将收到的正确Deliver包写入日志文件 if(PackTag==0) { if(Protocol==0 && PackType == CMPP_DELIVER) //CMPP_Deliver { CMPP_Deliver *pDeliver = (CMPP_Deliver *)Pack; Cmpp_Deliver_Log DeliverUpLog; WrSystemLog->getTime(DeliverUpLog.RecvTime); DeliverUpLog.SrcNode = NodeID; DeliverUpLog.Deliver = pDeliver; if(!pDeliver->Registered_Delivery) //写入Deliver日志 { //编码格式转换Unicode->GBK,并修改消息长度和包长度 if(pDeliver->Msg_Fmt==8) { int OldMsgLen; OldMsgLen = pDeliver->Msg_Length; char tmpReserve[8]; memcpy(tmpReserve,&pDeliver->Deliver_Msg.Msg_Content[pDeliver->Msg_Length],8); /*printf("\n UpReceiver: Content=[%s],Unicode= [",pDeliver->Deliver_Msg.Msg_Content); for(int a=0; a<pDeliver->Msg_Length; a++) printf("%02x ",(unsigned char)pDeliver->Deliver_Msg.Msg_Content[a]); printf("]\n");*/ pDeliver->Msg_Length = CFunc::CodeConvert("UTF-16BE","GB2312",pDeliver->Deliver_Msg.Msg_Content,pDeliver->Deliver_Msg.Msg_Content,pDeliver->Msg_Length); //pDeliver->Msg_Length = CFunc::CodeConvert("Unicode","GB2312",pDeliver->Deliver_Msg.Msg_Content,pDeliver->Deliver_Msg.Msg_Content,pDeliver->Msg_Length); memcpy(&pDeliver->Deliver_Msg.Msg_Content[pDeliver->Msg_Length],tmpReserve,8); pDeliver->Head.Total_Length = htonl(ntohl(pDeliver->Head.Total_Length) + pDeliver->Msg_Length - OldMsgLen); pDeliver->Msg_Fmt = 15; //printf("\n UpReceiver: Unicode->GB2312 is done! \n"); /*printf("\n UpReceiver: GB2312= ["); for(int a=0; a<pDeliver->Msg_Length; a++) printf("%02x ",(unsigned char)pDeliver->Deliver_Msg.Msg_Content[a]); printf("]\n");*/ } /*printf("\n Content=["); for(int a=0; a<pDeliver->Msg_Length; a++) printf("%c ",pDeliver->Deliver_Msg.Msg_Content[a]); printf("].\n");*/ /*printf("\n==============UpReceiver: After Convert=================\n"); ptmp=(unsigned char *)Pack; len = ntohl(*(int *)Pack); for(int i=0;i<len;i++) { printf("%02x ",*(ptmp+i)); if(((i+1)%4)==0) printf(" "); if((i+1)==12) printf("\n"); } printf("\n=========================================== \n");*/ if((RetCode = WrDeliverLog->WriteLog(&DeliverUpLog,CMPP_DELIVER_LOG))>0) { sprintf(sSysEvent,"Error[%d]: UpReceiver Writing Cmpp_Deliver_Log File!",RetCode); WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG); /*写系统日志*/ break; } #ifdef DEBUG //printf("\n UpReceiver: Cmpp_Deliver_UpLog is Writen sucessfully!\n"); #endif } else //写入Report日志 { if((RetCode = WrReportLog->WriteLog(&DeliverUpLog,CMPP_REPORT_LOG))>0) { sprintf(sSysEvent,"Error[%d]: UpReceiver Writing Cmpp_Report_Log File!",RetCode); WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG); /*写系统日志*/ break; } #ifdef DEBUG printf("\n UpReceiver: Cmpp_Report Log file is Writen sucessfully!\n"); #endif } }//CMPP_Deliver else if(Protocol==1 && PackType == REQUEST_ID_DELIVER) //SMGP { SMGP_Deliver *pDeliver = (SMGP_Deliver *)Pack; Smgp_Deliver_Log DeliverUpLog; WrSystemLog->getTime(DeliverUpLog.RecvTime); DeliverUpLog.SrcNode = NodeID; DeliverUpLog.Deliver = pDeliver; if(!pDeliver->nIsReport) //写入Deliver日志 { if((RetCode = WrDeliverLog->WriteLog(&DeliverUpLog,SMGP_DELIVER_LOG))>0) { sprintf(sSysEvent,"Error[%d]: UpReceiver Writing Smgp_Deliver_UpLog File!",RetCode); WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG); /*写系统日志*/ break; } #ifdef DEBUG printf("\n UpReceiver: Smgp_Deliver_UpLog is Writen sucessfully!\n"); #endif } else //写入Report日志 { if((RetCode = WrReportLog->WriteLog(&DeliverUpLog,SMGP_REPORT_LOG))>0) { sprintf(sSysEvent,"Error[%d]: UpReceiver Writing Smgp_Report Log File!",RetCode); WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG); /*写系统日志*/ break; } #ifdef DEBUG printf("\n UpReceiver: Smgp_Report Log file is Writen sucessfully!\n"); #endif } }//SMGP }//写接收日志文件 //收到的正确请求包(除ActiveTest外)或应答包,形成接收队列单元,放入接收队列 if(PackTag<=0) { RUnit.Pack = Pack; time(&nowtime); RUnit.RecvTime = nowtime; //RUnit.Protocol = Protocol; if(State && ToExit) { State = 2; //节点状态为暂时不再连接 break; } while(RecvQ->Put(RUnit)==false) { if(State && ToExit) { State = 2; //节点状态为暂时不再连接 goto UpRecvEnd; } sprintf(sSysEvent,"Warning: RecvQ of UpNode[%s] is full!Recv_Q_Full_Wait=[%d]!",NodeCode,Recv_Q_Full_Wait); WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG); usleep(Recv_Q_Full_Wait); } #ifdef DEBUG //printf("\n UpReceiver: A pack[PackTag=%d] is put into RecvQ.\n",PackTag); #endif } //待发应答包,形成发送队列单元,放入发送队列 if(PackTag>=0 && RspPack!=NULL) { //SUnit.Protocol = Protocol; SUnit.Pack = RspPack; //SUnit.SrcRecvTime SUnit.SrcNode = NodeID; //SUnit.ID_Relate time(&nowtime); SUnit.SendTime = nowtime; SUnit.iResent = -1; //-1为应答包,发送成功就删除 //SUnit.SrcSequence = 0; while(SendQ->Put(SUnit)==false) { if(State && ToExit) { State = 2; //节点状态为暂时不再连接 goto UpRecvEnd; } sprintf(sSysEvent,"Warning: SendQ of UpNode[%s] is full!Send_Q_Full_Wait=[%d]!",NodeCode,Send_Q_Full_Wait); WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG); usleep(Send_Q_Full_Wait); } #ifdef DEBUG //printf("\n UpReceiver: A RspPack is put into SendQ.\n"); #endif } if(State && ToExit) { State = 2; //节点状态为暂时不再连接 break; } if(State) //用于当UpSender先发现连接断开的情况 break; /*/--------- 测试: 模拟连接断开 ------------- if(SeqId%10 == 0) { printf("\n ================UpReceiver: connection is cut for testing!========\n"); TcpSock.Close_sock(); State = 1; //节点状态为正在连接 break; }*/ //--------- 测试: 模拟连接断开 ------------- }//while(1)UpRecvEnd: free(pPack); //释放请求包指针的指针本身所占内存 free(pRspPack); //释放应答包指针的指针本身所占内存 ReadThr = 0; if(ToExit) { State = 2; //节点状态为暂时不再连接 sprintf(sSysEvent,"LOG: UpReceiver of UpNode[%s] is ended!",NodeCode); } else sprintf(sSysEvent,"Warning: UpReceiver of UpNode[%s] QUIT! ",NodeCode); WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG); return (void *)NULL;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -