📄 cmppsocket_vc.cpp
字号:
// 拷贝数据到自己的内存区域,避免在临界区中发送数据
memcpy( (void *)window, (void *)cmpp._window, sizeof( window));
// 搜寻并修改到期时间
for( i=0; i<nCMPP_WINDOW_SIZE; i++)
{
// 空位,跳过
if( cmpp._window[i].head.cmdid == 0)
continue;
// 未到发送时间,跳过
if( cmpp._window[i].t > time( NULL))
continue;
// 设置下一次发送的时间,当前时间+60秒
cmpp._window[i].t += 60;
}
LeaveCriticalSection( &cmpp._csec_wnd);
// 搜寻并发送到期的数据
for( i=0; i<nCMPP_WINDOW_SIZE; i++)
{
// 空位,跳过
if( window[i].head.cmdid == 0)
continue;
// 未到发送时间,跳过
if( window[i].t > time( NULL))
continue;
// 发送
nsize = ntohl( window[i].head.size);
err = cmpp._send( (char *)&window[i], nsize);
// 发送出错,结束线程
if( err != nsize)
return 0;
}
// 主线程请求退出
if( cmpp._bexitting)
break;
}
return 0;
}
DWORD WINAPI CcmppSocket::thread_recv( LPVOID pdata)
{
int err, i;
long prevcount;
FD_SET fdset;
TIMEVAL timeout;
CcmppSocket &cmpp = *( CcmppSocket *)pdata;
CMPP_PACKAGE pkg;
memset((void *)&pkg,0,sizeof(pkg));
FD_ZERO( &fdset);
FD_SET( cmpp._soc, &fdset);
// 轮询间隔1秒
timeout.tv_sec = 1;
timeout.tv_usec= 0;
#ifdef _DEBUG
_dbgeventlst.push_back("Thread Recieve : Thread Begin.");
#endif
for( ;;)
{
FD_ZERO( &fdset);
FD_SET( cmpp._soc, &fdset);
err = select(
0,
&fdset,
NULL,
NULL,
&timeout);
// 出错
if( err == SOCKET_ERROR)
{
#ifdef _DEBUG
sprintf(_dbgtemp,"Thread Recieve : Socket Wait for Data Error.Error Code=%d",WSAGetLastError());
_dbgeventlst.push_back(_dbgtemp);
#endif
continue;//break;
}
// 超时
if( err == 0)
continue;
// 先接收包头部分,以确定包的大小、类型
err = cmpp._recv( (char *)&pkg.head, sizeof( pkg.head));
if( err != sizeof( pkg.head))
{
#ifdef _DEBUG
sprintf(_dbgtemp,"Thread Recieve : Recieve Header Return Code=%d Needed Length=%d Error=%d.",err,sizeof( pkg.head),WSAGetLastError());
_dbgeventlst.push_back(_dbgtemp);
#endif
continue;//break;
}
// 接收包体
err = cmpp._recv( pkg.data, ntohl( pkg.head.size )-sizeof( pkg.head));
if( err == SOCKET_ERROR)
{
#ifdef _DEBUG
sprintf(_dbgtemp,"Thread Recieve : Socket Wait for Data Error.Error Code=%d",WSAGetLastError());
_dbgeventlst.push_back(_dbgtemp);
#endif
continue;//break;
}
switch (ntohl(pkg.head.cmdid))
{
case nCMPP_SUBMIT_RESP:
{
CMPP_SUBMIT_RESP & _smtrsp=*(CMPP_SUBMIT_RESP *)&pkg.data;
if (err!=sizeof(CMPP_SUBMIT_RESP))
{
#ifdef _DEBUG
sprintf(_dbgtemp,"Thread Recieve : Data Body Length Error.Length=%d",err);
_dbgeventlst.push_back(_dbgtemp);
#endif
break;
}
// 申请数据发送窗口的使用权
EnterCriticalSection( &cmpp._csec_wnd);
// 删除对应流水号的包
for( i=0; i<nCMPP_WINDOW_SIZE; i++)
{
if( cmpp._window[i].head.cmdid == 0)
continue;
if( cmpp._window[i].head.seqid == pkg.head.seqid)
{
cmpp._window[i].head.cmdid = 0;
break;
}
}
LeaveCriticalSection( &cmpp._csec_wnd);
// 释放一个窗口格子
ReleaseSemaphore(
cmpp._hsema_wnd,
1,
&prevcount);
#ifdef _DEBUG
sprintf(_dbgtemp,"Thread Recieve : Recieve CMPP_SUBMIT_RESP CMD. SeqID=%d",ntohl(pkg.head.seqid));
_dbgeventlst.push_back(_dbgtemp);
#endif
break;
}
//2. CMPP_DELIVER
case nCMPP_DELIVER:
{
CMPP_DELIVER & _dlv=*(CMPP_DELIVER *) &pkg.data;
printf("\nRecieve Data.\n");
memcpy((void *)_dlv.LinkID,(void *)(_dlv.msgcontent+_dlv.msglen),20);
_dlv.msgcontent[_dlv.msglen]=0x0;
_dlv.LinkID[20]='\0';
printf(" Size: %d\n",ntohl(pkg.head.size));
printf(" cmdid: %d\n",ntohl(pkg.head.cmdid));
printf(" seqid: %d\n",ntohl(pkg.head.seqid));
printf(" msgid: %LX\n",_dlv.msgid);
printf(" destnumber: %s\n",_dlv.destnumber);
printf(" serviceid : %s\n",_dlv.serviceid );
printf(" tppid: %d\n",_dlv.tppid);
printf(" tpudhi: %d\n",_dlv.tpudhi);
printf(" msgfmt: %d\n",_dlv.msgfmt);
printf(" srcnumber : %s\n",_dlv.srcnumber );
printf(" srctype;: %d\n",_dlv.srctype);
printf(" delivery: %d\n",_dlv.delivery);
printf(" msglen;: %d\n",_dlv.msglen);
printf(" msgcontent: %s\n",_dlv.msgcontent);
printf(" LinkID : %s\n",_dlv.LinkID );
#ifdef _DEBUG
sprintf(_dbgtemp,"Thread Recieve : Recieve CMPP_DELIVER CMD. SeqID=%d",ntohl(pkg.head.seqid));
_dbgeventlst.push_back(_dbgtemp);
#endif
break;
}
//3. CMPP_ACTIVE_TEST_RESP
case nCMPP_ACTIVE_TEST_RESP:
{
#ifdef _DEBUG
sprintf(_dbgtemp,"Thread Recieve : Recieve CMPP_ACTIVE_TEST_RESP CMD. SeqID=%d",ntohl(pkg.head.seqid));
_dbgeventlst.push_back(_dbgtemp);
#endif
break;
}
default:
{
#ifdef _DEBUG
sprintf(_dbgtemp,"Thread Recieve : Recieve CMPP_ACTIVE_TEST_RESP CMD. CMD=%LX SeqID=%d",ntohl(pkg.head.cmdid),ntohl(pkg.head.seqid));
_dbgeventlst.push_back(_dbgtemp);
#endif
}
}
// 主线程请求退出
if( cmpp._bexitting)
break;
}
#ifdef _DEBUG
_dbgeventlst.push_back("Thread Recieve : Thread End.");
#endif
return 0;
}
/******************************************************************************
* 数据接收线程,从端口向数据窗口填充数据
******************************************************************************/
/*
DWORD WINAPI CcmppSocket::thread_revwnd(LPVOID LParam)
{
/*
int err=0;
char buff=0;
for(;;)
{
err=WaitForSingleObject(_hsema_rev,1000);
if (err==WAIT_OBJECT_0)
{
err=_recv(&buff,1);
if (err!=SOCKET_ERROR)
{
EnterCriticalSection(_csec_revwnd);
_revbuff[_prevbuff++]=buff;
LeaveCriticalSection(_csec_revwnd);
}
}
if( cmpp._bexitting)
break;
}
}
*/
/******************************************************************************
* 为了保证线程及时正常退出,设定轮询间隔为1秒
* CMPP3.0建议的链路检测间隔为180秒
* 为了兼容各个厂家的实现,设为30秒
******************************************************************************/
DWORD WINAPI CcmppSocket::thread_actv( LPVOID pdata)
{
CcmppSocket &cmpp = *( CcmppSocket *)pdata;
CMPP_HEAD msg;
int c = 0, // 计数
n = 30; // 等待的秒数
for( ;;)
{
Sleep( 1000);
// 主线程请求退出
if( cmpp._bexitting)
break;
if( ++ c < n)
continue;
c = 0;
// 获得数据发送权利,发送链路测试包
msg.size = htonl( sizeof( msg));
msg.cmdid= htonl( nCMPP_ACTIVE_TEST);
msg.seqid= htonl( cmpp._getseqid());
cmpp._send( (char *)&msg, sizeof( msg));
}
return 0;
}
char * CcmppSocket::_timestamp( char *buf)
{
time_t tt;
struct tm *now;
time( &tt);
now = localtime( &tt);
sprintf( buf, "%02d%02d%02d%02d%02d",
now->tm_mon + 1,
now->tm_mday,
now->tm_hour,
now->tm_min,
now->tm_sec);
return buf;
}
__int64 CcmppSocket::_ntoh64( __int64 inval)
{
__int64 outval = 0;
for( int i=0; i<8; i++)
outval = ( outval << 8) + ( ( inval >> ( i * 8)) & 255);
return outval;
}
__int64 CcmppSocket::_hton64( __int64 inval)
{
return _ntoh64( inval);
}
int CcmppSocket::_getseqid()
{
int id;
EnterCriticalSection( &_csec_seq);
id = ++_seqid;
LeaveCriticalSection( &_csec_seq);
return id;
}
int CcmppSocket::_send( char *buf, int len)
{
int err;
EnterCriticalSection( &_csec_snd);
err = send( _soc, buf, len, 0);
LeaveCriticalSection( &_csec_snd);
return err;
}
int CcmppSocket::_recv( char *buf, int len)
{
int err;
EnterCriticalSection( &_csec_recv);
err = recv( _soc, buf, len, 0);
#ifdef _DEBUG
//sprintf(_dbgtemp," Recieve Data=%d Needed Length=%d.",err);
//_dbgeventlst.push_back(_dbgtemp);
if (err!=SOCKET_ERROR)
{
memcpy( (void *)_revp, (void *)buf, err);
_revp+=err;
//memset(_revp,0xFF,16);
//_revp+=16;
}
#endif
LeaveCriticalSection( &_csec_recv);
return err;
}
int main()
{
#ifdef _DEBUG
_dbgeventlst.push_back("Begin Of Debug.");
#endif
CcmppSocket cmpp;
cmpp.init( "900001", "900001", "127.0.0.1");
CMPP_SUBMIT msg;
memset( (void *)&msg, 0, sizeof( msg));
msg.desttotal = 1;
strcpy( (char *)msg.destnumbers, "13808425257");
msg.msglen = 10;
strcpy( (char *)msg.msgcontent, "0123456789");
strcpy( (char *)msg.reserve, "abcdefgh");
strcpy( (char *)msg.msgsrc, "900001");
msg.pkgnumber=1;
msg.pkgtotal=1;
msg.delivery=1;
for( int i=0; i<10; i++)
{
Sleep(100);
cmpp.Submit( msg);
}
printf("Press Return Continue...");
getchar();
#ifdef _DEBUG
FILE *_revstream;
if ((_revstream = fopen("CMPP3REV.LOG", "wb")) != NULL)
{
fwrite((void *)_revdebug,_revp-_revdebug,1,_revstream);
fclose(_revstream);
}
std::ofstream fileo;
fileo.open("CMPP3EVENT.LOG");
fileo << _dbgeventlst << std::endl;
fileo.close();
#endif
return 0;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -