📄 transbuf.cpp
字号:
pPacket->IsReliable())
{
m_uReliableSeqNo = pPacket->GetReliableSeqNo() - 1;
m_bMulticastReliableSeqNoSet = TRUE;
}
/*
* Save away the sequence number because Insert() may delete the
* ClientPacket
*/
UINT16 uSequenceNumber = pPacket->GetSequenceNumber();
HX_RESULT result = HXR_OK;
/* We insert it AFTER flushing the packets in live flush case */
if (!m_bWaitingForLiveSeekFlush)
{
result = Insert(pPacket);
}
if (HXR_OK != result)
{
return result;
}
if (m_bWaitingForSeekFlush)
{
UINT32 uPacketSeekIndex = GetSeekIndex(uSequenceNumber);
/*
* If we have already tried to flush or this packet belongs
* after the seek, then Flush()
*/
if (m_bFlushHolding || uPacketSeekIndex < MAX_DEQUE_SIZE)
{
/*
* This routine will clear out the old packets and will reset
* the packet queue with the information provided by the seek
*/
if (HXR_OK == Flush())
{
if (m_bFlushHolding)
{
m_bFlushHolding = FALSE;
}
m_bWaitingForSeekFlush = FALSE;
// Queue should not be empty at this point, since seek index
// was contained within the queue prior to flush
HX_ASSERT(!m_bQueueIsEmpty);
}
else
{
m_bFlushHolding = TRUE;
}
}
}
if (m_bWaitingForLiveSeekFlush)
{
HX_ASSERT(!m_bWaitingForSeekFlush);
m_bWaitingForLiveSeekFlush = FALSE;
m_uACKSequenceNumber =
m_uFirstSequenceNumber =
m_uLastSequenceNumber = uSequenceNumber;
result = Insert(pPacket);
}
return HXR_OK;
}
HX_RESULT
RTSPTransportBuffer::Insert(ClientPacket* pPacket)
{
ClientPacket* tempPacket = NULL;
if (m_bQueueIsEmpty)
{
SanitizePacketQueue();
m_bQueueIsEmpty = FALSE;
}
UINT32 uTailIndex = GetPacketIndex(m_uLastSequenceNumber);
if (uTailIndex >= MAX_DEQUE_SIZE)
{
/*
* Somebody had better be getting packets from this buffer
*/
HX_RELEASE(pPacket);
return HXR_FAIL;
}
UINT32 uTimestamp = pPacket->GetTime();
m_uByteCount += pPacket->GetByteCount();
m_ulCurrentQueueByteCount += pPacket->GetByteCount();;
UINT16 uSequenceNumber = pPacket->GetSequenceNumber();
UINT32 uPacketIndex = GetPacketIndex(uSequenceNumber);
// Send NAK iff at least REORDER_TOLERANCE packets with higher
// seqno's than the lost packet arrive. increases robustness if
// reordering occurs. There is a trade off between loss detection
// accuracy and the time of the retransmission window being made
// here. Loss detection becomes inaccurate when we count reordered
// packets as lost. But we can't determine if packets are
// reordered without waiting for subsequent pkts to arrive.
// Something to consider is determining whether to NAK early or
// not based on the avg. Time for the current index to be
// retrieved by the higher level transpor object (avg time in
// queue) and the estimated RTT.
m_pPendingLock->Lock();
LISTPOSITION pos = m_PendingPackets.GetHeadPosition();
int nCount = m_PendingPackets.GetCount();
for( int nTmp=0 ; pos && nTmp<nCount ; nTmp++ )
{
BOOL bDeleted=FALSE;
PendingPacket* pPend = (PendingPacket*)m_PendingPackets.GetAt(pos);
if(uSequenceNumber > pPend->m_ulSequenceNumber)
{
pPend->m_ulNumPktsBehind++;
if( pPend->m_ulNumPktsBehind>REORDER_TOLERANCE )
{
UINT32 tempIndex = GetPacketIndex((UINT16)pPend->m_ulSequenceNumber);
//Send a NAK and increment resend requested count.
m_pOwner->sendNAKPacket( m_uStreamNumber,
(UINT16)pPend->m_ulSequenceNumber,
(UINT16)pPend->m_ulSequenceNumber);
if( tempIndex<m_pPacketDeque->size())
((ClientPacket*)(*m_pPacketDeque)[tempIndex])->SetResendRequested();
m_uResendRequested++;
//Remove this packet from our pending list
pos = m_PendingPackets.RemoveAt(pos);
HX_DELETE(pPend);
bDeleted=TRUE;
}
}
else if( uSequenceNumber==pPend->m_ulSequenceNumber )
{
//This packet arrived, remove it from the pending list.
pos = m_PendingPackets.RemoveAt(pos);
HX_DELETE(pPend);
bDeleted=TRUE;
m_ulOutOfOrder++;
}
//If we deleted, RemoveAt() updated the pos.
if(!bDeleted)
{
m_PendingPackets.GetNext(pos);
}
}
m_pPendingLock->Unlock();
if (uPacketIndex == uTailIndex + 1)
{
/*
* If the only packet in the queue is the sanitize packet, then we
* have lost a packet
*/
tempPacket = (ClientPacket*)(*m_pPacketDeque)[uTailIndex];
if (tempPacket->IsSanitizePacket())
{
//{FILE* f1 = ::fopen("c:\\loss.txt", "a+"); ::fprintf(f1, "this: %p Lost the sanitized packet uPacketIndex == uTailIndex + 1\n", this);::fclose(f1);}
goto HandleLostPacket;
}
/*
* Packet has arrived in order so put it in the queue
*/
if (OverByteLimit())
{
ConvertToDroppedPkt(pPacket);
}
m_pPacketDeque->push_back(pPacket);
m_uLastSequenceNumber++;
if (m_uLastSequenceNumber == m_wrapSequenceNumber)
{
m_uLastSequenceNumber = 0;
}
m_uNormal++;
}
else if (uPacketIndex <= uTailIndex)
{
/*
* Check to see that the packet queue is in a sane state
*/
if (uPacketIndex >= m_pPacketDeque->size())
{
ASSERT(0);
HX_RELEASE(pPacket);
return HXR_UNEXPECTED;
}
/*
* This is a valid out of order packet that belongs somewhere
* in the queue
*/
tempPacket = (ClientPacket*)(*m_pPacketDeque)[uPacketIndex];
if (tempPacket->IsLostPacket())
{
if (tempPacket->IsResendRequested())
{
m_uResendReceived++;
}
else
{
m_uNormal++;
}
/*
* This was a place holder packet, so replace it with the
* valid packet
*/
if (OverByteLimit())
{
ConvertToDroppedPkt(pPacket);
}
(*m_pPacketDeque)[uPacketIndex] = pPacket;
HX_RELEASE(tempPacket);
}
else
{
// could be actually duplicate (rare) OR
// because of resends for out-of-order packets (more likely)
m_ulDuplicate++;
/*
* We've received a duplicate packet so get rid of it
*/
HX_RELEASE(pPacket);
}
}
else if (uPacketIndex > MAX_DEQUE_SIZE)
{
//XXXGH...don't count late packets because they've already been
// been accounted for as lost packets
// m_uLate++;
/*
* If the stream is not being reset and this packet is either
* too early or too late to be placed in the queue, then Grow().
* If the stream just starting or ending then don't bother growing
* because packet funkiness may occur
*/
if (!m_bStreamBegin && !m_bIsEnded)
{
Grow();
}
HX_RELEASE(pPacket);
}
else
{
/*
* Check to see that the packet queue is in a sane state
*/
if (uTailIndex >= m_pPacketDeque->size())
{
ASSERT(0);
HX_RELEASE(pPacket);
return HXR_UNEXPECTED;
}
/*
* This is a valid out of order packet that belongs somewhere
* after the last packet in the queue, so fill in any missing
* packets
*/
tempPacket = (ClientPacket*)(*m_pPacketDeque)[uTailIndex];
HandleLostPacket:
/*
* Use the reliable count from the incoming packet to keep track
* of lost reliable packets and use the the timestamp from the
* incoming packet to give the transport a little longer to recover
* missing packets
*/
UINT16 uReliableSeqNo = pPacket->GetReliableSeqNo();
/*
* If the last packet in the queue is the sanitize packet, then
* it must get replaced with a proper lost packet
*/
UINT16 uSeqNo;
UINT32 uFillIndex;
if (tempPacket->IsSanitizePacket())
{
uSeqNo = tempPacket->GetSequenceNumber();
uFillIndex = uTailIndex;
tempPacket = (ClientPacket*)m_pPacketDeque->pop_front();
HX_RELEASE(tempPacket);
}
else
{
uSeqNo = tempPacket->GetSequenceNumber() + 1;
uFillIndex = uTailIndex + 1;
}
if (uSeqNo == m_wrapSequenceNumber)
{
uSeqNo = 0;
}
/*
* Fill in lost packets from the end of the queue to this packet
*/
UINT32 i;
//For each missing packet sequence number, make a dummy packet
//and stick it in the queue as a place holder. If we are
//looking at a 'large' gap in sequence numbers here then there
//is a good chance we are looking at real loss and not
//out-of-order packets. In that case, lets forget about the
//out-of-order work and send an immediate NAK here instead of
//waiting for the out of order limit and sending indivudual
//NAKs for each packet. This will lessen the server and
//network load as well (no NAK spam).
BOOL bUseOOPQueue = TRUE;
INT32 nNumToFill = uPacketIndex-uFillIndex;
UINT16 uStartNAKSeqNumber = uSeqNo;
//If we are missing more then REORDER_TOLERANCE packets then
//this has to be real loss and not out of order packets. In
//this case we just want to add these packets to the Packets
//queue, mark them as resend-requested and then send a single
//NAK for the bunch.
if( nNumToFill >= REORDER_TOLERANCE )
bUseOOPQueue = FALSE;
for( i=uFillIndex; i<uPacketIndex; i++ )
{
//Add a new filler packet..
tempPacket = new ClientPacket(uSeqNo++,
uReliableSeqNo,
uTimestamp,
0,
0,
0,
GetTime(),
FALSE);
tempPacket->AddRef();
m_pPacketDeque->push_back(tempPacket);
//Don't add to OOP queue if we think this is real loss.
if( bUseOOPQueue )
{
//Track this place holder packet by inserting it into our
//pending packet list.
m_pPendingLock->Lock();
PendingPacket* pPend = new PendingPacket(uSeqNo-1, HX_GET_TICKCOUNT());
if( pPend )
m_PendingPackets.AddTail( pPend );
//If this is the first packet found out of order start our callback.
if(m_pScheduler)
{
if( !m_pCallBack)
{
m_pCallBack = new RTSPTransportBufferCallback(this);
m_pCallBack->AddRef();
}
m_CallbackHandle = m_pScheduler->RelativeEnter(m_pCallBack, NAK_CHECK_INTERVAL);
}
m_pPendingLock->Unlock();
}
else
{
//Mark it, we will NAK for all packets outside of loop.
tempPacket->SetResendRequested();
m_uResendRequested++;
}
}
if( !bUseOOPQueue )
{
//send out the NAK right now...
m_pOwner->sendNAKPacket( m_uStreamNumber,
uStartNAKSeqNumber,
uSeqNo-1);
}
if (OverByteLimit())
{
ConvertToDroppedPkt(pPacket);
}
m_pPacketDeque->push_back(pPacket);
/*
* Carefully bump m_uLastSequenceNumber
*/
UINT16 uTestSequenceNumber = (UINT16)(m_uLastSequenceNumber +
(uPacketIndex - uTailIndex));
if (uTestSequenceNumber < m_uLastSequenceNumber ||
uTestSequenceNumber >= m_wrapSequenceNumber)
{
for (i = 0; i < uPacketIndex - uTailIndex; i++)
{
m_uLastSequenceNumber++;
if (m_uLastSequenceNumber == m_wrapSequenceNumber)
{
m_uLastSequenceNumber = 0;
}
}
}
else
{
m_uLastSequenceNumber = uTestSequenceNumber;
}
/*
* We did receive a valid packet
*/
m_uNormal++;
}
if (m_uSeekCount == 0 && pPacket && !pPacket->IsLostPacket())
{
if (!m_bAtLeastOnePacketReceived)
{
m_bAtLeastOnePacketReceived = TRUE;
m_ulFirstTimestampReceived = uTimestamp;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -