📄 mothread.cpp
字号:
// MoThread1.cpp: implementation of the CMoThread class.
//
//////////////////////////////////////////////////////////////////////
#include "stdafx.h"
#include "MoThread.h"
#include "ServiceAction.h"
#include "MoSocket.h"
#ifdef _DEBUG
#undef THIS_FILE
static char THIS_FILE[]=__FILE__;
#define new DEBUG_NEW
#endif
//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////
#define WAITDELIVER 120000
#define WAITACTIVERSP 120000 //等待ActiveTest120秒
#define RECONNECTTIME 1000 //自动重连时间
#define ACTIVETSTTIME 20000 //10秒连路检测
CMoThread::CMoThread(CServiceAction* pParentAction,UINT DeliverWaitTime)
{
::CoInitializeEx(
NULL, //always NULL
COINIT_APARTMENTTHREADED //see book about threading models
);
m_LocalPort=0;
m_ServiceNumber=""; //特服号
m_pMoSocket=NULL;
m_pParentAction =pParentAction;
m_DeliveredMsgs=0;
m_DeliverMsgs=0;
m_ConnectStatus=0;
m_NotRecDeliver=0;
m_ReconnectEvent=0;//自动重联
m_WaitDeliverTime=DeliverWaitTime;//等待Deliver包的时间,长短不同
m_bSavereport=FALSE;
m_NotWaitMoActive=0;
m_ReconnectTimes=0;
m_ProvinceNum=0;
}
CMoThread::~CMoThread()
{
}
void CMoThread::InitChildClass()
{
InitQueue();
// Sleep(m_WaitDeliverTime/10);
if(InitSocket()==FALSE)
return;
}
BOOL CMoThread::InitQueue()
{
::CoInitializeEx(
NULL, //always NULL
COINIT_APARTMENTTHREADED //see book about threading models
);
int i;
try{
for(i=0;i<m_ProvinceArray.GetSize();i++)
{
m_ProvinceArray.GetAt(i)->m_QueueInfoPtr.CreateInstance(__uuidof(MSMQQueueInfo));
if(m_ProvinceArray.GetAt(i)->m_QueueInfoPtr==NULL)
{
CString ErrorInfo;
ErrorInfo.Format("Init mo QueueInfoPtr Error");
ReportError(ErrorInfo,i);
}
m_ProvinceArray.GetAt(i)->m_MessagePtr.CreateInstance(__uuidof(MSMQMessage));
m_ProvinceArray.GetAt(i)->m_ReportMessage.CreateInstance(__uuidof(MSMQMessage));
m_ProvinceArray.GetAt(i)->m_QueueInfoPtr->PutFormatName(_bstr_t(m_ProvinceArray.GetAt(i)->m_InQueuePath));
m_ProvinceArray.GetAt(i)->m_QueuePtr=m_ProvinceArray.GetAt(i)->m_QueueInfoPtr->Open(MQ_SEND_ACCESS,MQ_DENY_NONE);
m_ProvinceArray.GetAt(i)->m_QueueInfoPtr->PutFormatName(_bstr_t(m_ProvinceArray.GetAt(i)->m_ReportQueuePath));
m_ProvinceArray.GetAt(i)->m_ReportQueuePtr=m_ProvinceArray.GetAt(i)->m_QueueInfoPtr->Open(MQ_SEND_ACCESS,MQ_DENY_NONE); //初始化状态报告队列
if(m_ProvinceArray.GetAt(i)->m_ReportQueuePtr==NULL)
{
CString ErrorInfo;
ErrorInfo.Format("Init ReportQueue Object Error:%s",m_ProvinceArray.GetAt(i)->m_ReportQueuePath);
ErrorInfo+="mo path:";
ErrorInfo+=m_InQueuePath;
ReportError(ErrorInfo,i);
}
CString Info;
Info.Format("init mo queue succ");
ReportError(Info,i);
}
}
catch(_com_error& e)
{
CMsmqerr errtemp;
CString ErrorInfo;
ErrorInfo.Format("Init MO queue error:%s",errtemp.GetErrText(e));
ReportError(ErrorInfo,i);
return FALSE;
}
return TRUE;
}
void CMoThread::ExitThread()
{
int i;
for(i=0;i<m_ProvinceArray.GetSize();i++)
{
if(m_ProvinceArray.GetAt(i)->m_QueuePtr!=NULL)
m_ProvinceArray.GetAt(i)->m_QueuePtr->Close();
delete m_ProvinceArray.GetAt(i);
m_ProvinceArray.RemoveAll();
}
if(m_pMoSocket!=NULL)
delete m_pMoSocket;
}
//初始化套接子
BOOL CMoThread::InitSocket()
{
m_pMoSocket=new CMoSocket;
m_pMoSocket->m_ParentThread=this;
int i;
for(i=0;i<m_ProvinceArray.GetSize();i++)
{
m_pMoSocket->InitProvince(m_ProvinceArray.GetAt(i)->m_csRemoteIP,i);
}
if(m_pMoSocket->CreateMoSocket(m_LocalPort,m_csRemoteIP,m_MoPassword,NULL))
{
m_MsgType="00";
m_ChildType="00";
CString ReportContent;
ReportContent.Format("Success to CreateSocket:%d",m_LocalPort);
for(i=0;i<m_ProvinceArray.GetSize();i++)
{
m_pParentAction->SaveStatusInfo(m_MsgType,m_ChildType,ReportContent,i);
}
return TRUE;
}
else
{
CString ReportContent;
ReportContent.Format("Fail to CreateSocket:%d",m_LocalPort);
for(i=0;i<m_ProvinceArray.GetSize();i++)
{
ReportError(ReportContent,i);
}
return FALSE;
}
}
//报告错误
void CMoThread::ReportError(CString &csErrorInfo,int Index)
{
CString MsgType="02";
CString ChildType="03";
m_pParentAction->SaveStatusInfo(MsgType,ChildType,csErrorInfo,Index);
}
//报告普通消息
void CMoThread::ReportNormalInfo(CString &MsgType, CString &ChildType, CString &Content,int Index)
{
m_pParentAction->SaveStatusInfo(MsgType,ChildType,Content,Index);
}
//处理Deliver
void CMoThread::OnDeliver(CString &DestNumber, CString &SrcNumber, CString &RecContent,int Index,CString LinkID)
{
CString csTemp,MsgType,ChildType,csLabel;
if(SrcNumber.GetLength()<11)
{
CString csError;
csError.Format("Receive Wrong PhoneNum:%s",SrcNumber);
ReportError(csError,Index);
return;
}
csTemp.Format("Recv:%s:%s:%s:%s",SrcNumber,DestNumber,RecContent,LinkID);
MsgType="00";
ChildType="03";
m_pParentAction->SaveStatusInfo(MsgType,ChildType,csTemp,Index);
m_DeliverMsgs++;
//写入对列
CString csLastNum=DestNumber.Mid(m_ProvinceArray.GetAt(Index)->m_csServerNum.GetLength());
if(csLastNum=="")
csLabel.Format("00_%s_%s_no_%s",m_ProvinceArray.GetAt(Index)->m_csProvince ,SrcNumber,LinkID);
else
csLabel.Format("00_%s_%s_%s_%s",m_ProvinceArray.GetAt(Index)->m_csProvince ,SrcNumber,csLastNum,LinkID);
try{
m_ProvinceArray.GetAt(Index)->m_MessagePtr->Label=_bstr_t(csLabel);
RecContent.TrimLeft();
RecContent.TrimRight();
m_ProvinceArray.GetAt(Index)->m_MessagePtr->PutBody(_variant_t(RecContent));
m_ProvinceArray.GetAt(Index)->m_MessagePtr->Send(m_ProvinceArray.GetAt(Index)->m_QueuePtr);
}
catch(_com_error&e)
{
CMsmqerr errtemp;
CString ErrorInfo;
ErrorInfo.Format("Send to queue error:%s",errtemp.GetErrText(e));
ReportError(ErrorInfo,Index);
}
}
//收到DeliverReport
void CMoThread::OnDeliverReport(char& Stat, char* MsgID,CString& MobileNum,int Index)
{
CString csDisplayInfo,MsgType,ChildType;
__int64 iMsgID=0;
memcpy(&iMsgID,MsgID+4,8);
char Temp[20];
_i64toa(iMsgID,Temp,16);
csDisplayInfo.Format("Deliver Report-MsgID:%s-Stat:%d-Mobile:%s",
Temp,Stat,MobileNum);
MsgType="00";
ChildType="04";
m_pParentAction->SaveStatusInfo(MsgType,ChildType,csDisplayInfo,Index);
if(Stat==0) //发送成功
m_DeliveredMsgs++;
CString csLabel;
if(MobileNum.Left(2)=="86")
MobileNum=MobileNum.Right(MobileNum.GetLength()-2);
try{
csLabel.Format("02_%s_%s_%s_%d_0000_0000",m_ProvinceArray.GetAt(Index)->m_csProvince,MobileNum,Temp,Stat);
m_ProvinceArray.GetAt(Index)->m_ReportMessage->Label=_bstr_t(csLabel);
m_ProvinceArray.GetAt(Index)->m_ReportMessage->Body=_variant_t("");
m_ProvinceArray.GetAt(Index)->m_ReportMessage->Send(m_ProvinceArray.GetAt(Index)->m_ReportQueuePtr);
}
catch(_com_error& e)
{
CMsmqerr errtemp;
CString ErrorInfo;
ErrorInfo.Format("Write SubmitResponse to Report queue error:%s",errtemp.GetErrText(e));
ReportError(ErrorInfo,Index);
}
}
void CMoThread::GetDeliverMsgs(UINT &DeliverMsgs, UINT &DeliveredMsgs)
{
DeliverMsgs=m_DeliverMsgs;
DeliveredMsgs=m_DeliveredMsgs;
}
//初始化
void CMoThread::InitConnectParam(CString &Province,CString &RemoteIP, UINT &LocalPort, CString &ICPID, CString &UserName, CString &Password, CString &ServiceNumber,int Index,CString InQueuePath,CString ReportQueuePath,CString csServiceNum)
{
StructProvince * TempProvince=new StructProvince;
TempProvince->m_csProvince=Province;
TempProvince->m_csRemoteIP=RemoteIP;
TempProvince->m_LocalPort=LocalPort;
TempProvince->m_Index=Index;
TempProvince->m_InQueuePath=InQueuePath;
TempProvince->m_ReportQueuePath=ReportQueuePath;
TempProvince->m_csServerNum =csServiceNum;
m_ProvinceArray.Add(TempProvince);
m_ProvinceNum =m_ProvinceArray.GetSize();
m_LocalPort=LocalPort;
}
void CMoThread::OnTimer(UINT EventID)
{
}
void CMoThread::SaveReport(BOOL bSavereport)
{
m_bSavereport=bSavereport;
}
//得到连接符串
void CMoThread::GetConnectStatus(CString &csConnectStatus)
{
if(m_ConnectStatus==0)
csConnectStatus="未连接";
else if(m_ConnectStatus==1)
csConnectStatus="连接正常";
else if(m_ConnectStatus==2)
csConnectStatus="自动重连";
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -