📄 ni_streamserver.cpp
字号:
// NI_StreamServer.cpp: implementation of the NI_StreamServer class.
//
//////////////////////////////////////////////////////////////////////
#include "stdafx.h"
#include "LiveController.h"
#include "NI_StreamServer.h"
#ifdef _DEBUG
#undef THIS_FILE
static char THIS_FILE[]=__FILE__;
#define new DEBUG_NEW
#endif
#define RECV_BUF_SIZE 2048
#define TIME_OUT_RECV_WAIT 10000 // 10ms
#define TIME_OUT_SEND_WAIT 500000 // 500ms
#define TIME_OUT_SEND 3 // 3s
#define SEND_TRY_TIMES 2
#define THREAD_STATE_PREINIT 0
#define THREAD_STATE_WAITING 1
#define THREAD_STATE_WORKING 2
#define THREAD_STATE_CLOSED 3
#define THREAD_TO_CLOSE 999
#define NI_SS_SEND_DEV_CTRL 1
#define NI_SS_SEND_STREAM_CTRL 2
//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////
NI_StreamServer::NI_StreamServer( HWND hWnd )
{
m_nRunningState = 0;
m_hwndNotify = hWnd;
m_Socket = INVALID_SOCKET;
m_hStopEvent = CreateEvent( NULL, TRUE, FALSE, NULL );
_nPSerial = 1;
_Timeout_Recv.tv_sec = TIME_OUT_RECV_WAIT/1000000;
_Timeout_Recv.tv_usec = TIME_OUT_RECV_WAIT%1000000;
_Timeout_Send.tv_sec = TIME_OUT_SEND_WAIT/1000000;
_Timeout_Send.tv_usec = TIME_OUT_SEND_WAIT%1000000;
_mpHead.cPType_H = 20;
_mpHead.cPType_L = 8;
_mpHead.cPVersion_H = 1;
_mpHead.cPVersion_L = 1;
_mpHead.cEncrpType = 0; // 0 --> no encrp
_mpHead.cEncrpValue = 0;
_szRecvBuf = new char[RECV_BUF_SIZE];
}
NI_StreamServer::~NI_StreamServer()
{
if ( m_Socket != INVALID_SOCKET )
{
closesocket( m_Socket );
m_Socket = INVALID_SOCKET;
}
while ( m_nRunningState != THREAD_STATE_CLOSED ) {
Sleep(20);
}
if ( m_hStopEvent )
{
CloseHandle(m_hStopEvent);
m_hStopEvent = NULL;
}
if ( _szRecvBuf )
{
delete[] _szRecvBuf;
_szRecvBuf = NULL;
}
}
BOOL NI_StreamServer::SetAddress( LPCTSTR serve_ip, unsigned short serve_port )
{
// Set address
m_addrLocal.sin_family = PF_INET;
m_addrLocal.sin_addr.s_addr = inet_addr((LPCTSTR)g_strInternalCtrlIP);
m_addrLocal.sin_port = htons(0);
// Set address
m_addrRemote.sin_family = PF_INET;
m_addrRemote.sin_addr.s_addr = inet_addr(serve_ip);
m_addrRemote.sin_port = htons(serve_port);
return InitSocket();
}
BOOL NI_StreamServer::InitSocket()
{
if ( m_Socket != INVALID_SOCKET ) {
closesocket(m_Socket);
m_Socket = INVALID_SOCKET;
}
// create socket
m_Socket = socket(PF_INET, SOCK_DGRAM, 0);
if ( m_Socket == INVALID_SOCKET )
{
return FALSE;
}
// bind socket
if ( bind( m_Socket, (struct sockaddr *)&m_addrLocal, sizeof(m_addrLocal)) != 0 )
{
return FALSE;
}
return TRUE;
}
BOOL NI_StreamServer::StartThread()
{
DWORD dwThreadId = 0;
m_hThreadID = CreateThread( NULL, 0, ThreadFunc, this, 0, &dwThreadId );
if ( m_hThreadID == NULL ) {
return FALSE;
}
return TRUE;
}
void NI_StreamServer::StopThread()
{
// Set stop event to thread
SetEvent( m_hStopEvent );
shutdown( m_Socket, 0x00 );
}
BOOL NI_StreamServer::_CheckThreadStop()
{
DWORD dwRt = WaitForSingleObject( m_hStopEvent, 0 );
if ( dwRt == WAIT_OBJECT_0 ) {
return TRUE;
}
return FALSE;
}
//////////////////////////////////////////////////////////////////////////
BOOL NI_StreamServer::_TrytoReceive()
{
BOOL bReturn = FALSE;
FD_ZERO( &_fdRead );
FD_SET( m_Socket, &_fdRead );
int nSelRet = select( 0, &_fdRead, NULL, NULL, &_Timeout_Recv );
if ( nSelRet <= 0 ) {
// select error or timeout
return bReturn;
}
// receive the message
ZGMP_HEAD mpHead;
struct sockaddr addrSrc;
int nAddrLen = sizeof(struct sockaddr);
memset( _szRecvBuf, 0, RECV_BUF_SIZE );
int nRecvLen = recvfrom( m_Socket, _szRecvBuf, RECV_BUF_SIZE,
0, &addrSrc, &nAddrLen );
if ( nRecvLen <= sizeof(mpHead) ) {
// if recv error
return bReturn;
}
char *pBufIndex = _szRecvBuf;
// retrieve the protocol head
memcpy( &mpHead, pBufIndex, sizeof(mpHead) );
pBufIndex = pBufIndex + sizeof(mpHead);
if ( mpHead.cPType_H != 20 || mpHead.cPType_L!= 9 ||
mpHead.cPVersion_H != 1 || mpHead.cPVersion_L != 1 )
{
// protocol not match
return bReturn;
}
// Get primary packet-type
WORD nPrimaryType = *(WORD*)pBufIndex;
pBufIndex = pBufIndex + sizeof(WORD);
if ( nPrimaryType == 0 )
{
_OnRecvAck( pBufIndex );
bReturn = FALSE; // No reply
}
else
if ( nPrimaryType == 1 ) // Recv Work State
{
bReturn = _OnRecvWorkState( pBufIndex );
}
// Other messages......
// Send ack of packet
if ( bReturn ) {
_SendAckofPacket( mpHead.nPSerial );
return bReturn;
}
return FALSE;
}
BOOL NI_StreamServer::_OnRecvAck( char *pBufIndex )
{
// Get secondary packet-type
WORD nSecondaryType = *(WORD*)pBufIndex;
pBufIndex = pBufIndex + sizeof(WORD);
// Get Ack SerialID
int nAckSerialID = *(int*)pBufIndex;
pBufIndex = pBufIndex + sizeof(int);
// Check the senddata pool
SD_DATA_CELL *pCell = m_SendDataPool.GetNewDataRef();
if ( pCell && pCell->nSendSerial == nAckSerialID )
{
ReleasePoolCell( pCell ); // release the cell data
m_SendDataPool.RemoveNewData(); // remove the cell
}
return TRUE;
}
BOOL NI_StreamServer::_OnRecvWorkState( char *pBufIndex )
{
NI_WMsg *pNewMsg = new NI_WMsg();
// Get secondary packet-type
WORD nSecondaryType = *(WORD*)pBufIndex;
pBufIndex = pBufIndex + sizeof(WORD);
pNewMsg->m_nMsgType = (int)nSecondaryType;
// Get DevID
LPTSTR szBuf = pNewMsg->m_strParam.GetBuffer(DEVICE_ID_LEN);
memcpy( szBuf, pBufIndex, DEVICE_ID_LEN );
pNewMsg->m_strParam.ReleaseBuffer();
pBufIndex = pBufIndex + DEVICE_ID_LEN;
// Get Param1
pNewMsg->m_dwParam1 = *(DWORD*)pBufIndex;
pBufIndex = pBufIndex + sizeof(DWORD);
// Get Param2
pNewMsg->m_dwParam2 = *(DWORD*)pBufIndex;
pBufIndex = pBufIndex + sizeof(DWORD);
PostMessage( m_hwndNotify, WM_NI_RECV_MSG, NI_WORKSTATE, (DWORD)pNewMsg );
return TRUE;
}
//////////////////////////////////////////////////////////////////////////
BOOL NI_StreamServer::SendDevCtrl( int nOpt )
{
return m_SendDataPool.PutNewData( NI_SS_SEND_DEV_CTRL, 1, (DWORD*)&nOpt, 0 );
}
BOOL NI_StreamServer::SendStreamCtrl( int nOpt, DWORD nScheID, DWORD nPlayID )
{
int nParamCount = 1;
DWORD dwParams[3];
dwParams[0] = nOpt;
if ( nScheID > 0 )
{
nParamCount++;
dwParams[1] = nScheID;
if ( nPlayID > 0 ) {
nParamCount++;
dwParams[2] = nPlayID;
}
}
// Put task to thread
return m_SendDataPool.PutNewData( NI_SS_SEND_STREAM_CTRL, nParamCount, dwParams );
}
BOOL NI_StreamServer::_SendAckofPacket( int nSerial )
{
// fit the head
char *pBufIndex = _szRecvBuf;
memset( pBufIndex, 0, RECV_BUF_SIZE );
_mpHead.nPSerial = _nPSerial++;
memcpy( pBufIndex, &_mpHead, sizeof(_mpHead) );
pBufIndex = pBufIndex + sizeof(_mpHead);
// fit primary-type of packet
WORD nPrimaryType = 0;
memcpy( pBufIndex, &nPrimaryType, sizeof(WORD) );
pBufIndex = pBufIndex + sizeof(WORD);
// fit secondary-type of packet
WORD nSecondaryType = 0;
memcpy( pBufIndex, &nSecondaryType, sizeof(WORD) );
pBufIndex = pBufIndex + sizeof(WORD);
// fit the ack-Serial
memcpy( pBufIndex, &nSerial, sizeof(int) );
pBufIndex = pBufIndex + sizeof(int);
// Then to send
FD_ZERO( &_fdWrite );
FD_SET( m_Socket, &_fdWrite );
int nSelRet = select( 0, NULL, &_fdWrite, NULL, &_Timeout_Send );
if ( nSelRet > 0 )
{
// to send
sendto( m_Socket, _szRecvBuf, (int)(pBufIndex-_szRecvBuf), 0,
(struct sockaddr*)&m_addrRemote, sizeof(struct sockaddr) );
return TRUE;
}
return FALSE;
}
BOOL NI_StreamServer::_DoNewSendTask()
{
SD_DATA_CELL *pCell = m_SendDataPool.GetNewDataRef();
if ( pCell == NULL ) {
return FALSE;
}
if ( pCell->nLastSendTime == 0 )
{
// the first time to send
return _SendTask( pCell );
}
// Check re-Send
CTime nowtime = CTime::GetCurrentTime();
int nInterval = ((CTimeSpan)(nowtime-pCell->nLastSendTime)).GetTotalSeconds();
if ( nInterval >= TIME_OUT_SEND )
{
if ( pCell->nTryedTimes >= SEND_TRY_TIMES )
{
// Fail to connect to Prof
EmptyProfPool( &m_SendDataPool );
PostMessage( m_hwndNotify, WM_NI_RECV_MSG, NI_CONNECT, NI_CONNECT_TIMEOUT );
return FALSE;
}
// Resend
return _SendTask( pCell );
}
return FALSE;
}
BOOL NI_StreamServer::_SendTask( SD_DATA_CELL *pCell )
{
ASSERT(pCell);
BOOL bIsSend = FALSE;
if ( pCell->nMsg == NI_SS_SEND_DEV_CTRL )
{
TRACE("Send Dev Ctrl\n");
bIsSend = _SendDevCtrl( pCell );
}
else
if ( pCell->nMsg == NI_SS_SEND_STREAM_CTRL )
{
TRACE("Send Stream Ctrl\n");
bIsSend = _SendStreamCtrl( pCell );
}
pCell->nLastSendTime = CTime::GetCurrentTime();
pCell->nTryedTimes++;
return bIsSend;
}
BOOL NI_StreamServer::_SendDevCtrl( SD_DATA_CELL *pCell )
{
// fit the head
char *pBufIndex = _szRecvBuf;
memset( pBufIndex, 0, RECV_BUF_SIZE );
_mpHead.nPSerial = _nPSerial++;
memcpy( pBufIndex, &_mpHead, sizeof(_mpHead) );
pBufIndex = pBufIndex + sizeof(_mpHead);
pCell->nSendSerial = _mpHead.nPSerial;
// fit primary-type of packet
WORD nPrimaryType = 1;
memcpy( pBufIndex, &nPrimaryType, sizeof(WORD) );
pBufIndex = pBufIndex + sizeof(WORD);
// fit secondary-type of packet
if ( pCell->nParamCount < 1 ) {
return FALSE;
}
WORD nSecondaryType = (WORD)(pCell->dwParams[0]);
memcpy( pBufIndex, &nSecondaryType, sizeof(WORD) );
pBufIndex = pBufIndex + sizeof(WORD);
// Then to send
FD_ZERO( &_fdWrite );
FD_SET( m_Socket, &_fdWrite );
int nSelRet = select( 0, NULL, &_fdWrite, NULL, &_Timeout_Send );
if ( nSelRet > 0 )
{
// to send
sendto( m_Socket, _szRecvBuf, (int)(pBufIndex-_szRecvBuf), 0,
(struct sockaddr*)&m_addrRemote, sizeof(struct sockaddr) );
return TRUE;
}
return FALSE;
}
BOOL NI_StreamServer::_SendStreamCtrl( SD_DATA_CELL *pCell )
{
// fit the head
char *pBufIndex = _szRecvBuf;
memset( pBufIndex, 0, RECV_BUF_SIZE );
_mpHead.nPSerial = _nPSerial++;
memcpy( pBufIndex, &_mpHead, sizeof(_mpHead) );
pBufIndex = pBufIndex + sizeof(_mpHead);
pCell->nSendSerial = _mpHead.nPSerial;
// fit primary-type of packet
WORD nPrimaryType = 2;
memcpy( pBufIndex, &nPrimaryType, sizeof(WORD) );
pBufIndex = pBufIndex + sizeof(WORD);
// fit secondary-type of packet
if ( pCell->nParamCount < 1 ) {
return FALSE;
}
WORD nSecondaryType = (WORD)(pCell->dwParams[0]);
memcpy( pBufIndex, &nSecondaryType, sizeof(WORD) );
pBufIndex = pBufIndex + sizeof(WORD);
for ( int i=0; i<pCell->nParamCount-1; i++ )
{
memcpy( pBufIndex, &(pCell->dwParams[i+1]), sizeof(DWORD) );
pBufIndex = pBufIndex + sizeof(DWORD);
}
// Then to send
FD_ZERO( &_fdWrite );
FD_SET( m_Socket, &_fdWrite );
int nSelRet = select( 0, NULL, &_fdWrite, NULL, &_Timeout_Send );
if ( nSelRet > 0 )
{
// to send
sendto( m_Socket, _szRecvBuf, (int)(pBufIndex-_szRecvBuf), 0,
(struct sockaddr*)&m_addrRemote, sizeof(struct sockaddr) );
return TRUE;
}
return FALSE;
}
void NI_StreamServer::EmptyProfPool( CSendDataPool *pPool )
{
while (TRUE)
{
SD_DATA_CELL *pCell = pPool->GetNewDataRef();
if ( pCell == NULL ) {
break;
}
ReleasePoolCell( pCell );
pPool->RemoveNewData();
}
}
void NI_StreamServer::ReleasePoolCell( SD_DATA_CELL *pCell )
{
ASSERT(pCell);
}
//////////////////////////////////////////////////////////////////////////
// ThreadFunc
DWORD NI_StreamServer::ThreadFunc( LPVOID lpParam )
{
NI_StreamServer *pWorker = (NI_StreamServer*)lpParam;
pWorker->m_nRunningState = THREAD_STATE_WORKING;
while (TRUE)
{
// Check if need to stop
if ( pWorker->_CheckThreadStop() == TRUE )
{
pWorker->m_nRunningState = THREAD_STATE_CLOSED;
return 0;
}
// --------------------------------------------------
// try to recv new packet first
if ( pWorker->_TrytoReceive() == TRUE ) {
continue;
}
// --------------------------------------------------
// then do send task, send one task every time at most
if ( pWorker->_DoNewSendTask() == FALSE ) {
Sleep(20);
}
}
return 0;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -