📄 siprtp.c
字号:
#if defined(PJ_WIN32) && PJ_WIN32 != 0
#include <windows.h>
static void boost_priority(void)
{
SetPriorityClass( GetCurrentProcess(), REALTIME_PRIORITY_CLASS);
SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_HIGHEST);
}
#elif defined(PJ_LINUX) && PJ_LINUX != 0
#include <pthread.h>
static void boost_priority(void)
{
#define POLICY SCHED_FIFO
struct sched_param tp;
int max_prio;
int policy;
int rc;
if (sched_get_priority_min(POLICY) < sched_get_priority_max(POLICY))
max_prio = sched_get_priority_max(POLICY)-1;
else
max_prio = sched_get_priority_max(POLICY)+1;
/*
* Adjust process scheduling algorithm and priority
*/
rc = sched_getparam(0, &tp);
if (rc != 0) {
app_perror( THIS_FILE, "sched_getparam error",
PJ_RETURN_OS_ERROR(rc));
return;
}
tp.__sched_priority = max_prio;
rc = sched_setscheduler(0, POLICY, &tp);
if (rc != 0) {
app_perror( THIS_FILE, "sched_setscheduler error",
PJ_RETURN_OS_ERROR(rc));
}
PJ_LOG(4, (THIS_FILE, "New process policy=%d, priority=%d",
policy, tp.__sched_priority));
/*
* Adjust thread scheduling algorithm and priority
*/
rc = pthread_getschedparam(pthread_self(), &policy, &tp);
if (rc != 0) {
app_perror( THIS_FILE, "pthread_getschedparam error",
PJ_RETURN_OS_ERROR(rc));
return;
}
PJ_LOG(4, (THIS_FILE, "Old thread policy=%d, priority=%d",
policy, tp.__sched_priority));
policy = POLICY;
tp.__sched_priority = max_prio;
rc = pthread_setschedparam(pthread_self(), policy, &tp);
if (rc != 0) {
app_perror( THIS_FILE, "pthread_setschedparam error",
PJ_RETURN_OS_ERROR(rc));
return;
}
PJ_LOG(4, (THIS_FILE, "New thread policy=%d, priority=%d",
policy, tp.__sched_priority));
}
#else
# define boost_priority()
#endif
/*
* This callback is called by media transport on receipt of RTP packet.
*/
static void on_rx_rtp(void *user_data, const void *pkt, pj_ssize_t size)
{
struct media_stream *strm;
pj_status_t status;
const pjmedia_rtp_hdr *hdr;
const void *payload;
unsigned payload_len;
strm = user_data;
/* Discard packet if media is inactive */
if (!strm->active)
return;
/* Check for errors */
if (size < 0) {
app_perror(THIS_FILE, "RTP recv() error", -size);
return;
}
/* Decode RTP packet. */
status = pjmedia_rtp_decode_rtp(&strm->in_sess,
pkt, size,
&hdr, &payload, &payload_len);
if (status != PJ_SUCCESS) {
app_perror(THIS_FILE, "RTP decode error", status);
return;
}
//PJ_LOG(4,(THIS_FILE, "Rx seq=%d", pj_ntohs(hdr->seq)));
/* Update the RTCP session. */
pjmedia_rtcp_rx_rtp(&strm->rtcp, pj_ntohs(hdr->seq),
pj_ntohl(hdr->ts), payload_len);
/* Update RTP session */
pjmedia_rtp_session_update(&strm->in_sess, hdr, NULL);
}
/*
* This callback is called by media transport on receipt of RTCP packet.
*/
static void on_rx_rtcp(void *user_data, const void *pkt, pj_ssize_t size)
{
struct media_stream *strm;
strm = user_data;
/* Discard packet if media is inactive */
if (!strm->active)
return;
/* Check for errors */
if (size < 0) {
app_perror(THIS_FILE, "Error receiving RTCP packet", -size);
return;
}
/* Update RTCP session */
pjmedia_rtcp_rx_rtcp(&strm->rtcp, pkt, size);
}
/*
* Media thread
*
* This is the thread to send and receive both RTP and RTCP packets.
*/
static int media_thread(void *arg)
{
enum { RTCP_INTERVAL = 5000, RTCP_RAND = 2000 };
struct media_stream *strm = arg;
char packet[1500];
unsigned msec_interval;
pj_timestamp freq, next_rtp, next_rtcp;
/* Boost thread priority if necessary */
boost_priority();
/* Let things settle */
pj_thread_sleep(100);
msec_interval = strm->samples_per_frame * 1000 / strm->clock_rate;
pj_get_timestamp_freq(&freq);
pj_get_timestamp(&next_rtp);
next_rtp.u64 += (freq.u64 * msec_interval / 1000);
next_rtcp = next_rtp;
next_rtcp.u64 += (freq.u64 * (RTCP_INTERVAL+(pj_rand()%RTCP_RAND)) / 1000);
while (!strm->thread_quit_flag) {
pj_timestamp now, lesser;
pj_time_val timeout;
pj_bool_t send_rtp, send_rtcp;
send_rtp = send_rtcp = PJ_FALSE;
/* Determine how long to sleep */
if (next_rtp.u64 < next_rtcp.u64) {
lesser = next_rtp;
send_rtp = PJ_TRUE;
} else {
lesser = next_rtcp;
send_rtcp = PJ_TRUE;
}
pj_get_timestamp(&now);
if (lesser.u64 <= now.u64) {
timeout.sec = timeout.msec = 0;
//printf("immediate "); fflush(stdout);
} else {
pj_uint64_t tick_delay;
tick_delay = lesser.u64 - now.u64;
timeout.sec = 0;
timeout.msec = (pj_uint32_t)(tick_delay * 1000 / freq.u64);
pj_time_val_normalize(&timeout);
//printf("%d:%03d ", timeout.sec, timeout.msec); fflush(stdout);
}
/* Wait for next interval */
//if (timeout.sec!=0 && timeout.msec!=0) {
pj_thread_sleep(PJ_TIME_VAL_MSEC(timeout));
if (strm->thread_quit_flag)
break;
//}
pj_get_timestamp(&now);
if (send_rtp || next_rtp.u64 <= now.u64) {
/*
* Time to send RTP packet.
*/
pj_status_t status;
const void *p_hdr;
const pjmedia_rtp_hdr *hdr;
pj_ssize_t size;
int hdrlen;
/* Format RTP header */
status = pjmedia_rtp_encode_rtp( &strm->out_sess, strm->si.tx_pt,
0, /* marker bit */
strm->bytes_per_frame,
strm->samples_per_frame,
&p_hdr, &hdrlen);
if (status == PJ_SUCCESS) {
//PJ_LOG(4,(THIS_FILE, "\t\tTx seq=%d", pj_ntohs(hdr->seq)));
hdr = (const pjmedia_rtp_hdr*) p_hdr;
/* Copy RTP header to packet */
pj_memcpy(packet, hdr, hdrlen);
/* Zero the payload */
pj_bzero(packet+hdrlen, strm->bytes_per_frame);
/* Send RTP packet */
size = hdrlen + strm->bytes_per_frame;
status = pjmedia_transport_send_rtp(strm->transport,
packet, size);
if (status != PJ_SUCCESS)
app_perror(THIS_FILE, "Error sending RTP packet", status);
} else {
pj_assert(!"RTP encode() error");
}
/* Update RTCP SR */
pjmedia_rtcp_tx_rtp( &strm->rtcp, (pj_uint16_t)strm->bytes_per_frame);
/* Schedule next send */
next_rtp.u64 += (msec_interval * freq.u64 / 1000);
}
if (send_rtcp || next_rtcp.u64 <= now.u64) {
/*
* Time to send RTCP packet.
*/
pjmedia_rtcp_pkt *rtcp_pkt;
int rtcp_len;
pj_ssize_t size;
pj_status_t status;
/* Build RTCP packet */
pjmedia_rtcp_build_rtcp(&strm->rtcp, &rtcp_pkt, &rtcp_len);
/* Send packet */
size = rtcp_len;
status = pjmedia_transport_send_rtcp(strm->transport,
rtcp_pkt, size);
if (status != PJ_SUCCESS) {
app_perror(THIS_FILE, "Error sending RTCP packet", status);
}
/* Schedule next send */
next_rtcp.u64 += (freq.u64 * (RTCP_INTERVAL+(pj_rand()%RTCP_RAND)) /
1000);
}
}
return 0;
}
/* Callback to be called when SDP negotiation is done in the call: */
static void call_on_media_update( pjsip_inv_session *inv,
pj_status_t status)
{
struct call *call;
pj_pool_t *pool;
struct media_stream *audio;
const pjmedia_sdp_session *local_sdp, *remote_sdp;
struct codec *codec_desc = NULL;
unsigned i;
call = inv->mod_data[mod_siprtp.id];
pool = inv->dlg->pool;
audio = &call->media[0];
/* If this is a mid-call media update, then destroy existing media */
if (audio->thread != NULL)
destroy_call_media(call->index);
/* Do nothing if media negotiation has failed */
if (status != PJ_SUCCESS) {
app_perror(THIS_FILE, "SDP negotiation failed", status);
return;
}
/* Capture stream definition from the SDP */
pjmedia_sdp_neg_get_active_local(inv->neg, &local_sdp);
pjmedia_sdp_neg_get_active_remote(inv->neg, &remote_sdp);
status = pjmedia_stream_info_from_sdp(&audio->si, inv->pool, app.med_endpt,
local_sdp, remote_sdp, 0);
if (status != PJ_SUCCESS) {
app_perror(THIS_FILE, "Error creating stream info from SDP", status);
return;
}
/* Get the remainder of codec information from codec descriptor */
if (audio->si.fmt.pt == app.audio_codec.pt)
codec_desc = &app.audio_codec;
else {
/* Find the codec description in codec array */
for (i=0; i<PJ_ARRAY_SIZE(audio_codecs); ++i) {
if (audio_codecs[i].pt == audio->si.fmt.pt) {
codec_desc = &audio_codecs[i];
break;
}
}
if (codec_desc == NULL) {
PJ_LOG(3, (THIS_FILE, "Error: Invalid codec payload type"));
return;
}
}
audio->clock_rate = audio->si.fmt.clock_rate;
audio->samples_per_frame = audio->clock_rate * codec_desc->ptime / 1000;
audio->bytes_per_frame = codec_desc->bit_rate * codec_desc->ptime / 1000 / 8;
pjmedia_rtp_session_init(&audio->out_sess, audio->si.tx_pt,
pj_rand());
pjmedia_rtp_session_init(&audio->in_sess, audio->si.fmt.pt, 0);
pjmedia_rtcp_init(&audio->rtcp, "rtcp", audio->clock_rate,
audio->samples_per_frame, 0);
/* Attach media to transport */
status = pjmedia_transport_attach(audio->transport, audio,
&audio->si.rem_addr,
&audio->si.rem_rtcp,
sizeof(pj_sockaddr_in),
&on_rx_rtp,
&on_rx_rtcp);
if (status != PJ_SUCCESS) {
app_perror(THIS_FILE, "Error on pjmedia_transport_attach()", status);
return;
}
/* Start media thread. */
audio->thread_quit_flag = 0;
status = pj_thread_create( inv->pool, "media", &media_thread, audio,
0, 0, &audio->thread);
if (status != PJ_SUCCESS) {
app_perror(THIS_FILE, "Error creating media thread", status);
return;
}
/* Set the media as active */
audio->active = PJ_TRUE;
}
/* Destroy call's media */
static void destroy_call_media(unsigned call_index)
{
struct media_stream *audio = &app.call[call_index].media[0];
if (audio) {
audio->active = PJ_FALSE;
if (audio->thread) {
audio->thread_quit_flag = 1;
pj_thread_join(audio->thread);
pj_thread_destroy(audio->thread);
audio->thread = NULL;
audio->thread_quit_flag = 0;
}
pjmedia_transport_detach(audio->transport, audio);
}
}
/*****************************************************************************
* USER INTERFACE STUFFS
*/
static void call_get_duration(int call_index, pj_time_val *dur)
{
struct call *call = &app.call[call_index];
pjsip_inv_session *inv;
dur->sec = dur->msec = 0;
if (!call)
return;
inv = call->inv;
if (!inv)
return;
if (inv->state >= PJSIP_INV_STATE_CONFIRMED && call->connect_time.sec) {
pj_gettimeofday(dur);
PJ_TIME_VAL_SUB((*dur), call->connect_time);
}
}
static const char *good_number(char *buf, pj_int32_t val)
{
if (val < 1000) {
pj_ansi_sprintf(buf, "%d", val);
} else if (val < 1000000) {
pj_ansi_sprintf(buf, "%d.%02dK",
val / 1000,
(val % 1000) / 100);
} else {
pj_ansi_sprintf(buf, "%d.%02dM",
val / 1000000,
(val % 1000000) / 10000);
}
return buf;
}
static void print_avg_stat(void)
{
#define MIN_(var,val) if ((int)val < (int)var) var = val
#define MAX_(var,val) if ((int)val > (int)var) var = val
#define AVG_(var,val) var = ( ((var * count) + val) / (count+1) )
#define BIGVAL 0x7FFFFFFFL
struct stat_entry
{
int min, avg, max;
};
struct stat_entry call_dur, call_pdd;
pjmedia_rtcp_stat min_stat, avg_stat, max_stat;
char srx_min[16], srx_avg[16], srx_max[16];
char brx_min[16], brx_avg[16], brx_max[16];
char stx_min[16], stx_avg[16], stx_max[16];
char btx_min[16], btx_avg[16], btx_max[16];
unsigned i, count;
pj_bzero(&call_dur, sizeof(call_dur));
call_dur.min = BIGVAL;
pj_bzero(&call_pdd, sizeof(call_pdd));
call_pdd.min = BIGVAL;
pj_bzero(&min_stat, sizeof(min_stat));
min_stat.rx.pkt = min_stat.tx.pkt = BIGVAL;
min_stat.rx.bytes = min_stat.tx.bytes = BIGVAL;
min_stat.rx.loss = min_stat.tx.loss = BIGVAL;
min_stat.rx.dup = min_stat.tx.dup = BIGVAL;
min_stat.rx.reorder = min_stat.tx.reorder = BIGVAL;
min_stat.rx.jitter.min = min_stat.tx.jitter.min = BIGVAL;
min_stat.rtt.min = BIGVAL;
pj_bzero(&avg_stat, sizeof(avg_stat));
pj_bzero(&max_stat, sizeof(max_stat));
for (i=0, count=0; i<app.max_calls; ++i) {
struct call *call = &app.call[i];
struct media_stream *audio = &call->media[0];
pj_time_val dur;
unsigned msec_dur;
if (call->inv == NULL ||
call->inv->state < PJSIP_INV_STATE_CONFIRMED ||
call->connect_time.sec == 0)
{
continue;
}
/* Duration */
call_get_duration(i, &dur);
msec_dur = PJ_TIME_VAL_MSEC(dur);
MIN_(call_dur.min, msec_dur);
MAX_(call_dur.max, msec_dur);
AVG_(call_dur.avg, msec_dur);
/* Connect delay */
if (call->connect_time.sec) {
pj_time_val t = call->connect_time;
PJ_TIME_VAL_SUB(t, call->start_time);
msec_dur = PJ_TIME_VAL_MSEC(t);
} else {
msec_dur = 10;
}
MIN_(call_pdd.min, msec_dur);
MAX_(call_pdd.max, msec_dur);
AVG_(call_pdd.avg, msec_dur);
/* RX Statistisc: */
/* Packets */
MIN_(min_stat.rx.pkt, audio->rtcp.stat.rx.pkt);
MAX_(max_stat.rx.pkt, audio->rtcp.stat.rx.pkt);
AVG_(avg_stat.rx.pkt, audio->rtcp.stat.rx.pkt);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -