⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ni_streamserver.cpp

📁 视频播放控制器程序
💻 CPP
字号:
// NI_StreamServer.cpp: implementation of the NI_StreamServer class.
//
//////////////////////////////////////////////////////////////////////

#include "stdafx.h"
#include "LiveController.h"
#include "NI_StreamServer.h"

#ifdef _DEBUG
#undef THIS_FILE
static char THIS_FILE[]=__FILE__;
#define new DEBUG_NEW
#endif

#define RECV_BUF_SIZE	2048
#define TIME_OUT_RECV_WAIT	10000	// 10ms
#define TIME_OUT_SEND_WAIT	500000	// 500ms
#define TIME_OUT_SEND	3 // 3s
#define SEND_TRY_TIMES	2

#define THREAD_STATE_PREINIT	0
#define THREAD_STATE_WAITING	1
#define THREAD_STATE_WORKING	2
#define THREAD_STATE_CLOSED		3
#define THREAD_TO_CLOSE	999

#define NI_SS_SEND_DEV_CTRL		1
#define NI_SS_SEND_STREAM_CTRL	2
//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////
NI_StreamServer::NI_StreamServer( HWND hWnd )
{
	m_nRunningState = 0;
	m_hwndNotify = hWnd;
	m_Socket = INVALID_SOCKET;
	m_hStopEvent = CreateEvent( NULL, TRUE, FALSE, NULL );
	_nPSerial = 1;

	_Timeout_Recv.tv_sec  = TIME_OUT_RECV_WAIT/1000000;
	_Timeout_Recv.tv_usec = TIME_OUT_RECV_WAIT%1000000;
	_Timeout_Send.tv_sec  = TIME_OUT_SEND_WAIT/1000000;
	_Timeout_Send.tv_usec = TIME_OUT_SEND_WAIT%1000000;
	_mpHead.cPType_H = 20;
	_mpHead.cPType_L = 8;
	_mpHead.cPVersion_H = 1;
	_mpHead.cPVersion_L = 1;
	_mpHead.cEncrpType = 0; // 0 --> no encrp
	_mpHead.cEncrpValue = 0;
	_szRecvBuf = new char[RECV_BUF_SIZE];
}

NI_StreamServer::~NI_StreamServer()
{
	if ( m_Socket != INVALID_SOCKET )
	{
		closesocket( m_Socket );
		m_Socket = INVALID_SOCKET;
	}

	while ( m_nRunningState != THREAD_STATE_CLOSED ) {
		Sleep(20);
	}

	if ( m_hStopEvent )
	{
		CloseHandle(m_hStopEvent);
		m_hStopEvent = NULL;
	}
	if ( _szRecvBuf )
	{
		delete[] _szRecvBuf;
		_szRecvBuf = NULL;
	}
}

BOOL NI_StreamServer::SetAddress( LPCTSTR serve_ip, unsigned short serve_port )
{	
	// Set address
	m_addrLocal.sin_family = PF_INET;
	m_addrLocal.sin_addr.s_addr = inet_addr((LPCTSTR)g_strInternalCtrlIP);
	m_addrLocal.sin_port = htons(0);

	// Set address
	m_addrRemote.sin_family = PF_INET;
	m_addrRemote.sin_addr.s_addr = inet_addr(serve_ip);
	m_addrRemote.sin_port = htons(serve_port);

	return InitSocket();
}

BOOL NI_StreamServer::InitSocket()
{
	if ( m_Socket != INVALID_SOCKET ) {
		closesocket(m_Socket);
		m_Socket = INVALID_SOCKET;
	}
	// create socket
	m_Socket = socket(PF_INET, SOCK_DGRAM, 0);
	if ( m_Socket == INVALID_SOCKET )
	{
		return FALSE;
	}
	// bind socket
	if ( bind( m_Socket, (struct sockaddr *)&m_addrLocal, sizeof(m_addrLocal)) != 0 )
	{
		return FALSE;
	}
	return TRUE;
}

BOOL NI_StreamServer::StartThread()
{
	DWORD dwThreadId = 0;
	m_hThreadID = CreateThread( NULL, 0, ThreadFunc, this, 0, &dwThreadId );
	if ( m_hThreadID == NULL ) {
		return FALSE;
	}
	return TRUE;
}

void NI_StreamServer::StopThread()
{
	// Set stop event to thread
	SetEvent( m_hStopEvent );
	shutdown( m_Socket, 0x00 );
}

BOOL NI_StreamServer::_CheckThreadStop()
{
	DWORD dwRt = WaitForSingleObject( m_hStopEvent, 0 );
	if ( dwRt == WAIT_OBJECT_0 ) {
		return TRUE;
	}
	return FALSE;
}

//////////////////////////////////////////////////////////////////////////
BOOL NI_StreamServer::_TrytoReceive()
{
	BOOL bReturn = FALSE;
	FD_ZERO( &_fdRead );
	FD_SET( m_Socket, &_fdRead );
	int nSelRet = select( 0, &_fdRead, NULL, NULL, &_Timeout_Recv );
	if ( nSelRet <= 0 )	{
		// select error or timeout
		return bReturn;
	}
	
	// receive the message
	ZGMP_HEAD mpHead;
	struct sockaddr addrSrc;
	int nAddrLen = sizeof(struct sockaddr);
	memset( _szRecvBuf, 0, RECV_BUF_SIZE );
	int nRecvLen = recvfrom( m_Socket, _szRecvBuf, RECV_BUF_SIZE,
							 0, &addrSrc, &nAddrLen );
	
	if ( nRecvLen <= sizeof(mpHead) ) {
		// if recv error
		return bReturn;
	}
	
	char *pBufIndex = _szRecvBuf;
	// retrieve the protocol head
	memcpy( &mpHead, pBufIndex, sizeof(mpHead) );
	pBufIndex = pBufIndex + sizeof(mpHead);
	if ( mpHead.cPType_H != 20 || mpHead.cPType_L!= 9 ||
		 mpHead.cPVersion_H != 1 || mpHead.cPVersion_L != 1 )
	{
		// protocol not match
		return bReturn;
	}
	
	// Get primary packet-type
	WORD nPrimaryType = *(WORD*)pBufIndex;
	pBufIndex = pBufIndex + sizeof(WORD);
	if ( nPrimaryType == 0 )
	{
		_OnRecvAck( pBufIndex );
		bReturn = FALSE; // No reply
	}
	else
	if ( nPrimaryType == 1 ) // Recv Work State
	{
		bReturn = _OnRecvWorkState( pBufIndex );
	}
	// Other messages......

	// Send ack of packet
	if ( bReturn ) {
		_SendAckofPacket( mpHead.nPSerial );
		return bReturn;
	}
	
	return FALSE;
}

BOOL NI_StreamServer::_OnRecvAck( char *pBufIndex )
{
	// Get secondary packet-type
	WORD nSecondaryType = *(WORD*)pBufIndex;
	pBufIndex = pBufIndex + sizeof(WORD);

	// Get Ack SerialID
	int nAckSerialID = *(int*)pBufIndex;
	pBufIndex = pBufIndex + sizeof(int);
	
	// Check the senddata pool
	SD_DATA_CELL *pCell = m_SendDataPool.GetNewDataRef();
	if ( pCell && pCell->nSendSerial == nAckSerialID )
	{
		ReleasePoolCell( pCell ); // release the cell data
		m_SendDataPool.RemoveNewData(); // remove the cell
	}
	return TRUE;
}

BOOL NI_StreamServer::_OnRecvWorkState( char *pBufIndex )
{
	NI_WMsg *pNewMsg = new NI_WMsg();

	// Get secondary packet-type
	WORD nSecondaryType = *(WORD*)pBufIndex;
	pBufIndex = pBufIndex + sizeof(WORD);
	pNewMsg->m_nMsgType = (int)nSecondaryType;

	// Get DevID
	LPTSTR szBuf = pNewMsg->m_strParam.GetBuffer(DEVICE_ID_LEN);
	memcpy( szBuf, pBufIndex, DEVICE_ID_LEN );
	pNewMsg->m_strParam.ReleaseBuffer();
	pBufIndex = pBufIndex + DEVICE_ID_LEN;

	// Get Param1
	pNewMsg->m_dwParam1 = *(DWORD*)pBufIndex;
	pBufIndex = pBufIndex + sizeof(DWORD);

	// Get Param2
	pNewMsg->m_dwParam2 = *(DWORD*)pBufIndex;
	pBufIndex = pBufIndex + sizeof(DWORD);
	
	PostMessage( m_hwndNotify, WM_NI_RECV_MSG, NI_WORKSTATE, (DWORD)pNewMsg );
	return TRUE;
}

//////////////////////////////////////////////////////////////////////////
BOOL NI_StreamServer::SendDevCtrl( int nOpt )
{
	return m_SendDataPool.PutNewData( NI_SS_SEND_DEV_CTRL, 1, (DWORD*)&nOpt, 0 );
}

BOOL NI_StreamServer::SendStreamCtrl( int nOpt, DWORD nScheID, DWORD nPlayID )
{
	int nParamCount = 1;
	DWORD dwParams[3];
	dwParams[0] = nOpt;
	if ( nScheID > 0 )
	{
		nParamCount++;
		dwParams[1] = nScheID;
		if ( nPlayID > 0 ) {
			nParamCount++;
			dwParams[2] = nPlayID;
		}
	}
	// Put task to thread
	return m_SendDataPool.PutNewData( NI_SS_SEND_STREAM_CTRL, nParamCount, dwParams );
}

BOOL NI_StreamServer::_SendAckofPacket( int nSerial )
{
	// fit the head
	char *pBufIndex = _szRecvBuf;
	memset( pBufIndex, 0, RECV_BUF_SIZE );
	_mpHead.nPSerial = _nPSerial++;
	memcpy( pBufIndex, &_mpHead, sizeof(_mpHead) );
	pBufIndex = pBufIndex + sizeof(_mpHead);

	// fit primary-type of packet
	WORD nPrimaryType = 0;
	memcpy( pBufIndex, &nPrimaryType, sizeof(WORD) );
	pBufIndex = pBufIndex + sizeof(WORD);
	
	// fit secondary-type of packet
	WORD nSecondaryType = 0;
	memcpy( pBufIndex, &nSecondaryType, sizeof(WORD) );
	pBufIndex = pBufIndex + sizeof(WORD);
	
	// fit the ack-Serial
	memcpy( pBufIndex, &nSerial, sizeof(int) );
	pBufIndex = pBufIndex + sizeof(int);

	// Then to send
	FD_ZERO( &_fdWrite );
	FD_SET( m_Socket, &_fdWrite );
	int nSelRet = select( 0, NULL, &_fdWrite, NULL, &_Timeout_Send );
	if ( nSelRet > 0 )
	{
		// to send
		sendto( m_Socket, _szRecvBuf, (int)(pBufIndex-_szRecvBuf), 0,
				(struct sockaddr*)&m_addrRemote, sizeof(struct sockaddr) );
		return TRUE;
	}
	return FALSE;
}

BOOL NI_StreamServer::_DoNewSendTask()
{
	SD_DATA_CELL *pCell = m_SendDataPool.GetNewDataRef();
	if ( pCell == NULL ) {
		return FALSE;
	}
	if ( pCell->nLastSendTime == 0 )
	{
		// the first time to send
		return _SendTask( pCell );
	}
	// Check re-Send
	CTime nowtime = CTime::GetCurrentTime();
	int nInterval = ((CTimeSpan)(nowtime-pCell->nLastSendTime)).GetTotalSeconds();
	if ( nInterval >= TIME_OUT_SEND )
	{
		if ( pCell->nTryedTimes >= SEND_TRY_TIMES )
		{
			// Fail to connect to Prof
			EmptyProfPool( &m_SendDataPool );
			PostMessage( m_hwndNotify, WM_NI_RECV_MSG, NI_CONNECT, NI_CONNECT_TIMEOUT );
			return FALSE;
		}
		// Resend
		return _SendTask( pCell );
	}
	
	return FALSE;
}

BOOL NI_StreamServer::_SendTask( SD_DATA_CELL *pCell )
{
	ASSERT(pCell);
	BOOL bIsSend = FALSE;
	if ( pCell->nMsg == NI_SS_SEND_DEV_CTRL )
	{
		TRACE("Send Dev Ctrl\n");
		bIsSend = _SendDevCtrl( pCell );
	}
	else
	if ( pCell->nMsg == NI_SS_SEND_STREAM_CTRL )
	{
		TRACE("Send Stream Ctrl\n");
		bIsSend = _SendStreamCtrl( pCell );
	}

	pCell->nLastSendTime = CTime::GetCurrentTime();
	pCell->nTryedTimes++;
	return bIsSend;
}

BOOL NI_StreamServer::_SendDevCtrl( SD_DATA_CELL *pCell )
{
	// fit the head
	char *pBufIndex = _szRecvBuf;
	memset( pBufIndex, 0, RECV_BUF_SIZE );
	_mpHead.nPSerial = _nPSerial++;
	memcpy( pBufIndex, &_mpHead, sizeof(_mpHead) );
	pBufIndex = pBufIndex + sizeof(_mpHead);
	pCell->nSendSerial = _mpHead.nPSerial;

	// fit primary-type of packet
	WORD nPrimaryType = 1;
	memcpy( pBufIndex, &nPrimaryType, sizeof(WORD) );
	pBufIndex = pBufIndex + sizeof(WORD);
	
	// fit secondary-type of packet
	if ( pCell->nParamCount < 1 ) {
		return FALSE;
	}
	WORD nSecondaryType = (WORD)(pCell->dwParams[0]);
	memcpy( pBufIndex, &nSecondaryType, sizeof(WORD) );
	pBufIndex = pBufIndex + sizeof(WORD);

	// Then to send
	FD_ZERO( &_fdWrite );
	FD_SET( m_Socket, &_fdWrite );
	int nSelRet = select( 0, NULL, &_fdWrite, NULL, &_Timeout_Send );
	if ( nSelRet > 0 )
	{
		// to send
		sendto( m_Socket, _szRecvBuf, (int)(pBufIndex-_szRecvBuf), 0,
				(struct sockaddr*)&m_addrRemote, sizeof(struct sockaddr) );
		return TRUE;
	}
	return FALSE;
}

BOOL NI_StreamServer::_SendStreamCtrl( SD_DATA_CELL *pCell )
{
	// fit the head
	char *pBufIndex = _szRecvBuf;
	memset( pBufIndex, 0, RECV_BUF_SIZE );
	_mpHead.nPSerial = _nPSerial++;
	memcpy( pBufIndex, &_mpHead, sizeof(_mpHead) );
	pBufIndex = pBufIndex + sizeof(_mpHead);
	pCell->nSendSerial = _mpHead.nPSerial;

	// fit primary-type of packet
	WORD nPrimaryType = 2;
	memcpy( pBufIndex, &nPrimaryType, sizeof(WORD) );
	pBufIndex = pBufIndex + sizeof(WORD);
	
	// fit secondary-type of packet
	if ( pCell->nParamCount < 1 ) {
		return FALSE;
	}
	WORD nSecondaryType = (WORD)(pCell->dwParams[0]);
	memcpy( pBufIndex, &nSecondaryType, sizeof(WORD) );
	pBufIndex = pBufIndex + sizeof(WORD);

	for ( int i=0; i<pCell->nParamCount-1; i++ )
	{
		memcpy( pBufIndex, &(pCell->dwParams[i+1]), sizeof(DWORD) );
		pBufIndex = pBufIndex + sizeof(DWORD);
	}

	// Then to send
	FD_ZERO( &_fdWrite );
	FD_SET( m_Socket, &_fdWrite );
	int nSelRet = select( 0, NULL, &_fdWrite, NULL, &_Timeout_Send );
	if ( nSelRet > 0 )
	{
		// to send
		sendto( m_Socket, _szRecvBuf, (int)(pBufIndex-_szRecvBuf), 0,
				(struct sockaddr*)&m_addrRemote, sizeof(struct sockaddr) );
		return TRUE;
	}
	return FALSE;
}

void NI_StreamServer::EmptyProfPool( CSendDataPool *pPool )
{
	while (TRUE)
	{
		SD_DATA_CELL *pCell = pPool->GetNewDataRef();
		if ( pCell == NULL ) {
			break;
		}
		ReleasePoolCell( pCell );
		pPool->RemoveNewData();
	}
}

void NI_StreamServer::ReleasePoolCell( SD_DATA_CELL *pCell )
{
	ASSERT(pCell);
}

//////////////////////////////////////////////////////////////////////////
// ThreadFunc
DWORD NI_StreamServer::ThreadFunc( LPVOID lpParam )
{
	NI_StreamServer *pWorker = (NI_StreamServer*)lpParam;
	pWorker->m_nRunningState = THREAD_STATE_WORKING;
	while (TRUE)
	{
		// Check if need to stop
		if ( pWorker->_CheckThreadStop() == TRUE )
		{
			pWorker->m_nRunningState = THREAD_STATE_CLOSED;
			return 0;
		}
		
		// --------------------------------------------------
		// try to recv new packet first
		if ( pWorker->_TrytoReceive() == TRUE ) {
			continue;
		}

		// --------------------------------------------------
		// then do send task, send one task every time at most
		if ( pWorker->_DoNewSendTask() == FALSE ) {
			Sleep(20);
		}

	}

	return 0;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -