📄 prioritizedrtpstreamselector.cpp
字号:
/**********This library is free software; you can redistribute it and/or modify it underthe terms of the GNU Lesser General Public License as published by theFree Software Foundation; either version 2.1 of the License, or (at youroption) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)This library is distributed in the hope that it will be useful, but WITHOUTANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESSFOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License formore details.You should have received a copy of the GNU Lesser General Public Licensealong with this library; if not, write to the Free Software Foundation, Inc.,59 Temple Place, Suite 330, Boston, MA 02111-1307 USA**********/// "liveMedia"// Copyright (c) 1996-2004 Live Networks, Inc. All rights reserved.// Select from multiple, prioritized RTP streams, based on RTP sequence// number, producing a single output stream// Implementation#include "PrioritizedRTPStreamSelector.hh"#include "GroupsockHelper.hh"#include <string.h>#include <stdlib.h>////////// PrioritizedInputStreamDescriptor //////////// A data structure used to describe each input stream:class PrioritizedInputStreamDescriptor {public: PrioritizedInputStreamDescriptor(PrioritizedRTPStreamSelector* ourSelector, PrioritizedInputStreamDescriptor* next, unsigned priority, RTPSource* inputStream, RTCPInstance* inputStreamRTCP); virtual ~PrioritizedInputStreamDescriptor(); PrioritizedInputStreamDescriptor*& next() { return fNext; } unsigned priority() const { return fPriority; } RTPSource* rtpStream() const { return fRTPStream; } unsigned char*& buffer() { return fBuffer; } unsigned bufferSize() const { return fBufferSize; } void afterGettingFrame1(unsigned frameSize); void onSourceClosure1();private: PrioritizedRTPStreamSelector* fOurSelector; PrioritizedInputStreamDescriptor* fNext; unsigned fPriority; RTPSource* fRTPStream; RTCPInstance* fRTCPStream; unsigned char* fBuffer; // where to put the next frame from this stream static unsigned const fBufferSize; unsigned fBufferBytesUsed;};static void afterGettingFrame(void* clientData, unsigned frameSize, unsigned numTruncatedBytes, struct timeval presentationTime, unsigned durationInMicroseconds);static void onSourceClosure(void* clientData);////////// PacketWarehouse //////////class WarehousedPacketDescriptor; // forwardclass PacketWarehouse {public: PacketWarehouse(unsigned seqNumStagger); virtual ~PacketWarehouse(); Boolean isFull(); void addNewFrame(unsigned priority, unsigned short rtpSeqNo, unsigned char* buffer, unsigned frameSize); unsigned char* dequeueFrame(unsigned& resultFrameSize, unsigned& uSecondsToDefer); Boolean fLastActionWasIncoming;private: WarehousedPacketDescriptor* fPacketDescriptors; Boolean fHaveReceivedFrames; unsigned short fMinSeqNumStored, fMaxSeqNumStored; unsigned const fMinSpanForDelivery, fMaxSpanForDelivery, fNumDescriptors; struct timeval fLastArrivalTime; unsigned short fLastRTPSeqNo; unsigned fInterArrivalAveGap; // in microseconds};////////// PrioritizedRTPStreamSelector implementation //////////PrioritizedRTPStreamSelector::PrioritizedRTPStreamSelector(UsageEnvironment& env, unsigned seqNumStagger) : FramedSource(env), fNextInputStreamPriority(0), fInputStreams(NULL), fAmCurrentlyReading(False), fNeedAFrame(False) { fWarehouse = new PacketWarehouse(seqNumStagger);}PrioritizedRTPStreamSelector::~PrioritizedRTPStreamSelector() { delete fWarehouse; while (fInputStreams != NULL) { PrioritizedInputStreamDescriptor* inputStream = fInputStreams; fInputStreams = inputStream->next(); delete inputStream; }}PrioritizedRTPStreamSelector* PrioritizedRTPStreamSelector::createNew(UsageEnvironment& env, unsigned seqNumStagger) { return new PrioritizedRTPStreamSelector(env, seqNumStagger);}unsigned PrioritizedRTPStreamSelector::addInputRTPStream(RTPSource* inputStream, RTCPInstance* inputStreamRTCP) { fInputStreams = new PrioritizedInputStreamDescriptor(this, fInputStreams, fNextInputStreamPriority, inputStream, inputStreamRTCP); return fNextInputStreamPriority++;}void PrioritizedRTPStreamSelector::removeInputRTPStream(unsigned priority) { for (PrioritizedInputStreamDescriptor*& inputStream = fInputStreams; inputStream != NULL; inputStream = inputStream->next()) { if (inputStream->priority() == priority) { PrioritizedInputStreamDescriptor* toDelete = inputStream; inputStream->next() = toDelete->next(); delete toDelete; break; } }}Boolean PrioritizedRTPStreamSelector::lookupByName(UsageEnvironment& env, char const* sourceName, PrioritizedRTPStreamSelector*& resultSelector) { resultSelector = NULL; // unless we succeed FramedSource* source; if (!FramedSource::lookupByName(env, sourceName, source)) return False; if (!source->isPrioritizedRTPStreamSelector()) { env.setResultMsg(sourceName, " is not a Prioritized RTP Stream Selector"); return False; } resultSelector = (PrioritizedRTPStreamSelector*)source; return True;}Boolean PrioritizedRTPStreamSelector::isPrioritizedRTPStreamSelector() const { return True;}void PrioritizedRTPStreamSelector::doGetNextFrame() { // Begin the process of reading frames from our sources into the // frame 'warehouse' (unless this is already ongoing): startReadingProcess(); // (Try to) give ourselves a frame from our warehouse: unsigned uSecondsToDefer; if (deliverFrameToClient(uSecondsToDefer)) { fNeedAFrame = False; // Complete the delivery. If we were told to delay before doing // this, then schedule this as a delayed task: if (uSecondsToDefer > 0) { nextTask() = envir().taskScheduler().scheduleDelayedTask((int)uSecondsToDefer, completeDelivery, this); } else { completeDelivery(this); } } else { fNeedAFrame = True; }}void PrioritizedRTPStreamSelector::startReadingProcess() { if (fAmCurrentlyReading) return; // already ongoing if (fWarehouse->isFull()) return; // no room now for any more // Run through each input stream, requesting a new frame from it // (unless a previous read on this frame is still in progress). // When a frame arrives on one of the input streams, it will get // stored in our 'warehouse', for later delivery to us. for (PrioritizedInputStreamDescriptor* inputStream = fInputStreams; inputStream != NULL; inputStream = inputStream->next()) { RTPSource* rtpStream = inputStream->rtpStream(); if (!rtpStream->isCurrentlyAwaitingData()) { // Read a frame into this stream's descriptor's buffer: fAmCurrentlyReading = True; rtpStream->getNextFrame(inputStream->buffer(), inputStream->bufferSize(), afterGettingFrame, inputStream, onSourceClosure, inputStream); } }}void PrioritizedRTPStreamSelector::handleNewIncomingFrame(unsigned priority, unsigned short rtpSeqNo, unsigned char* buffer, unsigned frameSize) { // Begin by adding this new frame to the warehouse: fWarehouse->addNewFrame(priority, rtpSeqNo, buffer, frameSize); fWarehouse->fLastActionWasIncoming = True; // Try again to deliver a frame for our client (if he still wants one): if (fNeedAFrame) { doGetNextFrame(); } // Continue the reading process: fAmCurrentlyReading = False; startReadingProcess();}Boolean PrioritizedRTPStreamSelector::deliverFrameToClient(unsigned& uSecondsToDefer) { unsigned char* buffer = fWarehouse->dequeueFrame(fFrameSize, uSecondsToDefer); if (buffer != NULL) { // A frame was available if (fFrameSize > fMaxSize) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -