📄 moforward.cpp
字号:
/**********************************************************************
FileName : MoForword.cpp
Description : 下端短消息分发线程函数
Version : 1.0
Date : 2003年7月8日
Author : 刘荣辉
Other : 从数据库中提取MO短消息分发给各个下节点
***********************************************************************/
#include "GateWay.h"
void * CGateWay::MOForword(void *pGateway)
{
int threadid;
int i,RetCode;
char sSysEvent[SYS_EVENT_LEN];
CGateWay *Gateway;
char SqlCommand[500];
SendQUnit SUnit;
int AliveNum;
//void * pSubmit;
//int Sequence;
Gateway=(CGateWay *)pGateway;
threadid = pthread_self();
pthread_detach(threadid);
sprintf(sSysEvent,"LOG: MOForword[%d] is started.",threadid);
Gateway->WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG); //写系统日志
Map_Pos Pos;
CDnNode *theDnNode;
Gateway->Sock_CP_Map.PosInit(&Pos);
Cmpp_MO_Log *MoLog;
MoLog = (Cmpp_MO_Log *)malloc(sizeof(Cmpp_MO_Log) * Gateway->Max_MO_Fetch);
usleep(Gateway->MO_DB_Interval); //读数据库周期
//sleep(1000); goto MoForwardEnd;
while(1) //不断从数据库中提取记录进行转发
{
sprintf(SqlCommand,"select * from %s where DestNode in(-1",TAB_MO_Queue);
AliveNum = 0;
for(i=0;;i++) //提取的记录的目的下节点必须是当前在线的
{
if(Gateway->ToExit)
goto MoForwardEnd;
//获取下一个在线下节点的指针
theDnNode=NULL;
RetCode = Gateway->Sock_CP_Map.GetNext((char **)&theDnNode, &Pos);
if(theDnNode==NULL)
{
strcat(SqlCommand, ") ");
break;
}
AliveNum++;
strcat(SqlCommand, ",");
char tmpstr[15];
strcat(SqlCommand, CFunc::lrhitoa(theDnNode->NodeID,tmpstr));
}
if(AliveNum == 0)
{
if(!Gateway->ToExit)
usleep(Gateway->MO_DB_Interval); //读数据库周期
continue;
}
int RecordNum = Gateway->Max_MO_Fetch;
sprintf(&SqlCommand[strlen(SqlCommand)],"and ScheduleTime<now() and RelayTimes<=%d LIMIT %d;", Gateway->Max_RelayTimes,RecordNum);
for(;;)
{
if(Gateway->MoDB == NULL)
{
printf("\n Database connection is not initialized!\n");
if(Gateway->ToExit)
goto MoForwardEnd;
usleep(Gateway->MO_DB_Interval); //读数据库周期
}
else break;
}
//printf("\n MOForword: SqlCommand=[%s]. MoLog=%d, sizeof(Cmpp_MO_Log)=%d, Gateway->Max_MO_Fetch=%d. \n",SqlCommand,MoLog,sizeof(Cmpp_MO_Log), Gateway->Max_MO_Fetch);
RetCode = Gateway->MoDB->ReadMO_Queue(SqlCommand, MoLog, &RecordNum);
if(RetCode) //从数据库中获取待发记录失败
{
sprintf(sSysEvent,"Error[%d]: Geting records in MO_Queue from database!",RetCode);
Gateway->WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG); //写系统日志
printf("\n 数据库操作出错!MOForword程序退出!\n");
break;//??????????????????????????????
}
#ifdef DEBUG
if(RecordNum)
{
printf("\n MOForword: SQL=[%s]. \n",SqlCommand);
printf("\n MOForword: Got records from MO_Queue,RecordNum=[%d],RetCode=[%d].....\n",RecordNum,RetCode);
}
#endif
for(i=0;i<RecordNum;i++)
{
//逐条将待处理短信形成待发送队列单元
SUnit.SrcRecvTime = CFunc::StrToTime(MoLog[i].SrcRecvTime);
SUnit.SrcNode = MoLog[i].SrcNode;
memcpy(SUnit.ID_Relate, &MoLog[i].SrcMsgId, sizeof(MoLog[i].SrcMsgId));
time_t nowtime;
time(&nowtime);
SUnit.SendTime = nowtime;
SUnit.iResent = CMPP_SEND_RETRIES;
SUnit.RelayTimes = MoLog[i].RelayTimes;
SUnit.Pack = (char *) MoLog[i].Deliver;
//--------根据DestNode上行路由--------
CDnNode * DestDnNode;
DestDnNode = (CDnNode *) Gateway->Code_CP_Map.Find(MoLog[i].DestNode);
if(DestDnNode==NULL || DestDnNode->State!=0) //目的节点不在线
{
free(MoLog[i].Deliver); //释放数据包所占内存,在ReadMO_Queue()中malloc
continue; //此短信不作转发处理
}
//---------修改流水号---------
MoLog[i].Deliver->Head.Sequence_Id = htonl(DestDnNode->GetSequence());
if(Gateway->ToExit) //若收到系统退出命令
{
for(int j=i; j<RecordNum; j++)
free(MoLog[j].Deliver); //释放数据包所占内存,在ReadMO_Queue()中malloc
goto MoForwardEnd;
}
//-----------------转发尝试--------------------
if(DestDnNode->SendQ->Put(SUnit)==false)
{
if(Gateway->ToExit) //若收到系统退出命令
{
for(int j=i; j<RecordNum; j++)
free(MoLog[j].Deliver); //释放数据包所占内存,在ReadMO_Queue()中malloc
goto MoForwardEnd;
}
sprintf(sSysEvent,"Warning: SendQ of DnNode[%d] is full, A Pack is rejected!",DestDnNode->NodeID);
Gateway->WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG);
free(MoLog[i].Deliver); //释放数据包所占内存,在ReadMO_Queue()中malloc
continue; //此短信不作转发处理
}
else
{
#ifdef DEBUG
printf("\n MoForward: Retransmiting a Deliver to DnNode[%d].\n",DestDnNode->NodeID);
#endif
}
//--------根据Id删除原MO_Queue表中的记录--------
memset(SqlCommand,0,sizeof(SqlCommand));
sprintf(SqlCommand,"Delete from %s where Id='%d';",TAB_MO_Queue,MoLog[i].Id);
int DelNum = Gateway->MoDB->DBModify(SqlCommand);
if(DelNum!=1)
{
#ifdef DEBUG
printf("\n MoForward: [%d] Records are deleted from TAB_MO_Queue[%s].\n",DelNum,TAB_MO_Queue);
#endif
}
if(Gateway->ToExit) //若收到系统退出命令
{
for(int j=i+1; j<RecordNum; j++)
free(MoLog[j].Deliver); //释放数据包所占内存,在ReadMO_Queue()中malloc
goto MoForwardEnd;
}
}//for处理下一条记录
if(Gateway->ToExit) //若收到系统退出命令
goto MoForwardEnd;
else if(RecordNum < Gateway->Max_MO_Fetch/10) //实际获取的记录数小于一定数量则延时
usleep(Gateway->MO_DB_Interval);
}//while(1)
MoForwardEnd:
free(MoLog);
if(Gateway->ToExit)
strcpy(sSysEvent,"LOG: MoForward is ended!");
else
strcpy(sSysEvent,"Error: MoForward QUIT!");
Gateway->WrSystemLog->WriteLog(sSysEvent,SYSTEMLOG);
return (void *)NULL;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -