📄 sgipapp.cpp
字号:
/*
* SGIPApp.cpp
*/
#include "sgip.h"
#include "SGIPApp.h"
#include "SGIPComm.h"
#include "SGIPPkg.h"
#include <iostream.h>
#define MAX_QUEUE_LEN 1000
//global
unsigned int g_nSeqNo = 0; //系列号
char g_cSrcID[11] = "";//节点编号
char g_cClientIP[16] = "";
int g_iLocalPort = 0;
char g_cServerIP[16] = "";
int g_iSrvPort = 0;
int g_iAcceptNum = 16;
char g_cRUserName[16] = "";
char g_cRUserPwd[16] = "";
char g_cLUserName[16] = "";
char g_cLUserPwd[16] = "";
char g_cSpNum[21] = "";
char g_cCorpID[7] = "";
static bool g_bWingateServerCreate = false;
static pthread_mutex_t g_WGSrvCreate_mutex;
// static int g_nListenfd = -1;
CSGIPApp::CSGIPApp()
{
tLastSendSubmitTime = 0;
bTermitnate = false;
nSubmitTimes = 0;
m_Listenfd = -1;
bInitConf = false;
//add liaomch 2003-08-14
m_nSendSock = -1;
bSendFinish = false;
pthread_mutex_init(&m_DeliverVec_mutex , NULL);
pthread_mutex_init(&m_ReportVec_mutex , NULL);
pthread_mutex_init(&m_AcceptSock_mutex , NULL);
if (!g_bWingateServerCreate)
pthread_mutex_init(&g_WGSrvCreate_mutex, NULL);
m_pLog = new CMobilLog();
}
CSGIPApp::~CSGIPApp()
{
if (m_pLog != NULL)
delete m_pLog;
if (m_Listenfd != -1)
close(m_Listenfd);
vector<SGIP_DELIVER_PACKAGE * > ::iterator it;
SGIP_DELIVER_PACKAGE * pBody = NULL;
pthread_mutex_lock(&m_DeliverVec_mutex);
for(it = m_pDeliver.begin();it != m_pDeliver.end(); it++)
{
if((*it) == NULL)
{
continue;
}
pBody = *it;
delete (char *)pBody;
}
m_pDeliver.clear();
pthread_mutex_unlock(&m_DeliverVec_mutex);
vector<SGIP_REPORT_PACKAGE * > ::iterator it1;
SGIP_REPORT_PACKAGE * pBody1 = NULL;
pthread_mutex_lock(&m_ReportVec_mutex);
for(it1 = m_pReport.begin();it1 != m_pReport.end(); it1++)
{
if((*it1) == NULL)
{
continue;
}
pBody1 = *it1;
delete (char *)pBody1;
}
m_pReport.clear();
pthread_mutex_unlock(&m_ReportVec_mutex);
}
void CSGIPApp::GetValByName(const char *pName, char *pBuf, char *pVal)
{
if ((pName == NULL) || (pBuf == NULL) || (pVal == NULL))
return;
char *pTmp = NULL;
if ((pTmp = strstr(pBuf, pName)) == NULL)
return;
char *p = strstr(pTmp, "=");
if (p == NULL)
return;
p++;
while ((*p!=' ') && (*p!='\t') && (*p!='\n') && (*p!='\r'))
{
*pVal++ = *p++;
}
return;
}
bool CSGIPApp::initConfig(char *pFile = NULL)
{
int nHand = 0;
int nLen = 0;
char cBuf[1024] = "";
if (pFile != NULL)
{
if ((nHand = open(pFile, O_RDONLY)) == END_FAILED)
return false;
}
else
{
if ((nHand = open("./sgipc.ini", O_RDONLY)) == END_FAILED)
{
if ((nHand = open("../config/sgipc.ini", O_RDONLY)) == END_FAILED)
return false;
}
}
//lseek(nHand, 0L, SEEK_SET);
nLen = lseek(nHand, 0L, SEEK_END);
lseek(nHand, 0L, SEEK_SET);
if (nLen != read(nHand, &cBuf, nLen))
{
close(nHand);
return false;
}
else
close(nHand);
memset(g_cClientIP, 0, sizeof(g_cClientIP));
memset(g_cServerIP, 0, sizeof(g_cServerIP));
memset(g_cCorpID, 0, sizeof(g_cCorpID));
memset(g_cSpNum, 0, sizeof(g_cSpNum));
memset(g_cRUserName, 0, sizeof(g_cRUserName));
memset(g_cRUserPwd, 0, sizeof(g_cRUserPwd));
memset(g_cLUserName, 0, sizeof(g_cLUserName));
memset(g_cLUserPwd, 0, sizeof(g_cLUserPwd));
memset(g_cSrcID, 0, sizeof(g_cSrcID));
GetValByName((const char *)&"ClientIP", (char *)&cBuf, (char *)&g_cClientIP);
GetValByName((const char *)&"ServerIP", (char *)&cBuf, (char *)&g_cServerIP);
GetValByName((const char *)&"CorpID", (char *)&cBuf, (char *)&g_cCorpID);
GetValByName((const char *)&"SpNum", (char *)&cBuf, (char *)&g_cSpNum);
GetValByName((const char *)&"RUserName", (char *)&cBuf, (char *)&g_cRUserName);
GetValByName((const char *)&"RUserPwd", (char *)&cBuf, (char *)&g_cRUserPwd);
GetValByName((const char *)&"LUserName", (char *)&cBuf, (char *)&g_cLUserName);
GetValByName((const char *)&"LUserPwd", (char *)&cBuf, (char *)&g_cLUserPwd);
GetValByName((const char *)&"SourceID", (char *)&cBuf, (char *)&g_cSrcID);
char cTmp[8]="";
memset(&cTmp, 0, sizeof(cTmp));
GetValByName((const char *)&"ServerPort", (char *)&cBuf, (char *)&cTmp);
g_iSrvPort = atoi(cTmp);
memset(&cTmp, 0, sizeof(cTmp));
GetValByName((const char *)&"LocalPort", (char *)&cBuf, (char *)&cTmp);
g_iLocalPort = atoi(cTmp);
bInitConf = true;
return true;
}
//封装的对外接口:
bool CSGIPApp::SGIPInit(char *pConfFile = NULL)
{
//config:
if (!bInitConf)
{
if (initConfig(pConfFile) != true)
{
printf("CSGIPApp::Init(%s) faild!\n", pConfFile);
bInitConf = false;
return false;
}
}
#ifdef __DEBUG_API__STOP
printf("g_nSeqNo = %d\n"
"g_cSrcID = %s\n"
"g_cClientIP = %s\n"
"g_iLocalPort = %d\n"
"g_cServerIP = %s\n"
"g_iSrvPort = %d\n"
"g_iAcceptNum = %d\n"
"g_cRUserName = %s\n"
"g_cRUserPwd = %s\n"
"g_cSpNum = %s\n"
"g_cCorpID = %s\n",
g_nSeqNo,
g_cSrcID,
g_cClientIP,
g_iLocalPort,
g_cServerIP,
g_iSrvPort,
g_iAcceptNum,
g_cRUserName,
g_cRUserPwd,
g_cSpNum,
g_cCorpID);
#endif
//start server:
pthread_attr_t pAttr;
pthread_attr_init(&pAttr);
pthread_attr_setdetachstate(&pAttr, PTHREAD_CREATE_DETACHED);
pthread_t iThrRecv = 0;
int ret = pthread_create(&iThrRecv, &pAttr, (void *(*) (void *))GetAcceptProc, (void *)this);
if(ret != 0)
{
printf("accept succeed but new thread error!\n");
return false;
}
else
printf("create Listen thread ID:%ld succeed!\n", iThrRecv);
return true;
}
void CSGIPApp::SGIPFinal(void)
{
if (m_nSendSock > 0)
{
DisConnect(m_nSendSock);
m_nSendSock = -1;
}
return;
}
int CSGIPApp::SGIPDeliver(SGIP_DELIVER_BODY &sDeliver, char *pMsgid, int nTimeOut)
{
time_t tBase = time(NULL);
vector<SGIP_DELIVER_PACKAGE * > ::iterator it;
SGIP_DELIVER_PACKAGE * pBody = NULL;
while (!HadTimeOut(tBase, nTimeOut))
{
if (m_pDeliver.size() != 0)
{
#ifdef __DEBUG_API__
cout << "deliver list total: " << m_pDeliver.size() << endl;
#endif
pthread_mutex_lock(&m_DeliverVec_mutex);
for(it = m_pDeliver.begin();it != m_pDeliver.end(); it++)
{
if((*it) == NULL)
{
m_pDeliver.erase(it);
it--;
continue;
}
pBody = *it;
memcpy(&sDeliver, &pBody->Body, sizeof(SGIP_DELIVER_BODY));
memcpy(pMsgid, pBody->cMsgid, sizeof(pBody->cMsgid));
m_pDeliver.erase(it);
delete (char *)pBody;
break;
}
pthread_mutex_unlock(&m_DeliverVec_mutex);
char cLogBuf[50] = "";
memset(cLogBuf, 0, sizeof(cLogBuf));
sprintf(cLogBuf, "MO-DELIVER:%s\n", pMsgid);
m_pLog->add(cLogBuf);
return END_SUCCEED;
}
else
usleep(10);
}
return END_FAILED;
}
int CSGIPApp::SGIPReport(SGIP_REPORT_BODY &sReport, char *pMsgid, int nTimeOut)
{
time_t tBase = time(NULL);
vector<SGIP_REPORT_PACKAGE * > ::iterator it;
SGIP_REPORT_PACKAGE *pBody = NULL;
while (!HadTimeOut(tBase, nTimeOut))
{
if (m_pReport.size() != 0)
{
#ifdef __DEBUG_API__
cout << "Report list total: " << m_pReport.size() << endl;
#endif
CMobilLog m_log;
pthread_mutex_lock(&m_ReportVec_mutex);
for(it = m_pReport.begin();it != m_pReport.end(); it++)
{
if((*it) == NULL)
{
m_pReport.erase(it);
it--;
continue;
}
pBody = *it;
memcpy(&sReport, &pBody->Body, sizeof(SGIP_REPORT_BODY));
memcpy(pMsgid, &pBody->cMsgid, sizeof(pBody->cMsgid));
m_pReport.erase(it);
delete (char *)pBody;
break;
}
pthread_mutex_unlock(&m_ReportVec_mutex);
char cLogBuf[50] = "";
memset(cLogBuf, 0, sizeof(cLogBuf));
sprintf(cLogBuf, "MO-REPORT:%s\n", pMsgid);
m_pLog->add(cLogBuf);
return END_SUCCEED;
}
else
usleep(10);
}
return END_FAILED;
}
int CSGIPApp::SGIPSingleSend(SGIP_SUBMIT_BODY &sSubmit, char *pMsgid)
{
char cBuf[5120] = "";
//int nSockfd = -1;
if (m_nSendSock < 0)
{
if ((m_nSendSock = DoConnect((char *)g_cServerIP, g_iSrvPort)) == END_FAILED)
{
printf("SingleSend DoConnect faild!\n");
m_nSendSock = -1;
return END_FAILED;
}
else
{
printf("SingleSend DoConnect succeed!\n");
}
}
//发送
nSubmitTimes = 0;//同一个Submit已发送的次数
while (nSubmitTimes < 3)//nSubmitRetry)
{
nSubmitTimes++;
memset(&cBuf, 0, sizeof(cBuf));
int nBufLen = 0;
m_pkg->DeComposePackage(SGIP_SUBMIT, (SGIP_MSG_BODY &)sSubmit, (char *)cBuf, &nBufLen);
SGIP_HEAD *pMsgHead = (SGIP_HEAD *)cBuf;
sprintf(pMsgid, "%u%010d%d",
ntohl(pMsgHead->sSeqNo.iSrcID),
ntohl(pMsgHead->sSeqNo.iDate),
ntohl(pMsgHead->sSeqNo.iSeqno));
char cLogBuf[50] = "";
memset(cLogBuf, 0, sizeof(cLogBuf));
sprintf(cLogBuf, "MT-SUBMIT:%s\t", pMsgid);
m_pLog->add(cLogBuf);
while (m_nSendSock < 0)
{
m_nSendSock = DoConnect((char *)g_cServerIP, g_iSrvPort);
if (m_nSendSock < 0)
{
printf("====reconnect to the remote host error!=====\n");
usleep(1000);
}
}
int nRecvLen = 0;
if ( !SendToSMC(m_nSendSock, (const char *)&cBuf, (nBufLen)) )
{
//send err, socket error!
close(m_nSendSock);
m_nSendSock = -1;
usleep(1000);
m_pLog->add("... socket error!\n");
continue;
}
else
{
//end
printf("Submit %d times send succeed! recv response... ...\n", nSubmitTimes);
while (true)
{
nRecvLen = sizeof(cBuf);
if ((RecvFromSMC(m_nSendSock, (char *)cBuf, &nRecvLen, 60)) == END_SUCCEED)
{
unsigned int iMsgType = 0;
if ((iMsgType = m_pkg->GetPackageType((const char *)cBuf)) == SGIP_SUBMIT_RESP)
//
{
//recv the submit resp:
int iRespType = -1;
SGIP_SUBMITRESP_MSG *pMsg = (SGIP_SUBMITRESP_MSG *)cBuf;
iRespType = pMsg->Body.cResult;
//here must add the compress the MSGID
//between send submit and recv response
//....
if (iRespType == 0)
{
if (bSendFinish)
{
DisConnect(m_nSendSock);
m_nSendSock = -1;
}
m_pLog->add("...succeed!\n");
return END_SUCCEED;
}
else
{
printf("submit package error! error code = %d\n", iRespType);
//continue;
//break;
//send submit succeed, but recv resp error
if (bSendFinish)
{
DisConnect(m_nSendSock);
m_nSendSock = -1;
}
m_pLog->add("...resp error!\n");
return iRespType;
}
}
else if (iMsgType == SGIP_REPORT)
{
printf("recv a report package!\n");
SGIP_REPORT_PACKAGE *pBody = (SGIP_REPORT_PACKAGE *)new char[sizeof(SGIP_REPORT_PACKAGE)];
SGIP_REPORT_MSG *pMsg = NULL;
pMsg = (SGIP_REPORT_MSG *)cBuf;
SGIP_REPORT_MSG stReport;
memset(&stReport, 0, sizeof(stReport));
m_pkg->ComposePackage(SGIP_REPORT, (SGIP_MSG &)stReport, (const char *)cBuf);
memcpy(&pBody->Body, &stReport.Body, sizeof(stReport.Body));
sprintf(pBody->cMsgid, "%u%010d%d",
ntohl(stReport.Head.sSeqNo.iSrcID),
ntohl(stReport.Head.sSeqNo.iDate),
ntohl(stReport.Head.sSeqNo.iSeqno)
);
pthread_mutex_lock(&m_ReportVec_mutex);
int nTmpResult = -1;
if ((m_pReport.size() < MAX_QUEUE_LEN) &&
(m_pReport.size() < m_pReport.max_size()))
{
m_pReport.push_back(pBody);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -