📄 siprtp.c
字号:
#if defined(PJ_WIN32) && PJ_WIN32 != 0#include <windows.h>static void boost_priority(void){ SetPriorityClass( GetCurrentProcess(), REALTIME_PRIORITY_CLASS); SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_HIGHEST);}#elif defined(PJ_LINUX) && PJ_LINUX != 0#include <pthread.h>static void boost_priority(void){#define POLICY SCHED_FIFO struct sched_param tp; int max_prio; int policy; int rc; if (sched_get_priority_min(POLICY) < sched_get_priority_max(POLICY)) max_prio = sched_get_priority_max(POLICY)-1; else max_prio = sched_get_priority_max(POLICY)+1; /* * Adjust process scheduling algorithm and priority */ rc = sched_getparam(0, &tp); if (rc != 0) { app_perror( THIS_FILE, "sched_getparam error", PJ_RETURN_OS_ERROR(rc)); return; } tp.__sched_priority = max_prio; rc = sched_setscheduler(0, POLICY, &tp); if (rc != 0) { app_perror( THIS_FILE, "sched_setscheduler error", PJ_RETURN_OS_ERROR(rc)); } PJ_LOG(4, (THIS_FILE, "New process policy=%d, priority=%d", policy, tp.__sched_priority)); /* * Adjust thread scheduling algorithm and priority */ rc = pthread_getschedparam(pthread_self(), &policy, &tp); if (rc != 0) { app_perror( THIS_FILE, "pthread_getschedparam error", PJ_RETURN_OS_ERROR(rc)); return; } PJ_LOG(4, (THIS_FILE, "Old thread policy=%d, priority=%d", policy, tp.__sched_priority)); policy = POLICY; tp.__sched_priority = max_prio; rc = pthread_setschedparam(pthread_self(), policy, &tp); if (rc != 0) { app_perror( THIS_FILE, "pthread_setschedparam error", PJ_RETURN_OS_ERROR(rc)); return; } PJ_LOG(4, (THIS_FILE, "New thread policy=%d, priority=%d", policy, tp.__sched_priority));}#else# define boost_priority()#endif/* * This callback is called by media transport on receipt of RTP packet. */static void on_rx_rtp(void *user_data, const void *pkt, pj_ssize_t size){ struct media_stream *strm; pj_status_t status; const pjmedia_rtp_hdr *hdr; const void *payload; unsigned payload_len; strm = user_data; /* Discard packet if media is inactive */ if (!strm->active) return; /* Check for errors */ if (size < 0) { app_perror(THIS_FILE, "RTP recv() error", -size); return; } /* Decode RTP packet. */ status = pjmedia_rtp_decode_rtp(&strm->in_sess, pkt, size, &hdr, &payload, &payload_len); if (status != PJ_SUCCESS) { app_perror(THIS_FILE, "RTP decode error", status); return; } //PJ_LOG(4,(THIS_FILE, "Rx seq=%d", pj_ntohs(hdr->seq))); /* Update the RTCP session. */ pjmedia_rtcp_rx_rtp(&strm->rtcp, pj_ntohs(hdr->seq), pj_ntohl(hdr->ts), payload_len); /* Update RTP session */ pjmedia_rtp_session_update(&strm->in_sess, hdr, NULL);}/* * This callback is called by media transport on receipt of RTCP packet. */static void on_rx_rtcp(void *user_data, const void *pkt, pj_ssize_t size){ struct media_stream *strm; strm = user_data; /* Discard packet if media is inactive */ if (!strm->active) return; /* Check for errors */ if (size < 0) { app_perror(THIS_FILE, "Error receiving RTCP packet", -size); return; } /* Update RTCP session */ pjmedia_rtcp_rx_rtcp(&strm->rtcp, pkt, size);}/* * Media thread * * This is the thread to send and receive both RTP and RTCP packets. */static int media_thread(void *arg){ enum { RTCP_INTERVAL = 5000, RTCP_RAND = 2000 }; struct media_stream *strm = arg; char packet[1500]; unsigned msec_interval; pj_timestamp freq, next_rtp, next_rtcp; /* Boost thread priority if necessary */ boost_priority(); /* Let things settle */ pj_thread_sleep(100); msec_interval = strm->samples_per_frame * 1000 / strm->clock_rate; pj_get_timestamp_freq(&freq); pj_get_timestamp(&next_rtp); next_rtp.u64 += (freq.u64 * msec_interval / 1000); next_rtcp = next_rtp; next_rtcp.u64 += (freq.u64 * (RTCP_INTERVAL+(pj_rand()%RTCP_RAND)) / 1000); while (!strm->thread_quit_flag) { pj_timestamp now, lesser; pj_time_val timeout; pj_bool_t send_rtp, send_rtcp; send_rtp = send_rtcp = PJ_FALSE; /* Determine how long to sleep */ if (next_rtp.u64 < next_rtcp.u64) { lesser = next_rtp; send_rtp = PJ_TRUE; } else { lesser = next_rtcp; send_rtcp = PJ_TRUE; } pj_get_timestamp(&now); if (lesser.u64 <= now.u64) { timeout.sec = timeout.msec = 0; //printf("immediate "); fflush(stdout); } else { pj_uint64_t tick_delay; tick_delay = lesser.u64 - now.u64; timeout.sec = 0; timeout.msec = (pj_uint32_t)(tick_delay * 1000 / freq.u64); pj_time_val_normalize(&timeout); //printf("%d:%03d ", timeout.sec, timeout.msec); fflush(stdout); } /* Wait for next interval */ //if (timeout.sec!=0 && timeout.msec!=0) { pj_thread_sleep(PJ_TIME_VAL_MSEC(timeout)); if (strm->thread_quit_flag) break; //} pj_get_timestamp(&now); if (send_rtp || next_rtp.u64 <= now.u64) { /* * Time to send RTP packet. */ pj_status_t status; const void *p_hdr; const pjmedia_rtp_hdr *hdr; pj_ssize_t size; int hdrlen; /* Format RTP header */ status = pjmedia_rtp_encode_rtp( &strm->out_sess, strm->si.tx_pt, 0, /* marker bit */ strm->bytes_per_frame, strm->samples_per_frame, &p_hdr, &hdrlen); if (status == PJ_SUCCESS) { //PJ_LOG(4,(THIS_FILE, "\t\tTx seq=%d", pj_ntohs(hdr->seq))); hdr = (const pjmedia_rtp_hdr*) p_hdr; /* Copy RTP header to packet */ pj_memcpy(packet, hdr, hdrlen); /* Zero the payload */ pj_bzero(packet+hdrlen, strm->bytes_per_frame); /* Send RTP packet */ size = hdrlen + strm->bytes_per_frame; status = pjmedia_transport_send_rtp(strm->transport, packet, size); if (status != PJ_SUCCESS) app_perror(THIS_FILE, "Error sending RTP packet", status); } else { pj_assert(!"RTP encode() error"); } /* Update RTCP SR */ pjmedia_rtcp_tx_rtp( &strm->rtcp, (pj_uint16_t)strm->bytes_per_frame); /* Schedule next send */ next_rtp.u64 += (msec_interval * freq.u64 / 1000); } if (send_rtcp || next_rtcp.u64 <= now.u64) { /* * Time to send RTCP packet. */ pjmedia_rtcp_pkt *rtcp_pkt; int rtcp_len; pj_ssize_t size; pj_status_t status; /* Build RTCP packet */ pjmedia_rtcp_build_rtcp(&strm->rtcp, &rtcp_pkt, &rtcp_len); /* Send packet */ size = rtcp_len; status = pjmedia_transport_send_rtcp(strm->transport, rtcp_pkt, size); if (status != PJ_SUCCESS) { app_perror(THIS_FILE, "Error sending RTCP packet", status); } /* Schedule next send */ next_rtcp.u64 += (freq.u64 * (RTCP_INTERVAL+(pj_rand()%RTCP_RAND)) / 1000); } } return 0;}/* Callback to be called when SDP negotiation is done in the call: */static void call_on_media_update( pjsip_inv_session *inv, pj_status_t status){ struct call *call; pj_pool_t *pool; struct media_stream *audio; const pjmedia_sdp_session *local_sdp, *remote_sdp; struct codec *codec_desc = NULL; unsigned i; call = inv->mod_data[mod_siprtp.id]; pool = inv->dlg->pool; audio = &call->media[0]; /* If this is a mid-call media update, then destroy existing media */ if (audio->thread != NULL) destroy_call_media(call->index); /* Do nothing if media negotiation has failed */ if (status != PJ_SUCCESS) { app_perror(THIS_FILE, "SDP negotiation failed", status); return; } /* Capture stream definition from the SDP */ pjmedia_sdp_neg_get_active_local(inv->neg, &local_sdp); pjmedia_sdp_neg_get_active_remote(inv->neg, &remote_sdp); status = pjmedia_stream_info_from_sdp(&audio->si, inv->pool, app.med_endpt, local_sdp, remote_sdp, 0); if (status != PJ_SUCCESS) { app_perror(THIS_FILE, "Error creating stream info from SDP", status); return; } /* Get the remainder of codec information from codec descriptor */ if (audio->si.fmt.pt == app.audio_codec.pt) codec_desc = &app.audio_codec; else { /* Find the codec description in codec array */ for (i=0; i<PJ_ARRAY_SIZE(audio_codecs); ++i) { if (audio_codecs[i].pt == audio->si.fmt.pt) { codec_desc = &audio_codecs[i]; break; } } if (codec_desc == NULL) { PJ_LOG(3, (THIS_FILE, "Error: Invalid codec payload type")); return; } } audio->clock_rate = audio->si.fmt.clock_rate; audio->samples_per_frame = audio->clock_rate * codec_desc->ptime / 1000; audio->bytes_per_frame = codec_desc->bit_rate * codec_desc->ptime / 1000 / 8; pjmedia_rtp_session_init(&audio->out_sess, audio->si.tx_pt, pj_rand()); pjmedia_rtp_session_init(&audio->in_sess, audio->si.fmt.pt, 0); pjmedia_rtcp_init(&audio->rtcp, "rtcp", audio->clock_rate, audio->samples_per_frame, 0); /* Attach media to transport */ status = pjmedia_transport_attach(audio->transport, audio, &audio->si.rem_addr, &audio->si.rem_rtcp, sizeof(pj_sockaddr_in), &on_rx_rtp, &on_rx_rtcp); if (status != PJ_SUCCESS) { app_perror(THIS_FILE, "Error on pjmedia_transport_attach()", status); return; } /* Start media thread. */ audio->thread_quit_flag = 0; status = pj_thread_create( inv->pool, "media", &media_thread, audio, 0, 0, &audio->thread); if (status != PJ_SUCCESS) { app_perror(THIS_FILE, "Error creating media thread", status); return; } /* Set the media as active */ audio->active = PJ_TRUE;}/* Destroy call's media */static void destroy_call_media(unsigned call_index){ struct media_stream *audio = &app.call[call_index].media[0]; if (audio) { audio->active = PJ_FALSE; if (audio->thread) { audio->thread_quit_flag = 1; pj_thread_join(audio->thread); pj_thread_destroy(audio->thread); audio->thread = NULL; audio->thread_quit_flag = 0; } pjmedia_transport_detach(audio->transport, audio); }} /***************************************************************************** * USER INTERFACE STUFFS */static void call_get_duration(int call_index, pj_time_val *dur){ struct call *call = &app.call[call_index]; pjsip_inv_session *inv; dur->sec = dur->msec = 0; if (!call) return; inv = call->inv; if (!inv) return; if (inv->state >= PJSIP_INV_STATE_CONFIRMED && call->connect_time.sec) { pj_gettimeofday(dur); PJ_TIME_VAL_SUB((*dur), call->connect_time); }}static const char *good_number(char *buf, pj_int32_t val){ if (val < 1000) { pj_ansi_sprintf(buf, "%d", val); } else if (val < 1000000) { pj_ansi_sprintf(buf, "%d.%02dK", val / 1000, (val % 1000) / 100); } else { pj_ansi_sprintf(buf, "%d.%02dM", val / 1000000, (val % 1000000) / 10000); } return buf;}static void print_avg_stat(void){#define MIN_(var,val) if ((int)val < (int)var) var = val#define MAX_(var,val) if ((int)val > (int)var) var = val#define AVG_(var,val) var = ( ((var * count) + val) / (count+1) )#define BIGVAL 0x7FFFFFFFL struct stat_entry { int min, avg, max; }; struct stat_entry call_dur, call_pdd; pjmedia_rtcp_stat min_stat, avg_stat, max_stat; char srx_min[16], srx_avg[16], srx_max[16]; char brx_min[16], brx_avg[16], brx_max[16]; char stx_min[16], stx_avg[16], stx_max[16]; char btx_min[16], btx_avg[16], btx_max[16]; unsigned i, count; pj_bzero(&call_dur, sizeof(call_dur)); call_dur.min = BIGVAL; pj_bzero(&call_pdd, sizeof(call_pdd)); call_pdd.min = BIGVAL; pj_bzero(&min_stat, sizeof(min_stat)); min_stat.rx.pkt = min_stat.tx.pkt = BIGVAL; min_stat.rx.bytes = min_stat.tx.bytes = BIGVAL; min_stat.rx.loss = min_stat.tx.loss = BIGVAL; min_stat.rx.dup = min_stat.tx.dup = BIGVAL; min_stat.rx.reorder = min_stat.tx.reorder = BIGVAL; min_stat.rx.jitter.min = min_stat.tx.jitter.min = BIGVAL; min_stat.rtt.min = BIGVAL; pj_bzero(&avg_stat, sizeof(avg_stat)); pj_bzero(&max_stat, sizeof(max_stat)); for (i=0, count=0; i<app.max_calls; ++i) { struct call *call = &app.call[i]; struct media_stream *audio = &call->media[0]; pj_time_val dur; unsigned msec_dur; if (call->inv == NULL || call->inv->state < PJSIP_INV_STATE_CONFIRMED || call->connect_time.sec == 0) { continue; } /* Duration */ call_get_duration(i, &dur); msec_dur = PJ_TIME_VAL_MSEC(dur); MIN_(call_dur.min, msec_dur); MAX_(call_dur.max, msec_dur); AVG_(call_dur.avg, msec_dur); /* Connect delay */ if (call->connect_time.sec) { pj_time_val t = call->connect_time; PJ_TIME_VAL_SUB(t, call->start_time); msec_dur = PJ_TIME_VAL_MSEC(t); } else { msec_dur = 10; } MIN_(call_pdd.min, msec_dur); MAX_(call_pdd.max, msec_dur); AVG_(call_pdd.avg, msec_dur); /* RX Statistisc: */ /* Packets */ MIN_(min_stat.rx.pkt, audio->rtcp.stat.rx.pkt); MAX_(max_stat.rx.pkt, audio->rtcp.stat.rx.pkt); AVG_(avg_stat.rx.pkt, audio->rtcp.stat.rx.pkt);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -