⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cmppsocket_bcb.cpp

📁 CMPP3.0协议源码下载(VC/BCB)支持CMPP_CONNECT,CMPP_TERMINATE,CMPP_SUBMIT,CMPP_ACTIVATE,CMPP_DELIVER等几条常用指令
💻 CPP
📖 第 1 页 / 共 2 页
字号:
		//	拷贝数据到自己的内存区域,避免在临界区中发送数据
		memcpy( (void *)window, (void *)cmpp._window, sizeof( window));
		//	搜寻并修改到期时间
		for( i=0; i<nCMPP_WINDOW_SIZE; i++)
		{
			//	空位,跳过
			if( cmpp._window[i].head.cmdid == 0)
				continue;
			//	未到发送时间,跳过
			if( cmpp._window[i].t > time( NULL))
				continue;
			//	设置下一次发送的时间,当前时间+60秒
			cmpp._window[i].t += 60;
		}
		LeaveCriticalSection( &cmpp._csec_wnd);
		//	搜寻并发送到期的数据
		for( i=0; i<nCMPP_WINDOW_SIZE; i++)
		{
			//	空位,跳过
			if( window[i].head.cmdid == 0)
				continue;
			//	未到发送时间,跳过
			if( window[i].t > time( NULL))
				continue;
			//	发送
			nsize = ntohl( window[i].head.size);
			err = cmpp._send( (char *)&window[i], nsize);
			//	发送出错,结束线程
			if( err != nsize)
				return 0;
		}
		//	主线程请求退出
		if( cmpp._bexitting)
			break;
	}
	return 0;
}

DWORD	WINAPI	CcmppSocket::thread_recv( LPVOID pdata)
{
	int		err, i;
	long	prevcount;
	FD_SET	fdset;
	TIMEVAL	timeout;
	CcmppSocket		&cmpp = *( CcmppSocket *)pdata;
	CMPP_PACKAGE	pkg;
    memset((void *)&pkg,0,sizeof(pkg));

	FD_ZERO( &fdset);
	FD_SET( cmpp._soc, &fdset);
	//	轮询间隔1秒
	timeout.tv_sec = 1;
	timeout.tv_usec= 0;
    #ifdef _DEBUG
        _dbgeventlst.push_back("Thread Recieve : Thread Begin.");
    #endif

	for( ;;)
	{
        FD_ZERO( &fdset);
        FD_SET( cmpp._soc, &fdset);
		err = select(
			0,
			&fdset,
			NULL,
			NULL,
			&timeout);
		//	出错
		if( err == SOCKET_ERROR)
        {
            #ifdef _DEBUG
                sprintf(_dbgtemp,"Thread Recieve : Socket Wait for Data Error.Error Code=%d",WSAGetLastError());
                _dbgeventlst.push_back(_dbgtemp);
            #endif
			continue;//break;
        }
		//	超时
		if( err == 0)
			continue;
		//	先接收包头部分,以确定包的大小、类型
		err = cmpp._recv( (char *)&pkg.head, sizeof( pkg.head));
		if( err != sizeof( pkg.head))
        {
            #ifdef _DEBUG
                sprintf(_dbgtemp,"Thread Recieve : Recieve Header Return Code=%d Needed Length=%d Error=%d.",err,sizeof( pkg.head),WSAGetLastError());
                _dbgeventlst.push_back(_dbgtemp);
            #endif
            continue;//break;
        }
		//	接收包体
		err = cmpp._recv( pkg.data, ntohl( pkg.head.size )-sizeof( pkg.head));
		if( err == SOCKET_ERROR)
        {
            #ifdef _DEBUG
                sprintf(_dbgtemp,"Thread Recieve : Socket Wait for Data Error.Error Code=%d",WSAGetLastError());
                _dbgeventlst.push_back(_dbgtemp);
            #endif
			continue;//break;
        }
        switch (ntohl(pkg.head.cmdid))
        {
            case nCMPP_SUBMIT_RESP:
            {
                CMPP_SUBMIT_RESP & _smtrsp=*(CMPP_SUBMIT_RESP *)&pkg.data;
                if (err!=sizeof(CMPP_SUBMIT_RESP))
                {
                    #ifdef _DEBUG
                        sprintf(_dbgtemp,"Thread Recieve : Data Body Length Error.Length=%d",err);
                        _dbgeventlst.push_back(_dbgtemp);
                    #endif
                    break;
                }
                //	申请数据发送窗口的使用权
                EnterCriticalSection( &cmpp._csec_wnd);
                //	删除对应流水号的包
                for( i=0; i<nCMPP_WINDOW_SIZE; i++)
                {
                    if( cmpp._window[i].head.cmdid == 0)
                        continue;
                    if( cmpp._window[i].head.seqid == pkg.head.seqid)
                    {
                        cmpp._window[i].head.cmdid = 0;
                        break;
                    }
                }
                LeaveCriticalSection( &cmpp._csec_wnd);
                //	释放一个窗口格子
                ReleaseSemaphore(
                    cmpp._hsema_wnd,
                    1,
                    &prevcount);
                #ifdef _DEBUG
                    sprintf(_dbgtemp,"Thread Recieve : Recieve CMPP_SUBMIT_RESP CMD. SeqID=%d",ntohl(pkg.head.seqid));
                    _dbgeventlst.push_back(_dbgtemp);
                #endif
                break;
            }
            //2. CMPP_DELIVER
            case nCMPP_DELIVER:
            {
                CMPP_DELIVER & _dlv=*(CMPP_DELIVER *) &pkg.data;
                printf("\nRecieve Data.\n");
                memcpy((void *)_dlv.LinkID,(void *)(_dlv.msgcontent+_dlv.msglen),20);
                _dlv.msgcontent[_dlv.msglen]=0x0;
                _dlv.LinkID[20]='\0';
                printf("  Size: %d\n",ntohl(pkg.head.size));
                printf("  cmdid: %d\n",ntohl(pkg.head.cmdid));
                printf("  seqid: %d\n",ntohl(pkg.head.seqid));
                printf("  msgid: %LX\n",_dlv.msgid);
                printf("  destnumber: %s\n",_dlv.destnumber);
                printf("  serviceid : %s\n",_dlv.serviceid );
                printf("  tppid: %d\n",_dlv.tppid);
                printf("  tpudhi: %d\n",_dlv.tpudhi);
                printf("  msgfmt: %d\n",_dlv.msgfmt);
                printf("  srcnumber : %s\n",_dlv.srcnumber );
                printf("  srctype;: %d\n",_dlv.srctype);
                printf("  delivery: %d\n",_dlv.delivery);
                printf("  msglen;: %d\n",_dlv.msglen);
                printf("  msgcontent: %s\n",_dlv.msgcontent);
                printf("  LinkID    : %s\n",_dlv.LinkID    );
                #ifdef _DEBUG
                    sprintf(_dbgtemp,"Thread Recieve : Recieve CMPP_DELIVER CMD. SeqID=%d",ntohl(pkg.head.seqid));
                    _dbgeventlst.push_back(_dbgtemp);
                #endif

                break;
            }
            //3. CMPP_ACTIVE_TEST_RESP
            case nCMPP_ACTIVE_TEST_RESP:
            {
                #ifdef _DEBUG
                    sprintf(_dbgtemp,"Thread Recieve : Recieve CMPP_ACTIVE_TEST_RESP CMD. SeqID=%d",ntohl(pkg.head.seqid));
                    _dbgeventlst.push_back(_dbgtemp);
                #endif
                break;
            }
            default:
            {
                #ifdef _DEBUG
                    sprintf(_dbgtemp,"Thread Recieve : Recieve CMPP_ACTIVE_TEST_RESP CMD. CMD=%LX SeqID=%d",ntohl(pkg.head.cmdid),ntohl(pkg.head.seqid));
                    _dbgeventlst.push_back(_dbgtemp);
                #endif

            }
        }
		//	主线程请求退出
		if( cmpp._bexitting)
			break;
	}
    #ifdef _DEBUG
        _dbgeventlst.push_back("Thread Recieve : Thread End.");
    #endif

	return 0;

}

/******************************************************************************
*    数据接收线程,从端口向数据窗口填充数据
******************************************************************************/
/*
DWORD WINAPI CcmppSocket::thread_revwnd(LPVOID LParam)
{
/*
    int     err=0;
    char    buff=0;
    for(;;)
    {
        err=WaitForSingleObject(_hsema_rev,1000);
        if (err==WAIT_OBJECT_0)
        {
            err=_recv(&buff,1);
            if (err!=SOCKET_ERROR)
            {
                EnterCriticalSection(_csec_revwnd);
                _revbuff[_prevbuff++]=buff;
                LeaveCriticalSection(_csec_revwnd);
            }
        }
		if( cmpp._bexitting)
			break;
    }

}
*/
/******************************************************************************
*	为了保证线程及时正常退出,设定轮询间隔为1秒
*	CMPP2.0建议的链路检测间隔为180秒
*	为了兼容各个厂家的实现,设为30秒
******************************************************************************/
DWORD	WINAPI	CcmppSocket::thread_actv( LPVOID pdata)
{
	CcmppSocket &cmpp = *( CcmppSocket *)pdata;
	CMPP_HEAD	msg;
	int			c = 0,		//	计数
				n = 30;		//	等待的秒数
	for( ;;)
	{
		Sleep( 1000);
		//	主线程请求退出
		if( cmpp._bexitting)
			break;
		if( ++ c < n)
			continue;
		c = 0;
		//	获得数据发送权利,发送链路测试包
		msg.size = htonl( sizeof( msg));
		msg.cmdid= htonl( nCMPP_ACTIVE_TEST);
		msg.seqid= htonl( cmpp._getseqid());

		cmpp._send( (char *)&msg, sizeof( msg));
	}
	return 0;
}

char * CcmppSocket::_timestamp( char *buf)
{
	time_t tt;
	struct tm *now;

	time( &tt);
	now = localtime( &tt);
	sprintf( buf, "%02d%02d%02d%02d%02d",
		now->tm_mon + 1,
		now->tm_mday,
		now->tm_hour,
		now->tm_min,
		now->tm_sec);
	return buf;
}

__int64	CcmppSocket::_ntoh64( __int64 inval)
{
	__int64 outval = 0;
	for( int i=0; i<8; i++)
		outval = ( outval << 8) + ( ( inval >> ( i * 8)) & 255);

	return outval;
}

__int64	CcmppSocket::_hton64( __int64 inval)
{
	return _ntoh64( inval);
}

int	CcmppSocket::_getseqid()
{
	int id;
	EnterCriticalSection( &_csec_seq);
	id = ++_seqid;
	LeaveCriticalSection( &_csec_seq);
	return id;
}

int	CcmppSocket::_send( char *buf, int len)
{
	int err;

	EnterCriticalSection( &_csec_snd);
	err = send( _soc, buf, len, 0);
	LeaveCriticalSection( &_csec_snd);

	return err;
}

int	CcmppSocket::_recv( char *buf, int len)
{
	int err;

	EnterCriticalSection( &_csec_recv);
	err = recv( _soc, buf, len, 0);
    #ifdef _DEBUG
        //sprintf(_dbgtemp," Recieve Data=%d Needed Length=%d.",err);
        //_dbgeventlst.push_back(_dbgtemp);

        if (err!=SOCKET_ERROR)
        {
            memcpy( (void *)_revp, (void *)buf, err);
            _revp+=err;
            //memset(_revp,0xFF,16);
            //_revp+=16;
        }
    #endif
	LeaveCriticalSection( &_csec_recv);

	return err;
}

int main()
{
#ifdef _DEBUG
    _dbgeventlst.push_back("Begin Of Debug.");
#endif

	CcmppSocket cmpp;

	cmpp.init( "900001", "900001", "127.0.0.1");

	CMPP_SUBMIT msg;
	memset( (void *)&msg, 0, sizeof( msg));
	msg.desttotal = 1;
	strcpy( (char *)msg.destnumbers, "13808425257");
	msg.msglen = 10;
	strcpy( (char *)msg.msgcontent, "0123456789");
	strcpy( (char *)msg.reserve, "abcdefgh");
	strcpy( (char *)msg.msgsrc, "900001");
	msg.pkgnumber=1;
	msg.pkgtotal=1;
	msg.delivery=1;

	for( int i=0; i<10; i++)
    {
        Sleep(100);
		cmpp.Submit( msg);
    }
    printf("Press Return Continue...");
    getchar();
#ifdef _DEBUG
   FILE *_revstream;
   if ((_revstream = fopen("CMPP3REV.LOG", "wb")) != NULL)
   {
        fwrite((void *)_revdebug,_revp-_revdebug,1,_revstream);
        fclose(_revstream);
   }
   std::ofstream fileo;
   fileo.open("CMPP3EVENT.LOG");
   fileo << _dbgeventlst << std::endl;
   fileo.close();

#endif
	return 0;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -