📄 playcommon.cpp
字号:
subsession->sink->startPlaying(*(subsession->readSource()), subsessionAfterPlaying, subsession); // Also set a handler to be called if a RTCP "BYE" arrives // for this subsession: if (subsession->rtcpInstance() != NULL) { subsession->rtcpInstance()->setByeHandler(subsessionByeHandler, subsession); } madeProgress = True; } } if (!madeProgress) shutdown(); } } // Finally, start playing each subsession, to start the data flow: startPlayingStreams(); env->taskScheduler().doEventLoop(); // does not return return 0; // only to prevent compiler warning}void setupStreams() { MediaSubsessionIterator iter(*session); MediaSubsession *subsession; Boolean madeProgress = False; while ((subsession = iter.next()) != NULL) { if (subsession->clientPortNum() == 0) continue; // port # was not set if (!clientSetupSubsession(ourClient, subsession, streamUsingTCP)) { *env << "Failed to setup \"" << subsession->mediumName() << "/" << subsession->codecName() << "\" subsession: " << env->getResultMsg() << "\n"; } else { *env << "Setup \"" << subsession->mediumName() << "/" << subsession->codecName() << "\" subsession (client ports " << subsession->clientPortNum() << "-" << subsession->clientPortNum()+1 << ")\n"; madeProgress = True; } } if (!madeProgress) shutdown();}void startPlayingStreams() { if (!clientStartPlayingSession(ourClient, session)) { *env << "Failed to start playing session: " << env->getResultMsg() << "\n"; shutdown(); } else { *env << "Started playing session\n"; } if (qosMeasurementIntervalMS > 0) { // Begin periodic QOS measurements: beginQOSMeasurement(); } // Figure out how long to delay (if at all) before shutting down, or // repeating the playing Boolean timerIsBeingUsed = False; double totalEndTime = endTime; if (endTime == 0) endTime = session->playEndTime(); // use SDP end time if (endTime > 0) { double const maxDelayTime = (double)( ((unsigned)0x7FFFFFFF)/1000000.0 ); if (endTime > maxDelayTime) { *env << "Warning: specified end time " << endTime << " exceeds maximum " << maxDelayTime << "; will not do a delayed shutdown\n"; endTime = 0.0; } else { timerIsBeingUsed = True; totalEndTime = endTime + endTimeSlop; int uSecsToDelay = (int)(totalEndTime*1000000.0); sessionTimerTask = env->taskScheduler().scheduleDelayedTask( uSecsToDelay, (TaskFunc*)sessionTimerHandler, (void*)NULL); } } char const* actionString = createReceivers? "Receiving streamed data":"Data is being streamed"; if (timerIsBeingUsed) { *env << actionString << " (for up to " << totalEndTime << " seconds)...\n"; } else {#ifdef USE_SIGNALS pid_t ourPid = getpid(); *env << actionString << " (signal with \"kill -HUP " << (int)ourPid << "\" or \"kill -USR1 " << (int)ourPid << "\" to terminate)...\n";#else *env << actionString << "...\n";#endif } // Watch for incoming packets (if desired): checkForPacketArrival(NULL); checkInterPacketGaps(NULL);}void tearDownStreams() { if (session == NULL) return; clientTearDownSession(ourClient, session);}void closeMediaSinks() { Medium::close(qtOut); Medium::close(aviOut); if (session == NULL) return; MediaSubsessionIterator iter(*session); MediaSubsession* subsession; while ((subsession = iter.next()) != NULL) { Medium::close(subsession->sink); subsession->sink = NULL; }}void subsessionAfterPlaying(void* clientData) { // Begin by closing this media subsession: MediaSubsession* subsession = (MediaSubsession*)clientData; Medium::close(subsession->sink); subsession->sink = NULL; // Next, check whether *all* subsessions have now been closed: MediaSession& session = subsession->parentSession(); MediaSubsessionIterator iter(session); while ((subsession = iter.next()) != NULL) { if (subsession->sink != NULL) return; // this subsession is still active } // All subsessions have now been closed sessionAfterPlaying();}void subsessionByeHandler(void* clientData) { struct timeval timeNow; gettimeofday(&timeNow, NULL); unsigned secsDiff = timeNow.tv_sec - startTime.tv_sec; MediaSubsession* subsession = (MediaSubsession*)clientData; *env << "Received RTCP \"BYE\" on \"" << subsession->mediumName() << "/" << subsession->codecName() << "\" subsession (after " << secsDiff << " seconds)\n"; // Act now as if the subsession had closed: subsessionAfterPlaying(subsession);}void sessionAfterPlaying(void* /*clientData*/) { if (!playContinuously) { shutdown(0); } else { // We've been asked to play the stream(s) over again: startPlayingStreams(); }}void sessionTimerHandler(void* /*clientData*/) { sessionTimerTask = NULL; sessionAfterPlaying();}class qosMeasurementRecord {public: qosMeasurementRecord(struct timeval const& startTime, RTPSource* src) : fSource(src), fNext(NULL), kbits_per_second_min(1e20), kbits_per_second_max(0), kBytesTotal(0.0), packet_loss_fraction_min(1.0), packet_loss_fraction_max(0.0), totNumPacketsReceived(0), totNumPacketsExpected(0) { measurementEndTime = measurementStartTime = startTime;#ifdef SUPPORT_REAL_RTSP if (session->isRealNetworksRDT) { // hack for RealMedia sessions (RDT, not RTP) RealRDTSource* rdt = (RealRDTSource*)src; kBytesTotal = rdt->totNumKBytesReceived(); totNumPacketsReceived = rdt->totNumPacketsReceived(); totNumPacketsExpected = totNumPacketsReceived; // because we use TCP return; }#endif RTPReceptionStatsDB::Iterator statsIter(src->receptionStatsDB()); // Assume that there's only one SSRC source (usually the case): RTPReceptionStats* stats = statsIter.next(True); if (stats != NULL) { kBytesTotal = stats->totNumKBytesReceived(); totNumPacketsReceived = stats->totNumPacketsReceived(); totNumPacketsExpected = stats->totNumPacketsExpected(); } } virtual ~qosMeasurementRecord() { delete fNext; } void periodicQOSMeasurement(struct timeval const& timeNow);public: RTPSource* fSource; qosMeasurementRecord* fNext;public: struct timeval measurementStartTime, measurementEndTime; double kbits_per_second_min, kbits_per_second_max; double kBytesTotal; double packet_loss_fraction_min, packet_loss_fraction_max; unsigned totNumPacketsReceived, totNumPacketsExpected;};static qosMeasurementRecord* qosRecordHead = NULL;static void periodicQOSMeasurement(void* clientData); // forwardstatic unsigned nextQOSMeasurementUSecs;static void scheduleNextQOSMeasurement() { nextQOSMeasurementUSecs += qosMeasurementIntervalMS*1000; struct timeval timeNow; gettimeofday(&timeNow, NULL); unsigned timeNowUSecs = timeNow.tv_sec*1000000 + timeNow.tv_usec; unsigned usecsToDelay = nextQOSMeasurementUSecs < timeNowUSecs ? 0 : nextQOSMeasurementUSecs - timeNowUSecs; qosMeasurementTimerTask = env->taskScheduler().scheduleDelayedTask( usecsToDelay, (TaskFunc*)periodicQOSMeasurement, (void*)NULL);}static void periodicQOSMeasurement(void* /*clientData*/) { struct timeval timeNow; gettimeofday(&timeNow, NULL); for (qosMeasurementRecord* qosRecord = qosRecordHead; qosRecord != NULL; qosRecord = qosRecord->fNext) { qosRecord->periodicQOSMeasurement(timeNow); } // Do this again later: scheduleNextQOSMeasurement();}void qosMeasurementRecord::periodicQOSMeasurement(struct timeval const& timeNow) { unsigned secsDiff = timeNow.tv_sec - measurementEndTime.tv_sec; int usecsDiff = timeNow.tv_usec - measurementEndTime.tv_usec; double timeDiff = secsDiff + usecsDiff/1000000.0; measurementEndTime = timeNow;#ifdef SUPPORT_REAL_RTSP if (session->isRealNetworksRDT) { // hack for RealMedia sessions (RDT, not RTP) RealRDTSource* rdt = (RealRDTSource*)fSource; double kBytesTotalNow = rdt->totNumKBytesReceived(); double kBytesDeltaNow = kBytesTotalNow - kBytesTotal; kBytesTotal = kBytesTotalNow; double kbpsNow = timeDiff == 0.0 ? 0.0 : 8*kBytesDeltaNow/timeDiff; if (kbpsNow < 0.0) kbpsNow = 0.0; // in case of roundoff error if (kbpsNow < kbits_per_second_min) kbits_per_second_min = kbpsNow; if (kbpsNow > kbits_per_second_max) kbits_per_second_max = kbpsNow; totNumPacketsReceived = rdt->totNumPacketsReceived(); totNumPacketsExpected = totNumPacketsReceived; // because we use TCP packet_loss_fraction_min = packet_loss_fraction_max = 0.0; // ditto return; }#endif RTPReceptionStatsDB::Iterator statsIter(fSource->receptionStatsDB()); // Assume that there's only one SSRC source (usually the case): RTPReceptionStats* stats = statsIter.next(True); if (stats != NULL) { double kBytesTotalNow = stats->totNumKBytesReceived(); double kBytesDeltaNow = kBytesTotalNow - kBytesTotal; kBytesTotal = kBytesTotalNow; double kbpsNow = timeDiff == 0.0 ? 0.0 : 8*kBytesDeltaNow/timeDiff; if (kbpsNow < 0.0) kbpsNow = 0.0; // in case of roundoff error if (kbpsNow < kbits_per_second_min) kbits_per_second_min = kbpsNow; if (kbpsNow > kbits_per_second_max) kbits_per_second_max = kbpsNow; unsigned totReceivedNow = stats->totNumPacketsReceived(); unsigned totExpectedNow = stats->totNumPacketsExpected(); unsigned deltaReceivedNow = totReceivedNow - totNumPacketsReceived; unsigned deltaExpectedNow = totExpectedNow - totNumPacketsExpected; totNumPacketsReceived = totReceivedNow; totNumPacketsExpected = totExpectedNow; double lossFractionNow = deltaExpectedNow == 0 ? 0.0 : 1.0 - deltaReceivedNow/(double)deltaExpectedNow; //if (lossFractionNow < 0.0) lossFractionNow = 0.0; //reordering can cause if (lossFractionNow < packet_loss_fraction_min) { packet_loss_fraction_min = lossFractionNow; } if (lossFractionNow > packet_loss_fraction_max) { packet_loss_fraction_max = lossFractionNow; } }}void beginQOSMeasurement() { // Set up a measurement record for each active subsession: struct timeval startTime; gettimeofday(&startTime, NULL); nextQOSMeasurementUSecs = startTime.tv_sec*1000000 + startTime.tv_usec; qosMeasurementRecord* qosRecordTail = NULL; MediaSubsessionIterator iter(*session); MediaSubsession* subsession; while ((subsession = iter.next()) != NULL) { RTPSource* src = subsession->rtpSource();#ifdef SUPPORT_REAL_RTSP if (session->isRealNetworksRDT) src = (RTPSource*)(subsession->readSource()); // hack#endif if (src == NULL) continue; qosMeasurementRecord* qosRecord = new qosMeasurementRecord(startTime, src); if (qosRecordHead == NULL) qosRecordHead = qosRecord; if (qosRecordTail != NULL) qosRecordTail->fNext = qosRecord; qosRecordTail = qosRecord; } // Then schedule the first of the periodic measurements: scheduleNextQOSMeasurement();}void printQOSData(int exitCode) { if (exitCode != 0 && statusCode == 0) statusCode = 2; *env << "begin_QOS_statistics\n"; *env << "server_availability\t" << (statusCode == 1 ? 0 : 100) << "\n"; *env << "stream_availability\t" << (statusCode == 0 ? 100 : 0) << "\n"; // Print out stats for each active subsession: qosMeasurementRecord* curQOSRecord = qosRecordHead; if (session != NULL) { MediaSubsessionIterator iter(*session); MediaSubsession* subsession; while ((subsession = iter.next()) != NULL) { RTPSource* src = subsession->rtpSource();#ifdef SUPPORT_REAL_RTSP if (session->isRealNetworksRDT) src = (RTPSource*)(subsession->readSource()); // hack#endif if (src == NULL) continue; *env << "subsession\t" << subsession->mediumName() << "/" << subsession->codecName() << "\n"; unsigned numPacketsReceived = 0, numPacketsExpected = 0; if (curQOSRecord != NULL) { numPacketsReceived = curQOSRecord->totNumPacketsReceived; numPacketsExpected = curQOSRecord->totNumPacketsExpected; } *env << "num_packets_received\t" << numPacketsReceived << "\n"; *env << "num_packets_lost\t" << numPacketsExpected - numPacketsReceived << "\n"; if (curQOSRecord != NULL) { unsigned secsDiff = curQOSRecord->measurementEndTime.tv_sec - curQOSRecord->measurementStartTime.tv_sec; int usecsDiff = curQOSRecord->measurementEndTime.tv_usec - curQOSRecord->measurementStartTime.tv_usec; double measurementTime = secsDiff + usecsDiff/1000000.0; *env << "elapsed_measurement_time\t" << measurementTime << "\n";
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -