📄 prioritizedrtpstreamselector.cpp
字号:
fNumTruncatedBytes = fFrameSize - fMaxSize; fFrameSize = fMaxSize; } memmove(fTo, buffer, fFrameSize); delete[] buffer; fWarehouse->fLastActionWasIncoming = False; return True; } // No frame was available. return False;}void PrioritizedRTPStreamSelector::completeDelivery(void* clientData) { PrioritizedRTPStreamSelector* selector = (PrioritizedRTPStreamSelector*)clientData; // Call our own 'after getting' function. Because we're not a 'leaf' // source, we can call this directly, without risking infinite recursion. FramedSource::afterGetting(selector);}//////// PrioritizedInputStreamDescriptor implementation ////////unsigned const PrioritizedInputStreamDescriptor::fBufferSize = 4000;PrioritizedInputStreamDescriptor::PrioritizedInputStreamDescriptor(PrioritizedRTPStreamSelector* ourSelector, PrioritizedInputStreamDescriptor* next, unsigned priority, RTPSource* inputStream, RTCPInstance* inputStreamRTCP) : fOurSelector(ourSelector), fNext(next), fPriority(priority), fRTPStream(inputStream), fRTCPStream(inputStreamRTCP) { fBuffer = new unsigned char[fBufferSize]; fBufferBytesUsed = 0;}PrioritizedInputStreamDescriptor::~PrioritizedInputStreamDescriptor() { delete[] fBuffer;}static void afterGettingFrame(void* clientData, unsigned frameSize, unsigned /*numTruncatedBytes*/, struct timeval /*presentationTime*/, unsigned /*durationInMicroseconds*/) { PrioritizedInputStreamDescriptor* inputStream = (PrioritizedInputStreamDescriptor*)clientData; inputStream->afterGettingFrame1(frameSize);}void PrioritizedInputStreamDescriptor::afterGettingFrame1(unsigned frameSize) { unsigned short rtpSeqNo = rtpStream()->curPacketRTPSeqNum(); // Deliver this frame to our selector: fOurSelector->handleNewIncomingFrame(fPriority, rtpSeqNo, fBuffer, frameSize);}static void onSourceClosure(void* clientData) { PrioritizedInputStreamDescriptor* inputStream = (PrioritizedInputStreamDescriptor*)clientData; inputStream->onSourceClosure1();}void PrioritizedInputStreamDescriptor::onSourceClosure1() { fOurSelector->removeInputRTPStream(fPriority);}////////// WarehousedPacketDescriptor //////////class WarehousedPacketDescriptor {public: WarehousedPacketDescriptor() : buffer(NULL) {} // Don't define a destructor; for some reason it causes a crash ##### unsigned priority; unsigned frameSize; unsigned char* buffer;};////////// PacketWarehouse implementation PacketWarehouse::PacketWarehouse(unsigned seqNumStagger) : fLastActionWasIncoming(False), fHaveReceivedFrames(False), fMinSeqNumStored(0), fMaxSeqNumStored(0), fMinSpanForDelivery((unsigned)(1.5*seqNumStagger)), fMaxSpanForDelivery(3*seqNumStagger), fNumDescriptors(4*seqNumStagger), fInterArrivalAveGap(0) { fPacketDescriptors = new WarehousedPacketDescriptor[fNumDescriptors]; if (fPacketDescriptors == NULL) {#ifdef DEBUG fprintf(stderr, "PacketWarehouse failed to allocate %d descriptors; seqNumStagger too large!\n", fNumDescriptors);#endif exit(1); } // Initially, set "fLastArrivalTime" to the current time: gettimeofday(&fLastArrivalTime, NULL);}PacketWarehouse::~PacketWarehouse() { // Delete each descriptor's buffer (if any), then delete the descriptors: for (unsigned i = 0; i < fNumDescriptors; ++i) { delete[] fPacketDescriptors[i].buffer; } delete[] fPacketDescriptors;}Boolean PacketWarehouse::isFull() { int currentSpan = fMaxSeqNumStored - fMinSeqNumStored; if (currentSpan < 0) currentSpan += 65536; return (unsigned)currentSpan >= fNumDescriptors;}void PacketWarehouse::addNewFrame(unsigned priority, unsigned short rtpSeqNo, unsigned char* buffer, unsigned frameSize) { if (!fHaveReceivedFrames) { // This is our first frame; set up initial parameters: // (But note hack: We want the first frame to have priority 0, so that // receivers' decoders are happy, by seeing the 'best' data initially) if (priority != 0) return; fMinSeqNumStored = fMaxSeqNumStored = rtpSeqNo; fHaveReceivedFrames = True; } else { // Update our record of the maximum sequence number stored if (seqNumLT(fMaxSeqNumStored, rtpSeqNo)) { fMaxSeqNumStored = rtpSeqNo; } else if (seqNumLT(rtpSeqNo, fMinSeqNumStored)) { return; // ignore this packet; it's too old for us } } if (isFull()) { // We've gotten way ahead of ourselves, probably due to a very large // set of consecutive lost packets. To recover, reset our min/max // sequence numbers (effectively emptying the warehouse). Hopefully, // this should be an unusual occurrence: fMinSeqNumStored = fMaxSeqNumStored = rtpSeqNo; } // Check whether a frame with this sequence number has already been seen WarehousedPacketDescriptor& desc = fPacketDescriptors[rtpSeqNo%fNumDescriptors]; if (desc.buffer != NULL) { // We already have a frame. If it's priority is higher than that of // this new frame, then we continue to use it: if (desc.priority < priority) return; // lower than existing priority // Otherwise, use the new frame instead, so get rid of the existing one: delete[] desc.buffer; } // Record this new frame: desc.buffer = new unsigned char[frameSize]; if (desc.buffer == NULL) {#ifdef DEBUG fprintf(stderr, "PacketWarehouse::addNewFrame failed to allocate %d-byte buffer!\n", frameSize);#endif exit(1); } memmove(desc.buffer, buffer, frameSize); desc.frameSize = frameSize; desc.priority = priority; struct timeval timeNow; gettimeofday(&timeNow, NULL); if (rtpSeqNo == (fLastRTPSeqNo+1)%65536) { // We've received consecutive packets, so update the estimate of // the average inter-arrival gap: unsigned lastGap // in microseconds = (timeNow.tv_sec - fLastArrivalTime.tv_sec)*1000000 + (timeNow.tv_usec - fLastArrivalTime.tv_usec); fInterArrivalAveGap = (9*fInterArrivalAveGap + lastGap)/10; // weighted } fLastArrivalTime = timeNow; fLastRTPSeqNo = rtpSeqNo;}unsigned char* PacketWarehouse::dequeueFrame(unsigned& resultFrameSize, unsigned& uSecondsToDefer) { uSecondsToDefer = 0; // by default // Don't return anything if we don't yet have enough packets to // cover our desired sequence number span int currentSpan = fMaxSeqNumStored - fMinSeqNumStored; if (currentSpan < 0) currentSpan += 65536; if (currentSpan < (int)fMinSpanForDelivery) { return NULL; // we're not ready } // Now, if our stored packet range is less than the desired maximum, // return a frame, but delay this unless no incoming frames have // arrived since the last time a frame was delivered to our client. // This causes the buffer to empty only if the flow of incoming // frames stops. if (currentSpan < (int)fMaxSpanForDelivery) { if (fLastActionWasIncoming) { uSecondsToDefer = (unsigned)(1.5*fInterArrivalAveGap); } } // Now, return a frame: unsigned char* resultBuffer = NULL; do { if (currentSpan < (int)fMinSpanForDelivery) break; WarehousedPacketDescriptor& desc = fPacketDescriptors[fMinSeqNumStored%fNumDescriptors]; resultBuffer = desc.buffer; resultFrameSize = desc.frameSize; desc.buffer = NULL;#ifdef DEBUG if (resultBuffer == NULL) fprintf(stderr, "No packet for seq num %d - skipping\n", fMinSeqNumStored); //##### else if (desc.priority == 2) fprintf(stderr, "Using priority %d frame for seq num %d\n", desc.priority, fMinSeqNumStored);//######endif fMinSeqNumStored = (fMinSeqNumStored+1)%65536; --currentSpan; } while (resultBuffer == NULL); // skip over missing packets return resultBuffer;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -