📄 transbuf.cpp
字号:
/* ***** BEGIN LICENSE BLOCK ***** * Source last modified: $Id: transbuf.cpp,v 1.14.2.2 2004/07/09 02:04:41 hubbe Exp $ * * Portions Copyright (c) 1995-2004 RealNetworks, Inc. All Rights Reserved. * * The contents of this file, and the files included with this file, * are subject to the current version of the RealNetworks Public * Source License (the "RPSL") available at * http://www.helixcommunity.org/content/rpsl unless you have licensed * the file under the current version of the RealNetworks Community * Source License (the "RCSL") available at * http://www.helixcommunity.org/content/rcsl, in which case the RCSL * will apply. You may also obtain the license terms directly from * RealNetworks. You may not use this file except in compliance with * the RPSL or, if you have a valid RCSL with RealNetworks applicable * to this file, the RCSL. Please see the applicable RPSL or RCSL for * the rights, obligations and limitations governing use of the * contents of the file. * * Alternatively, the contents of this file may be used under the * terms of the GNU General Public License Version 2 or later (the * "GPL") in which case the provisions of the GPL are applicable * instead of those above. If you wish to allow use of your version of * this file only under the terms of the GPL, and not to allow others * to use your version of this file under the terms of either the RPSL * or RCSL, indicate your decision by deleting the provisions above * and replace them with the notice and other provisions required by * the GPL. If you do not delete the provisions above, a recipient may * use your version of this file under the terms of any one of the * RPSL, the RCSL or the GPL. * * This file is part of the Helix DNA Technology. RealNetworks is the * developer of the Original Code and owns the copyrights in the * portions it created. * * This file, and the files included with this file, is distributed * and made available on an 'AS IS' basis, WITHOUT WARRANTY OF ANY * KIND, EITHER EXPRESS OR IMPLIED, AND REALNETWORKS HEREBY DISCLAIMS * ALL SUCH WARRANTIES, INCLUDING WITHOUT LIMITATION, ANY WARRANTIES * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, QUIET * ENJOYMENT OR NON-INFRINGEMENT. * * Technology Compatibility Kit Test Suite(s) Location: * http://www.helixcommunity.org/content/tck * * Contributor(s): * * ***** END LICENSE BLOCK ***** */#include "debug.h"#include "hxcom.h"#include "hxtypes.h"#include "hxstring.h"#include "hxslist.h"#include "hxdeque.h"#include "hxbitset.h"#include "hxmap.h"#include "hxengin.h"#include "ihxpckts.h"#include "basepkt.h"#include "mimehead.h"#include "rtspmsg.h"#include "servrsnd.h"#include "transbuf.h"#include "rtspif.h"#include "rtsptran.h"#include "hxtick.h"#include "hxheap.h"#ifdef _DEBUG#undef HX_THIS_FILE static const char HX_THIS_FILE[] = __FILE__;#endif//These defines control when we stop waiting for an out of order//packet and just send a NAK for it. The two conditions are a timeout//and the number of packets, with higher sequence numbers, that come//after it. For the timeout we have choosen 500ms for now. This//number should be based off of the RTT but we currently do not have//that information.#define NAK_TIMEOUT 500#define REORDER_TOLERANCE 3//This is how often to check to see if we have exceeded our NAK_TIMEOUT//for the packets in our pending queue.#define NAK_CHECK_INTERVAL 100// Pending Packet MethodsPendingPacket::PendingPacket(UINT32 ulSeqNo, UINT32 arrivalTime) : m_ulSequenceNumber(ulSeqNo), m_ulNumPktsBehind(0), m_ulArrivalTime(arrivalTime){}PendingPacket::~PendingPacket(){};/* * Keep the deque under 16k */const UINT16 MAX_BITSET_SIZE = 384;const UINT16 MIN_NETWORK_JITTER_MSECS = 2000;const UINT16 MAX_QUEUED_PACKETS = 500;RTSPTransportBuffer::RTSPTransportBuffer( RTSPTransport* owner, UINT16 streamNumber, UINT32 bufferDuration, UINT32 maxBufferDuration, UINT32 growthRate, UINT32 wrapSequenceNumber) : m_pOwner(owner), m_uStreamNumber(streamNumber), m_bufferDuration(bufferDuration), m_maxBufferDuration(maxBufferDuration), m_growthRate(growthRate), m_wrapSequenceNumber(wrapSequenceNumber), m_status(TRANSBUF_INITIALIZING), m_bIsInitialized(FALSE), m_bWaitingForSeekFlush(FALSE), m_bWaitingForLiveSeekFlush(FALSE), m_bFlushHolding(FALSE), m_bIsEnded(FALSE), m_bQueueIsEmpty(TRUE), m_bCacheIsEmpty(TRUE), m_bStreamBegin(FALSE), m_bStreamDone(FALSE), m_bStreamDoneSent(FALSE), m_bSourceStopped(FALSE), m_bExpectedTSRangeSet(FALSE), m_uStartTimestamp(0), m_uEndTimestamp(0), m_ulEndDelayTolerance(0), m_bACKDone(FALSE), m_bPaused(FALSE), m_bPausedHack(FALSE), m_uReliableSeqNo(0), m_uEndReliableSeqNo(0), m_uFirstSequenceNumber(0), m_uLastSequenceNumber(0), m_uEndSequenceNumber(0), m_uSeekSequenceNumber(0), m_uSeekCount(0), m_uNormal(0), m_ulDuplicate(0), m_ulOutOfOrder(0), m_uLost(0), m_uLate(0), m_uResendRequested(0), m_uResendReceived(0), m_uByteCount(0), m_uLastByteCount(0), m_uAvgBandwidth(0), m_uCurBandwidth(0), m_ulLastLost30(0), m_ulLastTotal30(0), m_ulTSRollOver(0), m_bPacketsStarted(FALSE), m_ulIndex30(0), m_uLastTimestamp(0), m_ulCurrentQueueByteCount(0), m_ulCurrentCacheByteCount(0), m_bAtLeastOnePacketReceived(FALSE), m_bAtLeastOneResetHandled(FALSE), m_ulFirstTimestampReceived(0), m_ulLastTimestampReceived(0), m_ulBufferingStartTime(0), m_ulLastGrowTime(HX_GET_TICKCOUNT()), m_bMulticast(FALSE), m_bMulticastReset(TRUE), m_bIsLive(FALSE), m_bSparseStream(FALSE), m_pScheduler(NULL), m_bMulticastReliableSeqNoSet(FALSE), m_bPrefetch(FALSE), m_bFastStart(FALSE), m_pFIFOCache(NULL), m_ulFrontTimeStampCached(0), m_ulRearTimeStampCached(0), m_ulByteLimit(0) ,m_CallbackHandle(0) ,m_pCallBack(NULL){ InitTimer(); int j = 0; for (j = 0; j < 30; j++) { m_ulTotal30[j] = 0; m_ulLost30[j] = 0; } m_pPacketDeque = new HX_deque(INITIAL_DEQUE_SIZE); m_pCallBack = new RTSPTransportBufferCallback(this); m_pCallBack->AddRef();#ifdef THREADS_SUPPORTED HXMutex::MakeMutex(m_pPendingLock);#else HXMutex::MakeStubMutex(m_pPendingLock);#endif}RTSPTransportBuffer::~RTSPTransportBuffer(){ CHXSimpleList::Iterator i; ClientPacket* pPacket; //Clean up our pending packet que. m_pPendingLock->Lock(); while( !m_PendingPackets.IsEmpty() ) { PendingPacket* pPend = (PendingPacket*)m_PendingPackets.RemoveHead(); HX_DELETE(pPend); } //Get rid of any scheduler events... if (m_pScheduler && m_CallbackHandle) { m_pScheduler->Remove(m_CallbackHandle); } m_CallbackHandle = 0; if( m_pCallBack ) m_pCallBack->Clear(); HX_RELEASE( m_pCallBack ); m_pPendingLock->Unlock(); for (i = m_pHoldList.Begin(); i != m_pHoldList.End(); ++i) { pPacket = (ClientPacket*)(*i); HX_RELEASE(pPacket); } m_pHoldList.RemoveAll(); while(!m_pPacketDeque->empty()) { pPacket = (ClientPacket*)m_pPacketDeque->pop_front(); HX_RELEASE(pPacket); } HX_RELEASE(m_pScheduler); HX_DELETE(m_pPendingLock); HX_DELETE(m_pPacketDeque);#if defined(HELIX_FEATURE_FIFOCACHE) HX_RELEASE(m_pFIFOCache);#endif}voidRTSPTransportBuffer::Reset(){ m_status = TRANSBUF_INITIALIZING; if (m_bAtLeastOneResetHandled) { m_uSeekCount++; m_bIsEnded = FALSE; m_bStreamDone = FALSE; m_bStreamDoneSent = FALSE; m_bSourceStopped = FALSE; } else { m_bAtLeastOneResetHandled = TRUE; m_ulBufferingStartTime = HX_GET_TICKCOUNT(); } m_ulTSRollOver = 0; m_bAtLeastOnePacketReceived = FALSE; m_uLastTimestamp = 0; m_bExpectedTSRangeSet = FALSE; m_uStartTimestamp = 0; m_uEndTimestamp = 0; m_ulEndDelayTolerance = 0;}voidRTSPTransportBuffer::Grow(){ UINT32 ulCurrentTime = HX_GET_TICKCOUNT(); /* Check to not grow fast in case we get multiple late * packets at around the same time. */ if (CALCULATE_ELAPSED_TICKS(m_ulLastGrowTime, ulCurrentTime) >= m_growthRate) { m_ulLastGrowTime = HX_GET_TICKCOUNT(); if (m_bufferDuration + m_growthRate <= m_maxBufferDuration) { m_bufferDuration += m_growthRate; } }}HX_RESULTRTSPTransportBuffer::Init(UINT16 uSeqNo){ /* * The server side of an encoding session will initialize the scheduler * here */ if (!m_pScheduler) { InitTimer(); } HX_ASSERT(m_pScheduler != NULL); if (!m_bIsInitialized) { m_bIsInitialized = TRUE; m_uACKSequenceNumber = m_uFirstSequenceNumber = m_uLastSequenceNumber = uSeqNo; if (m_uSeekCount > 0) { return HXR_OK; } } else if (m_uSeekCount) { m_uSeekCount--; if (m_uSeekCount > 0) { return HXR_OK; } m_uSeekSequenceNumber = uSeqNo; m_bWaitingForSeekFlush = TRUE; } m_status = TRANSBUF_READY; /* * Now add any packets that arrived before initialization */ m_bStreamBegin = TRUE; CHXSimpleList::Iterator i; for (i = m_pHoldList.Begin(); i != m_pHoldList.End(); ++i) { ClientPacket* pPacket = (ClientPacket*)(*i); Add(pPacket); } m_pHoldList.RemoveAll(); m_bStreamBegin = FALSE; return HXR_OK;}HX_RESULTRTSPTransportBuffer::Add(ClientPacket* pPacket){ if (!m_pPacketDeque) { HX_RELEASE(pPacket); return HXR_FAIL; } else if (m_pPacketDeque->size() >= MAX_DEQUE_SIZE) { m_pOwner->HandleBufferError(); HX_RELEASE(pPacket); return HXR_FAIL; } else if (m_bStreamDone) { /* * If we have already returned the last packet, then don't bother * trying to add anymore */ HX_RELEASE(pPacket); return HXR_OK; } else if (!m_bIsInitialized || m_uSeekCount) { /* * Until the first sequence number is set, just hold the packets * as they arrive */#if defined(HELIX_FEATURE_TRANSPORT_MULTICAST) if (m_bMulticast && m_bMulticastReset) { //We are going to destroy all our place holders so clean up the //pending queue as well. m_pPendingLock->Lock(); while( !m_PendingPackets.IsEmpty() ) { PendingPacket* pPend = (PendingPacket*)m_PendingPackets.RemoveHead(); HX_DELETE(pPend); } //Get rid of any scheduler events... if (m_pScheduler && m_CallbackHandle) { m_pScheduler->Remove(m_CallbackHandle); } m_CallbackHandle = 0; if( m_pCallBack ) m_pCallBack->Clear(); HX_RELEASE( m_pCallBack ); m_pPendingLock->Unlock(); /* Destruct and recreate the packet queue */ /* XXXSMP ...but it works */ ClientPacket *pTmpPacket = NULL; while(!m_pPacketDeque->empty()) { pTmpPacket = (ClientPacket*)m_pPacketDeque->pop_front(); HX_RELEASE(pTmpPacket); } HX_DELETE(m_pPacketDeque); m_pPacketDeque = new HX_deque(INITIAL_DEQUE_SIZE); m_bMulticastReset = FALSE; m_bIsInitialized = FALSE; m_bWaitingForSeekFlush = FALSE; m_bWaitingForLiveSeekFlush = FALSE; m_bFlushHolding = FALSE; m_bIsEnded = FALSE; m_bQueueIsEmpty = TRUE; m_bCacheIsEmpty = TRUE; m_bStreamBegin = FALSE; m_bStreamDone = FALSE; m_bStreamDoneSent = FALSE; m_bSourceStopped = FALSE; m_bExpectedTSRangeSet = FALSE; m_uStartTimestamp = 0; m_uEndTimestamp = 0; m_ulEndDelayTolerance = 0; m_bACKDone = FALSE; m_bPaused = FALSE; m_bPausedHack = FALSE; m_uReliableSeqNo = 0; m_uEndReliableSeqNo = 0; m_uFirstSequenceNumber = 0; m_uLastSequenceNumber = 0; m_uEndSequenceNumber = 0; m_uSeekSequenceNumber = 0; m_uSeekCount = 0; m_bAtLeastOnePacketReceived = FALSE; m_bAtLeastOneResetHandled = FALSE; Init(pPacket->GetSequenceNumber()); Add(pPacket); return HXR_OK; }#endif /* HELIX_FEATURE_TRANSPORT_MULTICAST */ m_pHoldList.AddTail(pPacket); return HXR_OK; } // initialize the reliableSeqNo on the first // reliable multicast packet // SeqNo of reliable packets starts at 1 if (m_bMulticast && !m_bMulticastReliableSeqNoSet &&
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -