📄 transbuf.cpp
字号:
m_ulCurrentQueueByteCount = 0;
/*
* It's possible that there are missing pre-seek packets that haven't
* been marked as lost yet...they will be marked as lost after the
* Insert(), so wait for the next incoming data packet before flushing
* the queue
*/
if (m_uFirstSequenceNumber != m_uSeekSequenceNumber)
{
return HXR_INCOMPLETE;
}
return HXR_OK;
}
HX_RESULT
RTSPTransportBuffer::GetCurrentBuffering(INT64& llLowestTimestamp,
INT64& llHighestTimestamp,
UINT32& ulNumBytes,
BOOL& bDone)
{
UINT32 ulFrontTimeStamp = 0;
UINT32 ulRearTimeStamp = 0;
llLowestTimestamp = 0;
llHighestTimestamp = 0;
ulNumBytes = 0;
bDone = m_bIsEnded;
if (m_pPacketDeque && m_uSeekCount == 0 && !m_bWaitingForSeekFlush)
{
if (!m_bCacheIsEmpty && m_bQueueIsEmpty)
{
ulFrontTimeStamp = m_ulFrontTimeStampCached;
ulRearTimeStamp = m_ulRearTimeStampCached;
}
else if (m_bCacheIsEmpty && !m_bQueueIsEmpty)
{
ClientPacket* frontPacket = (ClientPacket*)m_pPacketDeque->front();
ClientPacket* rearPacket = (ClientPacket*)m_pPacketDeque->back();
ulFrontTimeStamp = frontPacket->GetTime();
ulRearTimeStamp = rearPacket->GetTime();
}
else if (!m_bCacheIsEmpty && !m_bQueueIsEmpty)
{
ClientPacket* rearPacket = (ClientPacket*)m_pPacketDeque->back();
ulFrontTimeStamp = m_ulFrontTimeStampCached;
ulRearTimeStamp = rearPacket->GetTime();
}
else
{
goto cleanup;
}
llLowestTimestamp = CAST_TO_INT64 ulFrontTimeStamp;
if (ulFrontTimeStamp > ulRearTimeStamp &&
((ulFrontTimeStamp - ulRearTimeStamp) > MAX_TIMESTAMP_GAP))
{
llHighestTimestamp = CAST_TO_INT64 MAX_UINT32 + CAST_TO_INT64 ulRearTimeStamp;
}
else
{
llHighestTimestamp = CAST_TO_INT64 ulRearTimeStamp;
}
ulNumBytes = m_ulCurrentQueueByteCount + m_ulCurrentCacheByteCount;
}
cleanup:
return HXR_OK;
}
void
RTSPTransportBuffer::CheckForSourceDone()
{
if (m_bIsEnded &&
m_bIsInitialized &&
m_uSeekCount == 0 &&
!m_bWaitingForSeekFlush &&
m_uEndReliableSeqNo == m_uReliableSeqNo)
{
m_pOwner->CheckForSourceDone(m_uStreamNumber);
}
}
void
RTSPTransportBuffer::UpdateStatsFromPacket(ClientPacket* pPacket)
{
m_uFirstSequenceNumber++;
if (m_uFirstSequenceNumber == m_wrapSequenceNumber)
{
m_uFirstSequenceNumber = 0;
}
if (pPacket->IsReliable())
{
m_uReliableSeqNo++;
}
if (pPacket->IsLostPacket())
{
m_uLost++;
}
m_uLastTimestamp = pPacket->GetTime();
m_ulCurrentQueueByteCount = m_ulCurrentQueueByteCount > pPacket->GetByteCount() ?
m_ulCurrentQueueByteCount - pPacket->GetByteCount() :0;
}
void
RTSPTransportBuffer::SeekFlush()
{
if (m_bMulticast)
{
m_bMulticastReset = TRUE;
m_bMulticastReliableSeqNoSet = FALSE;
m_uSeekCount = 1;
Reset();
return;
}
/* We use this to re-initialize the first sequence number
* since we do not get this information in live pause case.
*/
m_bWaitingForLiveSeekFlush = TRUE;
/*
* If we're empty, there's nothing to flush
*/
if (m_bQueueIsEmpty)
{
return;
}
/*
* In the seek flush case there will be no initialization packet,
* so use the sequence number of the last packet in the buffer + 1
* as the beginning sequence number of the post-seek packets
*/
UINT32 uTailIndex = GetPacketIndex(m_uLastSequenceNumber);
ClientPacket* tempPacket = (ClientPacket*)(*m_pPacketDeque)[uTailIndex];
m_uSeekSequenceNumber = tempPacket->GetSequenceNumber() + 1;
if (m_uSeekSequenceNumber == m_wrapSequenceNumber)
{
m_uSeekSequenceNumber = 0;
}
m_bWaitingForSeekFlush = TRUE;
}
void
RTSPTransportBuffer::ReleasePackets()
{
/*
* If this is a live session try to send packets up to client
*/
if (m_bIsLive)
{
HX_RESULT hresult;
do
{
ClientPacket* pPacket = 0;
hresult = GetPacket(pPacket);
if (hresult == HXR_AT_END ||
hresult == HXR_NO_DATA ||
hresult == HXR_BUFFERING)
{
break;
}
IHXPacket* pIHXPacket = pPacket->GetPacket();
if (m_bPacketsStarted)
{
m_pOwner->packetReady(hresult,
m_uStreamNumber,
pIHXPacket);
}
HX_RELEASE(pIHXPacket);
HX_RELEASE(pPacket);
} while (hresult == HXR_OK);
}
}
void
RTSPTransportBuffer::SetBufferDepth(UINT32 uMilliseconds)
{
m_bufferDuration = uMilliseconds;
if (m_maxBufferDuration < uMilliseconds)
{
m_maxBufferDuration = uMilliseconds;
}
}
void
RTSPTransportBuffer::EnterPrefetch(void)
{
#if defined(HELIX_FEATURE_FIFOCACHE) && defined(HELIX_FEATURE_PREFETCH)
m_bPrefetch = TRUE;
if (m_bPrefetch)
{
IUnknown* pContext = NULL;
IHXCommonClassFactory* pClassFactory = NULL;
m_pOwner->GetContext(pContext);
if (pContext &&
HXR_OK == pContext->QueryInterface(IID_IHXCommonClassFactory,
(void**)&pClassFactory))
{
HX_RELEASE(m_pFIFOCache);
pClassFactory->CreateInstance(CLSID_IHXFIFOCache,
(void**)&m_pFIFOCache);
}
HX_RELEASE(pClassFactory);
HX_RELEASE(pContext);
}
#endif /* HELIX_FEATURE_FIFOCACHE && HELIX_FEATURE_PREFETCH */
return;
}
void
RTSPTransportBuffer::LeavePrefetch(void)
{
m_bPrefetch = FALSE;
return;
}
void
RTSPTransportBuffer::DoPrefetch(void)
{
#if defined(HELIX_FEATURE_FIFOCACHE) && defined(HELIX_FEATURE_PREFETCH)
UINT32 i = 0;
ClientPacket* pClientPacket = NULL;
if (m_pFIFOCache)
{
while (HXR_OK == GetPacketFromQueue(pClientPacket) && pClientPacket)
{
if (m_bCacheIsEmpty)
{
m_bCacheIsEmpty = FALSE;
m_ulFrontTimeStampCached = m_ulRearTimeStampCached = pClientPacket->GetTime();
}
else
{
m_ulRearTimeStampCached = pClientPacket->GetTime();
}
m_pFIFOCache->Cache((IUnknown*)pClientPacket);
m_ulCurrentCacheByteCount += pClientPacket->GetByteCount();
HX_RELEASE(pClientPacket);
}
}
#endif /* HELIX_FEATURE_FIFOCACHE && HELIX_FEATURE_PREFETCH */
return;
}
HX_RESULT
RTSPTransportBuffer::GetPacketFromCache(ClientPacket*& pPacket)
{
pPacket = NULL;
#if defined(HELIX_FEATURE_FIFOCACHE) && defined(HELIX_FEATURE_PREFETCH)
if (m_pFIFOCache)
{
m_pFIFOCache->Retrieve((IUnknown*&)pPacket);
// no more cached packets left
if (pPacket)
{
m_ulCurrentCacheByteCount = m_ulCurrentCacheByteCount > pPacket->GetByteCount() ?
m_ulCurrentCacheByteCount - pPacket->GetByteCount() :0;
}
else
{
HX_ASSERT(m_ulCurrentCacheByteCount == 0);
m_bCacheIsEmpty = TRUE;
}
}
#endif /* HELIX_FEATURE_FIFOCACHE && HELIX_FEATURE_PREFETCH */
return HXR_OK;
}
HX_RESULT
RTSPTransportBuffer::GetPacketFromQueue(ClientPacket*& pPacket)
{
UINT32 ulTimeInQueue = 0;
ClientPacket* frontPacket = NULL;
ClientPacket* rearPacket = NULL;
pPacket = NULL;
if (m_bQueueIsEmpty)
{
return HXR_NO_DATA;
}
frontPacket = (ClientPacket*)m_pPacketDeque->front();
rearPacket = (ClientPacket*)m_pPacketDeque->back();
/*
* The transport buffer should NEVER send a sanitization packet to the
* core
*/
if (frontPacket->IsSanitizePacket())
{
return HXR_NO_DATA;
}
UINT32 ulFrontTimeStamp = frontPacket->GetTime();
UINT32 ulRearTimeStamp = rearPacket->GetTime();
if (ulFrontTimeStamp > ulRearTimeStamp &&
((ulFrontTimeStamp - ulRearTimeStamp) > MAX_TIMESTAMP_GAP))
{
ulTimeInQueue = INT64_TO_UINT32(CAST_TO_INT64 ulRearTimeStamp + MAX_UINT32 - CAST_TO_INT64 ulFrontTimeStamp);
}
else
{
ulTimeInQueue = ulRearTimeStamp - ulFrontTimeStamp;
}
Timeval TimeInBuffer;
UpdateTime(&m_PacketTime);
TimeInBuffer = m_PacketTime.m_LastTime - frontPacket->GetStartTime();
/*
* If...
*
* 1) the server is still sending packets AND
* the first packet is lost AND
* there are less than MAX_QUEUED_PACKETS AND
* there is not enough data in the buffer AND
* the first packet has not been in the buffer too long
* 2) there was a reliable packet lost before this one
*
* then return HXR_BUFFERING
*/
/*
* If we are still in a buffering state AND the resend depth
* is not set to zero (to minimize latency), do not deplete the
* network jitter buffer.
*/
UINT32 ulMinimumToBuffer = m_bufferDuration;
BOOL bPlaying = FALSE;
if (m_pOwner && m_pOwner->m_pPlayerState)
{
bPlaying = m_pOwner->m_pPlayerState->IsPlaying();
if (!bPlaying && ulMinimumToBuffer != 0 )
ulMinimumToBuffer += MIN_NETWORK_JITTER_MSECS;
}
// We only want to get packets as soon as possible for FastStart when
// before starting playback. If already playing getting lost packets
// faster then usual prevents resent packets from being processed.
if ((!m_bFastStart || bPlaying) &&
(!m_bIsEnded &&
m_pPacketDeque->size() < MAX_QUEUED_PACKETS &&
frontPacket->IsLostPacket() &&
ulTimeInQueue < ulMinimumToBuffer &&
TimeInBuffer < Timeval((float)ulMinimumToBuffer / 1000.0)) ||
(frontPacket->GetReliableSeqNo() !=
(UINT16)(m_uReliableSeqNo + frontPacket->IsReliable())))
{
pPacket = 0;
m_status = TRANSBUF_FILLING;
return HXR_BUFFERING;
}
if (m_status != TRANSBUF_READY)
{
m_status = TRANSBUF_READY;
}
pPacket = (ClientPacket*)m_pPacketDeque->pop_front();
//Remove this packet if it is in our pending packet list
m_pPendingLock->Lock();
LISTPOSITION pos = m_PendingPackets.GetHeadPosition();
UINT32 ulSeqNum = pPacket->GetSequenceNumber();
while(pos)
{
PendingPacket* pPend = (PendingPacket*)m_PendingPackets.GetAt(pos);
if( pPend->m_ulSequenceNumber == ulSeqNum )
{
m_PendingPackets.RemoveAt(pos);
HX_DELETE( pPend );
break;
}
m_PendingPackets.GetNext(pos);
}
m_pPendingLock->Unlock();
/*
* The player has all the packets for the stream when first == end
* sequence number
*/
if (m_uFirstSequenceNumber == m_uLastSequenceNumber)
{
m_bQueueIsEmpty = TRUE;
}
UpdateStatsFromPacket(pPacket);
m_LastPacketTime = m_PacketTime;
return HXR_OK;
}
void
RTSPTransportBuffer::InitTimer()
{
m_pScheduler = m_pOwner->GetScheduler();
if (m_pScheduler)
{
m_pScheduler->AddRef();
InitializeTime(&m_StatisticsTime);
InitializeTime(&m_PacketTime);
m_LastPacketTime = m_PacketTime;
}
}
HX_RESULT
RTSPTransportBuffer::GetTransportBufferInfo(UINT32& ulLowestTimestamp,
UINT32& ulHighestTimestamp,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -