📄 emppsms.cpp
字号:
// EmppSms.cpp : Defines the entry point for the console application.
//
#include "stdafx.h"
#include "EmppSms.h"
#include "data.h"
#include "AmHttpSocket.h"
#include "process.h"
#ifdef _DEBUG
#define new DEBUG_NEW
#undef THIS_FILE
static char THIS_FILE[] = __FILE__;
#endif
#define MT_THREADS 3
#define MO_THREADS 5
#define REPORT_THREADS 5
#include "Operator.h"
#include "SCmpp30Operator.h"
#include "yes.h"
#define GENADOSTRING(dest,src) if(src.vt==VT_NULL) dest[0]=0; else strcpy(dest,(char*)(_bstr_t)src)
#define GENADOCSTR(dest,src) if(src.vt==VT_NULL) dest=""; else dest=(LPCTSTR)(_bstr_t)src
void HanldeMoThread(LPVOID lp);
void HanldeMtThread(LPVOID lp);
void HanldeReportThread(LPVOID lp);
CPtrList m_molist;
CPtrList m_reportlist;
CPtrList m_smslist;
COperator * op;
CAmHttpSocket http;
char mourl[250];
/////////////////////////////////////////////////////////////////////////////
// The one and only application object
CWinApp theApp;
#define MAX_BUFFER 10
#define MAX_DBS 5
long m_totalcounts=0;
bool m_run=true;
#define MAX_SMSOBJECT 5
using namespace std;
CCriticalSection m_showlock;
CCriticalSection m_datalock;
CCriticalSection m_molock;
CCriticalSection m_reportlock;
CCriticalSection m_dblock;
CAdoDB dbpool[MAX_DBS];
void GetSMSFromDB(LPVOID lp);
void InitdbPool()
{
for(int i=0;i<MAX_DBS;i++)
{
dbpool[i].Initconn();
dbpool[i].ConnectDB();
}
}
CAdoDB * GetDbFromPool()
{
while(1)
{
m_dblock.Lock();
for(int i=0;i<MAX_DBS;i++)
{
if(!dbpool[i].busy)
{
dbpool[i].busy=true;
m_dblock.Unlock();
return &dbpool[i];
}
}
m_dblock.Unlock();
Sleep(1);
}
return NULL;
}
CString ConvertToUrlString(CString strSource)
{
CString strDest;
char strTemp[3];
int i, x;
int length=strSource.GetLength();
strDest.Empty();
for( i=0; i<length; i++)
{
x = (int)(BYTE)strSource.GetAt(i);
if( x<16 )
{
strTemp[0]='0';
itoa(x, strTemp+1, 16);
}
else
itoa(x, strTemp, 16);
strDest=strDest+"%"+strTemp;
}
strDest.MakeUpper();
return strDest;
}
CString GenUrl(SMSObject* obj)
{
CString ret="";
return ret;
}
void ShowMsg(CString msg)
{
m_showlock.Lock();
CString strmsg;
CTime t=CTime::GetCurrentTime();
strmsg.Format("[%s] %s",t.Format("%Y-%m-%d %H:%M:%S"),msg);
cout<<(char*)(LPCTSTR)strmsg<<endl;
m_showlock.Unlock();
}
int _tmain(int argc, TCHAR* argv[], TCHAR* envp[])
{
int nRetCode = 0;
// initialize MFC and print and error on failure
if (!AfxWinInit(::GetModuleHandle(NULL), NULL, ::GetCommandLine(), 0))
{
// TODO: change error code to suit your needs
cerr << _T("Fatal Error: MFC initialization failed") << endl;
nRetCode = 1;
}
else
{
// TODO: code your application's behavior here.
CString strHello;
strHello.LoadString(IDS_HELLO);
cout << (LPCTSTR)strHello << endl;
#if _WIN32_WINNT >= 0x0400
HRESULT hRes = CoInitializeEx(NULL, COINIT_MULTITHREADED);
#else
HRESULT hRes = CoInitialize(NULL);
#endif
m_smslist.RemoveAll();
GetPrivateProfileString("System","mourl","",mourl,250,CONFIGFILE);
InitSocket();
InitdbPool();
op=new CSCmpp30Operator(0);
op->LoadSetting();
op->InitOperator();
op->StartMoRecv();
for(int i=0;i<MT_THREADS;i++)
{
_beginthread(HanldeMtThread,0,(void*)i);
}
for(i=0;i<MO_THREADS;i++)
{
_beginthread(HanldeMoThread,0,(void*)i);
}
for(i=0;i<REPORT_THREADS;i++)
{
_beginthread(HanldeReportThread,0,(void*)i);
}
_beginthread(GetSMSFromDB,0,0);
while(1)
{
char command[100];
cin>>command;
CString strCommand(command);
if(strCommand.CompareNoCase("SHUTDOWN")==0)
{
op->StopMoRecv();
m_run=false;
Sleep(3000);
break;
}
if(strCommand.CompareNoCase("SEND")==0)
{
char mobile[100];
cout<<"mobile:";
cin>>mobile;
char msg[200];
cout<<"message:";
cin>>msg;
op->Submit(msg,mobile,"555500961","555500961",NULL,NULL);
}
}
delete op;
CoUninitialize();
}
return nRetCode;
}
void GetSMSFromDB(LPVOID lp)
{
CAdoDB db;
if(!db.Initconn())
{
ShowMsg("创建数据库对象失败,请重起本程序");
return;
}
if(!db.ConnectDB())
{
ShowMsg("连接数据库失败,不能提取任务,请重起本程序");
return;
}
ShowMsg("连接数据库成功,正在任务......");
CString sql="update smslist set state=0 where state=1";
db.ExecuteSql((char*)(LPCTSTR)sql);
CPtrList m_tempsmslist;
CString m_delstring="";
while(m_run)
{
m_delstring="";
int m_numbers=0;
m_numbers=MAX_BUFFER-m_smslist.GetCount();
if(m_numbers<=0)
{
Sleep(1);
continue;
}
m_tempsmslist.RemoveAll();
CString sql="";
sql.Format("select top %d * from smslist where state=0 and (sendtime is null or datediff(second,sendtime,getdate())>=0) order by id",m_numbers);
_RecordsetPtr rs;
if(db.Get_Record((char*)(LPCTSTR)sql,(IDispatch**)&rs))
{
if(!rs->GetadoEOF()&&!rs->GetadoBOF())
{
while(!rs->GetadoEOF())
{
SMSObject* obj=new SMSObject;
m_tempsmslist.AddTail(obj);
obj->id=rs->GetCollect(L"id");
GENADOCSTR(obj->mobile,rs->GetCollect(L"mobile"));
GENADOCSTR(obj->msg,rs->GetCollect(L"msg"));
GENADOCSTR(obj->scrid,rs->GetCollect(L"srcid"));
GENADOCSTR(obj->svcid,rs->GetCollect(L"svcid"));
CString tid;
tid.Format(",%ld",obj->id);
m_delstring=m_delstring + tid;
rs->MoveNext();
}
rs->Close();
m_delstring.TrimLeft(",");
m_delstring="update smslist set state=1 where id in (" + m_delstring +")";
db.ExecuteSql((char*)(LPCTSTR)m_delstring);
int m_tmpcount=m_tempsmslist.GetCount();
for(int i=0;i<m_tmpcount;i++)
{
SMSObject* obj=(SMSObject*)m_tempsmslist.RemoveHead();
//InterlockedIncrement((LPLONG)&m_totalcounts);
m_datalock.Lock();
m_smslist.AddTail(obj);
m_datalock.Unlock();
}
m_tempsmslist.RemoveAll();
}else
{
rs->Close();
}
}
Sleep(1);
}
}
void HanldeMoThread(LPVOID lp)
{
int index=(int)lp;
while(m_run)
{
m_molock.Lock();
CMPP_Deliver_Body* body=NULL;
if(!m_molist.IsEmpty())
body=(CMPP_Deliver_Body*)m_molist.RemoveHead();
m_molock.Unlock();
if(body)
{
if(strlen(mourl)>0)
{
DWORD size;
CString sobject="";
sobject.Format("msg=%s&src=%s&dest=%s&recvtime=%s",ConvertToUrlString(body->pMsg),body->srcterminal,body->destterminal,body->recvtime);
char* szobject=(char*)(LPCTSTR)sobject;
char* content=http.GetPage(mourl,size,(szobject==NULL)?false:true,szobject,(szobject==NULL)?-1:strlen(szobject));
int pagecode=0;
if(content)
{
pagecode=http.GetPageStatusCode();
if(pagecode==200)
{
CString msg;
msg.Format("MO Thread %d:处理mo结果成功!",index);
ShowMsg(msg);
}
http.FreeBuffer();
}
}
delete body;
}
Sleep(1);
}
}
void HanldeMtThread(LPVOID lp)
{
while(m_run)
{
m_datalock.Lock();
SMSObject* body=NULL;
if(!m_smslist.IsEmpty())
body=(SMSObject*)m_smslist.RemoveHead();
m_datalock.Unlock();
if(body)
{
char msgid[21];
unsigned long state;
op->Submit(body->msg,body->mobile,body->scrid,body->svcid,msgid,&state);
CString sql="";
if(state==10004||state==10005||state==10006||state==20000)
sql.Format("update smslist set state=0,SendSeqId=%u,sendtime=dateadd(second,15,getdate()) where id=%ld",state,body->id);
else
sql.Format("update smslist set state=2,Messageid='%s',SendSeqId=%u where id=%ld",msgid,state,body->id);
delete body;
CAdoDB* db=GetDbFromPool();
db->ExecuteSql((char*)(LPCTSTR)sql);
db->busy=false;
}
Sleep(1);
}
}
CString CovertReportState(BYTE state)
{
switch(state)
{
case SM_DELIVERED:
return "DELIVERED";
case SM_EXPIRED:
return "EXPIRED";
case SM_DELETED:
return "DELETED";
case SM_UNDELIVERABLE:
return "UNDELIVERABLE";
case SM_ACCEPTED:
return "ACCEPTED";
case SM_UNKNOWN:
return "UNKNOWN";
case SM_REJECTED:
return "REJECTED";
case SM_WAITING:
return "WAITING";
case SM_FAILURE:
return "FAILURE";
default:
return "UNKNOWN";
}
}
void HanldeReportThread(LPVOID lp)
{
while(m_run)
{
m_reportlock.Lock();
CMPP_Report_Body* body=NULL;
if(!m_reportlist.IsEmpty())
body=(CMPP_Report_Body*)m_reportlist.RemoveHead();
m_reportlock.Unlock();
if(body)
{
CString sql="";
sql.Format("update smslist set SendResult=%u,statusReport='%s' where Messageid='%s'",body->stat,CovertReportState(body->stat),body->msgid);
delete body;
CAdoDB* db=GetDbFromPool();
db->ExecuteSql((char*)(LPCTSTR)sql);
db->busy=false;
}
Sleep(1);
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -