📄 rtp_bytestream.cpp
字号:
* We need to make sure we have some buffering. We'll buffer * about 2 seconds worth, then let the decode task know to go... */ if (m_buffering == 0) { uint32_t head_ts, tail_ts; if (m_head != NULL) { /* * Payload type the same. Make sure we have at least 2 seconds of * good data */ if (rtp_ready() == 0) { rtp_message(LOG_DEBUG, "Determined payload type, but rtp bytestream is not ready"); uint64_t calc; do { head_ts = m_head->rtp_pak_ts; tail_ts = m_tail->rtp_pak_ts; calc = (tail_ts - head_ts); calc *= 1000; calc /= m_timescale; if (calc > m_rtp_buffer_time) { rtp_packet *temp = m_head; m_head = m_head->rtp_next; m_tail->rtp_next = m_head; m_head->rtp_prev = m_tail; xfree((void *)temp); } } while (calc > m_rtp_buffer_time); return 0; } if (check_rtp_frame_complete_for_payload_type()) { head_ts = m_head->rtp_pak_ts; tail_ts = m_tail->rtp_pak_ts; if (head_ts > tail_ts && ((head_ts & (1 << 31)) == (tail_ts & (1 << 31)))) { return 0; } uint64_t calc; calc = tail_ts; calc -= head_ts; calc *= M_LLU; calc /= m_timescale; if (calc > m_rtp_buffer_time) { if (m_base_ts_set == false) { rtp_message(LOG_NOTICE, "Setting rtp seq and time from 1st pak"); set_rtp_base_ts(m_head->rtp_pak_ts); m_rtpinfo_set_from_pak = 1; } else { m_rtpinfo_set_from_pak = 0; } m_buffering = 1;#if 1 rtp_message(LOG_INFO, "%s buffering complete - seq %d head %u tail %u "LLD, m_name, m_head->rtp_pak_seq, head_ts, tail_ts, calc);#endif rtp_done_buffering(); } } } } else { if (decode_thread_waiting != 0) { /* * We're good with buffering - but the decode thread might have * caught up, and will be waiting. Post a message to kickstart * it */ if (check_rtp_frame_complete_for_payload_type()) { return (1); } } if (m_recvd_pak == 0) { if (m_recvd_pak_timeout == 0) { m_recvd_pak_timeout_time = get_time_of_day();#if 0 rtp_message(LOG_DEBUG, "%s Starting timeout at %llu", m_name, m_recvd_pak_timeout_time);#endif } else { uint64_t timeout; timeout = get_time_of_day() - m_recvd_pak_timeout_time; if (m_stream_ondemand && get_max_playtime() != 0.0) { uint64_t range_end = (uint64_t)(get_max_playtime() * 1000.0); if (m_last_realtime + timeout >= range_end) { rtp_message(LOG_DEBUG, "%s Timedout at range end - last "LLU" range end "LLU" "LLU, m_name, m_last_realtime, range_end, timeout); m_eof = 1; } } else { // broadcast - perhaps if we time out for a second or 2, we // should re-init rtp ? We definately need to put some timing // checks here. session_desc_t *sptr = m_fmt->media->parent; if (sptr->time_desc != NULL && sptr->time_desc->end_time != 0) { time_t this_time; this_time = time(NULL); if (this_time > sptr->time_desc->end_time && timeout >= M_LLU) { m_eof = 1; } } } } m_recvd_pak_timeout++; } else { m_recvd_pak = 0; m_recvd_pak_timeout = 0; } } return (m_buffering);}int CRtpByteStreamBase::check_rtp_frame_complete_for_payload_type (void){ return (m_head && m_tail->rtp_pak_m == 1);}uint64_t CRtpByteStreamBase::rtp_ts_to_msec (uint32_t rtp_ts, uint64_t uts, uint64_t &wrap_offset){ uint64_t timetick; uint64_t adjusted_rtp_ts; uint64_t adjusted_wc_rtp_ts; bool have_wrap = false; if (((m_ts & 0x80000000) == 0x80000000) && ((rtp_ts & 0x80000000) == 0)) { wrap_offset += (I_LLU << 32); have_wrap = true; } if (m_stream_ondemand) { adjusted_rtp_ts = wrap_offset; adjusted_rtp_ts += rtp_ts; adjusted_wc_rtp_ts = m_base_rtp_ts; if (adjusted_wc_rtp_ts > adjusted_rtp_ts) { timetick = adjusted_wc_rtp_ts - adjusted_rtp_ts; timetick *= M_LLU; timetick /= m_timescale; if (timetick > m_play_start_time) { timetick = 0; } else { timetick = m_play_start_time - timetick; } } else { timetick = adjusted_rtp_ts - adjusted_wc_rtp_ts; timetick *= M_LLU; timetick /= m_timescale; timetick += m_play_start_time; } } else { // We've got a broadcast scenario here... if (m_have_first_pak_ts == false) { // We haven't processed the first packet yet - we record // the data here. m_first_pak_rtp_ts = rtp_ts; m_first_pak_ts = uts; m_have_first_pak_ts = true; rtp_message(LOG_DEBUG, "%s first pak ts %u %llu", m_name, m_first_pak_rtp_ts, m_first_pak_ts); // if we have received RTCP, set the wallclock offset, which // triggers the syncronization effort. if (m_rtcp_received) { // calculate other stuff set_wallclock_offset(m_rtcp_ts, m_rtcp_rtp_ts); } } SDL_LockMutex(m_rtp_packet_mutex); // fairly simple calculation to calculate the timestamp // based on this rtp timestamp, the first pak rtp timestamp and // the first packet timestamp. int64_t adder; if (have_wrap) { adder = (int64_t)rtp_ts; adder += I_LLU << 32; adder -= (int64_t)m_first_pak_rtp_ts; // adjust once an hour, to keep errors low // we'll adjust the timestamp and rtp timestamp adder *= M_LLU; adder /= m_timescale; m_first_pak_ts += adder; m_first_pak_rtp_ts = rtp_ts;#ifdef DEBUG_RTP_BCAST rtp_message(LOG_DEBUG, "%s adjust for wrap - first pak ts is now %llu rtp %u", m_name, m_first_pak_ts, m_first_pak_rtp_ts);#endif } adder = (int64_t)rtp_ts; adder -= (int64_t) m_first_pak_rtp_ts; adder *= (int64_t)1000; adder /= (int64_t)m_timescale; timetick = m_first_pak_ts; timetick += adder; SDL_UnlockMutex(m_rtp_packet_mutex);#ifdef DEBUG_RTP_BCAST rtp_message(LOG_DEBUG, "%s ts %u base %u %llu tp %llu", m_name, rtp_ts, m_first_pak_rtp_ts, m_first_pak_ts, timetick);#endif } // record time m_last_realtime = timetick; return (timetick);}CRtpByteStream::CRtpByteStream(const char *name, format_list_t *fmt, unsigned int rtp_pt, int ondemand, uint64_t tickpersec, rtp_packet **head, rtp_packet **tail, int rtp_seq_set, uint16_t rtp_base_seq, int rtp_ts_set, uint32_t rtp_base_ts, int rtcp_received, uint32_t ntp_frac, uint32_t ntp_sec, uint32_t rtp_ts) : CRtpByteStreamBase(name, fmt, rtp_pt, ondemand, tickpersec, head, tail, rtp_seq_set, rtp_base_seq, rtp_ts_set, rtp_base_ts, rtcp_received, ntp_frac, ntp_sec, rtp_ts){ m_buffer = (uint8_t *)malloc(4096); m_buffer_len_max = 4096; m_bytes_used = m_buffer_len = 0;}CRtpByteStream::~CRtpByteStream (void){ free(m_buffer); m_buffer = NULL;}void CRtpByteStream::reset (void){ m_buffer_len = m_bytes_used = 0; CRtpByteStreamBase::reset();}uint64_t CRtpByteStream::start_next_frame (uint8_t **buffer, uint32_t *buflen, void **ud){ uint16_t seq = 0; uint32_t rtp_ts = 0; uint64_t timetick; uint64_t ts = 0; int first = 0; int finished = 0; rtp_packet *rpak; int32_t diff; diff = m_buffer_len - m_bytes_used; if (diff >= 2) { // Still bytes in the buffer... *buffer = m_buffer + m_bytes_used; *buflen = diff;#ifdef DEBUG_RTP_PAKS rtp_message(LOG_DEBUG, "%s Still left - %d bytes", m_name, *buflen);#endif#if 0 rtp_message(LOG_DEBUG, "%s start %02x %02x %02x %02x %02x", m_name, (*buffer)[0], (*buffer)[1], (*buffer)[2], (*buffer)[3], (*buffer)[4]);#endif return (m_last_realtime); } else { m_buffer_len = 0; while (finished == 0) { rpak = m_head; if (rpak == NULL) { // incomplete frame - bail on this player_error_message("%s - This should never happen - rtp bytestream" "is incomplete and active", m_name); player_error_message("Please report to mpeg4ip"); player_error_message("first %d seq %u ts %x blen %d", first, seq, rtp_ts, m_buffer_len); m_buffer_len = 0; m_bytes_used = 0; return 0; } remove_packet_rtp_queue(rpak, 0); if (first == 0) { seq = rpak->rtp_pak_seq + 1; ts = rpak->pd.rtp_pd_timestamp; rtp_ts = rpak->rtp_pak_ts; first = 1; } else { if ((seq != rpak->rtp_pak_seq) || (rtp_ts != rpak->rtp_pak_ts)) { if (seq != rpak->rtp_pak_seq) { rtp_message(LOG_INFO, "%s missing rtp sequence - should be %u is %u", m_name, seq, rpak->rtp_pak_seq); } else { rtp_message(LOG_INFO, "%s timestamp error - seq %u should be %x is %x", m_name, seq, rtp_ts, rpak->rtp_pak_ts); } m_buffer_len = 0; rtp_ts = rpak->rtp_pak_ts; } seq = rpak->rtp_pak_seq + 1; } uint8_t *from; uint32_t len; from = (uint8_t *)rpak->rtp_data + m_skip_on_advance_bytes; len = rpak->rtp_data_len - m_skip_on_advance_bytes; if ((m_buffer_len + len) >= m_buffer_len_max) { // realloc m_buffer_len_max = m_buffer_len + len + 1024; m_buffer = (uint8_t *)realloc(m_buffer, m_buffer_len_max); } memcpy(m_buffer + m_buffer_len, from, len); m_buffer_len += len; if (rpak->rtp_pak_m == 1) { finished = 1; } xfree(rpak); } m_bytes_used = 0; *buffer = m_buffer + m_bytes_used; *buflen = m_buffer_len - m_bytes_used;#if 0 rtp_message(LOG_DEBUG, "%s start %02x %02x %02x %02x %02x", m_name, (*buffer)[0], (*buffer)[1], (*buffer)[2], (*buffer)[3], (*buffer)[4]);#endif#ifdef DEBUG_RTP_PAKS rtp_message(LOG_DEBUG, "%s buffer len %d", m_name, m_buffer_len);#endif } timetick = rtp_ts_to_msec(rtp_ts, ts, m_wrap_offset); m_ts = rtp_ts; return (timetick);}int CRtpByteStream::skip_next_frame (uint64_t *pts, int *hasSyncFrame, uint8_t **buffer, uint32_t *buflen, void **ud){ uint64_t ts; *hasSyncFrame = -1; // we don't know if we have a sync frame m_buffer_len = m_bytes_used = 0; if (m_head == NULL) return 0; ts = m_head->rtp_pak_ts; do { remove_packet_rtp_queue(m_head, 1); } while (m_head != NULL && m_head->rtp_pak_ts == ts); if (m_head == NULL) return 0; init(); m_buffer_len = m_bytes_used = 0; ts = start_next_frame(buffer, buflen, ud); *pts = ts; return (1);}void CRtpByteStream::used_bytes_for_frame (uint32_t bytes){ m_bytes_used += bytes;#ifdef DEBUG_RTP_PAKS rtp_message(LOG_DEBUG, "%s Used %d bytes", m_name, bytes);#endif}int CRtpByteStream::have_no_data (void){ rtp_packet *temp, *first; first = temp = m_head; if (temp == NULL) return TRUE; do { if (temp->rtp_pak_m == 1) return FALSE; temp = temp->rtp_next; } while (temp != NULL && temp != first); return TRUE;}void CRtpByteStream::flush_rtp_packets (void){ CRtpByteStreamBase::flush_rtp_packets(); m_bytes_used = m_buffer_len = 0;}CAudioRtpByteStream::CAudioRtpByteStream (unsigned int rtp_pt, format_list_t *fmt, int ondemand, uint64_t tps, rtp_packet **head, rtp_packet **tail, int rtp_seq_set, uint16_t rtp_base_seq, int rtp_ts_set, uint32_t rtp_base_ts, int rtcp_received, uint32_t ntp_frac, uint32_t ntp_sec, uint32_t rtp_ts) : CRtpByteStream("audio", fmt, rtp_pt, ondemand, tps, head, tail, rtp_seq_set, rtp_base_seq, rtp_ts_set, rtp_base_ts, rtcp_received, ntp_frac, ntp_sec, rtp_ts){ init(); m_working_pak = NULL;}CAudioRtpByteStream::~CAudioRtpByteStream(void){}int CAudioRtpByteStream::have_no_data (void){ if (m_head == NULL) return TRUE; return FALSE;}int CAudioRtpByteStream::check_rtp_frame_complete_for_payload_type (void){ return m_head != NULL;}void CAudioRtpByteStream::reset (void){ if (m_working_pak != NULL) { xfree(m_working_pak); m_working_pak = NULL; } CRtpByteStream::reset();}uint64_t CAudioRtpByteStream::start_next_frame (uint8_t **buffer, uint32_t *buflen, void **ud){ uint32_t ts; int32_t diff; if (m_working_pak != NULL) { diff = m_working_pak->rtp_data_len - m_bytes_used; } else diff = 0; if (diff > 0) { // Still bytes in the buffer... *buffer = (uint8_t *)m_working_pak->rtp_data + m_bytes_used; *buflen = diff; ts = m_ts;#ifdef DEBUG_RTP_PAKS rtp_message(LOG_DEBUG, "%s Still left - %d bytes", m_name, *buflen);#endif return (m_last_realtime); } else { if (m_working_pak) xfree(m_working_pak); m_buffer_len = 0; m_bytes_used = m_skip_on_advance_bytes; m_working_pak = m_head; remove_packet_rtp_queue(m_working_pak, 0); if (m_have_first_pak_ts && m_seq_recvd != m_working_pak->rtp_pak_seq) { rtp_message(LOG_ERR, "%s missing seq should be %d got %d", m_name, m_seq_recvd, m_working_pak->rtp_pak_seq); } m_seq_recvd = m_working_pak->rtp_pak_seq + 1; *buffer = (uint8_t *)m_working_pak->rtp_data + m_bytes_used; *buflen = m_working_pak->rtp_data_len; ts = m_working_pak->rtp_pak_ts;#ifdef DEBUG_RTP_PAKS rtp_message(LOG_DEBUG, "%s buffer seq %d ts %x len %d", m_name, m_working_pak->rtp_pak_seq, m_working_pak->rtp_pak_ts, m_buffer_len);#endif } // We're going to have to handle wrap better... uint64_t retts = rtp_ts_to_msec(ts, m_working_pak->pd.rtp_pd_timestamp, m_wrap_offset); m_ts = ts; return retts;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -