⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 playcommon.cpp

📁 linux下的rtsp/rtp/rtcp协议栈源代码; c++写的
💻 CPP
📖 第 1 页 / 共 3 页
字号:
}void subsessionAfterPlaying(void* clientData) {  // Begin by closing this media subsession:  MediaSubsession* subsession = (MediaSubsession*)clientData;  Medium::close(subsession->sink);  subsession->sink = NULL;  // Next, check whether *all* subsessions have now been closed:  MediaSession& session = subsession->parentSession();  MediaSubsessionIterator iter(session);  while ((subsession = iter.next()) != NULL) {    if (subsession->sink != NULL) return; // this subsession is still active  }  // All subsessions have now been closed  sessionAfterPlaying();}void subsessionByeHandler(void* clientData) {  struct timeval timeNow;  gettimeofday(&timeNow, NULL);  unsigned secsDiff = timeNow.tv_sec - startTime.tv_sec;  MediaSubsession* subsession = (MediaSubsession*)clientData;  *env << "Received RTCP \"BYE\" on \"" << subsession->mediumName()	<< "/" << subsession->codecName()	<< "\" subsession (after " << secsDiff	<< " seconds)\n";  // Act now as if the subsession had closed:  subsessionAfterPlaying(subsession);}void sessionAfterPlaying(void* /*clientData*/) {  if (!playContinuously) {    shutdown(0);  } else {    // We've been asked to play the stream(s) over again:    startPlayingStreams();  }}void sessionTimerHandler(void* /*clientData*/) {  sessionTimerTask = NULL;  sessionAfterPlaying();}class qosMeasurementRecord {public:  qosMeasurementRecord(struct timeval const& startTime, RTPSource* src)    : fSource(src), fNext(NULL),      kbits_per_second_min(1e20), kbits_per_second_max(0),      kBytesTotal(0.0),      packet_loss_fraction_min(1.0), packet_loss_fraction_max(0.0),      totNumPacketsReceived(0), totNumPacketsExpected(0) {    measurementEndTime = measurementStartTime = startTime;#ifdef SUPPORT_REAL_RTSP    if (session->isRealNetworksRDT) { // hack for RealMedia sessions (RDT, not RTP)      RealRDTSource* rdt = (RealRDTSource*)src;      kBytesTotal = rdt->totNumKBytesReceived();      totNumPacketsReceived = rdt->totNumPacketsReceived();      totNumPacketsExpected = totNumPacketsReceived; // because we use TCP      return;    }#endif    RTPReceptionStatsDB::Iterator statsIter(src->receptionStatsDB());    // Assume that there's only one SSRC source (usually the case):    RTPReceptionStats* stats = statsIter.next(True);    if (stats != NULL) {      kBytesTotal = stats->totNumKBytesReceived();      totNumPacketsReceived = stats->totNumPacketsReceived();      totNumPacketsExpected = stats->totNumPacketsExpected();    }  }  virtual ~qosMeasurementRecord() { delete fNext; }      void periodicQOSMeasurement(struct timeval const& timeNow);public:  RTPSource* fSource;  qosMeasurementRecord* fNext;public:  struct timeval measurementStartTime, measurementEndTime;  double kbits_per_second_min, kbits_per_second_max;  double kBytesTotal;  double packet_loss_fraction_min, packet_loss_fraction_max;  unsigned totNumPacketsReceived, totNumPacketsExpected;};static qosMeasurementRecord* qosRecordHead = NULL;static void periodicQOSMeasurement(void* clientData); // forwardstatic unsigned nextQOSMeasurementUSecs;static void scheduleNextQOSMeasurement() {  nextQOSMeasurementUSecs += qosMeasurementIntervalMS*1000;  struct timeval timeNow;  gettimeofday(&timeNow, NULL);  unsigned timeNowUSecs = timeNow.tv_sec*1000000 + timeNow.tv_usec;  unsigned usecsToDelay = nextQOSMeasurementUSecs < timeNowUSecs ? 0    : nextQOSMeasurementUSecs - timeNowUSecs;  qosMeasurementTimerTask = env->taskScheduler().scheduleDelayedTask(     usecsToDelay, (TaskFunc*)periodicQOSMeasurement, (void*)NULL);}static void periodicQOSMeasurement(void* /*clientData*/) {  struct timeval timeNow;  gettimeofday(&timeNow, NULL);  for (qosMeasurementRecord* qosRecord = qosRecordHead;       qosRecord != NULL; qosRecord = qosRecord->fNext) {    qosRecord->periodicQOSMeasurement(timeNow);  }  // Do this again later:  scheduleNextQOSMeasurement();}void qosMeasurementRecord::periodicQOSMeasurement(struct timeval const& timeNow) {  unsigned secsDiff = timeNow.tv_sec - measurementEndTime.tv_sec;  int usecsDiff = timeNow.tv_usec - measurementEndTime.tv_usec;  double timeDiff = secsDiff + usecsDiff/1000000.0;  measurementEndTime = timeNow;#ifdef SUPPORT_REAL_RTSP  if (session->isRealNetworksRDT) { // hack for RealMedia sessions (RDT, not RTP)    RealRDTSource* rdt = (RealRDTSource*)fSource;    double kBytesTotalNow = rdt->totNumKBytesReceived();    double kBytesDeltaNow = kBytesTotalNow - kBytesTotal;    kBytesTotal = kBytesTotalNow;    double kbpsNow = timeDiff == 0.0 ? 0.0 : 8*kBytesDeltaNow/timeDiff;    if (kbpsNow < 0.0) kbpsNow = 0.0; // in case of roundoff error    if (kbpsNow < kbits_per_second_min) kbits_per_second_min = kbpsNow;    if (kbpsNow > kbits_per_second_max) kbits_per_second_max = kbpsNow;    totNumPacketsReceived = rdt->totNumPacketsReceived();    totNumPacketsExpected = totNumPacketsReceived; // because we use TCP    packet_loss_fraction_min = packet_loss_fraction_max = 0.0; // ditto    return;  }#endif  RTPReceptionStatsDB::Iterator statsIter(fSource->receptionStatsDB());  // Assume that there's only one SSRC source (usually the case):  RTPReceptionStats* stats = statsIter.next(True);  if (stats != NULL) {    double kBytesTotalNow = stats->totNumKBytesReceived();    double kBytesDeltaNow = kBytesTotalNow - kBytesTotal;    kBytesTotal = kBytesTotalNow;    double kbpsNow = timeDiff == 0.0 ? 0.0 : 8*kBytesDeltaNow/timeDiff;    if (kbpsNow < 0.0) kbpsNow = 0.0; // in case of roundoff error    if (kbpsNow < kbits_per_second_min) kbits_per_second_min = kbpsNow;    if (kbpsNow > kbits_per_second_max) kbits_per_second_max = kbpsNow;    unsigned totReceivedNow = stats->totNumPacketsReceived();    unsigned totExpectedNow = stats->totNumPacketsExpected();    unsigned deltaReceivedNow = totReceivedNow - totNumPacketsReceived;    unsigned deltaExpectedNow = totExpectedNow - totNumPacketsExpected;    totNumPacketsReceived = totReceivedNow;    totNumPacketsExpected = totExpectedNow;    double lossFractionNow = deltaExpectedNow == 0 ? 0.0      : 1.0 - deltaReceivedNow/(double)deltaExpectedNow;    //if (lossFractionNow < 0.0) lossFractionNow = 0.0; //reordering can cause    if (lossFractionNow < packet_loss_fraction_min) {      packet_loss_fraction_min = lossFractionNow;    }    if (lossFractionNow > packet_loss_fraction_max) {      packet_loss_fraction_max = lossFractionNow;    }  }}void beginQOSMeasurement() {  // Set up a measurement record for each active subsession:  struct timeval startTime;  gettimeofday(&startTime, NULL);  nextQOSMeasurementUSecs = startTime.tv_sec*1000000 + startTime.tv_usec;  qosMeasurementRecord* qosRecordTail = NULL;  MediaSubsessionIterator iter(*session);  MediaSubsession* subsession;  while ((subsession = iter.next()) != NULL) {    RTPSource* src = subsession->rtpSource();#ifdef SUPPORT_REAL_RTSP    if (session->isRealNetworksRDT) src = (RTPSource*)(subsession->readSource()); // hack#endif    if (src == NULL) continue;    qosMeasurementRecord* qosRecord      = new qosMeasurementRecord(startTime, src);    if (qosRecordHead == NULL) qosRecordHead = qosRecord;    if (qosRecordTail != NULL) qosRecordTail->fNext = qosRecord;    qosRecordTail  = qosRecord;  }  // Then schedule the first of the periodic measurements:  scheduleNextQOSMeasurement();}void printQOSData(int exitCode) {  if (exitCode != 0 && statusCode == 0) statusCode = 2;  *env << "begin_QOS_statistics\n";  *env << "server_availability\t" << (statusCode == 1 ? 0 : 100) << "\n";  *env << "stream_availability\t" << (statusCode == 0 ? 100 : 0) << "\n";  // Print out stats for each active subsession:  qosMeasurementRecord* curQOSRecord = qosRecordHead;  if (session != NULL) {    MediaSubsessionIterator iter(*session);    MediaSubsession* subsession;    while ((subsession = iter.next()) != NULL) {      RTPSource* src = subsession->rtpSource();#ifdef SUPPORT_REAL_RTSP      if (session->isRealNetworksRDT) src = (RTPSource*)(subsession->readSource()); // hack#endif      if (src == NULL) continue;      *env << "subsession\t" << subsession->mediumName()	   << "/" << subsession->codecName() << "\n";            unsigned numPacketsReceived = 0, numPacketsExpected = 0;            if (curQOSRecord != NULL) {	numPacketsReceived = curQOSRecord->totNumPacketsReceived;	numPacketsExpected = curQOSRecord->totNumPacketsExpected;      }      *env << "num_packets_received\t" << numPacketsReceived << "\n";      *env << "num_packets_lost\t" << numPacketsExpected - numPacketsReceived << "\n";            if (curQOSRecord != NULL) {	unsigned secsDiff = curQOSRecord->measurementEndTime.tv_sec	  - curQOSRecord->measurementStartTime.tv_sec;	int usecsDiff = curQOSRecord->measurementEndTime.tv_usec	  - curQOSRecord->measurementStartTime.tv_usec;	double measurementTime = secsDiff + usecsDiff/1000000.0;	*env << "elapsed_measurement_time\t" << measurementTime << "\n";		*env << "kBytes_received_total\t" << curQOSRecord->kBytesTotal << "\n";		*env << "measurement_sampling_interval_ms\t" << qosMeasurementIntervalMS << "\n";		if (curQOSRecord->kbits_per_second_max == 0) {	  // special case: we didn't receive any data:	  *env <<	    "kbits_per_second_min\tunavailable\n"	    "kbits_per_second_ave\tunavailable\n"	    "kbits_per_second_max\tunavailable\n";	} else {	  *env << "kbits_per_second_min\t" << curQOSRecord->kbits_per_second_min << "\n";	  *env << "kbits_per_second_ave\t"	       << (measurementTime == 0.0 ? 0.0 : 8*curQOSRecord->kBytesTotal/measurementTime) << "\n";	  *env << "kbits_per_second_max\t" << curQOSRecord->kbits_per_second_max << "\n";	}		*env << "packet_loss_percentage_min\t" << 100*curQOSRecord->packet_loss_fraction_min << "\n";	double packetLossFraction = numPacketsExpected == 0 ? 1.0	  : 1.0 - numPacketsReceived/(double)numPacketsExpected;	if (packetLossFraction < 0.0) packetLossFraction = 0.0;	*env << "packet_loss_percentage_ave\t" << 100*packetLossFraction << "\n";	*env << "packet_loss_percentage_max\t"	     << (packetLossFraction == 1.0 ? 100.0 : 100*curQOSRecord->packet_loss_fraction_max) << "\n";	#ifdef SUPPORT_REAL_RTSP	if (session->isRealNetworksRDT) {	  RealRDTSource* rdt = (RealRDTSource*)src;	  *env << "inter_packet_gap_ms_min\t" << rdt->minInterPacketGapUS()/1000.0 << "\n";	  struct timeval totalGaps = rdt->totalInterPacketGaps();	  double totalGapsMS = totalGaps.tv_sec*1000.0 + totalGaps.tv_usec/1000.0;	  unsigned totNumPacketsReceived = rdt->totNumPacketsReceived();	  *env << "inter_packet_gap_ms_ave\t"	       << (totNumPacketsReceived == 0 ? 0.0 : totalGapsMS/totNumPacketsReceived) << "\n";	  *env << "inter_packet_gap_ms_max\t" << rdt->maxInterPacketGapUS()/1000.0 << "\n";	} else {#endif	  RTPReceptionStatsDB::Iterator statsIter(src->receptionStatsDB());	  // Assume that there's only one SSRC source (usually the case):	  RTPReceptionStats* stats = statsIter.next(True);	  if (stats != NULL) {	    *env << "inter_packet_gap_ms_min\t" << stats->minInterPacketGapUS()/1000.0 << "\n";	    struct timeval totalGaps = stats->totalInterPacketGaps();	    double totalGapsMS = totalGaps.tv_sec*1000.0 + totalGaps.tv_usec/1000.0;	    unsigned totNumPacketsReceived = stats->totNumPacketsReceived();	    *env << "inter_packet_gap_ms_ave\t"		 << (totNumPacketsReceived == 0 ? 0.0 : totalGapsMS/totNumPacketsReceived) << "\n";	    *env << "inter_packet_gap_ms_max\t" << stats->maxInterPacketGapUS()/1000.0 << "\n";	  }#ifdef SUPPORT_REAL_RTSP	}#endif		curQOSRecord = curQOSRecord->fNext;      }    }  }      *env << "end_QOS_statistics\n";  delete qosRecordHead;}void shutdown(int exitCode) {  if (env != NULL) {    env->taskScheduler().unscheduleDelayedTask(sessionTimerTask);    env->taskScheduler().unscheduleDelayedTask(arrivalCheckTimerTask);    env->taskScheduler().unscheduleDelayedTask(interPacketGapCheckTimerTask);    env->taskScheduler().unscheduleDelayedTask(qosMeasurementTimerTask);  }  if (qosMeasurementIntervalMS > 0) {    printQOSData(exitCode);  }  // Close our output files:  closeMediaSinks();  // Teardown, then shutdown, any outstanding RTP/RTCP subsessions  tearDownStreams();  Medium::close(session);  // Finally, shut down our client:  Medium::close(ourClient);  // Adios...  exit(exitCode);}void signalHandlerShutdown(int /*sig*/) {  *env << "Got shutdown signal\n";  shutdown(0);}void checkForPacketArrival(void* /*clientData*/) {  if (!notifyOnPacketArrival) return; // we're not checking   // Check each subsession, to see whether it has received data packets:  unsigned numSubsessionsChecked = 0;  unsigned numSubsessionsWithReceivedData = 0;  unsigned numSubsessionsThatHaveBeenSynced = 0;  MediaSubsessionIterator iter(*session);  MediaSubsession* subsession;  while ((subsession = iter.next()) != NULL) {    RTPSource* src = subsession->rtpSource();    if (src == NULL) continue;    ++numSubsessionsChecked;    if (src->receptionStatsDB().numActiveSourcesSinceLastReset() > 0) {      // At least one data packet has arrived      ++numSubsessionsWithReceivedData;    }    if (src->hasBeenSynchronizedUsingRTCP()) {      ++numSubsessionsThatHaveBeenSynced;    }  }  unsigned numSubsessionsToCheck = numSubsessionsChecked;  // Special case for "QuickTimeFileSink"s and "AVIFileSink"s:  // They might not use all of the input sources:  if (qtOut != NULL) {    numSubsessionsToCheck = qtOut->numActiveSubsessions();  } else if (aviOut != NULL) {    numSubsessionsToCheck = aviOut->numActiveSubsessions();  }  Boolean notifyTheUser;  if (!syncStreams) {    notifyTheUser = numSubsessionsWithReceivedData > 0; // easy case  } else {    notifyTheUser = numSubsessionsWithReceivedData >= numSubsessionsToCheck      && numSubsessionsThatHaveBeenSynced == numSubsessionsChecked;    // Note: A subsession with no active sources is considered to be synced  }  if (notifyTheUser) {    struct timeval timeNow;    gettimeofday(&timeNow, NULL);	char timestampStr[100];	sprintf(timestampStr, "%ld%03ld", timeNow.tv_sec, timeNow.tv_usec/1000);    *env << (syncStreams ? "Synchronized d" : "D")		<< "ata packets have begun arriving [" << timestampStr << "]\007\n";    return;  }  // No luck, so reschedule this check again, after a delay:  int uSecsToDelay = 100000; // 100 ms  arrivalCheckTimerTask    = env->taskScheduler().scheduleDelayedTask(uSecsToDelay,			       (TaskFunc*)checkForPacketArrival, NULL);}void checkInterPacketGaps(void* /*clientData*/) {  if (interPacketGapMaxTime == 0) return; // we're not checking   // Check each subsession, counting up how many packets have been received:  unsigned newTotNumPacketsReceived = 0;  MediaSubsessionIterator iter(*session);  MediaSubsession* subsession;  while ((subsession = iter.next()) != NULL) {    RTPSource* src = subsession->rtpSource();    if (src == NULL) continue;    newTotNumPacketsReceived += src->receptionStatsDB().totNumPacketsReceived();  }  if (newTotNumPacketsReceived == totNumPacketsReceived) {    // No additional packets have been received since the last time we    // checked, so end this stream:    *env << "Closing session, because we stopped receiving packets.\n";    interPacketGapCheckTimerTask = NULL;    sessionAfterPlaying();  } else {    totNumPacketsReceived = newTotNumPacketsReceived;    // Check again, after the specified delay:     interPacketGapCheckTimerTask      = env->taskScheduler().scheduleDelayedTask(interPacketGapMaxTime*1000000,				 (TaskFunc*)checkInterPacketGaps, NULL);  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -