📄 rtp_bytestream.cpp
字号:
m_name, m_head->rtp_pak_ts);
m_rtp_base_ts = m_head->rtp_pak_ts;
}
//
}
m_buffering = 1;
#if 1
rtp_message(LOG_INFO,
"%s buffering complete - seq %d head %u tail %u "LLD,
m_name, m_head->rtp_pak_seq,
head_ts, tail_ts, calc);
#endif
rtp_done_buffering();
}
}
}
} else {
if (decode_thread_waiting != 0) {
/*
* We're good with buffering - but the decode thread might have
* caught up, and will be waiting. Post a message to kickstart
* it
*/
if (check_rtp_frame_complete_for_payload_type()) {
return (1);
}
}
if (m_recvd_pak == 0) {
if (m_recvd_pak_timeout == 0) {
m_recvd_pak_timeout_time = get_time_of_day();
} else {
uint64_t timeout;
timeout = get_time_of_day() - m_recvd_pak_timeout_time;
if (m_stream_ondemand) {
uint64_t range_end = (uint64_t)(get_max_playtime() * 1000.0);
if (m_last_realtime + timeout >= range_end) {
rtp_message(LOG_DEBUG, "%s Timedout at range end", m_name);
m_eof = 1;
}
} else {
// broadcast - perhaps if we time out for a second or 2, we
// should re-init rtp ? We definately need to put some timing
// checks here.
session_desc_t *sptr = m_fmt->media->parent;
if (sptr->time_desc != NULL &&
sptr->time_desc->end_time != 0) {
time_t this_time;
this_time = time(NULL);
if (this_time > sptr->time_desc->end_time &&
timeout >= M_LLU) {
m_eof = 1;
}
}
}
}
m_recvd_pak_timeout++;
} else {
m_recvd_pak = 0;
m_recvd_pak_timeout = 0;
}
}
return (m_buffering);
}
int CRtpByteStreamBase::check_rtp_frame_complete_for_payload_type (void)
{
return (m_head && m_tail->rtp_pak_m == 1);
}
uint64_t CRtpByteStreamBase::rtp_ts_to_msec (uint32_t ts,
uint64_t &wrap_offset)
{
uint64_t timetick;
uint64_t adjusted_rtp_ts;
uint64_t adjusted_wc_rtp_ts;
if (((m_ts & 0x80000000) == 0x80000000) &&
((ts & 0x80000000) == 0)) {
wrap_offset += (I_LLU << 32);
}
if (m_stream_ondemand) {
adjusted_rtp_ts = wrap_offset;
adjusted_rtp_ts += ts;
adjusted_wc_rtp_ts = m_rtp_base_ts;
if (adjusted_wc_rtp_ts > adjusted_rtp_ts) {
timetick = adjusted_wc_rtp_ts - adjusted_rtp_ts;
timetick *= M_LLU;
timetick /= m_rtptime_tickpersec;
if (timetick > m_play_start_time) {
timetick = 0;
} else {
timetick = m_play_start_time - timetick;
}
} else {
timetick = adjusted_rtp_ts - adjusted_wc_rtp_ts;
timetick *= M_LLU;
timetick /= m_rtptime_tickpersec;
timetick += m_play_start_time;
}
} else {
SDL_LockMutex(m_rtp_packet_mutex);
adjusted_rtp_ts = wrap_offset;
adjusted_rtp_ts += ts;
adjusted_wc_rtp_ts = m_wallclock_offset_wrap;
adjusted_wc_rtp_ts += m_wallclock_rtp_ts;
SDL_UnlockMutex(m_rtp_packet_mutex);
if (adjusted_rtp_ts >= adjusted_wc_rtp_ts) {
timetick = adjusted_rtp_ts - adjusted_wc_rtp_ts;
timetick *= M_LLU;
timetick /= m_rtptime_tickpersec;
timetick += m_wallclock_offset;
} else {
timetick = adjusted_wc_rtp_ts - adjusted_rtp_ts;
timetick *= M_LLU;
timetick /= m_rtptime_tickpersec;
timetick = m_wallclock_offset - timetick;
}
#ifdef DEBUG_RTP_BCAST
rtp_message(LOG_DEBUG, "%s wcts %llu ts %llu wcntp %llu tp %llu",
m_name, adjusted_wc_rtp_ts, adjusted_rtp_ts, m_wallclock_offset,
timetick);
#endif
}
// record time
m_last_realtime = timetick;
return (timetick);
}
CRtpByteStream::CRtpByteStream(const char *name,
format_list_t *fmt,
unsigned int rtp_pt,
int ondemand,
uint64_t tickpersec,
rtp_packet **head,
rtp_packet **tail,
int rtp_seq_set,
uint16_t rtp_base_seq,
int rtp_ts_set,
uint32_t rtp_base_ts,
int rtcp_received,
uint32_t ntp_frac,
uint32_t ntp_sec,
uint32_t rtp_ts) :
CRtpByteStreamBase(name, fmt, rtp_pt, ondemand, tickpersec, head, tail,
rtp_seq_set, rtp_base_seq, rtp_ts_set, rtp_base_ts,
rtcp_received, ntp_frac, ntp_sec, rtp_ts)
{
m_buffer = (uint8_t *)malloc(4096);
m_buffer_len_max = 4096;
m_bytes_used = m_buffer_len = 0;
}
CRtpByteStream::~CRtpByteStream (void)
{
free(m_buffer);
m_buffer = NULL;
}
void CRtpByteStream::reset (void)
{
m_buffer_len = m_bytes_used = 0;
CRtpByteStreamBase::reset();
}
uint64_t CRtpByteStream::start_next_frame (uint8_t **buffer,
uint32_t *buflen,
void **ud)
{
uint16_t seq = 0;
uint32_t ts = 0;
uint64_t timetick;
int first = 0;
int finished = 0;
rtp_packet *rpak;
int32_t diff;
diff = m_buffer_len - m_bytes_used;
m_doing_add = 0;
if (diff > 2) {
// Still bytes in the buffer...
*buffer = m_buffer + m_bytes_used;
*buflen = diff;
#ifdef DEBUG_RTP_PAKS
rtp_message(LOG_DEBUG, "%s Still left - %d bytes", m_name, *buflen);
#endif
return (m_last_realtime);
} else {
m_buffer_len = 0;
while (finished == 0) {
rpak = m_head;
remove_packet_rtp_queue(rpak, 0);
if (first == 0) {
seq = rpak->rtp_pak_seq + 1;
ts = rpak->rtp_pak_ts;
first = 1;
} else {
if ((seq != rpak->rtp_pak_seq) ||
(ts != rpak->rtp_pak_ts)) {
if (seq != rpak->rtp_pak_seq) {
rtp_message(LOG_INFO, "%s missing rtp sequence - should be %u is %u",
m_name, seq, rpak->rtp_pak_seq);
} else {
rtp_message(LOG_INFO, "%s timestamp error - seq %u should be %x is %x",
m_name, seq, ts, rpak->rtp_pak_ts);
}
m_buffer_len = 0;
ts = rpak->rtp_pak_ts;
}
seq = rpak->rtp_pak_seq + 1;
}
uint8_t *from;
uint32_t len;
from = (uint8_t *)rpak->rtp_data + m_skip_on_advance_bytes;
len = rpak->rtp_data_len - m_skip_on_advance_bytes;
if ((m_buffer_len + len) > m_buffer_len_max) {
// realloc
m_buffer_len_max = m_buffer_len + len + 1024;
m_buffer = (uint8_t *)realloc(m_buffer, m_buffer_len_max);
}
memcpy(m_buffer + m_buffer_len,
from,
len);
m_buffer_len += len;
if (rpak->rtp_pak_m == 1) {
finished = 1;
}
xfree(rpak);
}
m_bytes_used = 0;
*buffer = m_buffer + m_bytes_used;
*buflen = m_buffer_len - m_bytes_used;
#ifdef DEBUG_RTP_PAKS
rtp_message(LOG_DEBUG, "%s buffer len %d", m_name, m_buffer_len);
#endif
}
timetick = rtp_ts_to_msec(ts, m_wrap_offset);
m_ts = ts;
return (timetick);
}
int CRtpByteStream::skip_next_frame (uint64_t *pts, int *hasSyncFrame,
uint8_t **buffer,
uint32_t *buflen)
{
uint64_t ts;
*hasSyncFrame = -1; // we don't know if we have a sync frame
m_buffer_len = m_bytes_used = 0;
if (m_head == NULL) return 0;
ts = m_head->rtp_pak_ts;
do {
remove_packet_rtp_queue(m_head, 1);
} while (m_head != NULL && m_head->rtp_pak_ts == ts);
if (m_head == NULL) return 0;
init();
m_buffer_len = m_bytes_used = 0;
ts = start_next_frame(buffer, buflen, NULL);
*pts = ts;
return (1);
}
void CRtpByteStream::used_bytes_for_frame (uint32_t bytes)
{
m_bytes_used += bytes;
#ifdef DEBUG_RTP_PAKS
rtp_message(LOG_DEBUG, "%s Used %d bytes", m_name, bytes);
#endif
}
int CRtpByteStream::have_no_data (void)
{
rtp_packet *temp, *first;
first = temp = m_head;
if (temp == NULL) return TRUE;
do {
if (temp->rtp_pak_m == 1) return FALSE;
temp = temp->rtp_next;
} while (temp != NULL && temp != first);
return TRUE;
}
void CRtpByteStream::flush_rtp_packets (void)
{
CRtpByteStreamBase::flush_rtp_packets();
m_bytes_used = m_buffer_len = 0;
}
CAudioRtpByteStream::CAudioRtpByteStream (unsigned int rtp_pt,
format_list_t *fmt,
int ondemand,
uint64_t tps,
rtp_packet **head,
rtp_packet **tail,
int rtp_seq_set,
uint16_t rtp_base_seq,
int rtp_ts_set,
uint32_t rtp_base_ts,
int rtcp_received,
uint32_t ntp_frac,
uint32_t ntp_sec,
uint32_t rtp_ts) :
CRtpByteStream("audio",
fmt,
rtp_pt,
ondemand,
tps,
head,
tail,
rtp_seq_set, rtp_base_seq,
rtp_ts_set, rtp_base_ts,
rtcp_received,
ntp_frac,
ntp_sec,
rtp_ts)
{
init();
m_working_pak = NULL;
}
CAudioRtpByteStream::~CAudioRtpByteStream(void)
{
}
int CAudioRtpByteStream::have_no_data (void)
{
if (m_head == NULL) return TRUE;
return FALSE;
}
int CAudioRtpByteStream::check_rtp_frame_complete_for_payload_type (void)
{
return m_head != NULL;
}
void CAudioRtpByteStream::reset (void)
{
if (m_working_pak != NULL) {
xfree(m_working_pak);
m_working_pak = NULL;
}
CRtpByteStream::reset();
}
uint64_t CAudioRtpByteStream::start_next_frame (uint8_t **buffer,
uint32_t *buflen,
void **ud)
{
uint32_t ts;
int32_t diff;
if (m_working_pak != NULL) {
diff = m_working_pak->rtp_data_len - m_bytes_used;
} else diff = 0;
m_doing_add = 0;
if (diff > 2) {
// Still bytes in the buffer...
*buffer = (uint8_t *)m_working_pak->rtp_data + m_bytes_used;
*buflen = diff;
ts = m_ts;
#ifdef DEBUG_RTP_PAKS
rtp_message(LOG_DEBUG, "%s Still left - %d bytes", m_name, *buflen);
#endif
return (m_last_realtime);
} else {
if (m_working_pak) xfree(m_working_pak);
m_buffer_len = 0;
m_working_pak = m_head;
remove_packet_rtp_queue(m_working_pak, 0);
*buffer = (uint8_t *)m_working_pak->rtp_data;
*buflen = m_working_pak->rtp_data_len;
ts = m_working_pak->rtp_pak_ts;
#ifdef DEBUG_RTP_PAKS
rtp_message(LOG_DEBUG, "%s buffer seq %d ts %x len %d", m_name,
m_working_pak->rtp_pak_seq,
m_working_pak->rtp_pak_ts, m_buffer_len);
#endif
}
// We're going to have to handle wrap better...
uint64_t retts = rtp_ts_to_msec(ts, m_wrap_offset);
m_ts = ts;
return retts;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -