📄 stream.c
字号:
/* Must have sufficient length before we proceed. */
if (payloadlen < sizeof(pjmedia_rtp_dtmf_event))
return;
//dump_bin(payload, payloadlen);
/* Check if this is the same/current digit of the last packet. */
if (stream->last_dtmf != -1 &&
event->event == stream->last_dtmf &&
pj_ntohs(event->duration) >= stream->last_dtmf_dur)
{
/* Yes, this is the same event. */
stream->last_dtmf_dur = pj_ntohs(event->duration);
return;
}
/* Ignore unknown event. */
if (event->event > 15) {
PJ_LOG(5,(stream->port.info.name.ptr,
"Ignored RTP pkt with bad DTMF event %d",
event->event));
return;
}
/* New event! */
PJ_LOG(5,(stream->port.info.name.ptr, "Received DTMF digit %c, vol=%d",
digitmap[event->event],
(event->e_vol & 0x3F)));
stream->last_dtmf = event->event;
stream->last_dtmf_dur = pj_ntohs(event->duration);
/* If DTMF callback is installed, call the callback, otherwise keep
* the DTMF digits in the buffer.
*/
if (stream->dtmf_cb) {
stream->dtmf_cb(stream, stream->dtmf_cb_user_data,
digitmap[event->event]);
} else {
/* By convention, we use jitter buffer's mutex to access shared
* DTMF variables.
*/
pj_mutex_lock(stream->jb_mutex);
if (stream->rx_dtmf_count >= PJ_ARRAY_SIZE(stream->rx_dtmf_buf)) {
/* DTMF digits overflow. Discard the oldest digit. */
pj_array_erase(stream->rx_dtmf_buf,
sizeof(stream->rx_dtmf_buf[0]),
stream->rx_dtmf_count, 0);
--stream->rx_dtmf_count;
}
stream->rx_dtmf_buf[stream->rx_dtmf_count++] = digitmap[event->event];
pj_mutex_unlock(stream->jb_mutex);
}
}
/*
* This callback is called by stream transport on receipt of packets
* in the RTP socket.
*/
static void on_rx_rtp( void *data,
const void *pkt,
pj_ssize_t bytes_read)
{
pjmedia_stream *stream = (pjmedia_stream*) data;
pjmedia_channel *channel = stream->dec;
const pjmedia_rtp_hdr *hdr;
const void *payload;
unsigned payloadlen;
pjmedia_rtp_status seq_st;
pj_status_t status;
/* Check for errors */
if (bytes_read < 0) {
LOGERR_((stream->port.info.name.ptr, "RTP recv() error", -bytes_read));
return;
}
/* Ignore keep-alive packets */
if (bytes_read < (pj_ssize_t) sizeof(pjmedia_rtp_hdr))
return;
/* Update RTP and RTCP session. */
status = pjmedia_rtp_decode_rtp(&channel->rtp, pkt, bytes_read,
&hdr, &payload, &payloadlen);
if (status != PJ_SUCCESS) {
LOGERR_((stream->port.info.name.ptr, "RTP decode error", status));
stream->rtcp.stat.rx.discard++;
return;
}
/* Inform RTCP session */
pjmedia_rtcp_rx_rtp(&stream->rtcp, pj_ntohs(hdr->seq),
pj_ntohl(hdr->ts), payloadlen);
/* Ignore the packet if decoder is paused */
if (channel->paused)
return;
/* Handle incoming DTMF. */
if (hdr->pt == stream->rx_event_pt) {
handle_incoming_dtmf(stream, payload, payloadlen);
return;
}
/* Update RTP session (also checks if RTP session can accept
* the incoming packet.
*/
pjmedia_rtp_session_update(&channel->rtp, hdr, &seq_st);
if (seq_st.status.value) {
TRC_ ((stream->port.info.name.ptr,
"RTP status: badpt=%d, badssrc=%d, dup=%d, "
"outorder=%d, probation=%d, restart=%d",
seq_st.status.flag.badpt,
seq_st.status.flag.badssrc,
seq_st.status.flag.dup,
seq_st.status.flag.outorder,
seq_st.status.flag.probation,
seq_st.status.flag.restart));
if (seq_st.status.flag.badpt) {
PJ_LOG(4,(stream->port.info.name.ptr,
"Bad RTP pt %d (expecting %d)",
hdr->pt, channel->rtp.out_pt));
}
}
/* Skip bad RTP packet */
if (seq_st.status.flag.bad)
return;
/* Put "good" packet to jitter buffer, or reset the jitter buffer
* when RTP session is restarted.
*/
pj_mutex_lock( stream->jb_mutex );
if (seq_st.status.flag.restart) {
status = pjmedia_jbuf_reset(stream->jb);
PJ_LOG(4,(stream->port.info.name.ptr, "Jitter buffer reset"));
} else {
/*
* Packets may contain more than one frames, while the jitter
* buffer can only take one frame per "put" operation. So we need
* to ask the codec to "parse" the payload into multiple frames.
*/
enum { MAX = 16 };
pj_timestamp ts;
unsigned i, count = MAX;
unsigned samples_per_frame;
pjmedia_frame frames[MAX];
/* Get the timestamp of the first sample */
ts.u64 = pj_ntohl(hdr->ts);
/* Parse the payload. */
status = (*stream->codec->op->parse)(stream->codec,
(void*)payload,
payloadlen,
&ts,
&count,
frames);
if (status != PJ_SUCCESS) {
LOGERR_((stream->port.info.name.ptr,
"Codec parse() error",
status));
count = 0;
}
/* Put each frame to jitter buffer. */
samples_per_frame = stream->codec_param.info.frm_ptime *
stream->codec_param.info.clock_rate *
stream->codec_param.info.channel_cnt /
1000;
for (i=0; i<count; ++i) {
unsigned ext_seq;
ext_seq = (unsigned)(frames[i].timestamp.u64 /
samples_per_frame);
pjmedia_jbuf_put_frame(stream->jb, frames[i].buf,
frames[i].size, ext_seq);
}
}
pj_mutex_unlock( stream->jb_mutex );
/* Check if now is the time to transmit RTCP SR/RR report.
* We only do this when stream direction is "decoding only",
* because otherwise check_tx_rtcp() will be handled by put_frame()
*/
if (stream->dir == PJMEDIA_DIR_DECODING) {
check_tx_rtcp(stream, pj_ntohl(hdr->ts));
}
if (status != 0) {
LOGERR_((stream->port.info.name.ptr, "Jitter buffer put() error",
status));
return;
}
}
/*
* This callback is called by stream transport on receipt of packets
* in the RTCP socket.
*/
static void on_rx_rtcp( void *data,
const void *pkt,
pj_ssize_t bytes_read)
{
pjmedia_stream *stream = (pjmedia_stream*) data;
/* Check for errors */
if (bytes_read < 0) {
LOGERR_((stream->port.info.name.ptr, "RTCP recv() error",
-bytes_read));
return;
}
pjmedia_rtcp_rx_rtcp(&stream->rtcp, pkt, bytes_read);
}
/*
* Create media channel.
*/
static pj_status_t create_channel( pj_pool_t *pool,
pjmedia_stream *stream,
pjmedia_dir dir,
unsigned pt,
const pjmedia_stream_info *param,
pjmedia_channel **p_channel)
{
pjmedia_channel *channel;
pj_status_t status;
/* Allocate memory for channel descriptor */
channel = PJ_POOL_ZALLOC_T(pool, pjmedia_channel);
PJ_ASSERT_RETURN(channel != NULL, PJ_ENOMEM);
/* Init channel info. */
channel->stream = stream;
channel->dir = dir;
channel->paused = 1;
channel->pt = pt;
/* Allocate buffer for incoming packet. */
channel->in_pkt_size = PJMEDIA_MAX_MTU;
channel->in_pkt = pj_pool_alloc( pool, channel->in_pkt_size );
PJ_ASSERT_RETURN(channel->in_pkt != NULL, PJ_ENOMEM);
/* Allocate buffer for outgoing packet. */
channel->out_pkt_size = sizeof(pjmedia_rtp_hdr) +
stream->codec_param.info.avg_bps/8 *
PJMEDIA_MAX_FRAME_DURATION_MS /
1000;
if (channel->out_pkt_size > PJMEDIA_MAX_MTU)
channel->out_pkt_size = PJMEDIA_MAX_MTU;
channel->out_pkt = pj_pool_alloc(pool, channel->out_pkt_size);
PJ_ASSERT_RETURN(channel->out_pkt != NULL, PJ_ENOMEM);
/* Create RTP and RTCP sessions: */
status = pjmedia_rtp_session_init(&channel->rtp, pt, param->ssrc);
if (status != PJ_SUCCESS)
return status;
/* Done. */
*p_channel = channel;
return PJ_SUCCESS;
}
/*
* Create media stream.
*/
PJ_DEF(pj_status_t) pjmedia_stream_create( pjmedia_endpt *endpt,
pj_pool_t *pool,
const pjmedia_stream_info *info,
pjmedia_transport *tp,
void *user_data,
pjmedia_stream **p_stream)
{
enum { M = 32 };
pjmedia_stream *stream;
pj_str_t name;
unsigned jb_init, jb_max, jb_min_pre, jb_max_pre;
pj_status_t status;
PJ_ASSERT_RETURN(pool && info && p_stream, PJ_EINVAL);
/* Allocate the media stream: */
stream = PJ_POOL_ZALLOC_T(pool, pjmedia_stream);
PJ_ASSERT_RETURN(stream != NULL, PJ_ENOMEM);
/* Init stream/port name */
name.ptr = (char*) pj_pool_alloc(pool, M);
name.slen = pj_ansi_snprintf(name.ptr, M, "strm%p", stream);
/* Init some port-info. Some parts of the info will be set later
* once we have more info about the codec.
*/
pjmedia_port_info_init(&stream->port.info, &name,
PJMEDIA_PORT_SIGNATURE('S', 'T', 'R', 'M'),
info->fmt.clock_rate, info->fmt.channel_cnt,
16, 80);
/* Init port. */
pj_strdup(pool, &stream->port.info.encoding_name, &info->fmt.encoding_name);
stream->port.info.clock_rate = info->fmt.clock_rate;
stream->port.info.channel_count = info->fmt.channel_cnt;
stream->port.port_data.pdata = stream;
stream->port.put_frame = &put_frame;
stream->port.get_frame = &get_frame;
/* Init stream: */
stream->endpt = endpt;
stream->codec_mgr = pjmedia_endpt_get_codec_mgr(endpt);
stream->dir = info->dir;
stream->user_data = user_data;
stream->rtcp_interval = (PJMEDIA_RTCP_INTERVAL + (pj_rand() % 8000)) *
info->fmt.clock_rate / 1000;
stream->tx_event_pt = info->tx_event_pt ? info->tx_event_pt : -1;
stream->rx_event_pt = info->rx_event_pt ? info->rx_event_pt : -1;
stream->last_dtmf = -1;
/* Create mutex to protect jitter buffer: */
status = pj_mutex_create_simple(pool, NULL, &stream->jb_mutex);
if (status != PJ_SUCCESS)
goto err_cleanup;
/* Create and initialize codec: */
status = pjmedia_codec_mgr_alloc_codec( stream->codec_mgr,
&info->fmt, &stream->codec);
if (status != PJ_SUCCESS)
goto err_cleanup;
/* Get codec param: */
if (info->param)
stream->codec_param = *info->param;
else {
status = pjmedia_codec_mgr_get_default_param(stream->codec_mgr,
&info->fmt,
&stream->codec_param);
if (status != PJ_SUCCESS)
goto err_cleanup;
}
/* Check for invalid frame per packet. */
if (stream->codec_param.setting.frm_per_pkt < 1)
stream->codec_param.setting.frm_per_pkt = 1;
/* Set additional info. */
stream->port.info.bits_per_sample = 16;
stream->port.info.samples_per_frame = info->fmt.clock_rate *
stream->codec_param.info.frm_ptime *
stream->codec_param.setting.frm_per_pkt /
1000;
stream->port.info.bytes_per_frame = stream->codec_param.info.avg_bps/8 *
stream->codec_param.info.frm_ptime *
stream->codec_param.setting.frm_per_pkt /
1000;
/* Open the codec: */
status = stream->codec->op->open(stream->codec, &stream->codec_param);
if (status != PJ_SUCCESS)
goto err_cleanup;
/* If encoder and decoder's ptime are asymmetric, then we need to
* create buffer on the encoder side. This could happen for example
* with iLBC
*/
if (stream->codec_param.info.enc_ptime!=0 &&
stream->codec_param.info.enc_ptime!=stream->codec_param.info.frm_ptime)
{
unsigned ptime;
stream->enc_samples_per_frame = stream->codec_param.info.enc_ptime *
stream->port.info.clock_rate / 1000;
/* Set buffer size as twice the largest ptime value between
* stream's ptime, encoder ptime, or decoder ptime.
*/
ptime = stream->port.info.samples_per_frame * 1000 /
stream->port.info.clock_rate;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -