📄 rtp_bytestream.cpp
字号:
calc /= m_timescale; if (calc >= m_rtp_buffer_time) { if (m_base_ts_set == false) { rtp_message(LOG_NOTICE, "%s - Setting rtp seq and time from 1st pak", m_name); set_rtp_base_ts(m_head->rtp_pak_ts); m_rtpinfo_set_from_pak = 1; } else { m_rtpinfo_set_from_pak = 0; } m_buffering = 1;#if 1 rtp_message(LOG_INFO, "%s buffering complete - head seq %d %u tail seq %d %u "D64, m_name, m_head->rtp_pak_seq, head_ts, m_tail->rtp_pak_seq, tail_ts, calc);#endif m_next_seq = m_head->rtp_pak_seq - 1; } } } } return m_buffering;} /* * recv_callback - callback for when bytestream is active - basically, * put things on the queue */int CRtpByteStreamBase::recv_callback (struct rtp *session, rtp_event *e){ switch (e->type) { case RX_RTP: rtp_packet *rpak; rpak = (rtp_packet *)e->data; if (rpak->rtp_data_len == 0) { xfree(rpak); } else { // need to add lock/unlock of mutex here if (m_have_recv_last_ts) { int32_t diff = rpak->rtp_pak_ts - m_recv_last_ts; int32_t ts, nts; ts = m_timescale * 2; nts = 0 - ts; if (diff > ts || diff < nts) { rtp_message(LOG_INFO, "%s - rtp timestamp diff %d last %u now %u", m_name, diff, m_recv_last_ts, rpak->rtp_pak_ts); flush_rtp_packets(); reset(); } } m_have_recv_last_ts = true; m_recv_last_ts = rpak->rtp_pak_ts; if (m_buffering == 0) { rpak->pd.rtp_pd_timestamp = get_time_of_day(); rpak->pd.rtp_pd_have_timestamp = 1; } if (SDL_mutexP(m_rtp_packet_mutex) == -1) { rtp_message(LOG_CRIT, "SDL Lock mutex failure in rtp bytestream recv"); break; } add_rtp_packet_to_queue(rpak, &m_head, &m_tail, m_name); if (SDL_mutexV(m_rtp_packet_mutex) == -1) { rtp_message(LOG_CRIT, "SDL Lock mutex failure in rtp bytestream recv"); break; } m_recvd_pak = true; check_buffering(); } break; case RX_SR: rtcp_sr *srpak; srpak = (rtcp_sr *)e->data; if (rtp_my_ssrc(session) != e->ssrc) { //rtp_message(LOG_DEBUG, "%s received rtcp", m_name); calculate_wallclock_offset_from_rtcp(srpak->ntp_frac, srpak->ntp_sec, srpak->rtp_ts); } break; case RX_APP: free(e->data); break; default:#if 0 rtp_message(LOG_DEBUG, "Thread %u - Callback from rtp with %d %p", SDL_ThreadID(),e->type, e->rtp_data);#endif break; } return m_buffering;}/* * synchronize is used to adjust a video broadcasts time based * on an audio broadcasts time. * We now start the audio and video just based on the Unix time of the * first packet. Then we use this to adjust when both sides have rtcp * packets. * It will also work if we never get in RTCP - this routine won't be * called - but our sync will be off. */void CRtpByteStreamBase::synchronize (rtcp_sync_t *sync){ // need to recalculate m_first_pak_ts here uint64_t adjust_first_pak_ts; int64_t adjust_first_pak; int64_t audio_diff, our_diff; if (sync == NULL) { if (!m_have_sync_info) return; sync = &m_sync_info; } else { if (m_rtcp_received == false) m_sync_info = *sync; } m_have_sync_info = true; if (m_psptr != NULL) return; if (m_rtcp_received == false) return; if (m_have_first_pak_ts == false) return; // First calculation - use the first packet's timestamp to calculate // what the timestamp value would be at the RTCP's RTP timestamp value // adjust_first_pak is amount we need to add to the first_packet's timestamp // We do this for the data we got for the audio stream adjust_first_pak = sync->rtcp_rtp_ts; adjust_first_pak -= sync->first_pak_rtp_ts; adjust_first_pak *= 1000; adjust_first_pak /= (int64_t)sync->timescale; adjust_first_pak_ts = sync->first_pak_ts; adjust_first_pak_ts += adjust_first_pak; // Now, we compute the difference between that value and what the RTCP // says the timestamp should be audio_diff = adjust_first_pak_ts; audio_diff -= sync->rtcp_ts;#ifdef DEBUG_RTP_WCLOCK rtp_message(LOG_DEBUG, "%s - audio rtcp rtp ts %u first pak %u", m_name, sync->rtcp_rtp_ts, sync->first_pak_rtp_ts); rtp_message(LOG_DEBUG, "%s - audio rtcp ts "U64" first pak "U64, m_name, sync->rtcp_ts, sync->first_pak_ts); rtp_message(LOG_DEBUG, "%s - adjusted first pak "D64" ts "U64, m_name, adjust_first_pak, sync->timescale); rtp_message(LOG_DEBUG, "%s - diff "D64, m_name, audio_diff);#endif // Now, we do the same calculation for the numbers for our timestamps - // find the timestamp by adjusting the first packet's timestamp to the // timestamp based on the current RTCP RTP timestamp; adjust_first_pak = m_rtcp_rtp_ts; adjust_first_pak -= m_first_pak_rtp_ts; adjust_first_pak *= 1000; adjust_first_pak /= (int64_t)m_timescale; adjust_first_pak_ts = m_first_pak_ts; adjust_first_pak_ts += adjust_first_pak; our_diff = adjust_first_pak_ts; our_diff -= m_rtcp_ts;#ifdef DEBUG_RTP_WCLOCK rtp_message(LOG_DEBUG, "%s - our rtcp rtp ts %u first pak %u", m_name, m_rtcp_rtp_ts, m_first_pak_rtp_ts); rtp_message(LOG_DEBUG, "%s - our rtcp ts "U64" first pak "U64, m_name, m_rtcp_ts, m_first_pak_ts); rtp_message(LOG_DEBUG, "%s - adjusted first pak "D64" ts "U64, m_name, adjust_first_pak, m_timescale); rtp_message(LOG_DEBUG, "%s - diff "D64, m_name, our_diff); rtp_message(LOG_INFO, "%s adjusting first pak ts by "D64, m_name, audio_diff - our_diff);#endif // Now, we very simply add the difference between the 2 to get // what the equivalent start time would be. Note that these values // for the first packet are not fixed - they change over time to avoid // wrap issues. m_first_pak_ts += audio_diff - our_diff;}void CRtpByteStreamBase::remove_packet_rtp_queue (rtp_packet *pak, int free){ if (pak == NULL) return; SDL_LockMutex(m_rtp_packet_mutex); if ((m_head == pak) && (m_head->rtp_next == NULL || m_head->rtp_next == m_head)) { m_head = NULL; m_tail = NULL; } else { pak->rtp_next->rtp_prev = pak->rtp_prev; pak->rtp_prev->rtp_next = pak->rtp_next; if (m_head == pak) { m_head = pak->rtp_next; } if (m_tail == pak) { m_tail = pak->rtp_prev; } } if (pak->rtp_data_len < 0) { // restore the packet data length pak->rtp_data_len = 0 - pak->rtp_data_len; } if (free == 1) { xfree(pak); } SDL_UnlockMutex(m_rtp_packet_mutex);}void CRtpByteStreamBase::flush_rtp_packets (void){ rtp_message(LOG_DEBUG, "%s flushed", m_name); while (m_head != NULL) { remove_packet_rtp_queue(m_head, 1); } m_buffering = 0; m_recvd_pak = false; m_recvd_pak_timeout = false;}void CRtpByteStreamBase::pause(void){ reset();}/* * rtp_periodic - called from the player media rtp task. This basically just * checks for the end of the range. */void CRtpByteStreamBase::rtp_periodic (void){ if (m_buffering != 0) { if (m_recvd_pak == false) { if (m_recvd_pak_timeout == false) { m_recvd_pak_timeout_time = get_time_of_day();#if 0 rtp_message(LOG_DEBUG, "%s Starting timeout at "U64, m_name, m_recvd_pak_timeout_time);#endif } else { uint64_t timeout; if (m_eof == 0) { timeout = get_time_of_day() - m_recvd_pak_timeout_time; if (m_stream_ondemand && get_max_playtime() != 0.0) { uint64_t range_end = (uint64_t)(get_max_playtime() * 1000.0); if (m_last_realtime + timeout >= range_end) { rtp_message(LOG_DEBUG, "%s Timedout at range end - last "U64" range end "U64" "U64, m_name, m_last_realtime, range_end, timeout); m_eof = 1; } } else { // broadcast - perhaps if we time out for a second or 2, we // should re-init rtp ? We definately need to put some timing // checks here. session_desc_t *sptr = m_fmt->media->parent; if (sptr->time_desc != NULL && sptr->time_desc->end_time != 0) { time_t this_time; this_time = time(NULL); if (this_time > sptr->time_desc->end_time && timeout >= TO_U64(1000)) { m_eof = 1; } } } } } m_recvd_pak_timeout = true; } else { m_recvd_pak = false; m_recvd_pak_timeout = false; } }}bool CRtpByteStreamBase::check_rtp_frame_complete_for_payload_type (void){ return (m_head && m_tail->rtp_pak_m == 1);}bool CRtpByteStreamBase::check_seq (uint16_t seq){ if (seq != m_next_seq) { rtp_message(LOG_INFO, "%s - rtp sequence is %u should be %u", m_name, seq, m_next_seq); return false; } return true;}void CRtpByteStreamBase::set_last_seq (uint16_t seq){ m_next_seq = seq + 1;}uint64_t CRtpByteStreamBase::rtp_ts_to_msec (uint32_t rtp_ts, uint64_t uts, uint64_t &wrap_offset){ uint64_t timetick; uint64_t adjusted_rtp_ts; uint64_t adjusted_wc_rtp_ts; bool have_wrap = false; uint32_t this_mask, last_mask; last_mask = m_last_rtp_ts & (1U << 31); this_mask = rtp_ts & (1U << 31); if (last_mask != this_mask) { if (this_mask == 0) { wrap_offset += (TO_U64(1) << 32); have_wrap = true; rtp_message(LOG_DEBUG, "%s - have wrap %x new %x", m_name, m_last_rtp_ts, rtp_ts); } else { // need to do something here } } if (m_stream_ondemand) { adjusted_rtp_ts = wrap_offset; adjusted_rtp_ts += rtp_ts; adjusted_wc_rtp_ts = m_base_rtp_ts; if (adjusted_wc_rtp_ts > adjusted_rtp_ts) { timetick = adjusted_wc_rtp_ts - adjusted_rtp_ts; timetick *= TO_U64(1000); timetick /= m_timescale; if (timetick > m_play_start_time) { timetick = 0; } else { timetick = m_play_start_time - timetick; } } else { timetick = adjusted_rtp_ts - adjusted_wc_rtp_ts; timetick *= TO_U64(1000); timetick /= m_timescale; timetick += m_play_start_time; } } else { // We've got a broadcast scenario here... if (m_have_first_pak_ts == false) { // We haven't processed the first packet yet - we record // the data here. m_first_pak_rtp_ts = rtp_ts; m_first_pak_ts = uts; m_have_first_pak_ts = true; rtp_message(LOG_DEBUG, "%s first pak ts %u "U64, m_name, m_first_pak_rtp_ts, m_first_pak_ts); // if we have received RTCP, set the wallclock offset, which // triggers the synchronization effort. if (m_rtcp_received) { // calculate other stuff //rtp_message(LOG_DEBUG, "%s rtp_ts_to_msec calling wallclock", m_name); set_wallclock_offset(m_rtcp_ts, m_rtcp_rtp_ts); } } SDL_LockMutex(m_rtp_packet_mutex); // fairly simple calculation to calculate the timestamp // based on this rtp timestamp, the first pak rtp timestamp and // the first packet timestamp. int32_t adder; int64_t ts_adder; if (have_wrap) { adder = rtp_ts - m_first_pak_rtp_ts; // adjust once an hour, to keep errors low // we'll adjust the timestamp and rtp timestamp ts_adder = (int64_t)adder; ts_adder *= TO_D64(1000); ts_adder /= (int64_t)m_timescale; m_first_pak_ts += ts_adder; m_first_pak_rtp_ts = rtp_ts;#ifdef DEBUG_RTP_BCAST rtp_message(LOG_DEBUG, "%s adjust for wrap - first pak ts is now "U64" rtp %u", m_name, m_first_pak_ts, m_first_pak_rtp_ts);#endif } // adder could be negative here, based on the RTCP we receive adder = rtp_ts - m_first_pak_rtp_ts; ts_adder = (int64_t)adder; ts_adder *= TO_D64(1000); ts_adder /= (int64_t)m_timescale; timetick = m_first_pak_ts; timetick += ts_adder; SDL_UnlockMutex(m_rtp_packet_mutex);#ifdef DEBUG_RTP_BCAST rtp_message(LOG_DEBUG, "%s ts %x base %x "U64" tp "U64" adder %d "D64,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -