📄 uppackdeal.cpp
字号:
/********************************************************************** FileName : UpPackDeal.cpp Description : 上端收到的CMPP/SMGP包的路由处理(线程)模块 Version : 1.0 Date : 2003年7月7日 Author : 刘荣辉 Other : MO路由转发/写日志/写数据库***********************************************************************/#include "GateWay.h"//==========================================================================//===========================上端MO包的路由处理=============================//==========================================================================void * CGateWay::UpPackDeal(RecvQUnit * pUnit,int NodeNum){ int RetCode; /*/-----------------debug---------------------- printf("\n=========UpPackDeal:checking a Pack from RecvQ======== \n"); unsigned char *ptmp; ptmp=(unsigned char *)pUnit->Pack; int len; len = ntohl(*(int *)pUnit->Pack); printf("PackLen=[%d]\n",len); for(int i=0;i<len;i++) { printf("%02x ",*(ptmp+i)); if(((i+1)%4)==0) printf(" "); if((i+1)==12) printf("\n"); } printf("\n=========================================== \n");*/ /*for(int i=0;i<len;i++) { printf("%d ",*(ptmp+i)); if(((i+1)%4)==0) printf(" "); if((i+1)==12) printf("\n"); } printf("\n=========================================== \n"); //--------------------debug end-------------------*/ if(UpNode[NodeNum].Protocol==0) //CMPP { RetCode = UpCMPPDeal(pUnit,NodeNum); //CMPP上行包处理函数 } else if(UpNode[NodeNum].Protocol==1) //SMGP { RetCode = UpSMGPDeal(pUnit,NodeNum); //SMGP上行包处理函数 } free(pUnit); //在Up_RecvQ_Poller()中malloc,在此free是为了多线程! #ifdef DEBUG //printf("\nUpPackDeal: A Pack is processed!\n"); #endif return (void *)NULL;}//=====================================================================================//==================================CMPP上行包处理函数=================================//=====================================================================================int CGateWay::UpCMPPDeal(RecvQUnit * pUnit,int NodeNum){ int PackType; int Sequence; int RetCode; struct tm *tmptime; char timebuf[16]; time_t nowtime; char sSysEvent[SYS_EVENT_LEN]; //CFunc MyFun; WrSystemLog->getTime(timebuf); time(&nowtime); PackType = ntohl(*(int *)(pUnit->Pack + 4)); Sequence = ntohl(*(int *)(pUnit->Pack + 8)); switch(PackType) { case CMPP_DELIVER: { CMPP_Deliver *pDeliver; pDeliver = (CMPP_Deliver *)pUnit->Pack; /*//编码格式转换Unicode->GBK,并修改消息长度和包长度 if(pDeliver->Msg_Fmt==8) { int OldMsgLen; OldMsgLen = pDeliver->Msg_Length; printf("\n UpCMPPDeal: Content=[%s],Unicode= [",pDeliver->Deliver_Msg.Msg_Content); for(int a=0; a<pDeliver->Msg_Length; a++) printf("%02x ",(unsigned char)pDeliver->Deliver_Msg.Msg_Content[a]); printf("]\n"); pDeliver->Msg_Length = CFunc::CodeConvert("UTF-16BE","GB2312",pDeliver->Deliver_Msg.Msg_Content,pDeliver->Deliver_Msg.Msg_Content,pDeliver->Msg_Length); //pDeliver->Msg_Length = CFunc::CodeConvert("Unicode","GB2312",pDeliver->Deliver_Msg.Msg_Content,pDeliver->Deliver_Msg.Msg_Content,pDeliver->Msg_Length); pDeliver->Head.Total_Length = htonl(ntohl(pDeliver->Head.Total_Length) + pDeliver->Msg_Length - OldMsgLen); printf("\n UpCMPPDeal: Unicode->GB2312 is done! \n"); printf("\n UpCMPPDeal: GB2312= ["); for(int a=0; a<pDeliver->Msg_Length; a++) printf("%02x ",(unsigned char)pDeliver->Deliver_Msg.Msg_Content[a]); printf("]\n"); }*/ if((UpNode[NodeNum].ShowLog==3) || (UpNode[NodeNum].ShowLog==1)) //显示收到的Deliver包 { printf("\n==============收到用户发来的短信============\n"); printf("接收时间:%s\n用户手机号:%s\n目的号码:%s\n短信内容:%s",timebuf,pDeliver->Src_terminal_Id,pDeliver->Dest_Id,pDeliver->Deliver_Msg.Msg_Content); printf("\n============================================ \n"); } //判断是否发给本网关系统的短信 if(strncmp(pDeliver->Dest_Id, UpNode[NodeNum].SP_Code, strlen(UpNode[NodeNum].SP_Code))==0) { //判断是否统一业务测试指令 if(pDeliver->Msg_Fmt==0 && pDeliver->Msg_Length==strlen(CMCCTEST) && UpNode[NodeNum].Protocol==CMPP_PROTOCOL) { char tmpContent[20]; for(int i=0;i<pDeliver->Msg_Length;i++) tmpContent[i]=toupper(pDeliver->Deliver_Msg.Msg_Content[i]); if(strncmp(tmpContent,CMCCTEST,strlen(CMCCTEST))==0) UpNode[NodeNum].CMCCTestRsp(pDeliver); //对统一业务测试指令的回复 free(pDeliver); break; //本case处理完毕 } } //-----------------根据目的CP服务代码路由------------------- char DestCPCode[10]; memset(DestCPCode,0,sizeof(DestCPCode)); strncpy(DestCPCode, pDeliver->Dest_Id, CPCODE_LEN); int ArrayNum; for(ArrayNum=0; ArrayNum<DnNodeNum; ArrayNum++) { if(strcmp(CP_Array[ArrayNum].ServiceCode, DestCPCode)==0) break; } if(ArrayNum==DnNodeNum) //目的CP不存在 { sprintf(sSysEvent,"Warning: DestCPCode[%s] does NOT exist in System, a CMPP_Deliver abandoned!",DestCPCode); WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG); free(pDeliver); break; //本case处理完毕 } //-------------------对于本地业务------------------- if(CP_Array[ArrayNum].InterfaceType==2) { //写入本地业务接口数据表 MyDB->WrMO_Interface(CMPP_PROTOCOL,pDeliver,CP_Array[ArrayNum].MOTable); //----------构造CMPP_Deliver上行转发记录----------- Cmpp_MO_Log MoLog; struct tm *tmptime; tmptime = localtime( &pUnit->RecvTime); sprintf( MoLog.SrcRecvTime, "%d%02d%02d%02d%02d%02d", tmptime->tm_year + 1900, tmptime->tm_mon + 1, tmptime->tm_mday, tmptime->tm_hour, tmptime->tm_min, tmptime->tm_sec); MoLog.SrcNode = UpNode[NodeNum].NodeID; memcpy(&MoLog.SrcMsgId , &pDeliver->Msg_Id, sizeof(pDeliver->Msg_Id)); MoLog.DestNode = atoi(DestCPCode); WrSystemLog->getTime(MoLog.SentTime); MoLog.Result = 0; MoLog.RelayTimes = 0; MoLog.Deliver = pDeliver; //写CMPP_Deliver上行转发记录表(CMPP_MO_Log) MyDB->WrCMPP_MOLog(&MoLog,TAB_CMPP_MO); free(pDeliver); break; //本case处理完毕 } //---------------对于HTTP接入的CP--------------- else if(CP_Array[ArrayNum].InterfaceType==1) { //写入HTTP接口数据表,并写入CMPP_MO_Log????????????? free(pDeliver); break; //本case处理完毕 } //---------------------对于CMPP接入的CP--------------------- CDnNode * TheDnNode; TheDnNode = (CDnNode *) Code_CP_Map.Find(atoi(DestCPCode)); //若目的下节点不需要状态报告 if(TheDnNode!=NULL && pDeliver->Registered_Delivery==1 && TheDnNode->WantReport==0) break; //不继续转发,Deliver包处理(case)结束 if(pDeliver->Registered_Delivery) { //提取以前存在数据库中的(返回给下节点的Submit_Rsp中的)MsgID long long MsgId_Echo; int RecordId; char SqlCommand[500]; memset(SqlCommand,0,sizeof(SqlCommand)); sprintf(SqlCommand,"select Id,MsgId_Echo from %s where MsgID=%lld and Dest_terminal_Id='%s' and Stat<>'' LIMIT 3;",\ TAB_CMPP_MT, pDeliver->Deliver_Msg.Report.Msg_Id, pDeliver->Deliver_Msg.Report.Dest_terminal_Id); RetCode = MyDB->GetMsgId_Echo(SqlCommand, &RecordId, &MsgId_Echo); if(RetCode!=1) { sprintf(sSysEvent,"Warning: [>=%d] Records in table(%s) where MsgID=%lld and Dest_terminal_Id='%s'.",\ RetCode, TAB_CMPP_MT, pDeliver->Deliver_Msg.Report.Msg_Id, pDeliver->Deliver_Msg.Report.Dest_terminal_Id); WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG); } //改写CMPP_Submit转发记录表中的report内容 char RecvTime[15]; CFunc::TimeToStr(pUnit->RecvTime, RecvTime); char Pack_MsgId[21]; memset(Pack_MsgId,0,sizeof(Pack_MsgId)); mysql_real_escape_string(&MyDB->conn,Pack_MsgId,(char *)&pDeliver->Msg_Id,sizeof(pDeliver->Msg_Id)); memset(SqlCommand,0,sizeof(SqlCommand)); sprintf(SqlCommand,"update %s set ReportTime='%s',Report_MsgID='%s',Stat='%s',Done_time='%s',Dest_Id='%s',SMSC_sequence='%d' where Id=%d and MsgID=%lld;",\ TAB_CMPP_MT,RecvTime,Pack_MsgId,pDeliver->Deliver_Msg.Report.Stat,\ pDeliver->Deliver_Msg.Report.Done_time,pDeliver->Deliver_Msg.Report.Dest_terminal_Id,\ pDeliver->Deliver_Msg.Report.SMSC_sequence,RecordId,pDeliver->Deliver_Msg.Report.Msg_Id); RetCode = MyDB->DBModify(SqlCommand); if(RetCode!=1) { sprintf(sSysEvent,"Warning: [>=%d] Records in table(%s) where Id=%d and MsgID=%lld.",\ RetCode, TAB_CMPP_MT, RecordId,pDeliver->Deliver_Msg.Report.Msg_Id); WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG); } //修改待转发的Report中的MsgID pDeliver->Deliver_Msg.Report.Msg_Id = MsgId_Echo; #ifdef DEBUG printf("\nUpCMPPDeal: A CMPP_report is dealed.\n"); #endif } //-----------------------转发-------------------------- long long SrcMsgId = pDeliver->Msg_Id; //修改Deliver包本身的MsgId pDeliver->Msg_Id = GetMsgId(); int QisFull=0; if(TheDnNode!=NULL && TheDnNode->State==0) //目的下节点连接正常 { //构造发送队列单元,放入目的下节点的发送队列 SendQUnit SUnit; SUnit.Pack = (char *)pDeliver; SUnit.SrcRecvTime = pUnit->RecvTime; SUnit.SrcNode = UpNode[NodeNum].NodeID; //SUnit.SrcSequence = pDeliver->Head.Sequence_Id; //原始请求包流水号 memcpy(&SUnit.ID_Relate, &SrcMsgId, sizeof(SrcMsgId)); SUnit.SendTime = nowtime; SUnit.iResent = CMPP_SEND_RETRIES; //SUnit.Protocol = CMPP_PROTOCOL; SUnit.RelayTimes = 0; //修改流水号 pDeliver->Head.Sequence_Id = htonl(TheDnNode->GetSequence()); //TheDnNode->SendQ->Wait_Put(SUnit); if(TheDnNode->SendQ->Put(SUnit)==false) { QisFull=1; //目的下节点的发送队列已满,暂存数据库 sprintf(sSysEvent,"Warning: SendQ of DnNode[%s] is full, A Pack is rejected!",TheDnNode->ServiceCode); WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG); } else { #ifdef DEBUG printf("\n UpCMPPDeal: A Deliver is put into SendQ of DnNode[%s] .\n",TheDnNode->ServiceCode); #endif } } //目的下节点不在线,或者连接不正常,或者目的发送队列已满 if(TheDnNode==NULL || TheDnNode->State!=0 || QisFull) { Cmpp_MO_Log MoLog; tmptime = localtime( &pUnit->RecvTime); sprintf( MoLog.SrcRecvTime, "%d%02d%02d%02d%02d%02d", tmptime->tm_year + 1900, tmptime->tm_mon + 1, tmptime->tm_mday, tmptime->tm_hour, tmptime->tm_min, tmptime->tm_sec); MoLog.SrcNode = UpNode[NodeNum].NodeID; //MoLog.SrcSequence = pDeliver->Head.Sequence_Id; //原始请求包流水号 MoLog.SrcMsgId = SrcMsgId; if(TheDnNode==NULL) { MoLog.DestNode = atoi(DestCPCode); (char)MoLog.Result = NOT_ONLINE; printf("\n Warning:: UpCMPPDeal: the DnNode[%s] is not in the Code_CP_Map!\n",DestCPCode); } else { MoLog.DestNode = TheDnNode->NodeID; if(TheDnNode->State!=0) { (char)MoLog.Result = CONN_ABNORMAL; printf("\n Warning:: UpCMPPDeal: the DnNode[%s] is not online!\n",DestCPCode); } else if(QisFull) (char)MoLog.Result = SENDQ_FULL; } time_t Schedule = (int)pUnit->RecvTime + Relay_Interval; tmptime = localtime( &Schedule); sprintf( MoLog.SentTime, "%d%02d%02d%02d%02d%02d", tmptime->tm_year + 1900, tmptime->tm_mon + 1, tmptime->tm_mday, tmptime->tm_hour, tmptime->tm_min, tmptime->tm_sec); MoLog.RelayTimes = 0; MoLog.Deliver = pDeliver; //写入上行转发记录缓存表MO_Queue MyDB->WrMO_Queue(&MoLog,TAB_MO_Queue); sprintf(sSysEvent,"Warning: A CMPP_Deliver to DnNode[%d] is saved to MO_Queue temporarily!",MoLog.DestNode); WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG); } break; } //================================================================ case CMPP_SUBMIT_RESP: { CMPP_Submit_Resp *pSubmitRsp; pSubmitRsp = (CMPP_Submit_Resp *)pUnit->Pack; //从已发送队列中找出并删除对应的等应答的单元 SendQUnit SUnit; if(UpNode[NodeNum].SentQ.FindDel(ntohl(pSubmitRsp->Head.Sequence_Id), &SUnit) == false) { sprintf(sSysEvent," Error: UpCMPPDeal Can't find the corresponding CMPP_Submit Pack[%d]!\n",ntohl(pSubmitRsp->Head.Sequence_Id)); WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG); } CMPP_Submit *pSubmit = (CMPP_Submit *)SUnit.Pack; //memcpy(pSubmit->Msg_Id,pSubmitRsp->Msg_Id,sizeof(pSubmit->Msg_Id)); pSubmit->Msg_Id = pSubmitRsp->Msg_Id; //修改Submit结构体(Msg_Id字段) //-----------构造CMPP_Submit下行转发记录------------- Cmpp_MT_Log MtLog; tmptime = localtime( &SUnit.SrcRecvTime); sprintf( MtLog.SrcRecvTime, "%d%02d%02d%02d%02d%02d", tmptime->tm_year + 1900, tmptime->tm_mon + 1, tmptime->tm_mday, tmptime->tm_hour, tmptime->tm_min, tmptime->tm_sec); MtLog.SrcNode = SUnit.SrcNode; //原始请求包的流水号 //MtLog.SrcSequence = SUnit.SrcSequence; memcpy(&MtLog.MsgId_Echo, &SUnit.ID_Relate, sizeof(MtLog.MsgId_Echo)); MtLog.DestNode = UpNode[NodeNum].NodeID; MtLog.Result = pSubmitRsp->Result; MtLog.RelayTimes = SUnit.RelayTimes + 1; MtLog.Submit = pSubmit; //检查应答状态,决定是否重发 switch(pSubmitRsp->Result) { case 0://正确应答 { /*if((LOG_DB==0) || (LOG_DB==2)) //写入Submit转发日志文件 { if((RetCode = UpNode[NodeNum].WrSubmitLog->WriteLog(&MtLog,CMPP_SUBMIT_LOG))>0) { sprintf(sSysEvent,"Error[%d]: UpCMPPDeal Writing CMPP_Submit pack into Log File!",RetCode); WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG); //写系统日志 break; } }*/ if(LOG_DB > 0) //将MT转发记录写入数据表 { tmptime = localtime( &pUnit->RecvTime); sprintf( MtLog.SentTime, "%d%02d%02d%02d%02d%02d", tmptime->tm_year + 1900, tmptime->tm_mon + 1, tmptime->tm_mday, tmptime->tm_hour, tmptime->tm_min, tmptime->tm_sec); MyDB->WrCMPP_MTLog(&MtLog,TAB_CMPP_MT); //写CMPP_Submit下行转发记录表 } //------------------------------------------------------------ free(SUnit.Pack); //释放Submit包所占内存 #ifdef DEBUG printf("\n UpCMPPDeal[%s]: A successfully Sent CMPP_Submit[%d] is cleaned from SentQ and writen into Submit_UpLog File!\n",timebuf,ntohl(pSubmitRsp->Head.Sequence_Id)); #endif break; } case 1: case 2: case 3: case 4: case 5: case 6: case 7: case 8: default: //各种错误响应码 { if(SUnit.iResent) //可重发的请求包,或者发送失败的应答包 { //存到待发送队列 UpNode[NodeNum].SendQ->Wait_Put(SUnit); /*while(UpNode[NodeNum].SendQ->Put(SUnit)==false) { sprintf(sSysEvent,"Warning: Send Queue is full!Send_Q_Full_Wait=[%d]!",Send_Q_Full_Wait); WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG); usleep(Send_Q_Full_Wait); }*/ #ifdef DEBUG printf("\n UpCMPPDeal[%s]: A CMPP_Submit Pack[%d] is transmited from SentQ to SendQ.\n",timebuf,ntohl(pSubmitRsp->Head.Sequence_Id)); #endif } else //最大发送次数已满,仍未收到正确的应答包 { if(LOG_DB > 0) //将Submit_Failed转发记录写入数据表,等待重发
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -