📄 demux_rtp.cpp
字号:
if (demuxer->stream->eof) return 0; // source stream has closed down // Before using this packet, check to make sure that its presentation // time is not far behind the other stream (if any). If it is, // then we discard this packet, and get another instead. (The rest of // MPlayer doesn't always do a good job of synchronizing when the // audio and video streams get this far apart.) // (We don't do this when streaming over TCP, because then the audio and // video streams are interleaved.) // (Also, if the stream is *excessively* far behind, then we allow // the packet, because in this case it probably means that there was // an error in the source's timestamp synchronization.) const float ptsBehindThreshold = 1.0; // seconds const float ptsBehindLimit = 60.0; // seconds if (ptsBehind < ptsBehindThreshold || ptsBehind > ptsBehindLimit || rtspStreamOverTCP) { // packet's OK ds_add_packet(ds, dp); break; } #ifdef DEBUG_PRINT_DISCARDED_PACKETS RTPState* rtpState = (RTPState*)(demuxer->priv); ReadBufferQueue* bufferQueue = ds == demuxer->video ? rtpState->videoBufferQueue : rtpState->audioBufferQueue; fprintf(stderr, "Discarding %s packet (%fs behind)\n", bufferQueue->tag(), ptsBehind);#endif free_demux_packet(dp); // give back this packet, and get another one } return 1;}Boolean awaitRTPPacket(demuxer_t* demuxer, demux_stream_t* ds, unsigned char*& packetData, unsigned& packetDataLen, float& pts) { // Similar to "demux_rtp_fill_buffer()", except that the "demux_packet" // is not delivered to the "demux_stream". float ptsBehind; demux_packet_t* dp = getBuffer(demuxer, ds, True, ptsBehind); // blocking if (dp == NULL) return False; packetData = dp->buffer; packetDataLen = dp->len; pts = dp->pts; return True;}Boolean insertRTPData(demuxer_t* demuxer, demux_stream_t* ds, unsigned char* data, unsigned dataLen) { // Begin by finding the buffer queue that we want to add data to. // (Get this from the RTP state, which we stored in // the demuxer's 'priv' field) RTPState* rtpState = (RTPState*)(demuxer->priv); ReadBufferQueue* bufferQueue = NULL; if (ds == demuxer->video) { bufferQueue = rtpState->videoBufferQueue; } else if (ds == demuxer->audio) { bufferQueue = rtpState->audioBufferQueue; } else { fprintf(stderr, "(demux_rtp)insertRTPData: internal error: unknown stream\n"); return False; } if (data == NULL || dataLen == 0) return False; demux_packet_t* dp = new_demux_packet(dataLen); if (dp == NULL) return False; // Copy our data into the buffer, and save it: memmove(dp->buffer, data, dataLen); dp->len = dataLen; dp->pts = 0; bufferQueue->savePendingBuffer(dp); return True;}static void teardownRTSPorSIPSession(RTPState* rtpState); // forwardextern "C" void demux_close_rtp(demuxer_t* demuxer) { // Reclaim all RTP-related state: // Get the RTP state that was stored in the demuxer's 'priv' field: RTPState* rtpState = (RTPState*)(demuxer->priv); if (rtpState == NULL) return; teardownRTSPorSIPSession(rtpState); UsageEnvironment* env = NULL; TaskScheduler* scheduler = NULL; if (rtpState->mediaSession != NULL) { env = &(rtpState->mediaSession->envir()); scheduler = &(env->taskScheduler()); } Medium::close(rtpState->mediaSession); Medium::close(rtpState->rtspClient); Medium::close(rtpState->sipClient); delete rtpState->audioBufferQueue; delete rtpState->videoBufferQueue; delete rtpState->sdpDescription; delete rtpState; env->reclaim(); delete scheduler;}////////// Extra routines that help implement the above interface functions:#define MAX_RTP_FRAME_SIZE 50000 // >= the largest conceivable frame composed from one or more RTP packetsstatic void afterReading(void* clientData, unsigned frameSize, unsigned /*numTruncatedBytes*/, struct timeval presentationTime, unsigned /*durationInMicroseconds*/) { if (frameSize >= MAX_RTP_FRAME_SIZE) { fprintf(stderr, "Saw an input frame too large (>=%d). Increase MAX_RTP_FRAME_SIZE in \"demux_rtp.cpp\".\n", MAX_RTP_FRAME_SIZE); } ReadBufferQueue* bufferQueue = (ReadBufferQueue*)clientData; demuxer_t* demuxer = bufferQueue->ourDemuxer(); RTPState* rtpState = (RTPState*)(demuxer->priv); if (frameSize > 0) demuxer->stream->eof = 0; demux_packet_t* dp = bufferQueue->dp; dp->len = frameSize; // Set the packet's presentation time stamp, depending on whether or // not our RTP source's timestamps have been synchronized yet: Boolean hasBeenSynchronized = bufferQueue->rtpSource()->hasBeenSynchronizedUsingRTCP(); if (hasBeenSynchronized) { if (verbose > 0 && !bufferQueue->prevPacketWasSynchronized) { fprintf(stderr, "%s stream has been synchronized using RTCP \n", bufferQueue->tag()); } struct timeval* fst = &(rtpState->firstSyncTime); // abbrev if (fst->tv_sec == 0 && fst->tv_usec == 0) { *fst = presentationTime; } // For the "pts" field, use the time differential from the first // synchronized time, rather than absolute time, in order to avoid // round-off errors when converting to a float: dp->pts = presentationTime.tv_sec - fst->tv_sec + (presentationTime.tv_usec - fst->tv_usec)/1000000.0; bufferQueue->prevPacketPTS = dp->pts; } else { if (verbose > 0 && bufferQueue->prevPacketWasSynchronized) { fprintf(stderr, "%s stream is no longer RTCP-synchronized \n", bufferQueue->tag()); } // use the previous packet's "pts" once again: dp->pts = bufferQueue->prevPacketPTS; } bufferQueue->prevPacketWasSynchronized = hasBeenSynchronized; dp->pos = demuxer->filepos; demuxer->filepos += frameSize; // Signal any pending 'doEventLoop()' call on this queue: bufferQueue->blockingFlag = ~0;}static void onSourceClosure(void* clientData) { ReadBufferQueue* bufferQueue = (ReadBufferQueue*)clientData; demuxer_t* demuxer = bufferQueue->ourDemuxer(); demuxer->stream->eof = 1; // Signal any pending 'doEventLoop()' call on this queue: bufferQueue->blockingFlag = ~0;}static demux_packet_t* getBuffer(demuxer_t* demuxer, demux_stream_t* ds, Boolean mustGetNewData, float& ptsBehind) { // Begin by finding the buffer queue that we want to read from: // (Get this from the RTP state, which we stored in // the demuxer's 'priv' field) RTPState* rtpState = (RTPState*)(demuxer->priv); ReadBufferQueue* bufferQueue = NULL; if (ds == demuxer->video) { bufferQueue = rtpState->videoBufferQueue; } else if (ds == demuxer->audio) { bufferQueue = rtpState->audioBufferQueue; } else { fprintf(stderr, "(demux_rtp)getBuffer: internal error: unknown stream\n"); return NULL; } if (bufferQueue == NULL || bufferQueue->readSource() == NULL) { fprintf(stderr, "(demux_rtp)getBuffer failed: no appropriate RTP subsession has been set up\n"); return NULL; } demux_packet_t* dp; if (!mustGetNewData) { // Check whether we have a previously-saved buffer that we can use: dp = bufferQueue->getPendingBuffer(); if (dp != NULL) { ptsBehind = 0.0; // so that we always accept this data return dp; } } // Allocate a new packet buffer, and arrange to read into it: dp = new_demux_packet(MAX_RTP_FRAME_SIZE); bufferQueue->dp = dp; if (dp == NULL) return NULL; // Schedule the read operation: bufferQueue->blockingFlag = 0; bufferQueue->readSource()->getNextFrame(dp->buffer, MAX_RTP_FRAME_SIZE, afterReading, bufferQueue, onSourceClosure, bufferQueue); // Block ourselves until data becomes available: TaskScheduler& scheduler = bufferQueue->readSource()->envir().taskScheduler(); scheduler.doEventLoop(&bufferQueue->blockingFlag); // Set the "ptsBehind" result parameter: if (bufferQueue->prevPacketPTS != 0.0 && bufferQueue->prevPacketWasSynchronized && *(bufferQueue->otherQueue) != NULL && (*(bufferQueue->otherQueue))->prevPacketPTS != 0.0 && (*(bufferQueue->otherQueue))->prevPacketWasSynchronized) { ptsBehind = (*(bufferQueue->otherQueue))->prevPacketPTS - bufferQueue->prevPacketPTS; } else { ptsBehind = 0.0; } if (mustGetNewData) { // Save this buffer for future reads: bufferQueue->savePendingBuffer(dp); } return dp;}static void teardownRTSPorSIPSession(RTPState* rtpState) { MediaSession* mediaSession = rtpState->mediaSession; if (mediaSession == NULL) return; if (rtpState->rtspClient != NULL) { MediaSubsessionIterator iter(*mediaSession); MediaSubsession* subsession; while ((subsession = iter.next()) != NULL) { rtpState->rtspClient->teardownMediaSubsession(*subsession); } } else if (rtpState->sipClient != NULL) { rtpState->sipClient->sendBYE(); }}////////// "ReadBuffer" and "ReadBufferQueue" implementation:ReadBufferQueue::ReadBufferQueue(MediaSubsession* subsession, demuxer_t* demuxer, char const* tag) : prevPacketWasSynchronized(False), prevPacketPTS(0.0), otherQueue(NULL), dp(NULL), pendingDPHead(NULL), pendingDPTail(NULL), fReadSource(subsession == NULL ? NULL : subsession->readSource()), fRTPSource(subsession == NULL ? NULL : subsession->rtpSource()), fOurDemuxer(demuxer), fTag(strdup(tag)) {} ReadBufferQueue::~ReadBufferQueue() { delete fTag; // Free any pending buffers (that never got delivered): demux_packet_t* dp = pendingDPHead; while (dp != NULL) { demux_packet_t* dpNext = dp->next; dp->next = NULL; free_demux_packet(dp); dp = dpNext; }}void ReadBufferQueue::savePendingBuffer(demux_packet_t* dp) { // Keep this buffer around, until MPlayer asks for it later: if (pendingDPTail == NULL) { pendingDPHead = pendingDPTail = dp; } else { pendingDPTail->next = dp; pendingDPTail = dp; } dp->next = NULL;}demux_packet_t* ReadBufferQueue::getPendingBuffer() { demux_packet_t* dp = pendingDPHead; if (dp != NULL) { pendingDPHead = dp->next; if (pendingDPHead == NULL) pendingDPTail = NULL; dp->next = NULL; } return dp;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -