📄 mtforward.cpp
字号:
/********************************************************************** FileName : MTForword.cpp Description : 短消息下行分发线程函数 Version : 1.0 Date : 2003年7月8日 Author : 刘荣辉 Other : 从数据库中提取MT短消息分发给各个上节点***********************************************************************/#include "GateWay.h"void * CGateWay::MTForword(void *pGateway){ int threadid,i,RetCode; char sSysEvent[SYS_EVENT_LEN]; char SqlCommand[500]; int AliveNum; CGateWay *Gateway; SendQUnit SUnit; Gateway=(CGateWay *)pGateway; threadid = pthread_self(); pthread_detach(threadid); sprintf(sSysEvent,"LOG: MTForword[%d] is started.",threadid); Gateway->WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG); //写系统日志 Cmpp_MT_Log *MtLog; MtLog = (Cmpp_MT_Log *)malloc(sizeof(Cmpp_MT_Log) * Gateway->Max_MT_Fetch); usleep(Gateway->MT_DB_Interval); //读数据库周期 while(1) //不断从数据库中提取记录进行转发 { sprintf(SqlCommand,"select * from %s where DestNode in(-1",TAB_MT_Queue); AliveNum = 0; for(i=0;i<UPNODE_NUM;i++) //提取的记录的目的下节点必须是当前在线的 { if(Gateway->ToExit) //若收到系统退出命令 goto MtForwardEnd; if(Gateway->UpNode[i].State==0) { strcat(SqlCommand, ","); char tmp[10]; strcat(SqlCommand, CFunc::lrhitoa(Gateway->UpNode[i].NodeID,tmp)); AliveNum++; } } if(AliveNum == 0) { usleep(Gateway->MT_DB_Interval); //读数据库周期 continue; } int RecordNum = Gateway->Max_MT_Fetch; sprintf(&SqlCommand[strlen(SqlCommand)],") and ScheduleTime<now() and RelayTimes<=%d LIMIT %d;", Gateway->Max_RelayTimes,RecordNum); for(;;) { if(Gateway->MtDB == NULL) { printf("\n Database connection is not initialized!\n"); if(Gateway->ToExit) //若收到系统退出命令 goto MtForwardEnd; else usleep(Gateway->MT_DB_Interval); //读数据库周期 } else break; } RetCode = Gateway->MtDB->ReadMT_Queue(SqlCommand, MtLog, &RecordNum); if(RetCode) //从数据库中获取待发记录失败 { sprintf(sSysEvent,"Error[%d]: Geting records in MT_Queue from database!",RetCode); Gateway->WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG); //写系统日志 printf("\n 数据库操作出错!MTForword程序退出!\n"); break; //??????????????????????????? } #ifdef DEBUG if(RecordNum) { printf("\n MTForword: SQL=[%s]. \n",SqlCommand); printf("\n MTForword: Got records from MT_Queue,RecordNum=[%d],RetCode=[%d].....\n",RecordNum,RetCode); } #endif for(i=0;i<RecordNum;i++) { //逐条将待处理短信形成待发送队列单元 SUnit.SrcRecvTime = CFunc::StrToTime(MtLog[i].SrcRecvTime); SUnit.SrcNode = MtLog[i].SrcNode; memcpy(SUnit.ID_Relate, &MtLog[i].MsgId_Echo, sizeof(MtLog[i].MsgId_Echo)); time_t nowtime; time(&nowtime); SUnit.SendTime = nowtime; //if(DestNode???) //根据目的上节点的节点编号判断协议类型 SUnit.iResent = CMPP_SEND_RETRIES; //else // SUnit.iResent = SMGP_SEND_RETRIES; SUnit.RelayTimes = MtLog[i].RelayTimes; SUnit.Pack = (char *) MtLog[i].Submit; //-----------------下行路由-------------------- int NodeNum; for(NodeNum=0; NodeNum < UPNODE_NUM; NodeNum++) { if(Gateway->UpNode[NodeNum].NodeID == MtLog[i].DestNode) break; } if(NodeNum==UPNODE_NUM || Gateway->UpNode[NodeNum].State) //目的节点不在线 { free(MtLog[i].Submit); //释放数据包所占内存,在ReadMT_Queue()中malloc continue; //此短信不作转发处理 } //---------修改流水号--------- MtLog[i].Submit->Head.Sequence_Id = htonl(Gateway->UpNode[NodeNum].GetSequence()); if(Gateway->ToExit) //若收到系统退出命令 { for(int j=i; j<RecordNum; j++) free(MtLog[j].Submit); //释放数据包所占内存,在ReadMT_Queue()中malloc goto MtForwardEnd; } //-----------------转发尝试-------------------- if(Gateway->UpNode[NodeNum].SendQ->Put(SUnit)==false) { if(Gateway->ToExit) //若收到系统退出命令 { for(int j=i; j<RecordNum; j++) free(MtLog[j].Submit); //释放数据包所占内存,在ReadMT_Queue()中malloc goto MtForwardEnd; } sprintf(sSysEvent,"Warning: SendQ of UpNode[%d] is full, A Pack is rejected!",Gateway->UpNode[NodeNum].NodeID); Gateway->WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG); free(MtLog[i].Submit); //释放数据包所占内存,在ReadMT_Queue()中malloc continue; //此短信不作转发处理 } else { #ifdef DEBUG printf("\n MTForword: Retransmiting a Deliver to UpNode[%d].\n",Gateway->UpNode[NodeNum].NodeID); #endif } //--------根据Id删除原MT_Queue表中的记录-------- memset(SqlCommand,0,sizeof(SqlCommand)); sprintf(SqlCommand,"Delete from %s where Id='%d';",TAB_MT_Queue,MtLog[i].Id); int DelNum = Gateway->MtDB->DBModify(SqlCommand); if(DelNum!=1) { #ifdef DEBUG printf("\n MTForword: [%d] Records are deleted from TAB_MT_Queue[%s].\n",DelNum,TAB_MT_Queue); #endif } if(Gateway->ToExit) //若收到系统退出命令 { for(int j=i+1; j<RecordNum; j++) free(MtLog[j].Submit); //释放数据包所占内存,在ReadMT_Queue()中malloc goto MtForwardEnd; } }//for处理下一条记录 if(Gateway->ToExit) //若收到系统退出命令 goto MtForwardEnd; else if(RecordNum < Gateway->Max_MT_Fetch/10) //实际获取的记录数小于一定数量则延时 usleep(Gateway->MT_DB_Interval); }//while(1)MtForwardEnd: free(MtLog); if(Gateway->ToExit) strcpy(sSysEvent,"LOG: MTForword is ended!"); else strcpy(sSysEvent,"Error: MTForword QUIT!"); Gateway->WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG); return (void *)NULL;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -