📄 transbuf.cpp
字号:
m_ulLastTimestampReceived = m_ulFirstTimestampReceived;
}
else
{
m_ulLastTimestampReceived = uTimestamp;
}
if (m_ulLastTimestampReceived > uTimestamp &&
((m_ulLastTimestampReceived - uTimestamp) > MAX_TIMESTAMP_GAP))
{
m_ulTSRollOver++;
}
}
// prefetch if we have received enough data(0x200)
// NOTE: we only cache half of the packets(0x100) in order to
// give enough time to recover lost packets
if (m_bPrefetch)
{
DoPrefetch();
}
CheckForSourceDone();
return HXR_OK;
}
HX_RESULT
RTSPTransportBuffer::GetPacket(ClientPacket*& pPacket)
{
HX_RESULT rc = HXR_OK;
pPacket = NULL;
if (m_bStreamDone)
{
if (!m_bStreamDoneSent)
{
m_bStreamDoneSent = TRUE;
m_pOwner->streamDone(m_uStreamNumber);
}
return HXR_AT_END;
}
else if (!m_bIsInitialized || m_uSeekCount || m_bWaitingForSeekFlush ||
(m_bPaused && !m_bIsEnded))
{
return HXR_NO_DATA;
}
if (!m_bCacheIsEmpty)
{
GetPacketFromCache(pPacket);
}
if (!pPacket)
{
rc = GetPacketFromQueue(pPacket);
}
if (m_bQueueIsEmpty && m_bCacheIsEmpty)
{
if (m_bIsEnded)
{
m_bStreamDone = TRUE;
}
else
{
// Check if projected packet time-stamp is past expected
// end-time stamp, if yes we no longer expect packets.
// If m_uLastTimestamp is 0, we haven't read any packets
// and should not try considering source completion.
if (m_bExpectedTSRangeSet && (m_uLastTimestamp != 0))
{
ULONG32 ulCurrentDuration = m_uLastTimestamp -
m_uStartTimestamp;
ULONG32 ulExpectedDuration = m_uEndTimestamp -
m_uStartTimestamp;
// For content longer than 24 days, do not use time-range based
// stream stoppage logic as it becomes more difficult to distinguish
// between post-end and pre-start timestamps
if (((LONG32) ulExpectedDuration) > 0)
{
UpdateTime(&m_PacketTime);
ULONG32 ulLastPacketTime =
m_LastPacketTime.m_LastTime.tv_sec * 1000 +
m_LastPacketTime.m_LastTime.tv_usec / 1000;
ULONG32 ulCurrentTime =
m_PacketTime.m_LastTime.tv_sec * 1000 +
m_PacketTime.m_LastTime.tv_usec / 1000;
ULONG32 ulTransportWaitTime = ulCurrentTime -
ulLastPacketTime;
if ((((LONG32) ulCurrentDuration) >= 0) &&
((ulCurrentDuration + ulTransportWaitTime) >=
(ulExpectedDuration + m_ulEndDelayTolerance)) &&
(ulTransportWaitTime >= m_ulEndDelayTolerance))
{
m_bSourceStopped = TRUE;
}
}
}
if (m_bSourceStopped)
{
m_bIsEnded = TRUE;
m_bStreamDone = TRUE;
m_bStreamDoneSent = TRUE;
m_pOwner->streamDone(m_uStreamNumber);
}
}
}
return rc;
}
HX_RESULT
RTSPTransportBuffer::StartPackets()
{
ASSERT(!m_bPacketsStarted);
m_bPacketsStarted = TRUE;
return HXR_OK;
}
HX_RESULT
RTSPTransportBuffer::StopPackets()
{
ASSERT(m_bPacketsStarted);
m_bPacketsStarted = FALSE;
return HXR_OK;
}
HX_RESULT
RTSPTransportBuffer::GetStatus
(
UINT16& uStatusCode,
UINT16& ulPercentDone
)
{
#if 0
uStatusCode = HX_STATUS_READY;
ulPercentDone = 100;
if (m_bIsEnded)
{
return HXR_OK;
}
/* ignore multicasted sparsed streams(i.e. events)
* it is OK to not be initialized if we are dealing with
* sparse streams over multicast. This is because
* in multicast, the transport gets initialialized on
* receiving the first packet. We do not want to hold
* the entire presenation if we never receive a packet
* for this sparse stream.
*/
else if ((!m_bIsInitialized || m_uSeekCount) &&
(!m_bMulticast || !m_bSparseStream))
{
uStatusCode = HX_STATUS_BUFFERING;
ulPercentDone = 0;
return HXR_OK;
}
UINT32 ulCurrentBuffering = 0;
INT64 llActualLastTimestampReceived = 0;
if (m_bAtLeastOnePacketReceived)
{
llActualLastTimestampReceived = CAST_TO_INT64 m_ulTSRollOver * CAST_TO_INT64 MAX_UINT32 +
CAST_TO_INT64 m_ulLastTimestampReceived;
// FileFormats may send packets with out of order timestamps
// if the stream has been continuesly playing for 49 days
// we will set llCurrentBufferingInMs to MAX_UINT32
if (llActualLastTimestampReceived > CAST_TO_INT64 m_ulFirstTimestampReceived)
{
if (llActualLastTimestampReceived - CAST_TO_INT64 m_ulFirstTimestampReceived > MAX_UINT32)
{
ulCurrentBuffering = MAX_UINT32;
}
else
{
ulCurrentBuffering = INT64_TO_UINT32(llActualLastTimestampReceived -
m_ulFirstTimestampReceived);
}
}
}
UINT32 ulElapsedBufferingTime =
CALCULATE_ELAPSED_TICKS(m_ulBufferingStartTime,
HX_GET_TICKCOUNT());
UINT32 ulPauseTime = m_PacketTime.m_PauseTime.tv_sec*1000 +
m_PacketTime.m_PauseTime.tv_usec/1000;
if (ulPauseTime > 0 && ulElapsedBufferingTime > ulPauseTime)
{
ulElapsedBufferingTime -= ulPauseTime;
}
/*
* If the buffer duration = 0, then there is no network jitter to worry
* about
*/
UINT32 ulMinimumToBuffer =
m_bufferDuration + (m_bufferDuration ? MIN_NETWORK_JITTER_MSECS : 0);
if (m_status == TRANSBUF_FILLING ||
(ulElapsedBufferingTime < ulMinimumToBuffer &&
ulCurrentBuffering < ulMinimumToBuffer))
{
uStatusCode = HX_STATUS_BUFFERING;
UINT32 ulHighVal = ulCurrentBuffering > ulElapsedBufferingTime ?
ulCurrentBuffering : ulElapsedBufferingTime;
if (ulHighVal < ulMinimumToBuffer)
{
ulPercentDone = HX_SAFEUINT16(ulHighVal*100/ulMinimumToBuffer);
}
else // Waiting for a reliable packet
{
ulPercentDone = 99;
}
}
return HXR_OK;
#else
return HXR_NOTIMPL;
#endif
}
HX_RESULT
RTSPTransportBuffer::SetupForACKPacket
(
UINT16& uSeqNo,
CHXBitset& pBitset,
UINT16& uBitCount,
BOOL& didACK,
BOOL& bLostHigh,
BOOL& bNeedAnotherACK
)
{
if (m_bACKDone || !m_bIsInitialized)
{
return HXR_NO_DATA;
}
UINT16 uLastSequenceNumber = m_uLastSequenceNumber;
BOOL bAllACK = FALSE;
/*
* The start and end indexes must be INT32 or the loop will not
* terminate properly
*/
ClientPacket* pPacket = 0;
INT32 iPacketIndex = GetPacketIndex(uLastSequenceNumber);
INT32 iStartIndex = GetACKIndex(uLastSequenceNumber);
INT32 iEndIndex = 0;
/*
* 1) If the start index > MAX_DEQUE_SIZE then we have ACKed all the
* current packets
* 2) If iPacketIndex = 0 AND
* A) the queue is empty, then we have never entered a packet into
* the queue
* B) the only packet is a sanitization packet, then we sanitized
* for a late packet
*/
if (iStartIndex > MAX_DEQUE_SIZE)
{
return HXR_NO_DATA;
}
else if (iPacketIndex == 0)
{
if (!m_bQueueIsEmpty)
{
pPacket = (ClientPacket*)(*m_pPacketDeque)[0];
if (!pPacket->IsSanitizePacket())
{
goto SanitizeContinue;
}
}
return HXR_NO_DATA;
}
SanitizeContinue:
INT32 i;
/*
* If we can't fit all the ACK/NAKs in one ACK packet, then start
* ACK/NAKing from the beginning of the transport buffer
*/
if (iStartIndex > MAX_BITSET_SIZE)
{
/*
* Carefully set uLastSequenceNumber
*/
uLastSequenceNumber = m_uACKSequenceNumber;
for (i = 0; i < MAX_BITSET_SIZE; i++)
{
uLastSequenceNumber++;
if (uLastSequenceNumber == m_wrapSequenceNumber)
{
uLastSequenceNumber = 0;
}
}
/*
* Reset the indexes with the last packet we will ACK/NAK
*/
iPacketIndex = GetPacketIndex(uLastSequenceNumber);
iStartIndex = MAX_BITSET_SIZE;
/*
* Since the number of packets > the amount we can ACK, we may need
* another ACK packet to fully clean up the ACK wait list. However,
* if we run into a NAK, abort the back-to-back ACK because we
* would just repeat the information going out in this ACK packet
*/
bNeedAnotherACK = TRUE;
}
/*
* We may have released more packets than can fit in an ACK packet or
* the queue may be empty
*/
if (iPacketIndex > MAX_DEQUE_SIZE)
{
bAllACK = TRUE;
iPacketIndex = 0;
}
ASSERT(m_bQueueIsEmpty ? bAllACK : TRUE);
UINT32 uLastNAKSequenceNumber = 0;
BOOL bNAKFound = FALSE;
uBitCount = HX_SAFEUINT16(iStartIndex);
bLostHigh = FALSE;
/*
* We loop iStartIndex+1 times because we also need to set uSeqNo
*/
for (i = iStartIndex; i >= iEndIndex; i--)
{
/*
* We may have released this packet already
*/
if (iPacketIndex < 0)
{
HX_ASSERT(i < iStartIndex);
pBitset.set((iStartIndex - 1) - i);
continue;
}
else if (iPacketIndex == 0)
{
/*
* We may have released all the packets before ACKing them
*/
if (bAllACK)
{
iPacketIndex--;
uSeqNo = uLastSequenceNumber;
didACK = TRUE;
continue;
}
}
pPacket = (ClientPacket*)(*m_pPacketDeque)[iPacketIndex--];
/*
* If the last packet is not valid, flag it for a NAK
*/
if (i == iStartIndex)
{
if (pPacket->IsLostPacket())
{
bLostHigh = TRUE;
bNeedAnotherACK = FALSE;
if (!pPacket->IsResendRequested())
{
pPacket->SetResendRequested();
m_uResendRequested++;
}
}
uSeqNo = pPacket->GetSequenceNumber();
didACK = TRUE;
continue;
}
else if (pPacket->IsLostPacket())
{
bNAKFound = TRUE;
bNeedAnotherACK = FALSE;
uLastNAKSequenceNumber = pPacket->GetSequenceNumber();
pBitset.set((iStartIndex - 1) - i);
pBitset.clear((iStartIndex - 1) - i);
if (!pPacket->IsResendRequested())
{
pPacket->SetResendRequested();
m_uResendRequested++;
}
}
else
{
pBitset.set((iStartIndex - 1) - i);
}
}
/*
* Bump the ACK counter
*/
INT32 iACKCount;
if (bNAKFound)
{
iACKCount = GetACKIndex((UINT16) uLastNAKSequenceNumber);
}
else
{
iACKCount = GetACKIndex((UINT16) uLastSequenceNumber) + 1;
}
/*
* Carefully bump m_uACKSequenceNumber
*/
UINT16 uTestSequenceNumber = (UINT16)(m_uACKSequenceNumber + iACKCount);
if (m_bIsEnded ||
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -