📄 transbuf.cpp
字号:
} // prefetch if we have received enough data(0x200) // NOTE: we only cache half of the packets(0x100) in order to // give enough time to recover lost packets if (m_bPrefetch) { DoPrefetch(); } CheckForSourceDone(); return HXR_OK;}HX_RESULTRTSPTransportBuffer::GetPacket(ClientPacket*& pPacket){ HX_RESULT rc = HXR_OK; pPacket = NULL; if (m_bStreamDone) { if (!m_bStreamDoneSent) { m_bStreamDoneSent = TRUE; m_pOwner->streamDone(m_uStreamNumber); } return HXR_AT_END; } else if (!m_bIsInitialized || m_uSeekCount || m_bWaitingForSeekFlush || (m_bPaused && !m_bIsEnded)) { return HXR_NO_DATA; } if (!m_bCacheIsEmpty) { GetPacketFromCache(pPacket); } if (!pPacket) { rc = GetPacketFromQueue(pPacket); } if (m_bQueueIsEmpty && m_bCacheIsEmpty) { if (m_bIsEnded) { m_bStreamDone = TRUE; } else { // Check if projected packet time-stamp is past expected // end-time stamp, if yes we no longer expect packets. // If m_uLastTimestamp is 0, we haven't read any packets // and should not try considering source completion. if (m_bExpectedTSRangeSet && (m_uLastTimestamp != 0)) { ULONG32 ulCurrentDuration = m_uLastTimestamp - m_uStartTimestamp; ULONG32 ulExpectedDuration = m_uEndTimestamp - m_uStartTimestamp; // For content longer than 24 days, do not use time-range based // stream stoppage logic as it becomes more difficult to distinguish // between post-end and pre-start timestamps if (((LONG32) ulExpectedDuration) > 0) { UpdateTime(&m_PacketTime); ULONG32 ulLastPacketTime = m_LastPacketTime.m_LastTime.tv_sec * 1000 + m_LastPacketTime.m_LastTime.tv_usec / 1000; ULONG32 ulCurrentTime = m_PacketTime.m_LastTime.tv_sec * 1000 + m_PacketTime.m_LastTime.tv_usec / 1000; ULONG32 ulTransportWaitTime = ulCurrentTime - ulLastPacketTime; if ((((LONG32) ulCurrentDuration) >= 0) && ((ulCurrentDuration + ulTransportWaitTime) >= (ulExpectedDuration + m_ulEndDelayTolerance)) && (ulTransportWaitTime >= m_ulEndDelayTolerance)) { m_bSourceStopped = TRUE; } } } if (m_bSourceStopped) { m_bIsEnded = TRUE; m_bStreamDone = TRUE; m_bStreamDoneSent = TRUE; m_pOwner->streamDone(m_uStreamNumber); } } } return rc;}HX_RESULTRTSPTransportBuffer::StartPackets(){ ASSERT(!m_bPacketsStarted); m_bPacketsStarted = TRUE; return HXR_OK;}HX_RESULTRTSPTransportBuffer::StopPackets(){ ASSERT(m_bPacketsStarted); m_bPacketsStarted = FALSE; return HXR_OK;}HX_RESULTRTSPTransportBuffer::GetStatus( UINT16& uStatusCode, UINT16& ulPercentDone){#if 0 uStatusCode = HX_STATUS_READY; ulPercentDone = 100; if (m_bIsEnded) { return HXR_OK; } /* ignore multicasted sparsed streams(i.e. events) * it is OK to not be initialized if we are dealing with * sparse streams over multicast. This is because * in multicast, the transport gets initialialized on * receiving the first packet. We do not want to hold * the entire presenation if we never receive a packet * for this sparse stream. */ else if ((!m_bIsInitialized || m_uSeekCount) && (!m_bMulticast || !m_bSparseStream)) { uStatusCode = HX_STATUS_BUFFERING; ulPercentDone = 0; return HXR_OK; } UINT32 ulCurrentBuffering = 0; INT64 llActualLastTimestampReceived = 0; if (m_bAtLeastOnePacketReceived) { llActualLastTimestampReceived = CAST_TO_INT64 m_ulTSRollOver * CAST_TO_INT64 MAX_UINT32 + CAST_TO_INT64 m_ulLastTimestampReceived; // FileFormats may send packets with out of order timestamps // if the stream has been continuesly playing for 49 days // we will set llCurrentBufferingInMs to MAX_UINT32 if (llActualLastTimestampReceived > CAST_TO_INT64 m_ulFirstTimestampReceived) { if (llActualLastTimestampReceived - CAST_TO_INT64 m_ulFirstTimestampReceived > MAX_UINT32) { ulCurrentBuffering = MAX_UINT32; } else { ulCurrentBuffering = INT64_TO_UINT32(llActualLastTimestampReceived - m_ulFirstTimestampReceived); } } } UINT32 ulElapsedBufferingTime = CALCULATE_ELAPSED_TICKS(m_ulBufferingStartTime, HX_GET_TICKCOUNT()); UINT32 ulPauseTime = m_PacketTime.m_PauseTime.tv_sec*1000 + m_PacketTime.m_PauseTime.tv_usec/1000; if (ulPauseTime > 0 && ulElapsedBufferingTime > ulPauseTime) { ulElapsedBufferingTime -= ulPauseTime; } /* * If the buffer duration = 0, then there is no network jitter to worry * about */ UINT32 ulMinimumToBuffer = m_bufferDuration + (m_bufferDuration ? MIN_NETWORK_JITTER_MSECS : 0); if (m_status == TRANSBUF_FILLING || (ulElapsedBufferingTime < ulMinimumToBuffer && ulCurrentBuffering < ulMinimumToBuffer)) { uStatusCode = HX_STATUS_BUFFERING; UINT32 ulHighVal = ulCurrentBuffering > ulElapsedBufferingTime ? ulCurrentBuffering : ulElapsedBufferingTime; if (ulHighVal < ulMinimumToBuffer) { ulPercentDone = HX_SAFEUINT16(ulHighVal*100/ulMinimumToBuffer); } else // Waiting for a reliable packet { ulPercentDone = 99; } } return HXR_OK;#else return HXR_NOTIMPL;#endif}HX_RESULTRTSPTransportBuffer::SetupForACKPacket( UINT16& uSeqNo, CHXBitset& pBitset, UINT16& uBitCount, BOOL& didACK, BOOL& bLostHigh, BOOL& bNeedAnotherACK){ if (m_bACKDone || !m_bIsInitialized) { return HXR_NO_DATA; } UINT16 uLastSequenceNumber = m_uLastSequenceNumber; BOOL bAllACK = FALSE; /* * The start and end indexes must be INT32 or the loop will not * terminate properly */ ClientPacket* pPacket = 0; INT32 iPacketIndex = GetPacketIndex(uLastSequenceNumber); INT32 iStartIndex = GetACKIndex(uLastSequenceNumber); INT32 iEndIndex = 0; /* * 1) If the start index > MAX_DEQUE_SIZE then we have ACKed all the * current packets * 2) If iPacketIndex = 0 AND * A) the queue is empty, then we have never entered a packet into * the queue * B) the only packet is a sanitization packet, then we sanitized * for a late packet */ if (iStartIndex > MAX_DEQUE_SIZE) { return HXR_NO_DATA; } else if (iPacketIndex == 0) { if (!m_bQueueIsEmpty) { pPacket = (ClientPacket*)(*m_pPacketDeque)[0]; if (!pPacket->IsSanitizePacket()) { goto SanitizeContinue; } } return HXR_NO_DATA; }SanitizeContinue: INT32 i; /* * If we can't fit all the ACK/NAKs in one ACK packet, then start * ACK/NAKing from the beginning of the transport buffer */ if (iStartIndex > MAX_BITSET_SIZE) { /* * Carefully set uLastSequenceNumber */ uLastSequenceNumber = m_uACKSequenceNumber; for (i = 0; i < MAX_BITSET_SIZE; i++) { uLastSequenceNumber++; if (uLastSequenceNumber == m_wrapSequenceNumber) { uLastSequenceNumber = 0; } } /* * Reset the indexes with the last packet we will ACK/NAK */ iPacketIndex = GetPacketIndex(uLastSequenceNumber); iStartIndex = MAX_BITSET_SIZE; /* * Since the number of packets > the amount we can ACK, we may need * another ACK packet to fully clean up the ACK wait list. However, * if we run into a NAK, abort the back-to-back ACK because we * would just repeat the information going out in this ACK packet */ bNeedAnotherACK = TRUE; } /* * We may have released more packets than can fit in an ACK packet or * the queue may be empty */ if (iPacketIndex > MAX_DEQUE_SIZE) { bAllACK = TRUE; iPacketIndex = 0; } ASSERT(m_bQueueIsEmpty ? bAllACK : TRUE); UINT32 uLastNAKSequenceNumber = 0; BOOL bNAKFound = FALSE; uBitCount = HX_SAFEUINT16(iStartIndex); bLostHigh = FALSE; /* * We loop iStartIndex+1 times because we also need to set uSeqNo */ for (i = iStartIndex; i >= iEndIndex; i--) { /* * We may have released this packet already */ if (iPacketIndex < 0) { HX_ASSERT(i < iStartIndex); pBitset.set((iStartIndex - 1) - i); continue; } else if (iPacketIndex == 0) { /* * We may have released all the packets before ACKing them */ if (bAllACK) { iPacketIndex--; uSeqNo = uLastSequenceNumber; didACK = TRUE; continue; } } pPacket = (ClientPacket*)(*m_pPacketDeque)[iPacketIndex--]; /* * If the last packet is not valid, flag it for a NAK */ if (i == iStartIndex) { if (pPacket->IsLostPacket()) { bLostHigh = TRUE; bNeedAnotherACK = FALSE; if (!pPacket->IsResendRequested()) { pPacket->SetResendRequested(); m_uResendRequested++; } } uSeqNo = pPacket->GetSequenceNumber(); didACK = TRUE; continue; } else if (pPacket->IsLostPacket()) { bNAKFound = TRUE; bNeedAnotherACK = FALSE; uLastNAKSequenceNumber = pPacket->GetSequenceNumber(); pBitset.set((iStartIndex - 1) - i); pBitset.clear((iStartIndex - 1) - i); if (!pPacket->IsResendRequested()) { pPacket->SetResendRequested(); m_uResendRequested++; } } else { pBitset.set((iStartIndex - 1) - i); } } /* * Bump the ACK counter */ INT32 iACKCount; if (bNAKFound) { iACKCount = GetACKIndex((UINT16) uLastNAKSequenceNumber); } else { iACKCount = GetACKIndex((UINT16) uLastSequenceNumber) + 1; } /* * Carefully bump m_uACKSequenceNumber */ UINT16 uTestSequenceNumber = (UINT16)(m_uACKSequenceNumber + iACKCount); if (m_bIsEnded || uTestSequenceNumber < m_uACKSequenceNumber || uTestSequenceNumber >= m_wrapSequenceNumber) { for (i = 0; i < iACKCount; i++) { if (m_bIsEnded && m_uACKSequenceNumber == m_uEndSequenceNumber) { m_bACKDone = TRUE; break; } m_uACKSequenceNumber++; if (m_uACKSequenceNumber == m_wrapSequenceNumber) { m_uACKSequenceNumber = 0; } } } else { m_uACKSequenceNumber = uTestSequenceNumber; } return HXR_OK;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -