📄 transbuf.cpp
字号:
uTestSequenceNumber < m_uACKSequenceNumber ||
uTestSequenceNumber >= m_wrapSequenceNumber)
{
for (i = 0; i < iACKCount; i++)
{
if (m_bIsEnded && m_uACKSequenceNumber == m_uEndSequenceNumber)
{
m_bACKDone = TRUE;
break;
}
m_uACKSequenceNumber++;
if (m_uACKSequenceNumber == m_wrapSequenceNumber)
{
m_uACKSequenceNumber = 0;
}
}
}
else
{
m_uACKSequenceNumber = uTestSequenceNumber;
}
return HXR_OK;
}
UINT32
RTSPTransportBuffer::GetIndex(UINT32 uBaseSequenceNumber, UINT16 uSeqNo)
{
INT32 index = uSeqNo - uBaseSequenceNumber;
if(index < 0)
{
index = m_wrapSequenceNumber - uBaseSequenceNumber + uSeqNo;
}
return (UINT32)index;
}
void
RTSPTransportBuffer::SetEndPacket
(
UINT16 uSeqNo,
UINT16 uReliableSeqNo,
BOOL bPacketSent,
UINT32 uTimestamp
)
{
if (m_bIsEnded)
{
return;
}
//We have just received the last packet. Since we are getting no
//more packets, make sure we go through the pending packets list
//and send NAKs for each packet we have not recieved or got out
//of order.
m_pPendingLock->Lock();
while(!m_PendingPackets.IsEmpty())
{
PendingPacket* pPend = (PendingPacket*)m_PendingPackets.RemoveHead();
UINT32 tempIndex = GetPacketIndex((UINT16)pPend->m_ulSequenceNumber);
//Send a NAK and increment resend requested count.
m_pOwner->sendNAKPacket(m_uStreamNumber,
(UINT16)pPend->m_ulSequenceNumber,
(UINT16)pPend->m_ulSequenceNumber);
if( tempIndex<m_pPacketDeque->size())
((ClientPacket*)(*m_pPacketDeque)[tempIndex])->SetResendRequested();
m_uResendRequested++;
//Clean up.
HX_DELETE(pPend);
}
//We also don't need to call func anymore for this object.
if (m_pScheduler && m_CallbackHandle)
{
m_pScheduler->Remove(m_CallbackHandle);
}
m_CallbackHandle = 0;
if( m_pCallBack )
m_pCallBack->Clear();
HX_RELEASE( m_pCallBack );
m_pPendingLock->Unlock();
m_bIsEnded = TRUE;
m_uEndSequenceNumber = uSeqNo;
UINT32 uEndIndex = GetPacketIndex(m_uEndSequenceNumber);
// XXX HP we have too many empty queue determination
// i.e. m_bQueueIsEmpty
// uEndIndex > MAX_DEQUEUE_SIZE
// m_pPacketDeque->empty() == TRUE
// m_pPacketDeque->size() == 0
if (!bPacketSent || (uEndIndex > MAX_DEQUE_SIZE && m_bCacheIsEmpty))
{
/*
* Either no packets were sent or the end packet has come
* after the last packet has been released, so just send
* stream done notification
*/
m_bStreamDone = TRUE;
m_bStreamDoneSent = TRUE;
m_pOwner->streamDone(m_uStreamNumber);
return;
}
/*
* Since the buffer duration restriction is now lifted, the player
* can get all the packets in the buffer. That means all the packets
* must exist, so fill in the end of the queue with temporary "lost"
* packets
*/
ClientPacket* pPacket = new ClientPacket(uSeqNo,
uReliableSeqNo,
uTimestamp,
0,
0,
0,
GetTime(),
FALSE);
pPacket->AddRef();
Add(pPacket);
m_uEndReliableSeqNo = uReliableSeqNo;
CheckForSourceDone();
}
void
RTSPTransportBuffer::InformSourceStopped
(
void
)
{
m_bSourceStopped = TRUE;
}
void
RTSPTransportBuffer::InformTimestampRange
(
UINT32 ulStartTimestamp,
UINT32 ulEndTimestamp,
UINT32 ulEndDelayTolerance
)
{
m_uStartTimestamp = ulStartTimestamp;
m_uEndTimestamp = ulEndTimestamp;
m_ulEndDelayTolerance = ulEndDelayTolerance;
m_bExpectedTSRangeSet = TRUE;
}
HX_RESULT
RTSPTransportBuffer::UpdateStatistics
(
ULONG32& ulNormal,
ULONG32& ulLost,
ULONG32& ulLate,
ULONG32& ulResendRequested,
ULONG32& ulResendReceived,
ULONG32& ulAvgBandwidth,
ULONG32& ulCurBandwidth,
ULONG32& ulTotal30,
ULONG32& ulLost30,
ULONG32& ulDuplicate,
ULONG32& ulOutOfOrder
)
{
if (!m_bIsInitialized)
{
return HXR_NO_DATA;
}
ulNormal = m_uNormal;
ulLost = m_uLost;
ulLate = m_uLate;
ulResendRequested = m_uResendRequested;
ulResendReceived = m_uResendReceived;
ulLost30 = m_ulLastLost30;
ulTotal30 = m_ulLastTotal30;
ulAvgBandwidth = m_uAvgBandwidth;
ulCurBandwidth = m_uCurBandwidth;
ulDuplicate = m_ulDuplicate;
ulOutOfOrder = m_ulOutOfOrder;
if (m_bIsEnded)
{
ulAvgBandwidth = m_uAvgBandwidth = 0;
ulCurBandwidth = m_uCurBandwidth = 0;
return HXR_OK;
}
if (m_bPaused || m_bPausedHack)
{
/*
* This hack is needed because the server may send out an
* extra packet when the stream is paused, and this unsettles
* the bandwidth statistics when the stream is resumed
*/
if (!m_bPaused && m_bPausedHack)
{
m_bPausedHack = FALSE;
}
return HXR_OK;
}
// caculate the lost/total packets during the last 30 seconds
m_ulLost30[m_ulIndex30 % 30] = m_uLost;
m_ulTotal30[m_ulIndex30 % 30] = m_uNormal + m_uLost + m_uLate + m_uResendReceived;
ulLost30 = m_ulLost30[m_ulIndex30 % 30] -
m_ulLost30[(m_ulIndex30 + 1) % 30];
ulTotal30 = m_ulTotal30[m_ulIndex30 % 30] -
m_ulTotal30[(m_ulIndex30 + 1) % 30];
m_ulLastLost30 = ulLost30;
m_ulLastTotal30 = ulTotal30;
m_ulIndex30++;
HXTimeval lTime = m_pScheduler->GetCurrentSchedulerTime();
Timeval now((INT32)lTime.tv_sec, (INT32)lTime.tv_usec);
/*
* Must adjust m_StartTime and m_LastTime for the amount of time the
* client has been paused
*/
Timeval TotalTime = now - AdjustedStartTime(&m_StatisticsTime);
Timeval TimeSlice = now - AdjustedLastTime(&m_StatisticsTime);
UpdateTime(&m_StatisticsTime);
if (TotalTime <= 0.0 || TimeSlice <= 0.0)
{
/*
* This should not happen
*/
return HXR_UNEXPECTED;
}
double uTotalSeconds = TotalTime.tv_sec + (TotalTime.tv_usec / 1000000.0);
double uRecentSeconds = TimeSlice.tv_sec + (TimeSlice.tv_usec / 1000000.0);
INT64 uBitCount = m_uByteCount * 8;
INT64 uRecentBitCount = (m_uByteCount - m_uLastByteCount) * 8;
m_uAvgBandwidth = INT64_TO_UINT32(uBitCount / uTotalSeconds);
m_uCurBandwidth = INT64_TO_UINT32(uRecentBitCount / uRecentSeconds);
ulAvgBandwidth = m_uAvgBandwidth;
ulCurBandwidth = m_uCurBandwidth;
m_uLastByteCount = m_uByteCount;
return HXR_OK;
}
void
RTSPTransportBuffer::InitializeTime(BufferTimer* Timer)
{
HXTimeval lTime = m_pScheduler->GetCurrentSchedulerTime();
Timer->m_StartTime = Timeval((INT32)lTime.tv_sec, (INT32)lTime.tv_usec);
Timer->m_PreviousTime = Timeval((INT32)lTime.tv_sec, (INT32)lTime.tv_usec);
Timer->m_LastTime = Timeval((INT32)lTime.tv_sec, (INT32)lTime.tv_usec);
Timer->m_PauseTime = Timeval(0.0);
}
void
RTSPTransportBuffer::UpdateTime(BufferTimer* Timer)
{
HXTimeval lTime = m_pScheduler->GetCurrentSchedulerTime();
Timeval now((INT32)lTime.tv_sec, (INT32)lTime.tv_usec);
Timer->m_LastTime += now - Timer->m_PreviousTime;
Timer->m_PreviousTime = now;
}
Timeval
RTSPTransportBuffer::GetTime(BufferTimer* Timer)
{
return Timer->m_LastTime;
}
Timeval
RTSPTransportBuffer::GetTime()
{
/*
* Use m_PacketTime for GetTime() references
*/
UpdateTime(&m_PacketTime);
return GetTime(&m_PacketTime);
}
Timeval
RTSPTransportBuffer::AdjustedStartTime(BufferTimer* Timer)
{
return Timer->m_StartTime + Timer->m_PauseTime;
}
Timeval
RTSPTransportBuffer::AdjustedLastTime(BufferTimer* Timer)
{
return Timer->m_LastTime + Timer->m_PauseTime;
}
void
RTSPTransportBuffer::SetMulticast()
{
m_bMulticast = TRUE;
m_bSparseStream = m_pOwner->isSparseStream(m_uStreamNumber);
}
void
RTSPTransportBuffer::Pause()
{
HXTimeval lTime = m_pScheduler->GetCurrentSchedulerTime();
Timeval now((INT32)lTime.tv_sec, (INT32)lTime.tv_usec);
m_bPaused = TRUE;
m_StatisticsTime.m_LastTime += now - m_StatisticsTime.m_PreviousTime;
m_StatisticsTime.m_PreviousTime = now;
m_PacketTime.m_LastTime += now - m_PacketTime.m_PreviousTime;
m_PacketTime.m_PreviousTime = now;
}
void
RTSPTransportBuffer::Resume()
{
if (m_bPaused)
{
HXTimeval lTime = m_pScheduler->GetCurrentSchedulerTime();
Timeval now((INT32)lTime.tv_sec, (INT32)lTime.tv_usec);
m_bPaused = FALSE;
m_bPausedHack = TRUE;
m_StatisticsTime.m_PauseTime += now - m_StatisticsTime.m_PreviousTime;
m_StatisticsTime.m_PreviousTime = now;
m_PacketTime.m_PauseTime += now - m_PacketTime.m_PreviousTime;
m_PacketTime.m_PreviousTime = now;
m_ulBufferingStartTime = HX_GET_TICKCOUNT();
m_uLastByteCount = m_uByteCount;
}
}
void
RTSPTransportBuffer::SanitizePacketQueue()
{
m_uLastSequenceNumber = m_uFirstSequenceNumber;
/*
* Put a temporary "lost" packet at the head of the queue to make
* it sane. The timestamp must be set to that of the last packet
* removed from the buffer. This prevents the early releasing of a
* true lost packet
*/
ClientPacket* pPacket = new ClientPacket(m_uFirstSequenceNumber,
m_uReliableSeqNo,
m_uLastTimestamp,
0,
0,
0,
GetTime(),
TRUE);
pPacket->AddRef();
m_pPacketDeque->push_back(pPacket);
}
HX_RESULT
RTSPTransportBuffer::Flush()
{
ClientPacket* pPacket;
//We are flushing all the packets. Empty out pending list.
m_pPendingLock->Lock();
while( !m_PendingPackets.IsEmpty() )
{
PendingPacket* pPend = (PendingPacket*)m_PendingPackets.RemoveHead();
HX_DELETE(pPend);
}
//Get rid of any scheduler events...
if (m_pScheduler && m_CallbackHandle)
{
m_pScheduler->Remove(m_CallbackHandle);
}
m_CallbackHandle = 0;
if( m_pCallBack )
m_pCallBack->Clear();
HX_RELEASE( m_pCallBack );
m_pPendingLock->Unlock();
while(!m_pPacketDeque->empty())
{
pPacket = (ClientPacket*)m_pPacketDeque->front();
if (pPacket)
{
/*
* Check to see that we are not waiting for a missing pre-seek
* reliable packet
*/
if (m_uReliableSeqNo !=
pPacket->GetReliableSeqNo() - pPacket->IsReliable())
{
return HXR_INCOMPLETE;
}
UINT32 uSeekIndex = GetSeekIndex(pPacket->GetSequenceNumber());
if (uSeekIndex == 0)
{
m_uLastTimestamp = pPacket->GetTime();
return HXR_OK;
}
pPacket = (ClientPacket*)m_pPacketDeque->pop_front();
IHXPacket* pIHXPacket = pPacket->GetPacket();
m_pOwner->packetReady(HXR_OK,
m_uStreamNumber,
pIHXPacket);
if (pIHXPacket)
{
pIHXPacket->Release();
}
UpdateStatsFromPacket(pPacket);
HX_RELEASE(pPacket);
}
}
/*
* XXXGH...Do I really need to do this?
* InitializeTime(&m_PacketTime);
*/
m_bQueueIsEmpty = TRUE;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -