📄 transbuf.cpp
字号:
pPacket && pPacket->IsReliable()) { m_uReliableSeqNo = pPacket->GetReliableSeqNo() - 1; m_bMulticastReliableSeqNoSet = TRUE; } /* * Save away the sequence number because Insert() may delete the * ClientPacket */ UINT16 uSequenceNumber = pPacket->GetSequenceNumber(); HX_RESULT result = HXR_OK; /* We insert it AFTER flushing the packets in live flush case */ if (!m_bWaitingForLiveSeekFlush) { result = Insert(pPacket); } if (HXR_OK != result) { return result; } if (m_bWaitingForSeekFlush) { UINT32 uPacketSeekIndex = GetSeekIndex(uSequenceNumber); /* * If we have already tried to flush or this packet belongs * after the seek, then Flush() */ if (m_bFlushHolding || uPacketSeekIndex < MAX_DEQUE_SIZE) { /* * This routine will clear out the old packets and will reset * the packet queue with the information provided by the seek */ if (HXR_OK == Flush()) { if (m_bFlushHolding) { m_bFlushHolding = FALSE; } m_bWaitingForSeekFlush = FALSE; // Queue should not be empty at this point, since seek index // was contained within the queue prior to flush HX_ASSERT(!m_bQueueIsEmpty); } else { m_bFlushHolding = TRUE; } } } if (m_bWaitingForLiveSeekFlush) { HX_ASSERT(!m_bWaitingForSeekFlush); m_bWaitingForLiveSeekFlush = FALSE; m_uACKSequenceNumber = m_uFirstSequenceNumber = m_uLastSequenceNumber = uSequenceNumber; result = Insert(pPacket); } return HXR_OK;}HX_RESULTRTSPTransportBuffer::Insert(ClientPacket* pPacket){ ClientPacket* tempPacket = NULL; if (m_bQueueIsEmpty) { SanitizePacketQueue(); m_bQueueIsEmpty = FALSE; } UINT32 uTailIndex = GetPacketIndex(m_uLastSequenceNumber); if (uTailIndex >= MAX_DEQUE_SIZE) { /* * Somebody had better be getting packets from this buffer */ HX_RELEASE(pPacket); return HXR_FAIL; } UINT32 uTimestamp = pPacket->GetTime(); m_uByteCount += pPacket->GetByteCount(); m_ulCurrentQueueByteCount += pPacket->GetByteCount();; UINT16 uSequenceNumber = pPacket->GetSequenceNumber(); UINT32 uPacketIndex = GetPacketIndex(uSequenceNumber); // Send NAK iff at least REORDER_TOLERANCE packets with higher // seqno's than the lost packet arrive. increases robustness if // reordering occurs. There is a trade off between loss detection // accuracy and the time of the retransmission window being made // here. Loss detection becomes inaccurate when we count reordered // packets as lost. But we can't determine if packets are // reordered without waiting for subsequent pkts to arrive. // Something to consider is determining whether to NAK early or // not based on the avg. Time for the current index to be // retrieved by the higher level transpor object (avg time in // queue) and the estimated RTT. m_pPendingLock->Lock(); LISTPOSITION pos = m_PendingPackets.GetHeadPosition(); int nCount = m_PendingPackets.GetCount(); for( int nTmp=0 ; pos && nTmp<nCount ; nTmp++ ) { BOOL bDeleted=FALSE; PendingPacket* pPend = (PendingPacket*)m_PendingPackets.GetAt(pos); if(uSequenceNumber > pPend->m_ulSequenceNumber) { pPend->m_ulNumPktsBehind++; if( pPend->m_ulNumPktsBehind>REORDER_TOLERANCE ) { UINT32 tempIndex = GetPacketIndex((UINT16)pPend->m_ulSequenceNumber); //Send a NAK and increment resend requested count. m_pOwner->sendNAKPacket( m_uStreamNumber, (UINT16)pPend->m_ulSequenceNumber, (UINT16)pPend->m_ulSequenceNumber); if( tempIndex<m_pPacketDeque->size()) ((ClientPacket*)(*m_pPacketDeque)[tempIndex])->SetResendRequested(); m_uResendRequested++; //Remove this packet from our pending list pos = m_PendingPackets.RemoveAt(pos); HX_DELETE(pPend); bDeleted=TRUE; } } else if( uSequenceNumber==pPend->m_ulSequenceNumber ) { //This packet arrived, remove it from the pending list. pos = m_PendingPackets.RemoveAt(pos); HX_DELETE(pPend); bDeleted=TRUE; m_ulOutOfOrder++; } //If we deleted, RemoveAt() updated the pos. if(!bDeleted) { m_PendingPackets.GetNext(pos); } } m_pPendingLock->Unlock(); if (uPacketIndex == uTailIndex + 1) { /* * If the only packet in the queue is the sanitize packet, then we * have lost a packet */ tempPacket = (ClientPacket*)(*m_pPacketDeque)[uTailIndex]; if (tempPacket->IsSanitizePacket()) {//{FILE* f1 = ::fopen("c:\\loss.txt", "a+"); ::fprintf(f1, "this: %p Lost the sanitized packet uPacketIndex == uTailIndex + 1\n", this);::fclose(f1);} goto HandleLostPacket; } /* * Packet has arrived in order so put it in the queue */ if (OverByteLimit()) { ConvertToDroppedPkt(pPacket); } m_pPacketDeque->push_back(pPacket); m_uLastSequenceNumber++; if (m_uLastSequenceNumber == m_wrapSequenceNumber) { m_uLastSequenceNumber = 0; } m_uNormal++; } else if (uPacketIndex <= uTailIndex) { /* * Check to see that the packet queue is in a sane state */ if (uPacketIndex >= m_pPacketDeque->size()) { ASSERT(0); HX_RELEASE(pPacket); return HXR_UNEXPECTED; } /* * This is a valid out of order packet that belongs somewhere * in the queue */ tempPacket = (ClientPacket*)(*m_pPacketDeque)[uPacketIndex]; if (tempPacket->IsLostPacket()) { if (tempPacket->IsResendRequested()) { m_uResendReceived++; } else { m_uNormal++; } /* * This was a place holder packet, so replace it with the * valid packet */ if (OverByteLimit()) { ConvertToDroppedPkt(pPacket); } (*m_pPacketDeque)[uPacketIndex] = pPacket; HX_RELEASE(tempPacket); } else { // could be actually duplicate (rare) OR // because of resends for out-of-order packets (more likely) m_ulDuplicate++; /* * We've received a duplicate packet so get rid of it */ HX_RELEASE(pPacket); } } else if (uPacketIndex > MAX_DEQUE_SIZE) { //XXXGH...don't count late packets because they've already been // been accounted for as lost packets // m_uLate++; /* * If the stream is not being reset and this packet is either * too early or too late to be placed in the queue, then Grow(). * If the stream just starting or ending then don't bother growing * because packet funkiness may occur */ if (!m_bStreamBegin && !m_bIsEnded) { Grow(); } HX_RELEASE(pPacket); } else { /* * Check to see that the packet queue is in a sane state */ if (uTailIndex >= m_pPacketDeque->size()) { ASSERT(0); HX_RELEASE(pPacket); return HXR_UNEXPECTED; } /* * This is a valid out of order packet that belongs somewhere * after the last packet in the queue, so fill in any missing * packets */ tempPacket = (ClientPacket*)(*m_pPacketDeque)[uTailIndex];HandleLostPacket: /* * Use the reliable count from the incoming packet to keep track * of lost reliable packets and use the the timestamp from the * incoming packet to give the transport a little longer to recover * missing packets */ UINT16 uReliableSeqNo = pPacket->GetReliableSeqNo(); /* * If the last packet in the queue is the sanitize packet, then * it must get replaced with a proper lost packet */ UINT16 uSeqNo; UINT32 uFillIndex; if (tempPacket->IsSanitizePacket()) { uSeqNo = tempPacket->GetSequenceNumber(); uFillIndex = uTailIndex; tempPacket = (ClientPacket*)m_pPacketDeque->pop_front(); HX_RELEASE(tempPacket); } else { uSeqNo = tempPacket->GetSequenceNumber() + 1; uFillIndex = uTailIndex + 1; } if (uSeqNo == m_wrapSequenceNumber) { uSeqNo = 0; } /* * Fill in lost packets from the end of the queue to this packet */ UINT32 i; //For each missing packet sequence number, make a dummy packet //and stick it in the queue as a place holder. If we are //looking at a 'large' gap in sequence numbers here then there //is a good chance we are looking at real loss and not //out-of-order packets. In that case, lets forget about the //out-of-order work and send an immediate NAK here instead of //waiting for the out of order limit and sending indivudual //NAKs for each packet. This will lessen the server and //network load as well (no NAK spam). BOOL bUseOOPQueue = TRUE; INT32 nNumToFill = uPacketIndex-uFillIndex; UINT16 uStartNAKSeqNumber = uSeqNo; //If we are missing more then REORDER_TOLERANCE packets then //this has to be real loss and not out of order packets. In //this case we just want to add these packets to the Packets //queue, mark them as resend-requested and then send a single //NAK for the bunch. if( nNumToFill >= REORDER_TOLERANCE ) bUseOOPQueue = FALSE; for( i=uFillIndex; i<uPacketIndex; i++ ) { //Add a new filler packet.. tempPacket = new ClientPacket(uSeqNo++, uReliableSeqNo, uTimestamp, 0, 0, 0, GetTime(), FALSE); tempPacket->AddRef(); m_pPacketDeque->push_back(tempPacket); //Don't add to OOP queue if we think this is real loss. if( bUseOOPQueue ) { //Track this place holder packet by inserting it into our //pending packet list. m_pPendingLock->Lock(); PendingPacket* pPend = new PendingPacket(uSeqNo-1, HX_GET_TICKCOUNT()); if( pPend ) m_PendingPackets.AddTail( pPend ); //If this is the first packet found out of order start our callback. if(m_pScheduler) { if( !m_pCallBack) { m_pCallBack = new RTSPTransportBufferCallback(this); m_pCallBack->AddRef(); } m_CallbackHandle = m_pScheduler->RelativeEnter(m_pCallBack, NAK_CHECK_INTERVAL); } m_pPendingLock->Unlock(); } else { //Mark it, we will NAK for all packets outside of loop. tempPacket->SetResendRequested(); m_uResendRequested++; } } if( !bUseOOPQueue ) { //send out the NAK right now... m_pOwner->sendNAKPacket( m_uStreamNumber, uStartNAKSeqNumber, uSeqNo-1); } if (OverByteLimit()) { ConvertToDroppedPkt(pPacket); } m_pPacketDeque->push_back(pPacket); /* * Carefully bump m_uLastSequenceNumber */ UINT16 uTestSequenceNumber = (UINT16)(m_uLastSequenceNumber + (uPacketIndex - uTailIndex)); if (uTestSequenceNumber < m_uLastSequenceNumber || uTestSequenceNumber >= m_wrapSequenceNumber) { for (i = 0; i < uPacketIndex - uTailIndex; i++) { m_uLastSequenceNumber++; if (m_uLastSequenceNumber == m_wrapSequenceNumber) { m_uLastSequenceNumber = 0; } } } else { m_uLastSequenceNumber = uTestSequenceNumber; } /* * We did receive a valid packet */ m_uNormal++; } if (m_uSeekCount == 0 && pPacket && !pPacket->IsLostPacket()) { if (!m_bAtLeastOnePacketReceived) { m_bAtLeastOnePacketReceived = TRUE; m_ulFirstTimestampReceived = uTimestamp; m_ulLastTimestampReceived = m_ulFirstTimestampReceived; } else { m_ulLastTimestampReceived = uTimestamp; } if (m_ulLastTimestampReceived > uTimestamp && ((m_ulLastTimestampReceived - uTimestamp) > MAX_TIMESTAMP_GAP)) { m_ulTSRollOver++; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -