stream.c
来自「基于sip协议的网络电话源码」· C语言 代码 · 共 1,670 行 · 第 1/3 页
C
1,670 行
stream->tx_event_pt, 0, frame_out.size, ts_len, (const void**)&rtphdr, &rtphdrlen); } else if (frame->type != PJMEDIA_FRAME_TYPE_NONE) { unsigned ts, codec_samples_per_frame; /* Repeatedly call encode if there are multiple frames to be * sent. */ codec_samples_per_frame = stream->codec_param.info.enc_ptime * stream->codec_param.info.clock_rate / 1000; if (codec_samples_per_frame == 0) { codec_samples_per_frame = stream->codec_param.info.frm_ptime * stream->codec_param.info.clock_rate / 1000; } for (ts=0; ts<ts_len; ts += codec_samples_per_frame) { pjmedia_frame tmp_out_frame, tmp_in_frame; unsigned bytes_per_sample, max_size; /* Nb of bytes in PCM sample */ bytes_per_sample = stream->codec_param.info.pcm_bits_per_sample/8; /* Split original PCM input frame into base frame size */ tmp_in_frame.timestamp.u64 = frame->timestamp.u64 + ts; tmp_in_frame.buf = ((char*)frame->buf) + ts * bytes_per_sample; tmp_in_frame.size = codec_samples_per_frame * bytes_per_sample; tmp_in_frame.type = PJMEDIA_FRAME_TYPE_AUDIO; /* Set output frame position */ tmp_out_frame.buf = ((char*)frame_out.buf) + frame_out.size; max_size = channel->out_pkt_size - sizeof(pjmedia_rtp_hdr) - frame_out.size; /* Encode! */ status = stream->codec->op->encode( stream->codec, &tmp_in_frame, max_size, &tmp_out_frame); if (status != PJ_SUCCESS) { LOGERR_((stream->port.info.name.ptr, "Codec encode() error", status)); return status; } /* tmp_out_frame.size may be zero for silence frame. */ frame_out.size += tmp_out_frame.size; /* Stop processing next PCM frame when encode() returns either * CNG frame or NULL frame. */ if (tmp_out_frame.type!=PJMEDIA_FRAME_TYPE_AUDIO || tmp_out_frame.size==0) { break; } } /* Encapsulate. */ status = pjmedia_rtp_encode_rtp( &channel->rtp, channel->pt, 0, frame_out.size, ts_len, (const void**)&rtphdr, &rtphdrlen); } else { /* Just update RTP session's timestamp. */ status = pjmedia_rtp_encode_rtp( &channel->rtp, 0, 0, 0, ts_len, (const void**)&rtphdr, &rtphdrlen); } if (status != PJ_SUCCESS) { LOGERR_((stream->port.info.name.ptr, "RTP encode_rtp() error", status)); return status; } /* Check if now is the time to transmit RTCP SR/RR report. * We only do this when stream direction is not "decoding only", because * when it is, check_tx_rtcp() will be handled by get_frame(). */ if (stream->dir != PJMEDIA_DIR_DECODING) { check_tx_rtcp(stream, pj_ntohl(channel->rtp.out_hdr.ts)); } /* Do nothing if we have nothing to transmit */ if (frame_out.size == 0) { if (stream->is_streaming) { PJ_LOG(5,(stream->port.info.name.ptr,"Starting silence")); stream->is_streaming = PJ_FALSE; } return PJ_SUCCESS; } /* Copy RTP header to the beginning of packet */ pj_memcpy(channel->out_pkt, rtphdr, sizeof(pjmedia_rtp_hdr)); /* Set RTP marker bit if currently not streaming */ if (stream->is_streaming == PJ_FALSE) { pjmedia_rtp_hdr *rtp = channel->out_pkt; rtp->m = 1; PJ_LOG(5,(stream->port.info.name.ptr,"Start talksprut..")); } stream->is_streaming = PJ_TRUE; /* Send the RTP packet to the transport. */ (*stream->transport->op->send_rtp)(stream->transport, channel->out_pkt, frame_out.size + sizeof(pjmedia_rtp_hdr)); /* Update stat */ pjmedia_rtcp_tx_rtp(&stream->rtcp, frame_out.size); return PJ_SUCCESS;}/** * put_frame() * * This callback is called by upstream component when it has PCM frame * to transmit. This function encodes the PCM frame, pack it into * RTP packet, and transmit to peer. */static pj_status_t put_frame( pjmedia_port *port, const pjmedia_frame *frame ){ pjmedia_stream *stream = port->port_data.pdata; pjmedia_frame tmp_zero_frame; unsigned samples_per_frame; samples_per_frame = stream->enc_samples_per_frame; /* http://www.pjsip.org/trac/ticket/56: * when input is PJMEDIA_FRAME_TYPE_NONE, feed zero PCM frame * instead so that encoder can decide whether or not to transmit * silence frame. */ if (frame->type == PJMEDIA_FRAME_TYPE_NONE && samples_per_frame <= ZERO_PCM_MAX_SIZE) { pj_memcpy(&tmp_zero_frame, frame, sizeof(pjmedia_frame)); frame = &tmp_zero_frame; tmp_zero_frame.buf = zero_frame; tmp_zero_frame.size = samples_per_frame * 2; tmp_zero_frame.type = PJMEDIA_FRAME_TYPE_AUDIO; }#if 0 // This is no longer needed because each TYPE_NONE frame will // be converted into zero frame above /* If VAD is temporarily disabled during creation, feed zero PCM frame * to the codec. */ if (stream->vad_enabled != stream->codec_param.setting.vad && stream->vad_enabled != 0 && frame->type == PJMEDIA_FRAME_TYPE_NONE && samples_per_frame <= ZERO_PCM_MAX_SIZE) { pj_memcpy(&tmp_in_frame, frame, sizeof(pjmedia_frame)); frame = &tmp_in_frame; tmp_in_frame.buf = zero_frame; tmp_in_frame.size = samples_per_frame * 2; tmp_in_frame.type = PJMEDIA_FRAME_TYPE_AUDIO; }#endif /* If VAD is temporarily disabled during creation, enable it * after transmitting for VAD_SUSPEND_SEC seconds. */ if (stream->vad_enabled != stream->codec_param.setting.vad && (stream->tx_duration - stream->ts_vad_disabled) > stream->port.info.clock_rate * PJMEDIA_STREAM_VAD_SUSPEND_MSEC / 1000) { stream->codec_param.setting.vad = stream->vad_enabled; stream->codec->op->modify(stream->codec, &stream->codec_param); PJ_LOG(4,(stream->port.info.name.ptr,"VAD re-enabled")); } /* If encoder has different ptime than decoder, then the frame must * be passed through the encoding buffer via rebuffer() function. */ if (stream->enc_buf != NULL) { pjmedia_frame tmp_rebuffer_frame; pj_status_t status = PJ_SUCCESS; /* Copy original frame to temporary frame since we need * to modify it. */ pj_memcpy(&tmp_rebuffer_frame, frame, sizeof(pjmedia_frame)); /* Loop while we have full frame in enc_buffer */ for (;;) { pj_status_t st; /* Run rebuffer() */ rebuffer(stream, &tmp_rebuffer_frame); /* Process this frame */ st = put_frame_imp(port, &tmp_rebuffer_frame); if (st != PJ_SUCCESS) status = st; /* If we still have full frame in the buffer, re-run * rebuffer() with NULL frame. */ if (stream->enc_buf_count >= stream->enc_samples_per_frame) { tmp_rebuffer_frame.type = PJMEDIA_FRAME_TYPE_NONE; } else { /* Otherwise break */ break; } } return status; } else { return put_frame_imp(port, frame); }}#if 0static void dump_bin(const char *buf, unsigned len){ unsigned i; PJ_LOG(3,(THIS_FILE, "begin dump")); for (i=0; i<len; ++i) { int j; char bits[9]; unsigned val = buf[i] & 0xFF; bits[8] = '\0'; for (j=0; j<8; ++j) { if (val & (1 << (7-j))) bits[j] = '1'; else bits[j] = '0'; } PJ_LOG(3,(THIS_FILE, "%2d %s [%d]", i, bits, val)); } PJ_LOG(3,(THIS_FILE, "end dump"));}#endif/* * Handle incoming DTMF digits. */static void handle_incoming_dtmf( pjmedia_stream *stream, const void *payload, unsigned payloadlen){ const pjmedia_rtp_dtmf_event *event = payload; /* Check compiler packing. */ pj_assert(sizeof(pjmedia_rtp_dtmf_event)==4); /* Must have sufficient length before we proceed. */ if (payloadlen < sizeof(pjmedia_rtp_dtmf_event)) return; //dump_bin(payload, payloadlen); /* Check if this is the same/current digit of the last packet. */ if (stream->last_dtmf != -1 && event->event == stream->last_dtmf && pj_ntohs(event->duration) >= stream->last_dtmf_dur) { /* Yes, this is the same event. */ stream->last_dtmf_dur = pj_ntohs(event->duration); return; } /* Ignore unknown event. */ if (event->event > 15) { PJ_LOG(5,(stream->port.info.name.ptr, "Ignored RTP pkt with bad DTMF event %d", event->event)); return; } /* New event! */ PJ_LOG(5,(stream->port.info.name.ptr, "Received DTMF digit %c, vol=%d", digitmap[event->event], (event->e_vol & 0x3F))); stream->last_dtmf = event->event; stream->last_dtmf_dur = pj_ntohs(event->duration); /* If DTMF callback is installed, call the callback, otherwise keep * the DTMF digits in the buffer. */ if (stream->dtmf_cb) { stream->dtmf_cb(stream, stream->dtmf_cb_user_data, digitmap[event->event]); } else { /* By convention, we use jitter buffer's mutex to access shared * DTMF variables. */ pj_mutex_lock(stream->jb_mutex); if (stream->rx_dtmf_count >= PJ_ARRAY_SIZE(stream->rx_dtmf_buf)) { /* DTMF digits overflow. Discard the oldest digit. */ pj_array_erase(stream->rx_dtmf_buf, sizeof(stream->rx_dtmf_buf[0]), stream->rx_dtmf_count, 0); --stream->rx_dtmf_count; } stream->rx_dtmf_buf[stream->rx_dtmf_count++] = digitmap[event->event]; pj_mutex_unlock(stream->jb_mutex); }}/* * This callback is called by stream transport on receipt of packets * in the RTP socket. */static void on_rx_rtp( void *data, const void *pkt, pj_ssize_t bytes_read){ pjmedia_stream *stream = data; pjmedia_channel *channel = stream->dec; const pjmedia_rtp_hdr *hdr; const void *payload; unsigned payloadlen; pjmedia_rtp_status seq_st; pj_status_t status; /* Check for errors */ if (bytes_read < 0) { LOGERR_((stream->port.info.name.ptr, "RTP recv() error", -bytes_read)); return; } /* Ignore keep-alive packets */ if (bytes_read < sizeof(pjmedia_rtp_hdr)) return; /* Update RTP and RTCP session. */ status = pjmedia_rtp_decode_rtp(&channel->rtp, pkt, bytes_read, &hdr, &payload, &payloadlen); if (status != PJ_SUCCESS) { LOGERR_((stream->port.info.name.ptr, "RTP decode error", status)); stream->rtcp.stat.rx.discard++; return; } /* Inform RTCP session */ pjmedia_rtcp_rx_rtp(&stream->rtcp, pj_ntohs(hdr->seq), pj_ntohl(hdr->ts), payloadlen); /* Ignore the packet if decoder is paused */ if (channel->paused) return; /* Handle incoming DTMF. */ if (hdr->pt == stream->rx_event_pt) { handle_incoming_dtmf(stream, payload, payloadlen); return; } /* Update RTP session (also checks if RTP session can accept * the incoming packet. */ pjmedia_rtp_session_update(&channel->rtp, hdr, &seq_st); if (seq_st.status.value) { TRC_ ((stream->port.info.name.ptr, "RTP status: badpt=%d, badssrc=%d, dup=%d, " "outorder=%d, probation=%d, restart=%d", seq_st.status.flag.badpt, seq_st.status.flag.badssrc, seq_st.status.flag.dup, seq_st.status.flag.outorder, seq_st.status.flag.probation, seq_st.status.flag.restart)); if (seq_st.status.flag.badpt) { PJ_LOG(4,(stream->port.info.name.ptr, "Bad RTP pt %d (expecting %d)", hdr->pt, channel->rtp.out_pt)); } } /* Skip bad RTP packet */ if (seq_st.status.flag.bad) return; /* Put "good" packet to jitter buffer, or reset the jitter buffer * when RTP session is restarted. */ pj_mutex_lock( stream->jb_mutex ); if (seq_st.status.flag.restart) { status = pjmedia_jbuf_reset(stream->jb); PJ_LOG(4,(stream->port.info.name.ptr, "Jitter buffer reset")); } else { /* * Packets may contain more than one frames, while the jitter * buffer can only take one frame per "put" operation. So we need * to ask the codec to "parse" the payload into multiple frames. */ enum { MAX = 16 }; pj_timestamp ts; unsigned i, count = MAX; unsigned samples_per_frame; pjmedia_frame frames[MAX]; /* Get the timestamp of the first sample */ ts.u64 = pj_ntohl(hdr->ts); /* Parse the payload. */ status = (*stream->codec->op->parse)(stream->codec, (void*)payload, payloadlen, &ts, &count, frames); if (status != PJ_SUCCESS) { LOGERR_((stream->port.info.name.ptr, "Codec parse() error", status)); count = 0; } /* Put each frame to jitter buffer. */ samples_per_frame = stream->codec_param.info.frm_ptime * stream->codec_param.info.clock_rate * stream->codec_param.info.channel_cnt / 1000; for (i=0; i<count; ++i) { unsigned ext_seq; ext_seq = (unsigned)(frames[i].timestamp.u64 / samples_per_frame); pjmedia_jbuf_put_frame(stream->jb, frames[i].buf, frames[i].size, ext_seq); } } pj_mutex_unlock( stream->jb_mutex ); /* Check if now is the time to transmit RTCP SR/RR report. * We only do this when stream direction is "decoding only", * because otherwise check_tx_rtcp() will be handled by put_frame() */ if (stream->dir == PJMEDIA_DIR_DECODING) { check_tx_rtcp(stream, pj_ntohl(hdr->ts)); } if (status != 0) { LOGERR_((stream->port.info.name.ptr, "Jitter buffer put() error", status)); return; }}/* * This callback is called by stream transport on receipt of packets * in the RTCP socket. */static void on_rx_rtcp( void *data, const void *pkt, pj_ssize_t bytes_read){ pjmedia_stream *stream = data; /* Check for errors */ if (bytes_read < 0) { LOGERR_((stream->port.info.name.ptr, "RTCP recv() error", -bytes_read)); return; } pjmedia_rtcp_rx_rtcp(&stream->rtcp, pkt, bytes_read);}/* * Create media channel. */static pj_status_t create_channel( pj_pool_t *pool, pjmedia_stream *stream, pjmedia_dir dir, unsigned pt, const pjmedia_stream_info *param, pjmedia_channel **p_channel){ pjmedia_channel *channel; pj_status_t status; /* Allocate memory for channel descriptor */ channel = pj_pool_zalloc(pool, sizeof(pjmedia_channel)); PJ_ASSERT_RETURN(channel != NULL, PJ_ENOMEM); /* Init channel info. */ channel->stream = stream; channel->dir = dir; channel->paused = 1; channel->pt = pt; /* Allocate buffer for incoming packet. */ channel->in_pkt_size = PJMEDIA_MAX_MTU; channel->in_pkt = pj_pool_alloc( pool, channel->in_pkt_size ); PJ_ASSERT_RETURN(channel->in_pkt != NULL, PJ_ENOMEM); /* Allocate buffer for outgoing packet. */ channel->out_pkt_size = sizeof(pjmedia_rtp_hdr) + stream->codec_param.info.avg_bps/8 * PJMEDIA_MAX_FRAME_DURATION_MS / 1000; if (channel->out_pkt_size > PJMEDIA_MAX_MTU) channel->out_pkt_size = PJMEDIA_MAX_MTU; channel->out_pkt = pj_pool_alloc(pool, channel->out_pkt_size); PJ_ASSERT_RETURN(channel->out_pkt != NULL, PJ_ENOMEM);
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?