📄 cmediacontroller.cpp
字号:
//
// CMediaController.cpp
//
/*-----------------------------------------------------*\
HQ Tech, Make Technology Easy!
More information, please go to http://hqtech.nease.net.
/*-----------------------------------------------------*/
#include <streams.h>
#include "CFilterNetReceiver.h"
#include "CMediaController.h"
#include "defines.h"
///////////////////////////////////////////////////////////////////////////////
CMediaController::CMediaController(CFilterNetReceiver * inFilter)
{
mReceiveFilter = inFilter;
ASSERT(mReceiveFilter);
// Total buffer size, we can change it according to actual requirements
mBufferSize = 10 * 1024 * 1024;
// If valid data length in buffer is less than mMinWorkingSize, we'll
// do memmove, moving valid data to the buffer header, making space for
// incoming data.
mMinWorkingSize = 20 * 1024;
// Before this filter connect downstream, firstly we must receive some data
// for connection checking.
// we can change its size according to actual requirements
mCheckingSize = 30 * 1024;
mMediaBuffer = new BYTE[mBufferSize];
mWritingOffset = 0;
mReadingOffset = 0;
mAvailabeSize = 0;
mIsReadyToBeChecked = FALSE;
mCheckingFinished = FALSE;
mIsEOS = FALSE;
mStreamType = 0;
mTotalSize = 0;
mExtraCheckOffset = 0;
mExtraBuffer = NULL;
mExtraBufferSize = 0;
mExtraWritingOffset = 0;
mExtraReadingOffset = 0;
mReadFromExtraBuffer = FALSE;
mIsWaitingToRead = FALSE;
mIsWaitingToAddData = FALSE;
mStrmSocket.SetController(this);
#ifdef _QOS_
mQosThrd = NULL;
mMonitoring = FALSE;
#endif
#ifdef _DEBUG_IO
fp = fopen("C:\\netrecv.txt", "w");
#endif
}
CMediaController::~CMediaController()
{
if (mMediaBuffer)
{
delete[] mMediaBuffer;
mMediaBuffer = NULL;
}
if (mExtraBuffer)
{
delete[] mExtraBuffer;
mExtraBuffer = NULL;
}
#ifdef _QOS_
StopQosMonitoring();
#endif
#ifdef _DEBUG_IO
fclose(fp);
#endif
}
void CMediaController::StartStreaming(void)
{
mIsEOS = FALSE;
mCheckingFinished = TRUE;
mReadFromExtraBuffer = FALSE;
mIsWaitingToRead = FALSE;
mIsWaitingToAddData = FALSE;
CAutoLock lck(&mSyncBuffer);
mReadingOffset = 0;
mExtraReadingOffset = 0;
#ifdef _QOS_
StartQosMonitoring();
#endif
}
// When filter graph becomes inactive, this method will be called
void CMediaController::StopStreaming(void)
{
mAvailabeSize = 0;
CAutoLock lck(&mSyncBuffer);
mWritingOffset = 0;
mReadingOffset = 0;
#ifdef _QOS_
StopQosMonitoring();
#endif
}
// Interface methods
void CMediaController::SetStreamType(long inType)
{
mStreamType = inType;
}
void CMediaController::SetStreamSocket(SOCKET inSocket)
{
mStrmSocket.Attach(inSocket);
}
void CMediaController::SetSize(long inTotalSize, long inCheckOffset)
{
mTotalSize = inTotalSize;
mExtraCheckOffset = inCheckOffset;
if (mExtraCheckOffset > 0)
{
mExtraBufferSize = inTotalSize - inCheckOffset;
// Allocate extra buffer for checking
if (mExtraBuffer)
{
delete[] mExtraBuffer;
mExtraBuffer = NULL;
}
mExtraBuffer = new BYTE[mExtraBufferSize];
}
// Adjust some params
if (mStreamType == FT_MPEG2)
{
mCheckingSize = 3 * 1024 * 1024;
}
if (mCheckingSize > mTotalSize)
{
mCheckingSize = mTotalSize;
}
}
BOOL CMediaController::StartReceiving(void)
{
return mStrmSocket.StartReceiving();
}
void CMediaController::StopReceiving(void)
{
mStrmSocket.StopReceiving();
}
BOOL CMediaController::IsReadToBeChecked(void)
{
return mIsReadyToBeChecked;
}
// Buffer operations
BOOL CMediaController::AddData(char * inData, long inLength)
{
// If pin connection need extra checking bytes,
// store data first into the extra buffer
// Attention: check bytes only received when graph is stopped
if (mReceiveFilter->IsStopped() && mExtraCheckOffset > 0 &&
mExtraWritingOffset < mExtraBufferSize)
{
long space = mExtraBufferSize - mExtraWritingOffset;
if (space >= inLength)
{
memcpy(mExtraBuffer + mExtraWritingOffset, inData, inLength);
mExtraWritingOffset += inLength;
return TRUE;
}
else
{
memcpy(mExtraBuffer + mExtraWritingOffset, inData, space);
mExtraWritingOffset += space;
return AddData(inData + space, inLength - space);
}
}
#ifdef _DEBUG_IO
fprintf(fp, "AddData: %10d\n", inLength);
#endif
// If have no enough space to hold new data, we wait!
while (mBufferSize - mWritingOffset < inLength)
{
mIsWaitingToAddData = TRUE;
if (mReceiveFilter->IsFlushing())
{
mIsWaitingToAddData = FALSE;
return FALSE;
}
#ifdef _DEBUG_IO
fprintf(fp, "Waiting for space to hold coming data!\n");
#endif
Sleep(2);
{
// Make space for incoming data
// If still in connection checking process, we shouldn't memmove!
CAutoLock lck(&mSyncBuffer);
long validLength = mWritingOffset - mReadingOffset;
if (mCheckingFinished)
{
if (validLength <= mMinWorkingSize || mIsWaitingToRead)
{
memmove(mMediaBuffer, mMediaBuffer+mReadingOffset,
validLength);
mReadingOffset = 0;
mWritingOffset = validLength;
mAvailabeSize = validLength;
}
}
}
}
mIsWaitingToAddData = FALSE;
CAutoLock lck(&mSyncBuffer);
// Put the incoming data to the end of the buffer
memcpy(mMediaBuffer + mWritingOffset, inData, inLength);
mWritingOffset += inLength;
mAvailabeSize += inLength;
// Determine byte-checking feature when pin connection
if (!mCheckingFinished && !mIsReadyToBeChecked &&
mWritingOffset >= mCheckingSize)
{
mIsReadyToBeChecked = TRUE;
}
return TRUE;
}
void CMediaController::MakeSpaceForAddData(void)
{
CAutoLock lck(&mSyncBuffer);
long validLength = mWritingOffset - mReadingOffset;
if (mCheckingFinished)
{
memmove(mMediaBuffer, mMediaBuffer+mReadingOffset, validLength);
mReadingOffset = 0;
mWritingOffset = validLength;
mAvailabeSize = validLength;
}
}
void CMediaController::EndOfStream(void)
{
mIsEOS = TRUE;
#ifdef _DEBUG_IO
fprintf(fp, "\nStream finished!\n");
#endif
}
// Delegating output stream pin's methods
// Only when connection checking, we let caller "SetPointer".
HRESULT CMediaController::SetPointer(LONGLONG llPos)
{
#ifdef _DEBUG_IO
fprintf(fp, "SetPointer: %10d\n", (long)llPos);
#endif
// When filter trying connection, our filter may be used to connect
// to several filters. So we should reuse the buffer data for checking.
if (!mCheckingFinished && mReceiveFilter->IsStopped())
{
if (llPos >= 0 && llPos < mBufferSize)//llPos < mTotalSize)
{
CAutoLock lck(&mSyncBuffer);
// if (mExtraCheckOffset > 0 && llPos >= mExtraCheckOffset)
// {
// mExtraReadingOffset = long(llPos - mExtraCheckOffset);
// mReadFromExtraBuffer = TRUE;
// }
// else
{
mReadingOffset = long(llPos);
mReadFromExtraBuffer = FALSE;
}
}
}
if (mExtraCheckOffset > 0 && llPos >= mExtraCheckOffset)
{
mExtraReadingOffset = long(llPos - mExtraCheckOffset);
mReadFromExtraBuffer = TRUE;
}
return NOERROR;
}
HRESULT CMediaController::Read(PBYTE pbBuffer, DWORD dwBytesToRead,
BOOL bAlign, LPDWORD pdwBytesRead)
{
if (mExtraCheckOffset > 0 && mReceiveFilter->IsStopped() &&
mReadFromExtraBuffer)
{
mReadFromExtraBuffer = FALSE;
memcpy(pbBuffer, mExtraBuffer + mExtraReadingOffset, dwBytesToRead);
mExtraReadingOffset += dwBytesToRead;
*pdwBytesRead = dwBytesToRead;
return NOERROR;
}
// If no enough data to be read out, wait here!
// while (mWritingOffset <= mReadingOffset)
while (mWritingOffset - mReadingOffset < dwBytesToRead)
{
#ifdef _DEBUG_IO
fprintf(fp, "Waiting for enough data!!\n");
#endif
mIsWaitingToRead = TRUE;
if (mReceiveFilter->IsFlushing() || mIsEOS)
{
mIsWaitingToRead = FALSE;
*pdwBytesRead = 0;
return NOERROR;
}
Sleep(2);
}
mIsWaitingToRead = FALSE;
CAutoLock lck(&mSyncBuffer);
/* long validLength = mWritingOffset - mReadingOffset;
// Read data from the buffer, and update the data pointer
if (validLength < dwBytesToRead)
{
memcpy(pbBuffer, mMediaBuffer + mReadingOffset, validLength);
mReadingOffset += validLength;
*pdwBytesRead = validLength;
}
else*/
{
memcpy(pbBuffer, mMediaBuffer + mReadingOffset, dwBytesToRead);
mReadingOffset += dwBytesToRead;
*pdwBytesRead = dwBytesToRead;
}
#ifdef _DEBUG_IO
fprintf(fp, "To read: %10d Actual read: %10d\n",
(long)dwBytesToRead, (long)*pdwBytesRead);
#endif
return NOERROR;
}
LONGLONG CMediaController::Size(LONGLONG *pSizeAvailable)
{
if (pSizeAvailable)
{
*pSizeAvailable = mAvailabeSize;
}
return mTotalSize;
// return 0x7fffffffff;
}
///////////////////////////////////////////////////////////////////////
#ifdef _QOS_
BOOL CMediaController::StartQosMonitoring(void)
{
mMonitoring = TRUE;
DWORD threadID = 0;
mQosThrd = CreateThread(NULL, 0, MonitoringThrd,
this, 0, &threadID);
return (mQosThrd != NULL);
}
void CMediaController::StopQosMonitoring(void)
{
if (mQosThrd != NULL)
{
mMonitoring = FALSE;
WaitForSingleObject(mQosThrd, INFINITE);
mQosThrd = NULL;
}
}
DWORD WINAPI CMediaController::MonitoringThrd(void * pParam)
{
CMediaController * pInst = (CMediaController *) pParam;
long validDataSize = 0;
while (pInst->mMonitoring)
{
validDataSize = pInst->mWritingOffset - pInst->mReadingOffset;
// If valid data size is less than 1% of buffer,
// pause the filter graph and wait for more data
if (validDataSize < pInst->mCheckingSize && !pInst->mIsEOS)
{
pInst->mReceiveFilter->PauseGraph();
DbgLog((LOG_TRACE, 0, TEXT("PauseGraph...")));
// When filter graph is paused, no data will be read out.
// So we should make space for holding data if necessary
if (pInst->mIsWaitingToAddData)
{
pInst->MakeSpaceForAddData();
}
}
// If valid data size is more than 80% of buffer,
// restart the filter graph
if (validDataSize * 10 > pInst->mBufferSize * 8 || pInst->mIsEOS)
{
pInst->mReceiveFilter->RunGraph();
DbgLog((LOG_TRACE, 0, TEXT("RunGraph~~~")));
}
Sleep(300);
}
return 1;
}
#endif // _QOS_
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -