📄 demux_rtp.cpp
字号:
}static void teardownRTSPorSIPSession(RTPState* rtpState); // forwardextern "C" void demux_close_rtp(demuxer_t* demuxer) { // Reclaim all RTP-related state: // Get the RTP state that was stored in the demuxer's 'priv' field: RTPState* rtpState = (RTPState*)(demuxer->priv); if (rtpState == NULL) return; teardownRTSPorSIPSession(rtpState); UsageEnvironment* env = NULL; TaskScheduler* scheduler = NULL; if (rtpState->mediaSession != NULL) { env = &(rtpState->mediaSession->envir()); scheduler = &(env->taskScheduler()); } Medium::close(rtpState->mediaSession); Medium::close(rtpState->rtspClient); Medium::close(rtpState->sipClient); delete rtpState->audioBufferQueue; delete rtpState->videoBufferQueue; delete rtpState->sdpDescription; delete rtpState; env->reclaim(); delete scheduler;}////////// Extra routines that help implement the above interface functions:#define MAX_RTP_FRAME_SIZE 50000 // >= the largest conceivable frame composed from one or more RTP packetsstatic void afterReading(void* clientData, unsigned frameSize, unsigned /*numTruncatedBytes*/, struct timeval presentationTime, unsigned /*durationInMicroseconds*/) { int headersize = 0; if (frameSize >= MAX_RTP_FRAME_SIZE) { fprintf(stderr, "Saw an input frame too large (>=%d). Increase MAX_RTP_FRAME_SIZE in \"demux_rtp.cpp\".\n", MAX_RTP_FRAME_SIZE); } ReadBufferQueue* bufferQueue = (ReadBufferQueue*)clientData; demuxer_t* demuxer = bufferQueue->ourDemuxer(); RTPState* rtpState = (RTPState*)(demuxer->priv); if (frameSize > 0) demuxer->stream->eof = 0; demux_packet_t* dp = bufferQueue->dp; if (bufferQueue->readSource()->isAMRAudioSource()) headersize = 1; else if (bufferQueue == rtpState->videoBufferQueue && ((sh_video_t*)demuxer->video->sh)->format == mmioFOURCC('H','2','6','4')) { dp->buffer[0]=0x00; dp->buffer[1]=0x00; dp->buffer[2]=0x01; headersize = 3; } resize_demux_packet(dp, frameSize + headersize); // Set the packet's presentation time stamp, depending on whether or // not our RTP source's timestamps have been synchronized yet: Boolean hasBeenSynchronized = bufferQueue->rtpSource()->hasBeenSynchronizedUsingRTCP(); if (hasBeenSynchronized) { if (verbose > 0 && !bufferQueue->prevPacketWasSynchronized) { fprintf(stderr, "%s stream has been synchronized using RTCP \n", bufferQueue->tag()); } struct timeval* fst = &(rtpState->firstSyncTime); // abbrev if (fst->tv_sec == 0 && fst->tv_usec == 0) { *fst = presentationTime; } // For the "pts" field, use the time differential from the first // synchronized time, rather than absolute time, in order to avoid // round-off errors when converting to a float: dp->pts = presentationTime.tv_sec - fst->tv_sec + (presentationTime.tv_usec - fst->tv_usec)/1000000.0; bufferQueue->prevPacketPTS = dp->pts; } else { if (verbose > 0 && bufferQueue->prevPacketWasSynchronized) { fprintf(stderr, "%s stream is no longer RTCP-synchronized \n", bufferQueue->tag()); } // use the previous packet's "pts" once again: dp->pts = bufferQueue->prevPacketPTS; } bufferQueue->prevPacketWasSynchronized = hasBeenSynchronized; dp->pos = demuxer->filepos; demuxer->filepos += frameSize + headersize; // Signal any pending 'doEventLoop()' call on this queue: bufferQueue->blockingFlag = ~0;}static void onSourceClosure(void* clientData) { ReadBufferQueue* bufferQueue = (ReadBufferQueue*)clientData; demuxer_t* demuxer = bufferQueue->ourDemuxer(); demuxer->stream->eof = 1; // Signal any pending 'doEventLoop()' call on this queue: bufferQueue->blockingFlag = ~0;}static demux_packet_t* getBuffer(demuxer_t* demuxer, demux_stream_t* ds, Boolean mustGetNewData, float& ptsBehind) { // Begin by finding the buffer queue that we want to read from: // (Get this from the RTP state, which we stored in // the demuxer's 'priv' field) RTPState* rtpState = (RTPState*)(demuxer->priv); ReadBufferQueue* bufferQueue = NULL; int headersize = 0; TaskToken task; if (demuxer->stream->eof) return NULL; if (ds == demuxer->video) { bufferQueue = rtpState->videoBufferQueue; if (((sh_video_t*)ds->sh)->format == mmioFOURCC('H','2','6','4')) headersize = 3; } else if (ds == demuxer->audio) { bufferQueue = rtpState->audioBufferQueue; if (bufferQueue->readSource()->isAMRAudioSource()) headersize = 1; } else { fprintf(stderr, "(demux_rtp)getBuffer: internal error: unknown stream\n"); return NULL; } if (bufferQueue == NULL || bufferQueue->readSource() == NULL) { fprintf(stderr, "(demux_rtp)getBuffer failed: no appropriate RTP subsession has been set up\n"); return NULL; } demux_packet_t* dp = NULL; if (!mustGetNewData) { // Check whether we have a previously-saved buffer that we can use: dp = bufferQueue->getPendingBuffer(); if (dp != NULL) { ptsBehind = 0.0; // so that we always accept this data return dp; } } // Allocate a new packet buffer, and arrange to read into it: if (!bufferQueue->nextpacket) { dp = new_demux_packet(MAX_RTP_FRAME_SIZE); bufferQueue->dp = dp; if (dp == NULL) return NULL; }#ifdef USE_LIBAVCODEC extern AVCodecParserContext * h264parserctx; int consumed, poutbuf_size = 1; const uint8_t *poutbuf = NULL; float lastpts = 0.0; do { if (!bufferQueue->nextpacket) {#endif // Schedule the read operation: bufferQueue->blockingFlag = 0; bufferQueue->readSource()->getNextFrame(&dp->buffer[headersize], MAX_RTP_FRAME_SIZE - headersize, afterReading, bufferQueue, onSourceClosure, bufferQueue); // Block ourselves until data becomes available: TaskScheduler& scheduler = bufferQueue->readSource()->envir().taskScheduler(); int delay = 10000000; if (bufferQueue->prevPacketPTS * 1.05 > rtpState->mediaSession->playEndTime()) delay /= 10; task = scheduler.scheduleDelayedTask(delay, onSourceClosure, bufferQueue); scheduler.doEventLoop(&bufferQueue->blockingFlag); scheduler.unscheduleDelayedTask(task); if (demuxer->stream->eof) { free_demux_packet(dp); return NULL; } if (headersize == 1) // amr dp->buffer[0] = ((AMRAudioSource*)bufferQueue->readSource())->lastFrameHeader();#ifdef USE_LIBAVCODEC } else { bufferQueue->dp = dp = bufferQueue->nextpacket; bufferQueue->nextpacket = NULL; } if (headersize == 3 && h264parserctx) { // h264 consumed = h264parserctx->parser->parser_parse(h264parserctx, NULL, &poutbuf, &poutbuf_size, dp->buffer, dp->len); if (!consumed && !poutbuf_size) return NULL; if (!poutbuf_size) { lastpts=dp->pts; free_demux_packet(dp); bufferQueue->dp = dp = new_demux_packet(MAX_RTP_FRAME_SIZE); } else { bufferQueue->nextpacket = dp; bufferQueue->dp = dp = new_demux_packet(poutbuf_size); memcpy(dp->buffer, poutbuf, poutbuf_size); dp->pts=lastpts; } } } while (!poutbuf_size);#endif // Set the "ptsBehind" result parameter: if (bufferQueue->prevPacketPTS != 0.0 && bufferQueue->prevPacketWasSynchronized && *(bufferQueue->otherQueue) != NULL && (*(bufferQueue->otherQueue))->prevPacketPTS != 0.0 && (*(bufferQueue->otherQueue))->prevPacketWasSynchronized) { ptsBehind = (*(bufferQueue->otherQueue))->prevPacketPTS - bufferQueue->prevPacketPTS; } else { ptsBehind = 0.0; } if (mustGetNewData) { // Save this buffer for future reads: bufferQueue->savePendingBuffer(dp); } return dp;}static void teardownRTSPorSIPSession(RTPState* rtpState) { MediaSession* mediaSession = rtpState->mediaSession; if (mediaSession == NULL) return; if (rtpState->rtspClient != NULL) { rtpState->rtspClient->teardownMediaSession(*mediaSession); } else if (rtpState->sipClient != NULL) { rtpState->sipClient->sendBYE(); }}////////// "ReadBuffer" and "ReadBufferQueue" implementation:ReadBufferQueue::ReadBufferQueue(MediaSubsession* subsession, demuxer_t* demuxer, char const* tag) : prevPacketWasSynchronized(False), prevPacketPTS(0.0), otherQueue(NULL), dp(NULL), nextpacket(NULL), pendingDPHead(NULL), pendingDPTail(NULL), fReadSource(subsession == NULL ? NULL : subsession->readSource()), fRTPSource(subsession == NULL ? NULL : subsession->rtpSource()), fOurDemuxer(demuxer), fTag(strdup(tag)) {} ReadBufferQueue::~ReadBufferQueue() { delete fTag; // Free any pending buffers (that never got delivered): demux_packet_t* dp = pendingDPHead; while (dp != NULL) { demux_packet_t* dpNext = dp->next; dp->next = NULL; free_demux_packet(dp); dp = dpNext; }}void ReadBufferQueue::savePendingBuffer(demux_packet_t* dp) { // Keep this buffer around, until MPlayer asks for it later: if (pendingDPTail == NULL) { pendingDPHead = pendingDPTail = dp; } else { pendingDPTail->next = dp; pendingDPTail = dp; } dp->next = NULL;}demux_packet_t* ReadBufferQueue::getPendingBuffer() { demux_packet_t* dp = pendingDPHead; if (dp != NULL) { pendingDPHead = dp->next; if (pendingDPHead == NULL) pendingDPTail = NULL; dp->next = NULL; } return dp;}static int demux_rtp_control(struct demuxer_st *demuxer, int cmd, void *arg) { double endpts = ((RTPState*)demuxer->priv)->mediaSession->playEndTime(); switch(cmd) { case DEMUXER_CTRL_GET_TIME_LENGTH: if (endpts <= 0) return DEMUXER_CTRL_DONTKNOW; *((double *)arg) = endpts; return DEMUXER_CTRL_OK; case DEMUXER_CTRL_GET_PERCENT_POS: if (endpts <= 0) return DEMUXER_CTRL_DONTKNOW; *((int *)arg) = (int)(((RTPState*)demuxer->priv)->videoBufferQueue->prevPacketPTS*100/endpts); return DEMUXER_CTRL_OK; default: return DEMUXER_CTRL_NOTIMPL; }}demuxer_desc_t demuxer_desc_rtp = { "LIVE555 RTP demuxer", "rtp", "", "Ross Finlayson", "requires LIVE555 Streaming Media library", DEMUXER_TYPE_RTP, 0, // no autodetect NULL, demux_rtp_fill_buffer, demux_open_rtp, demux_close_rtp, NULL, demux_rtp_control};
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -