📄 multiframedrtpsource.cpp
字号:
/**********This library is free software; you can redistribute it and/or modify it underthe terms of the GNU Lesser General Public License as published by theFree Software Foundation; either version 2.1 of the License, or (at youroption) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)This library is distributed in the hope that it will be useful, but WITHOUTANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESSFOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License formore details.You should have received a copy of the GNU Lesser General Public Licensealong with this library; if not, write to the Free Software Foundation, Inc.,51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA**********/// "liveMedia"// Copyright (c) 1996-2010 Live Networks, Inc. All rights reserved.// RTP source for a common kind of payload format: Those that pack multiple,// complete codec frames (as many as possible) into each RTP packet.// Implementation#include "MultiFramedRTPSource.hh"#include "GroupsockHelper.hh"#include <string.h>////////// ReorderingPacketBuffer definition //////////class ReorderingPacketBuffer {public: ReorderingPacketBuffer(BufferedPacketFactory* packetFactory); virtual ~ReorderingPacketBuffer(); void reset(); BufferedPacket* getFreePacket(MultiFramedRTPSource* ourSource); Boolean storePacket(BufferedPacket* bPacket); BufferedPacket* getNextCompletedPacket(Boolean& packetLossPreceded); void releaseUsedPacket(BufferedPacket* packet); void freePacket(BufferedPacket* packet) { if (packet != fSavedPacket) { delete packet; } else { fSavedPacketFree = True; } } Boolean isEmpty() const { return fHeadPacket == NULL; } void setThresholdTime(unsigned uSeconds) { fThresholdTime = uSeconds; }private: BufferedPacketFactory* fPacketFactory; unsigned fThresholdTime; // uSeconds Boolean fHaveSeenFirstPacket; // used to set initial "fNextExpectedSeqNo" unsigned short fNextExpectedSeqNo; BufferedPacket* fHeadPacket; BufferedPacket* fSavedPacket; // to avoid calling new/free in the common case Boolean fSavedPacketFree;};////////// MultiFramedRTPSource implementation //////////MultiFramedRTPSource::MultiFramedRTPSource(UsageEnvironment& env, Groupsock* RTPgs, unsigned char rtpPayloadFormat, unsigned rtpTimestampFrequency, BufferedPacketFactory* packetFactory) : RTPSource(env, RTPgs, rtpPayloadFormat, rtpTimestampFrequency) { reset(); fReorderingBuffer = new ReorderingPacketBuffer(packetFactory); // Try to use a big receive buffer for RTP: increaseReceiveBufferTo(env, RTPgs->socketNum(), 50*1024);}void MultiFramedRTPSource::reset() { fCurrentPacketBeginsFrame = True; // by default fCurrentPacketCompletesFrame = True; // by default fAreDoingNetworkReads = False; fPacketReadInProgress = NULL; fNeedDelivery = False; fPacketLossInFragmentedFrame = False;}MultiFramedRTPSource::~MultiFramedRTPSource() { fRTPInterface.stopNetworkReading(); delete fReorderingBuffer;}Boolean MultiFramedRTPSource::processSpecialHeader(BufferedPacket* /*packet*/, unsigned& resultSpecialHeaderSize) { // Default implementation: Assume no special header: resultSpecialHeaderSize = 0; return True;}Boolean MultiFramedRTPSource::packetIsUsableInJitterCalculation(unsigned char* /*packet*/, unsigned /*packetSize*/) { // Default implementation: return True;}void MultiFramedRTPSource::doStopGettingFrames() { fRTPInterface.stopNetworkReading(); fReorderingBuffer->reset(); reset();}void MultiFramedRTPSource::doGetNextFrame() { if (!fAreDoingNetworkReads) { // Turn on background read handling of incoming packets: fAreDoingNetworkReads = True; TaskScheduler::BackgroundHandlerProc* handler = (TaskScheduler::BackgroundHandlerProc*)&networkReadHandler; fRTPInterface.startNetworkReading(handler); } fSavedTo = fTo; fSavedMaxSize = fMaxSize; fFrameSize = 0; // for now fNeedDelivery = True; doGetNextFrame1();}void MultiFramedRTPSource::doGetNextFrame1() { while (fNeedDelivery) { // If we already have packet data available, then deliver it now. Boolean packetLossPrecededThis; BufferedPacket* nextPacket = fReorderingBuffer->getNextCompletedPacket(packetLossPrecededThis); if (nextPacket == NULL) break; fNeedDelivery = False; if (nextPacket->useCount() == 0) { // Before using the packet, check whether it has a special header // that needs to be processed: unsigned specialHeaderSize; if (!processSpecialHeader(nextPacket, specialHeaderSize)) { // Something's wrong with the header; reject the packet: fReorderingBuffer->releaseUsedPacket(nextPacket); fNeedDelivery = True; break; } nextPacket->skip(specialHeaderSize); } // Check whether we're part of a multi-packet frame, and whether // there was packet loss that would render this packet unusable: if (fCurrentPacketBeginsFrame) { if (packetLossPrecededThis || fPacketLossInFragmentedFrame) { // We didn't get all of the previous frame. // Forget any data that we used from it: fTo = fSavedTo; fMaxSize = fSavedMaxSize; fFrameSize = 0; } fPacketLossInFragmentedFrame = False; } else if (packetLossPrecededThis) { // We're in a multi-packet frame, with preceding packet loss fPacketLossInFragmentedFrame = True; } if (fPacketLossInFragmentedFrame) { // This packet is unusable; reject it: fReorderingBuffer->releaseUsedPacket(nextPacket); fNeedDelivery = True; break; } // The packet is usable. Deliver all or part of it to our caller: unsigned frameSize; nextPacket->use(fTo, fMaxSize, frameSize, fNumTruncatedBytes, fCurPacketRTPSeqNum, fCurPacketRTPTimestamp, fPresentationTime, fCurPacketHasBeenSynchronizedUsingRTCP, fCurPacketMarkerBit); fFrameSize += frameSize; if (!nextPacket->hasUsableData()) { // We're completely done with this packet now fReorderingBuffer->releaseUsedPacket(nextPacket); } if (fCurrentPacketCompletesFrame || fNumTruncatedBytes > 0) { // We have all the data that the client wants. if (fNumTruncatedBytes > 0) { envir() << "MultiFramedRTPSource::doGetNextFrame1(): The total received frame size exceeds the client's buffer size (" << fSavedMaxSize << "). " << fNumTruncatedBytes << " bytes of trailing data will be dropped!\n"; } // Call our own 'after getting' function, so that the downstream object can consume the data: if (fReorderingBuffer->isEmpty()) { // Common case optimization: There are no more queued incoming packets, so this code will not get // executed again without having first returned to the event loop. Call our 'after getting' function // directly, because there's no risk of a long chain of recursion (and thus stack overflow): afterGetting(this); } else { // Special case: Call our 'after getting' function via the event loop. nextTask() = envir().taskScheduler().scheduleDelayedTask(0, (TaskFunc*)FramedSource::afterGetting, this); } } else { // This packet contained fragmented data, and does not complete // the data that the client wants. Keep getting data: fTo += frameSize; fMaxSize -= frameSize; fNeedDelivery = True; } }}void MultiFramedRTPSource::setPacketReorderingThresholdTime(unsigned uSeconds) { fReorderingBuffer->setThresholdTime(uSeconds);}#define ADVANCE(n) do { bPacket->skip(n); } while (0)void MultiFramedRTPSource::networkReadHandler(MultiFramedRTPSource* source, int /*mask*/) { source->networkReadHandler1();}void MultiFramedRTPSource::networkReadHandler1() { BufferedPacket* bPacket = fPacketReadInProgress; if (bPacket == NULL) { // Normal case: Get a free BufferedPacket descriptor to hold the new network packet: bPacket = fReorderingBuffer->getFreePacket(this); } // Read the network packet, and perform sanity checks on the RTP header: Boolean readSuccess = False; do { Boolean packetReadWasIncomplete = fPacketReadInProgress != NULL; if (!bPacket->fillInData(fRTPInterface, packetReadWasIncomplete)) break; if (packetReadWasIncomplete) { // We need additional read(s) before we can process the incoming packet: fPacketReadInProgress = bPacket; return; } else { fPacketReadInProgress = NULL; }#ifdef TEST_LOSS setPacketReorderingThresholdTime(0); // don't wait for 'lost' packets to arrive out-of-order later if ((our_random()%10) == 0) break; // simulate 10% packet loss#endif // Check for the 12-byte RTP header: if (bPacket->dataSize() < 12) break; unsigned rtpHdr = ntohl(*(unsigned*)(bPacket->data())); ADVANCE(4); Boolean rtpMarkerBit = (rtpHdr&0x00800000) >> 23; unsigned rtpTimestamp = ntohl(*(unsigned*)(bPacket->data()));ADVANCE(4); unsigned rtpSSRC = ntohl(*(unsigned*)(bPacket->data())); ADVANCE(4); // Check the RTP version number (it should be 2): if ((rtpHdr&0xC0000000) != 0x80000000) break; // Skip over any CSRC identifiers in the header: unsigned cc = (rtpHdr>>24)&0xF; if (bPacket->dataSize() < cc) break; ADVANCE(cc*4); // Check for (& ignore) any RTP header extension if (rtpHdr&0x10000000) { if (bPacket->dataSize() < 4) break; unsigned extHdr = ntohl(*(unsigned*)(bPacket->data())); ADVANCE(4); unsigned remExtSize = 4*(extHdr&0xFFFF); if (bPacket->dataSize() < remExtSize) break; ADVANCE(remExtSize); } // Discard any padding bytes: if (rtpHdr&0x20000000) { if (bPacket->dataSize() == 0) break; unsigned numPaddingBytes = (unsigned)(bPacket->data())[bPacket->dataSize()-1]; if (bPacket->dataSize() < numPaddingBytes) break; bPacket->removePadding(numPaddingBytes); } // Check the Payload Type. if ((unsigned char)((rtpHdr&0x007F0000)>>16) != rtpPayloadFormat()) { break; } // The rest of the packet is the usable data. Record and save it:
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -