📄 demux_rtp.cpp
字号:
////////// Routines (with C-linkage) that interface between "MPlayer"////////// and the "LIVE.COM Streaming Media" libraries:extern "C" {// on MinGW, we must include windows.h before the things it conflicts#ifdef __MINGW32__ // with. they are each protected from#include <windows.h> // windows.h, but not the other way around.#endif#include "demux_rtp.h"#include "stheader.h"}#include "demux_rtp_internal.h"#include "BasicUsageEnvironment.hh"#include "liveMedia.hh"#include "GroupsockHelper.hh"#include <unistd.h>extern "C" stream_t* stream_open_sdp(int fd, off_t fileSize, int* file_format) { *file_format = DEMUXER_TYPE_RTP; stream_t* newStream = NULL; do { char* sdpDescription = (char*)malloc(fileSize+1); if (sdpDescription == NULL) break; ssize_t numBytesRead = read(fd, sdpDescription, fileSize); if (numBytesRead != fileSize) break; sdpDescription[fileSize] = '\0'; // to be safe newStream = (stream_t*)calloc(sizeof (stream_t), 1); if (newStream == NULL) break; // Store the SDP description in the 'priv' field, for later use: newStream->priv = sdpDescription; } while (0); return newStream;}extern "C" int _rtsp_streaming_seek(int /*fd*/, off_t /*pos*/, streaming_ctrl_t* /*streaming_ctrl*/) { return -1; // For now, we don't handle RTSP stream seeking}extern "C" int rtsp_streaming_start(stream_t* stream) { stream->streaming_ctrl->streaming_seek = _rtsp_streaming_seek; return 0;}// A data structure representing input data for each stream:class ReadBufferQueue {public: ReadBufferQueue(MediaSubsession* subsession, demuxer_t* demuxer, char const* tag); virtual ~ReadBufferQueue(); FramedSource* readSource() const { return fReadSource; } RTPSource* rtpSource() const { return fRTPSource; } demuxer_t* ourDemuxer() const { return fOurDemuxer; } char const* tag() const { return fTag; } char blockingFlag; // used to implement synchronous reads // For A/V synchronization: Boolean prevPacketWasSynchronized; float prevPacketPTS; ReadBufferQueue** otherQueue; // The 'queue' actually consists of just a single "demux_packet_t" // (because the underlying OS does the actual queueing/buffering): demux_packet_t* dp; // However, we sometimes inspect buffers before delivering them. // For this, we maintain a queue of pending buffers: void savePendingBuffer(demux_packet_t* dp); demux_packet_t* getPendingBuffer();private: demux_packet_t* pendingDPHead; demux_packet_t* pendingDPTail; FramedSource* fReadSource; RTPSource* fRTPSource; demuxer_t* fOurDemuxer; char const* fTag; // used for debugging};// A structure of RTP-specific state, kept so that we can cleanly// reclaim it:typedef struct RTPState { char const* sdpDescription; RTSPClient* rtspClient; SIPClient* sipClient; MediaSession* mediaSession; ReadBufferQueue* audioBufferQueue; ReadBufferQueue* videoBufferQueue; unsigned flags; struct timeval firstSyncTime;};extern "C" char* network_username;extern "C" char* network_password;static char* openURL_rtsp(RTSPClient* client, char const* url) { // If we were given a user name (and optional password), then use them: if (network_username != NULL) { char const* password = network_password == NULL ? "" : network_password; return client->describeWithPassword(url, network_username, password); } else { return client->describeURL(url); }}static char* openURL_sip(SIPClient* client, char const* url) { // If we were given a user name (and optional password), then use them: if (network_username != NULL) { char const* password = network_password == NULL ? "" : network_password; return client->inviteWithPassword(url, network_username, password); } else { return client->invite(url); }}int rtspStreamOverTCP = 0; extern "C" int audio_id, video_id, dvdsub_id;extern "C" demuxer_t* demux_open_rtp(demuxer_t* demuxer) { Boolean success = False; do { TaskScheduler* scheduler = BasicTaskScheduler::createNew(); if (scheduler == NULL) break; UsageEnvironment* env = BasicUsageEnvironment::createNew(*scheduler); if (env == NULL) break; RTSPClient* rtspClient = NULL; SIPClient* sipClient = NULL; if (demuxer == NULL || demuxer->stream == NULL) break; // shouldn't happen demuxer->stream->eof = 0; // just in case // Look at the stream's 'priv' field to see if we were initiated // via a SDP description: char* sdpDescription = (char*)(demuxer->stream->priv); if (sdpDescription == NULL) { // We weren't given a SDP description directly, so assume that // we were given a RTSP or SIP URL: char const* protocol = demuxer->stream->streaming_ctrl->url->protocol; char const* url = demuxer->stream->streaming_ctrl->url->url; extern int verbose; if (strcmp(protocol, "rtsp") == 0) { rtspClient = RTSPClient::createNew(*env, verbose, "MPlayer"); if (rtspClient == NULL) { fprintf(stderr, "Failed to create RTSP client: %s\n", env->getResultMsg()); break; } sdpDescription = openURL_rtsp(rtspClient, url); } else { // SIP unsigned char desiredAudioType = 0; // PCMU (use 3 for GSM) sipClient = SIPClient::createNew(*env, desiredAudioType, NULL, verbose, "MPlayer"); if (sipClient == NULL) { fprintf(stderr, "Failed to create SIP client: %s\n", env->getResultMsg()); break; } sipClient->setClientStartPortNum(8000); sdpDescription = openURL_sip(sipClient, url); } if (sdpDescription == NULL) { fprintf(stderr, "Failed to get a SDP description from URL \"%s\": %s\n", url, env->getResultMsg()); break; } } // Now that we have a SDP description, create a MediaSession from it: MediaSession* mediaSession = MediaSession::createNew(*env, sdpDescription); if (mediaSession == NULL) break; // Create a 'RTPState' structure containing the state that we just created, // and store it in the demuxer's 'priv' field, for future reference: RTPState* rtpState = new RTPState; rtpState->sdpDescription = sdpDescription; rtpState->rtspClient = rtspClient; rtpState->sipClient = sipClient; rtpState->mediaSession = mediaSession; rtpState->audioBufferQueue = rtpState->videoBufferQueue = NULL; rtpState->flags = 0; rtpState->firstSyncTime.tv_sec = rtpState->firstSyncTime.tv_usec = 0; demuxer->priv = rtpState; // Create RTP receivers (sources) for each subsession: MediaSubsessionIterator iter(*mediaSession); MediaSubsession* subsession; unsigned desiredReceiveBufferSize; while ((subsession = iter.next()) != NULL) { // Ignore any subsession that's not audio or video: if (strcmp(subsession->mediumName(), "audio") == 0) { desiredReceiveBufferSize = 100000; } else if (strcmp(subsession->mediumName(), "video") == 0) { desiredReceiveBufferSize = 2000000; } else { continue; } if (!subsession->initiate()) { fprintf(stderr, "Failed to initiate \"%s/%s\" RTP subsession: %s\n", subsession->mediumName(), subsession->codecName(), env->getResultMsg()); } else { fprintf(stderr, "Initiated \"%s/%s\" RTP subsession\n", subsession->mediumName(), subsession->codecName()); // Set the OS's socket receive buffer sufficiently large to avoid // incoming packets getting dropped between successive reads from this // subsession's demuxer. Depending on the bitrate(s) that you expect, // you may wish to tweak the "desiredReceiveBufferSize" values above. int rtpSocketNum = subsession->rtpSource()->RTPgs()->socketNum(); int receiveBufferSize = increaseReceiveBufferTo(*env, rtpSocketNum, desiredReceiveBufferSize); if (verbose > 0) { fprintf(stderr, "Increased %s socket receive buffer to %d bytes \n", subsession->mediumName(), receiveBufferSize); } if (rtspClient != NULL) { // Issue a RTSP "SETUP" command on the chosen subsession: if (!rtspClient->setupMediaSubsession(*subsession, False, rtspStreamOverTCP)) break; } } } if (rtspClient != NULL) { // Issue a RTSP aggregate "PLAY" command on the whole session: if (!rtspClient->playMediaSession(*mediaSession)) break; } else if (sipClient != NULL) { sipClient->sendACK(); // to start the stream flowing } // Now that the session is ready to be read, do additional // MPlayer codec-specific initialization on each subsession: iter.reset(); while ((subsession = iter.next()) != NULL) { if (subsession->readSource() == NULL) continue; // not reading this unsigned flags = 0; if (strcmp(subsession->mediumName(), "audio") == 0) { rtpState->audioBufferQueue = new ReadBufferQueue(subsession, demuxer, "audio"); rtpState->audioBufferQueue->otherQueue = &(rtpState->videoBufferQueue); rtpCodecInitialize_audio(demuxer, subsession, flags); } else if (strcmp(subsession->mediumName(), "video") == 0) { rtpState->videoBufferQueue = new ReadBufferQueue(subsession, demuxer, "video"); rtpState->videoBufferQueue->otherQueue = &(rtpState->audioBufferQueue); rtpCodecInitialize_video(demuxer, subsession, flags); } rtpState->flags |= flags; } success = True; } while (0); if (!success) return NULL; // an error occurred // Hack: If audio and video are demuxed together on a single RTP stream, // then create a new "demuxer_t" structure to allow the higher-level // code to recognize this: if (demux_is_multiplexed_rtp_stream(demuxer)) { stream_t* s = new_ds_stream(demuxer->video); demuxer_t* od = demux_open(s, DEMUXER_TYPE_UNKNOWN, audio_id, video_id, dvdsub_id, NULL); demuxer = new_demuxers_demuxer(od, od, od); } return demuxer;}extern "C" int demux_is_mpeg_rtp_stream(demuxer_t* demuxer) { // Get the RTP state that was stored in the demuxer's 'priv' field: RTPState* rtpState = (RTPState*)(demuxer->priv); return (rtpState->flags&RTPSTATE_IS_MPEG12_VIDEO) != 0;}extern "C" int demux_is_multiplexed_rtp_stream(demuxer_t* demuxer) { // Get the RTP state that was stored in the demuxer's 'priv' field: RTPState* rtpState = (RTPState*)(demuxer->priv); return (rtpState->flags&RTPSTATE_IS_MULTIPLEXED) != 0;}static demux_packet_t* getBuffer(demuxer_t* demuxer, demux_stream_t* ds, Boolean mustGetNewData, float& ptsBehind); // forwardextern "C" int demux_rtp_fill_buffer(demuxer_t* demuxer, demux_stream_t* ds) { // Get a filled-in "demux_packet" from the RTP source, and deliver it. // Note that this is called as a synchronous read operation, so it needs // to block in the (hopefully infrequent) case where no packet is // immediately available. while (1) { float ptsBehind; demux_packet_t* dp = getBuffer(demuxer, ds, False, ptsBehind); // blocking if (dp == NULL) return 0;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -