📄 rtp_bytestream.cpp
字号:
/* * The contents of this file are subject to the Mozilla Public * License Version 1.1 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of * the License at http://www.mozilla.org/MPL/ * * Software distributed under the License is distributed on an "AS * IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or * implied. See theL icense for the specific language governing * rights and limitations under the License. * * The Original Code is MPEG4IP. * * The Initial Developer of the Original Code is Cisco Systems Inc. * Portions created by Cisco Systems Inc. are * Copyright (C) Cisco Systems Inc. 2001. All Rights Reserved. * * Contributor(s): * Bill May wmay@cisco.com */#include "systems.h"#include <rtp/rtp.h>#include <rtp/memory.h>#include <sdp/sdp.h> // for NTP_TO_UNIX_TIME#include "rtp_bytestream.h"#include "our_config_file.h"//#define DEBUG_RTP_PAKS 1//#define DEBUG_RTP_BCAST 1//#define DEBUG_RTP_WCLOCK 1#ifdef _WIN32DEFINE_MESSAGE_MACRO(rtp_message, "rtpbyst")#else#define rtp_message(loglevel, fmt...) message(loglevel, "rtpbyst", fmt)#endif/* * add_rtp_packet_to_queue() - adds rtp packet to doubly linked lists - * this is used both by the bytestream, and by the player_media when trying * to determine which rtp payload type is being used. */int add_rtp_packet_to_queue (rtp_packet *pak, rtp_packet **head, rtp_packet **tail){ rtp_packet *q; int inserted = TRUE; int16_t diff;#ifdef DEBUG_RTP_PAKS rtp_message(LOG_DEBUG, "CBThread %u - m %u pt %u seq %u ts %u len %d", SDL_ThreadID(), pak->rtp_pak_m, pak->rtp_pak_pt, pak->rtp_pak_seq, pak->rtp_pak_ts, pak->rtp_data_len);#endif if (*head == NULL) { *head = *tail = pak; } else if (*head == *tail) { if (pak->rtp_pak_seq == (*head)->rtp_pak_seq) { rtp_message(LOG_ERR, "Duplicate RTP sequence number %d received", pak->rtp_pak_seq); inserted = FALSE; } else { (*head)->rtp_next = pak; (*head)->rtp_prev = pak; pak->rtp_next = pak->rtp_prev = *head; diff = pak->rtp_pak_seq - (*head)->rtp_pak_seq; if (diff > 0) { *tail = pak; } else { *head = pak; } } } else if ((((*head)->rtp_pak_seq < (*tail)->rtp_pak_seq) && ((pak->rtp_pak_seq > (*tail)->rtp_pak_seq) || (pak->rtp_pak_seq < (*head)->rtp_pak_seq))) || (((*head)->rtp_pak_seq > (*tail)->rtp_pak_seq) && ((pak->rtp_pak_seq > (*tail)->rtp_pak_seq && pak->rtp_pak_seq < (*head)->rtp_pak_seq)))) { // insert between tail and head // Maybe move head - probably move tail. (*tail)->rtp_next = pak; pak->rtp_prev = *tail; (*head)->rtp_prev = pak; pak->rtp_next = *head; diff = (*head)->rtp_pak_seq - pak->rtp_pak_seq; if (diff > 0 && diff < 4) { // between tail and head, and really close to the head - move the // head pointer *head = pak; } else { // otherwise, just insert at end *tail = pak; } } else { // insert in middle // Loop through until we find where it should fit q = *head; do { // check for duplicates if (pak->rtp_pak_seq == q->rtp_pak_seq || pak->rtp_pak_seq == q->rtp_next->rtp_pak_seq) { // dup seq number inserted = FALSE; break; } /* * Okay - this is disgusting, but works. The first part (before the * or) will see if pak->rtp_pak_seq is between q and q->rtp_next, * assuming that the sequence number for q and q->rtp_next are ascending. * * 2nd part of the if is the converse case (q->rtp_next->rtp_pak_seq * is smaller than q->rtp_pak_seq). In that case, we need to make * sure that pak->rtp_pak_seq is either larger than q->rtp_pak_seq * or less than q->rtp_next->rtp_pak_seq */ if (((q->rtp_next->rtp_pak_seq > q->rtp_pak_seq) && (q->rtp_pak_seq < pak->rtp_pak_seq && pak->rtp_pak_seq < q->rtp_next->rtp_pak_seq)) || ((q->rtp_next->rtp_pak_seq < q->rtp_pak_seq) && (pak->rtp_pak_seq < q->rtp_next->rtp_pak_seq || pak->rtp_pak_seq > q->rtp_pak_seq))) { q->rtp_next->rtp_prev = pak; pak->rtp_next = q->rtp_next; q->rtp_next = pak; pak->rtp_prev = q; break; } q = q->rtp_next; } while (q != *tail); if (q == *tail) { inserted = FALSE; rtp_message(LOG_ERR, "Couldn't insert %u between %u and %u", pak->rtp_pak_seq, (*head)->rtp_pak_seq, (*tail)->rtp_pak_seq); } } if (inserted == FALSE) { rtp_message(LOG_ERR, "Couldn't insert pak"); rtp_message(LOG_DEBUG, "pak seq %u", pak->rtp_pak_seq); if (*head != NULL) { rtp_message(LOG_DEBUG, "head seq %u, tail seq %u", (*head)->rtp_pak_seq, (*tail)->rtp_pak_seq); } xfree(pak); return (0); } return (1);}CRtpByteStreamBase::CRtpByteStreamBase(const char *name, format_list_t *fmt, unsigned int rtp_pt, int ondemand, uint64_t tps, rtp_packet **head, rtp_packet **tail, int rtp_seq_set, uint16_t rtp_base_seq, int rtp_ts_set, uint32_t rtp_base_ts, int rtcp_received, uint32_t ntp_frac, uint32_t ntp_sec, uint32_t rtp_ts) : COurInByteStream(name){ m_fmt = fmt; m_head = *head; *head = NULL; m_tail = *tail; *tail = NULL; set_rtp_base_ts(rtp_base_ts); set_rtp_base_seq(rtp_base_seq); m_have_first_pak_ts = false; m_rtp_pt = rtp_pt; uint64_t temp; temp = config.get_config_value(CONFIG_RTP_BUFFER_TIME_MSEC); if (temp > 0) { m_rtp_buffer_time = temp; } else { m_rtp_buffer_time = 2 * M_LLU; } m_timescale = tps; init(); m_ts = 0; m_total =0; m_skip_on_advance_bytes = 0; m_stream_ondemand = ondemand; m_rtcp_received = false; m_rtp_packet_mutex = SDL_CreateMutex(); m_buffering = 0; m_eof = 0; m_psptr = NULL; m_have_sync_info = false; if (rtcp_received) { calculate_wallclock_offset_from_rtcp(ntp_frac, ntp_sec, rtp_ts); }}CRtpByteStreamBase::~CRtpByteStreamBase (void){ flush_rtp_packets(); if (m_rtp_packet_mutex) { SDL_DestroyMutex(m_rtp_packet_mutex); m_rtp_packet_mutex = NULL; }}// set_sync - this is for audio only - it will send messages to any// video rtp bytestream to perform the syncronizatiovoid CRtpByteStreamBase::set_sync (CPlayerSession *psptr) { m_psptr = psptr; }void CRtpByteStreamBase::init (void){ m_wrap_offset = 0; m_offset_in_pak = m_skip_on_advance_bytes; m_eof = 0;}void CRtpByteStreamBase::set_wallclock_offset (uint64_t wclock, uint32_t rtp_ts) { int32_t rtp_ts_diff; uint64_t wclock_calc; bool set = true; bool had_recvd_rtcp; if (m_rtcp_received == 1 && m_stream_ondemand == 0) { rtp_ts_diff = rtp_ts; rtp_ts_diff -= m_rtcp_rtp_ts; wclock_calc = rtp_ts_diff * M_LLU; wclock_calc /= m_timescale; wclock_calc += m_rtcp_ts; if (wclock_calc != wclock) {#ifdef DEBUG_RTP_WCLOCK rtp_message(LOG_DEBUG, "%s - set wallclock - wclock should be "LLU" is "LLU, m_name, wclock_calc, wclock);#endif // don't change wclock offset if it's > 100 msec - otherwise, // it's annoying noise int64_t diff = wclock_calc - wclock; if (abs(diff) < 100) { set = false; } } } had_recvd_rtcp = m_rtcp_received; m_rtcp_received = true; SDL_LockMutex(m_rtp_packet_mutex); if (set) { m_rtcp_ts = wclock; m_rtcp_rtp_ts = rtp_ts; } if (m_have_first_pak_ts) { int64_t diff; diff = (int64_t)rtp_ts; diff -= (int64_t)m_first_pak_rtp_ts; int64_t compare = 3600 * m_timescale;#ifdef DEBUG_RTP_WCLOCK rtp_message(LOG_DEBUG, "%s - rtp ts %u rtcp %u %lld", m_name, m_first_pak_rtp_ts, rtp_ts, diff);#endif if (diff > compare) { // adjust once an hour, to keep errors low // we'll adjust the timestamp and rtp timestamp diff *= M_LLU; diff /= m_timescale; m_first_pak_ts += diff; m_first_pak_rtp_ts = rtp_ts;#ifdef DEBUG_RTP_WCLOCK rtp_message(LOG_DEBUG, "CHANGE %s - first pak ts is now %llu rtp %u", m_name, m_first_pak_ts, m_first_pak_rtp_ts);#endif } // We've received an RTCP - see if we need to syncronize // the video streams. if (m_psptr != NULL) { rtcp_sync_t sync; sync.first_pak_ts = m_first_pak_ts; sync.first_pak_rtp_ts = m_first_pak_rtp_ts; sync.rtcp_ts = m_rtcp_ts; sync.rtcp_rtp_ts = m_rtcp_rtp_ts; sync.timescale = m_timescale; m_psptr->syncronize_rtp_bytestreams(&sync); } else { // if this is our first rtcp, try to syncronize if (!had_recvd_rtcp) syncronize(NULL); } } SDL_UnlockMutex(m_rtp_packet_mutex);}/* * calculate_wallclock_offset_from_rtcp * Given certain information from an rtcp message, Calculate what the * wallclock time for rtp sequence number 0 would be. */voidCRtpByteStreamBase::calculate_wallclock_offset_from_rtcp (uint32_t ntp_frac, uint32_t ntp_sec, uint32_t rtp_ts){ uint64_t wclock; wclock = ntp_frac; wclock *= M_LLU; wclock /= (I_LLU << 32); uint64_t offset; offset = ntp_sec; offset -= NTP_TO_UNIX_TIME; offset *= M_LLU; wclock += offset;#ifdef DEBUG_RTP_WCLOCK rtp_message(LOG_DEBUG, "%s RTCP data - sec %u frac %u value %llu ts %u", m_name, ntp_sec, ntp_frac, wclock, rtp_ts);#endif set_wallclock_offset(wclock, rtp_ts);}/* * recv_callback - callback for when bytestream is active - basically, * put things on the queue */void CRtpByteStreamBase::recv_callback (struct rtp *session, rtp_event *e){ switch (e->type) { case RX_RTP: rtp_packet *rpak; rpak = (rtp_packet *)e->data; if (rpak->rtp_data_len == 0) { xfree(rpak); } else { // need to add lock/unlock of mutex here if (m_buffering == 0) { rpak->pd.rtp_pd_timestamp = get_time_of_day(); rpak->pd.rtp_pd_have_timestamp = 1; } if (SDL_mutexP(m_rtp_packet_mutex) == -1) { rtp_message(LOG_CRIT, "SDL Lock mutex failure in rtp bytestream recv"); return; } add_rtp_packet_to_queue(rpak, &m_head, &m_tail); if (SDL_mutexV(m_rtp_packet_mutex) == -1) { rtp_message(LOG_CRIT, "SDL Lock mutex failure in rtp bytestream recv"); return; } m_recvd_pak = 1; } break; case RX_SR: rtcp_sr *srpak; srpak = (rtcp_sr *)e->data; calculate_wallclock_offset_from_rtcp(srpak->ntp_frac, srpak->ntp_sec, srpak->rtp_ts); break; case RX_APP: free(e->data); break; default:#if 0 rtp_message(LOG_DEBUG, "Thread %u - Callback from rtp with %d %p", SDL_ThreadID(),e->type, e->rtp_data);#endif break; break; }}/* * syncronize is used to adjust a video broadcasts time based * on an audio broadcasts time. * We now start the audio and video just based on the Unix time of the * first packet. Then we use this to adjust when both sides have rtcp * packets. * It will also work if we never get in RTCP - this routine won't be * called - but our sync will be off. */void CRtpByteStreamBase::syncronize (rtcp_sync_t *sync){ // need to recalculate m_first_pak_ts here uint64_t adjust_first_pak_ts; int64_t adjust_first_pak; int64_t audio_diff, our_diff; if (sync == NULL) { if (!m_have_sync_info) return; sync = &m_sync_info; } else { if (m_rtcp_received == false) m_sync_info = *sync; } m_have_sync_info = true; if (m_psptr != NULL) return; if (m_rtcp_received == false) return; if (m_have_first_pak_ts == false) return; // First calculation - use the first packet's timestamp to calculate // what the timestamp value would be at the RTCP's RTP timestamp value // adjust_first_pak is amount we need to add to the first_packet's timestamp // We do this for the data we got for the audio stream adjust_first_pak = sync->rtcp_rtp_ts; adjust_first_pak -= sync->first_pak_rtp_ts; adjust_first_pak *= 1000; adjust_first_pak /= (int64_t)sync->timescale; adjust_first_pak_ts = sync->first_pak_ts; adjust_first_pak_ts += adjust_first_pak; // Now, we compute the difference between that value and what the RTCP // says the timestamp should be audio_diff = adjust_first_pak_ts; audio_diff -= sync->rtcp_ts;#ifdef DEBUG_RTP_WCLOCK rtp_message(LOG_DEBUG, "%s - audio rtcp rtp ts %u first pak %u", m_name, sync->rtcp_rtp_ts, sync->first_pak_rtp_ts); rtp_message(LOG_DEBUG, "%s - audio rtcp ts %llu first pak %llu", m_name, sync->rtcp_ts, sync->first_pak_ts); rtp_message(LOG_DEBUG, "%s - adjusted first pak %lld ts %llu", m_name, adjust_first_pak, sync->timescale); rtp_message(LOG_DEBUG, "%s - diff %lld", m_name, audio_diff);#endif // Now, we do the same calculation for the numbers for our timestamps - // find the timestamp by adjusting the first packet's timestamp to the // timestamp based on the current RTCP RTP timestamp; adjust_first_pak = m_rtcp_rtp_ts; adjust_first_pak -= m_first_pak_rtp_ts; adjust_first_pak *= 1000; adjust_first_pak /= (int64_t)m_timescale; adjust_first_pak_ts = m_first_pak_ts; adjust_first_pak_ts += adjust_first_pak; our_diff = adjust_first_pak_ts; our_diff -= m_rtcp_ts;#ifdef DEBUG_RTP_WCLOCK rtp_message(LOG_DEBUG, "%s - our rtcp rtp ts %u first pak %u", m_name, m_rtcp_rtp_ts, m_first_pak_rtp_ts); rtp_message(LOG_DEBUG, "%s - our rtcp ts %llu first pak %llu", m_name, m_rtcp_ts, m_first_pak_ts); rtp_message(LOG_DEBUG, "%s - adjusted first pak %lld ts %llu", m_name, adjust_first_pak, m_timescale); rtp_message(LOG_DEBUG, "%s - diff %lld", m_name, our_diff); rtp_message(LOG_INFO, "%s adjusting first pak ts by "LLD, m_name, audio_diff - our_diff);#endif // Now, we very simply add the difference between the 2 to get // what the equivalent start time would be. Note that these values // for the first packet are not fixed - they change over time to avoid // wrap issues. m_first_pak_ts += audio_diff - our_diff;}void CRtpByteStreamBase::remove_packet_rtp_queue (rtp_packet *pak, int free){ if (pak == NULL) return; SDL_LockMutex(m_rtp_packet_mutex); if ((m_head == pak) && (m_head->rtp_next == NULL || m_head->rtp_next == m_head)) { m_head = NULL; m_tail = NULL; } else { pak->rtp_next->rtp_prev = pak->rtp_prev; pak->rtp_prev->rtp_next = pak->rtp_next; if (m_head == pak) { m_head = pak->rtp_next; } if (m_tail == pak) { m_tail = pak->rtp_prev; } } if (pak->rtp_data_len < 0) { // restore the packet data length pak->rtp_data_len = 0 - pak->rtp_data_len; } if (free == 1) { xfree(pak); } SDL_UnlockMutex(m_rtp_packet_mutex);}void CRtpByteStreamBase::flush_rtp_packets (void){ while (m_head != NULL) { remove_packet_rtp_queue(m_head, 1); } m_buffering = 0;}void CRtpByteStreamBase::pause(void){ reset();}/* * recv_task - called from the player media rtp task - make sure * we have 2 seconds of buffering, then go... */int CRtpByteStreamBase::recv_task (int decode_thread_waiting){ /*
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -