📄 rtp_bytestream.cpp
字号:
/*
* The contents of this file are subject to the Mozilla Public
* License Version 1.1 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of
* the License at http://www.mozilla.org/MPL/
*
* Software distributed under the License is distributed on an "AS
* IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or
* implied. See the License for the specific language governing
* rights and limitations under the License.
*
* The Original Code is MPEG4IP.
*
* The Initial Developer of the Original Code is Cisco Systems Inc.
* Portions created by Cisco Systems Inc. are
* Copyright (C) Cisco Systems Inc. 2001. All Rights Reserved.
*
* Contributor(s):
* Bill May wmay@cisco.com
*/
#include "systems.h"
#include <rtp/rtp.h>
#include <rtp/memory.h>
#include <sdp/sdp.h> // for NTP_TO_UNIX_TIME
#include "rtp_bytestream.h"
#include "our_config_file.h"
//#define DEBUG_RTP_PAKS 1
//#define DEBUG_RTP_BCAST 1
#ifdef _WIN32
DEFINE_MESSAGE_MACRO(rtp_message, "rtpbyst")
#else
#define rtp_message(loglevel, fmt...) message(loglevel, "rtpbyst", fmt)
#endif
/*
* add_rtp_packet_to_queue() - adds rtp packet to doubly linked lists -
* this is used both by the bytestream, and by the player_media when trying
* to determine which rtp payload type is being used.
*/
int add_rtp_packet_to_queue (rtp_packet *pak,
rtp_packet **head,
rtp_packet **tail)
{
rtp_packet *q;
int inserted = TRUE;
int16_t diff;
#ifdef DEBUG_RTP_PAKS
rtp_message(LOG_DEBUG, "CBThread %u - m %u pt %u seq %u ts %u len %d",
SDL_ThreadID(),
pak->rtp_pak_m, pak->rtp_pak_pt, pak->rtp_pak_seq,
pak->rtp_pak_ts, pak->rtp_data_len);
#endif
if (*head == NULL) {
*head = *tail = pak;
} else if (*head == *tail) {
if (pak->rtp_pak_seq == (*head)->rtp_pak_seq) {
rtp_message(LOG_ERR, "Duplicate RTP sequence number %d received",
pak->rtp_pak_seq);
inserted = FALSE;
} else {
(*head)->rtp_next = pak;
(*head)->rtp_prev = pak;
pak->rtp_next = pak->rtp_prev = *head;
diff = pak->rtp_pak_seq - (*head)->rtp_pak_seq;
if (diff > 0) {
*tail = pak;
} else {
*head = pak;
}
}
} else if ((((*head)->rtp_pak_seq < (*tail)->rtp_pak_seq) &&
((pak->rtp_pak_seq > (*tail)->rtp_pak_seq) ||
(pak->rtp_pak_seq < (*head)->rtp_pak_seq))) ||
(((*head)->rtp_pak_seq > (*tail)->rtp_pak_seq) &&
((pak->rtp_pak_seq > (*tail)->rtp_pak_seq &&
pak->rtp_pak_seq < (*head)->rtp_pak_seq)))) {
// insert between tail and head
// Maybe move head - probably move tail.
(*tail)->rtp_next = pak;
pak->rtp_prev = *tail;
(*head)->rtp_prev = pak;
pak->rtp_next = *head;
diff = (*head)->rtp_pak_seq - pak->rtp_pak_seq;
if (diff > 0 && diff < 4) {
// between tail and head, and really close to the head - move the
// head pointer
*head = pak;
} else {
// otherwise, just insert at end
*tail = pak;
}
} else {
// insert in middle
// Loop through until we find where it should fit
q = *head;
do {
// check for duplicates
if (pak->rtp_pak_seq == q->rtp_pak_seq || pak->rtp_pak_seq == q->rtp_next->rtp_pak_seq) {
// dup seq number
inserted = FALSE;
break;
}
/*
* Okay - this is disgusting, but works. The first part (before the
* or) will see if pak->rtp_pak_seq is between q and q->rtp_next,
* assuming that the sequence number for q and q->rtp_next are ascending.
*
* 2nd part of the if is the converse case (q->rtp_next->rtp_pak_seq
* is smaller than q->rtp_pak_seq). In that case, we need to make
* sure that pak->rtp_pak_seq is either larger than q->rtp_pak_seq
* or less than q->rtp_next->rtp_pak_seq
*/
if (((q->rtp_next->rtp_pak_seq > q->rtp_pak_seq) &&
(q->rtp_pak_seq < pak->rtp_pak_seq &&
pak->rtp_pak_seq < q->rtp_next->rtp_pak_seq)) ||
((q->rtp_next->rtp_pak_seq < q->rtp_pak_seq) &&
(pak->rtp_pak_seq < q->rtp_next->rtp_pak_seq ||
pak->rtp_pak_seq > q->rtp_pak_seq))) {
q->rtp_next->rtp_prev = pak;
pak->rtp_next = q->rtp_next;
q->rtp_next = pak;
pak->rtp_prev = q;
break;
}
q = q->rtp_next;
} while (q != *tail);
if (q == *tail) {
inserted = FALSE;
rtp_message(LOG_ERR, "Couldn't insert %u between %u and %u",
pak->rtp_pak_seq,
(*head)->rtp_pak_seq,
(*tail)->rtp_pak_seq);
}
}
if (inserted == FALSE) {
rtp_message(LOG_ERR, "Couldn't insert pak");
rtp_message(LOG_DEBUG, "pak seq %u", pak->rtp_pak_seq);
if (*head != NULL) {
rtp_message(LOG_DEBUG, "head seq %u, tail seq %u",
(*head)->rtp_pak_seq,
(*tail)->rtp_pak_seq);
}
xfree(pak);
return (0);
}
return (1);
}
CRtpByteStreamBase::CRtpByteStreamBase(const char *name,
format_list_t *fmt,
unsigned int rtp_pt,
int ondemand,
uint64_t tps,
rtp_packet **head,
rtp_packet **tail,
int rtp_seq_set,
uint16_t rtp_base_seq,
int rtp_ts_set,
uint32_t rtp_base_ts,
int rtcp_received,
uint32_t ntp_frac,
uint32_t ntp_sec,
uint32_t rtp_ts) :
COurInByteStream(name)
{
m_fmt = fmt;
m_head = *head;
*head = NULL;
m_tail = *tail;
*tail = NULL;
m_rtp_base_ts_set = rtp_ts_set;
m_rtp_base_ts = rtp_base_ts;
m_rtp_base_seq_set = rtp_seq_set;
m_rtp_base_seq = rtp_base_seq;
m_rtp_pt = rtp_pt;
uint64_t temp;
temp = config.get_config_value(CONFIG_RTP_BUFFER_TIME_MSEC);
if (temp > 0) {
m_rtp_buffer_time = temp;
} else {
m_rtp_buffer_time = (ondemand ? 2 : 5)* M_LLU;
}
m_rtptime_tickpersec = tps;
init();
m_ts = 0;
m_total =0;
m_skip_on_advance_bytes = 0;
m_stream_ondemand = ondemand;
m_wallclock_offset_set = 0;
m_rtp_packet_mutex = SDL_CreateMutex();
m_buffering = 0;
m_eof = 0;
if (rtcp_received) {
calculate_wallclock_offset_from_rtcp(ntp_frac, ntp_sec, rtp_ts);
}
}
CRtpByteStreamBase::~CRtpByteStreamBase (void)
{
flush_rtp_packets();
if (m_rtp_packet_mutex) {
SDL_DestroyMutex(m_rtp_packet_mutex);
m_rtp_packet_mutex = NULL;
}
}
void CRtpByteStreamBase::init (void)
{
m_wrap_offset = 0;
m_offset_in_pak = m_skip_on_advance_bytes;
m_eof = 0;
}
void CRtpByteStreamBase::set_wallclock_offset (uint64_t wclock,
uint32_t rtp_ts)
{
m_wallclock_offset_set = 1;
SDL_LockMutex(m_rtp_packet_mutex);
m_wallclock_offset = wclock;
m_wallclock_rtp_ts = rtp_ts;
m_wallclock_offset_wrap = m_wrap_offset;
if (((m_ts & 0x80000000) == 0x80000000) &&
((rtp_ts & 0x80000000) == 0)) {
m_wallclock_offset_wrap += (I_LLU << 32);
}
SDL_UnlockMutex(m_rtp_packet_mutex);
}
/*
* calculate_wallclock_offset_from_rtcp
* Given certain information from an rtcp message, Calculate what the
* wallclock time for rtp sequence number 0 would be.
*/
void
CRtpByteStreamBase::calculate_wallclock_offset_from_rtcp (uint32_t ntp_frac,
uint32_t ntp_sec,
uint32_t rtp_ts)
{
uint64_t wclock;
wclock = ntp_frac;
wclock *= M_LLU;
wclock /= (I_LLU << 32);
uint64_t offset;
offset = ntp_sec;
offset -= NTP_TO_UNIX_TIME;
offset *= M_LLU;
wclock += offset;
#ifdef DEBUG_RTP_BCAST
rtp_message(LOG_DEBUG, "%s RTCP data - sec %u frac %u value %llu ts %d",
m_name, ntp_sec, ntp_frac, wclock, rtp_ts);
#endif
set_wallclock_offset(wclock, rtp_ts);
}
/*
* recv_callback - callback for when bytestream is active - basically,
* put things on the queue
*/
void CRtpByteStreamBase::recv_callback (struct rtp *session, rtp_event *e)
{
switch (e->type) {
case RX_RTP:
rtp_packet *rpak;
rpak = (rtp_packet *)e->data;
if (rpak->rtp_data_len == 0) {
xfree(rpak);
} else {
// need to add lock/unlock of mutex here
if (SDL_mutexP(m_rtp_packet_mutex) == -1) {
rtp_message(LOG_CRIT, "SDL Lock mutex failure in rtp bytestream recv");
return;
}
add_rtp_packet_to_queue(rpak, &m_head, &m_tail);
if (SDL_mutexV(m_rtp_packet_mutex) == -1) {
rtp_message(LOG_CRIT, "SDL Lock mutex failure in rtp bytestream recv");
return;
}
m_recvd_pak = 1;
}
break;
case RX_SR:
rtcp_sr *srpak;
srpak = (rtcp_sr *)e->data;
calculate_wallclock_offset_from_rtcp(srpak->ntp_frac,
srpak->ntp_sec,
srpak->rtp_ts);
break;
default:
#if 0
rtp_message(LOG_DEBUG, "Thread %u - Callback from rtp with %d %p",
SDL_ThreadID(),e->type, e->rtp_data);
#endif
break;
break;
}
}
void CRtpByteStreamBase::remove_packet_rtp_queue (rtp_packet *pak,
int free)
{
SDL_LockMutex(m_rtp_packet_mutex);
if ((m_head == pak) &&
(m_head->rtp_next == NULL || m_head->rtp_next == m_head)) {
m_head = NULL;
m_tail = NULL;
} else {
pak->rtp_next->rtp_prev = pak->rtp_prev;
pak->rtp_prev->rtp_next = pak->rtp_next;
if (m_head == pak) {
m_head = pak->rtp_next;
}
if (m_tail == pak) {
m_tail = pak->rtp_prev;
}
}
if (pak->rtp_data_len < 0) {
// restore the packet data length
pak->rtp_data_len = 0 - pak->rtp_data_len;
}
if (free == 1) {
xfree(pak);
}
SDL_UnlockMutex(m_rtp_packet_mutex);
}
void CRtpByteStreamBase::flush_rtp_packets (void)
{
while (m_head != NULL) {
remove_packet_rtp_queue(m_head, 1);
}
m_buffering = 0;
}
/*
* recv_task - called from the player media rtp task - make sure
* we have 2 seconds of buffering, then go...
*/
int CRtpByteStreamBase::recv_task (int decode_thread_waiting)
{
/*
* We need to make sure we have some buffering. We'll buffer
* about 2 seconds worth, then let the decode task know to go...
*/
if (m_buffering == 0) {
uint32_t head_ts, tail_ts;
if (m_head != NULL) {
/*
* Payload type the same. Make sure we have at least 2 seconds of
* good data
*/
if (rtp_ready() == 0) {
rtp_message(LOG_DEBUG,
"Determined payload type, but rtp bytestream is not ready");
uint64_t calc;
do {
head_ts = m_head->rtp_pak_ts;
tail_ts = m_tail->rtp_pak_ts;
calc = (tail_ts - head_ts);
calc *= 1000;
calc /= m_rtptime_tickpersec;
if (calc > m_rtp_buffer_time) {
rtp_packet *temp = m_head;
m_head = m_head->rtp_next;
m_tail->rtp_next = m_head;
m_head->rtp_prev = m_tail;
xfree((void *)temp);
}
} while (calc > m_rtp_buffer_time);
return 0;
}
if (check_rtp_frame_complete_for_payload_type()) {
head_ts = m_head->rtp_pak_ts;
tail_ts = m_tail->rtp_pak_ts;
if (head_ts > tail_ts &&
((head_ts & (1 << 31)) == (tail_ts & (1 << 31)))) {
return 0;
}
uint64_t calc;
calc = tail_ts;
calc -= head_ts;
calc *= M_LLU;
calc /= m_rtptime_tickpersec;
if (calc > m_rtp_buffer_time) {
if (m_rtp_base_ts_set == 0) {
rtp_message(LOG_NOTICE, "Setting rtp seq and time from 1st pak");
m_rtp_base_ts = m_head->rtp_pak_ts;
m_rtp_base_ts_set = 1;
m_rtpinfo_set_from_pak = 1;
} else {
m_rtpinfo_set_from_pak = 0;
if (m_rtp_base_seq_set != 0 &&
m_rtp_base_seq == m_head->rtp_pak_seq &&
m_rtp_base_ts != m_head->rtp_pak_ts) {
rtp_message(LOG_NOTICE, "%s - rtp ts doesn't match RTPInfo %d",
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -