⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 siprtp.c

📁 一个开源SIP协议栈
💻 C
📖 第 1 页 / 共 4 页
字号:
#if defined(PJ_WIN32) && PJ_WIN32 != 0
#include <windows.h>
static void boost_priority(void)
{
    SetPriorityClass( GetCurrentProcess(), REALTIME_PRIORITY_CLASS);
    SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_HIGHEST);
}

#elif defined(PJ_LINUX) && PJ_LINUX != 0
#include <pthread.h>
static void boost_priority(void)
{
#define POLICY	SCHED_FIFO
    struct sched_param tp;
    int max_prio;
    int policy;
    int rc;

    if (sched_get_priority_min(POLICY) < sched_get_priority_max(POLICY))
	max_prio = sched_get_priority_max(POLICY)-1;
    else
	max_prio = sched_get_priority_max(POLICY)+1;

    /*
     * Adjust process scheduling algorithm and priority
     */
    rc = sched_getparam(0, &tp);
    if (rc != 0) {
	app_perror( THIS_FILE, "sched_getparam error",
		    PJ_RETURN_OS_ERROR(rc));
	return;
    }
    tp.__sched_priority = max_prio;

    rc = sched_setscheduler(0, POLICY, &tp);
    if (rc != 0) {
	app_perror( THIS_FILE, "sched_setscheduler error",
		    PJ_RETURN_OS_ERROR(rc));
    }

    PJ_LOG(4, (THIS_FILE, "New process policy=%d, priority=%d",
	      policy, tp.__sched_priority));

    /*
     * Adjust thread scheduling algorithm and priority
     */
    rc = pthread_getschedparam(pthread_self(), &policy, &tp);
    if (rc != 0) {
	app_perror( THIS_FILE, "pthread_getschedparam error",
		    PJ_RETURN_OS_ERROR(rc));
	return;
    }

    PJ_LOG(4, (THIS_FILE, "Old thread policy=%d, priority=%d",
	      policy, tp.__sched_priority));

    policy = POLICY;
    tp.__sched_priority = max_prio;

    rc = pthread_setschedparam(pthread_self(), policy, &tp);
    if (rc != 0) {
	app_perror( THIS_FILE, "pthread_setschedparam error",
		    PJ_RETURN_OS_ERROR(rc));
	return;
    }

    PJ_LOG(4, (THIS_FILE, "New thread policy=%d, priority=%d",
	      policy, tp.__sched_priority));
}

#else
#  define boost_priority()
#endif


/*
 * This callback is called by media transport on receipt of RTP packet.
 */
static void on_rx_rtp(void *user_data, const void *pkt, pj_ssize_t size)
{
    struct media_stream *strm;
    pj_status_t status;
    const pjmedia_rtp_hdr *hdr;
    const void *payload;
    unsigned payload_len;

    strm = user_data;

    /* Discard packet if media is inactive */
    if (!strm->active)
	return;

    /* Check for errors */
    if (size < 0) {
	app_perror(THIS_FILE, "RTP recv() error", -size);
	return;
    }

    /* Decode RTP packet. */
    status = pjmedia_rtp_decode_rtp(&strm->in_sess, 
				    pkt, size, 
				    &hdr, &payload, &payload_len);
    if (status != PJ_SUCCESS) {
	app_perror(THIS_FILE, "RTP decode error", status);
	return;
    }

    //PJ_LOG(4,(THIS_FILE, "Rx seq=%d", pj_ntohs(hdr->seq)));

    /* Update the RTCP session. */
    pjmedia_rtcp_rx_rtp(&strm->rtcp, pj_ntohs(hdr->seq),
			pj_ntohl(hdr->ts), payload_len);

    /* Update RTP session */
    pjmedia_rtp_session_update(&strm->in_sess, hdr, NULL);

}

/*
 * This callback is called by media transport on receipt of RTCP packet.
 */
static void on_rx_rtcp(void *user_data, const void *pkt, pj_ssize_t size)
{
    struct media_stream *strm;

    strm = user_data;

    /* Discard packet if media is inactive */
    if (!strm->active)
	return;

    /* Check for errors */
    if (size < 0) {
	app_perror(THIS_FILE, "Error receiving RTCP packet", -size);
	return;
    }

    /* Update RTCP session */
    pjmedia_rtcp_rx_rtcp(&strm->rtcp, pkt, size);
}


/* 
 * Media thread 
 *
 * This is the thread to send and receive both RTP and RTCP packets.
 */
static int media_thread(void *arg)
{
    enum { RTCP_INTERVAL = 5000, RTCP_RAND = 2000 };
    struct media_stream *strm = arg;
    char packet[1500];
    unsigned msec_interval;
    pj_timestamp freq, next_rtp, next_rtcp;


    /* Boost thread priority if necessary */
    boost_priority();

    /* Let things settle */
    pj_thread_sleep(100);

    msec_interval = strm->samples_per_frame * 1000 / strm->clock_rate;
    pj_get_timestamp_freq(&freq);

    pj_get_timestamp(&next_rtp);
    next_rtp.u64 += (freq.u64 * msec_interval / 1000);

    next_rtcp = next_rtp;
    next_rtcp.u64 += (freq.u64 * (RTCP_INTERVAL+(pj_rand()%RTCP_RAND)) / 1000);


    while (!strm->thread_quit_flag) {
	pj_timestamp now, lesser;
	pj_time_val timeout;
	pj_bool_t send_rtp, send_rtcp;

	send_rtp = send_rtcp = PJ_FALSE;

	/* Determine how long to sleep */
	if (next_rtp.u64 < next_rtcp.u64) {
	    lesser = next_rtp;
	    send_rtp = PJ_TRUE;
	} else {
	    lesser = next_rtcp;
	    send_rtcp = PJ_TRUE;
	}

	pj_get_timestamp(&now);
	if (lesser.u64 <= now.u64) {
	    timeout.sec = timeout.msec = 0;
	    //printf("immediate "); fflush(stdout);
	} else {
	    pj_uint64_t tick_delay;
	    tick_delay = lesser.u64 - now.u64;
	    timeout.sec = 0;
	    timeout.msec = (pj_uint32_t)(tick_delay * 1000 / freq.u64);
	    pj_time_val_normalize(&timeout);

	    //printf("%d:%03d ", timeout.sec, timeout.msec); fflush(stdout);
	}

	/* Wait for next interval */
	//if (timeout.sec!=0 && timeout.msec!=0) {
	    pj_thread_sleep(PJ_TIME_VAL_MSEC(timeout));
	    if (strm->thread_quit_flag)
		break;
	//}

	pj_get_timestamp(&now);

	if (send_rtp || next_rtp.u64 <= now.u64) {
	    /*
	     * Time to send RTP packet.
	     */
	    pj_status_t status;
	    const void *p_hdr;
	    const pjmedia_rtp_hdr *hdr;
	    pj_ssize_t size;
	    int hdrlen;

	    /* Format RTP header */
	    status = pjmedia_rtp_encode_rtp( &strm->out_sess, strm->si.tx_pt,
					     0, /* marker bit */
					     strm->bytes_per_frame, 
					     strm->samples_per_frame,
					     &p_hdr, &hdrlen);
	    if (status == PJ_SUCCESS) {

		//PJ_LOG(4,(THIS_FILE, "\t\tTx seq=%d", pj_ntohs(hdr->seq)));
		
		hdr = (const pjmedia_rtp_hdr*) p_hdr;

		/* Copy RTP header to packet */
		pj_memcpy(packet, hdr, hdrlen);

		/* Zero the payload */
		pj_bzero(packet+hdrlen, strm->bytes_per_frame);

		/* Send RTP packet */
		size = hdrlen + strm->bytes_per_frame;
		status = pjmedia_transport_send_rtp(strm->transport, 
						    packet, size);
		if (status != PJ_SUCCESS)
		    app_perror(THIS_FILE, "Error sending RTP packet", status);

	    } else {
		pj_assert(!"RTP encode() error");
	    }

	    /* Update RTCP SR */
	    pjmedia_rtcp_tx_rtp( &strm->rtcp, (pj_uint16_t)strm->bytes_per_frame);

	    /* Schedule next send */
	    next_rtp.u64 += (msec_interval * freq.u64 / 1000);
	}


	if (send_rtcp || next_rtcp.u64 <= now.u64) {
	    /*
	     * Time to send RTCP packet.
	     */
	    pjmedia_rtcp_pkt *rtcp_pkt;
	    int rtcp_len;
	    pj_ssize_t size;
	    pj_status_t status;

	    /* Build RTCP packet */
	    pjmedia_rtcp_build_rtcp(&strm->rtcp, &rtcp_pkt, &rtcp_len);

    
	    /* Send packet */
	    size = rtcp_len;
	    status = pjmedia_transport_send_rtcp(strm->transport,
						 rtcp_pkt, size);
	    if (status != PJ_SUCCESS) {
		app_perror(THIS_FILE, "Error sending RTCP packet", status);
	    }
	    
	    /* Schedule next send */
    	    next_rtcp.u64 += (freq.u64 * (RTCP_INTERVAL+(pj_rand()%RTCP_RAND)) /
			      1000);
	}
    }

    return 0;
}


/* Callback to be called when SDP negotiation is done in the call: */
static void call_on_media_update( pjsip_inv_session *inv,
				  pj_status_t status)
{
    struct call *call;
    pj_pool_t *pool;
    struct media_stream *audio;
    const pjmedia_sdp_session *local_sdp, *remote_sdp;
    struct codec *codec_desc = NULL;
    unsigned i;

    call = inv->mod_data[mod_siprtp.id];
    pool = inv->dlg->pool;
    audio = &call->media[0];

    /* If this is a mid-call media update, then destroy existing media */
    if (audio->thread != NULL)
	destroy_call_media(call->index);


    /* Do nothing if media negotiation has failed */
    if (status != PJ_SUCCESS) {
	app_perror(THIS_FILE, "SDP negotiation failed", status);
	return;
    }

    
    /* Capture stream definition from the SDP */
    pjmedia_sdp_neg_get_active_local(inv->neg, &local_sdp);
    pjmedia_sdp_neg_get_active_remote(inv->neg, &remote_sdp);

    status = pjmedia_stream_info_from_sdp(&audio->si, inv->pool, app.med_endpt,
					  local_sdp, remote_sdp, 0);
    if (status != PJ_SUCCESS) {
	app_perror(THIS_FILE, "Error creating stream info from SDP", status);
	return;
    }

    /* Get the remainder of codec information from codec descriptor */
    if (audio->si.fmt.pt == app.audio_codec.pt)
	codec_desc = &app.audio_codec;
    else {
	/* Find the codec description in codec array */
	for (i=0; i<PJ_ARRAY_SIZE(audio_codecs); ++i) {
	    if (audio_codecs[i].pt == audio->si.fmt.pt) {
		codec_desc = &audio_codecs[i];
		break;
	    }
	}

	if (codec_desc == NULL) {
	    PJ_LOG(3, (THIS_FILE, "Error: Invalid codec payload type"));
	    return;
	}
    }

    audio->clock_rate = audio->si.fmt.clock_rate;
    audio->samples_per_frame = audio->clock_rate * codec_desc->ptime / 1000;
    audio->bytes_per_frame = codec_desc->bit_rate * codec_desc->ptime / 1000 / 8;


    pjmedia_rtp_session_init(&audio->out_sess, audio->si.tx_pt, 
			     pj_rand());
    pjmedia_rtp_session_init(&audio->in_sess, audio->si.fmt.pt, 0);
    pjmedia_rtcp_init(&audio->rtcp, "rtcp", audio->clock_rate, 
		      audio->samples_per_frame, 0);


    /* Attach media to transport */
    status = pjmedia_transport_attach(audio->transport, audio, 
				      &audio->si.rem_addr, 
				      &audio->si.rem_rtcp, 
				      sizeof(pj_sockaddr_in),
				      &on_rx_rtp,
				      &on_rx_rtcp);
    if (status != PJ_SUCCESS) {
	app_perror(THIS_FILE, "Error on pjmedia_transport_attach()", status);
	return;
    }

    /* Start media thread. */
    audio->thread_quit_flag = 0;
    status = pj_thread_create( inv->pool, "media", &media_thread, audio,
			       0, 0, &audio->thread);
    if (status != PJ_SUCCESS) {
	app_perror(THIS_FILE, "Error creating media thread", status);
	return;
    }

    /* Set the media as active */
    audio->active = PJ_TRUE;
}



/* Destroy call's media */
static void destroy_call_media(unsigned call_index)
{
    struct media_stream *audio = &app.call[call_index].media[0];

    if (audio) {
	audio->active = PJ_FALSE;

	if (audio->thread) {
	    audio->thread_quit_flag = 1;
	    pj_thread_join(audio->thread);
	    pj_thread_destroy(audio->thread);
	    audio->thread = NULL;
	    audio->thread_quit_flag = 0;
	}

	pjmedia_transport_detach(audio->transport, audio);
    }
}

 
/*****************************************************************************
 * USER INTERFACE STUFFS
 */

static void call_get_duration(int call_index, pj_time_val *dur)
{
    struct call *call = &app.call[call_index];
    pjsip_inv_session *inv;

    dur->sec = dur->msec = 0;

    if (!call)
	return;

    inv = call->inv;
    if (!inv)
	return;

    if (inv->state >= PJSIP_INV_STATE_CONFIRMED && call->connect_time.sec) {

	pj_gettimeofday(dur);
	PJ_TIME_VAL_SUB((*dur), call->connect_time);
    }
}


static const char *good_number(char *buf, pj_int32_t val)
{
    if (val < 1000) {
	pj_ansi_sprintf(buf, "%d", val);
    } else if (val < 1000000) {
	pj_ansi_sprintf(buf, "%d.%02dK", 
			val / 1000,
			(val % 1000) / 100);
    } else {
	pj_ansi_sprintf(buf, "%d.%02dM", 
			val / 1000000,
			(val % 1000000) / 10000);
    }

    return buf;
}



static void print_avg_stat(void)
{
#define MIN_(var,val)	   if ((int)val < (int)var) var = val
#define MAX_(var,val)	   if ((int)val > (int)var) var = val
#define AVG_(var,val)	   var = ( ((var * count) + val) / (count+1) )
#define BIGVAL		    0x7FFFFFFFL
    struct stat_entry
    {
	int min, avg, max;
    };

    struct stat_entry call_dur, call_pdd;
    pjmedia_rtcp_stat min_stat, avg_stat, max_stat;

    char srx_min[16], srx_avg[16], srx_max[16];
    char brx_min[16], brx_avg[16], brx_max[16];
    char stx_min[16], stx_avg[16], stx_max[16];
    char btx_min[16], btx_avg[16], btx_max[16];


    unsigned i, count;

    pj_bzero(&call_dur, sizeof(call_dur)); 
    call_dur.min = BIGVAL;

    pj_bzero(&call_pdd, sizeof(call_pdd)); 
    call_pdd.min = BIGVAL;

    pj_bzero(&min_stat, sizeof(min_stat));
    min_stat.rx.pkt = min_stat.tx.pkt = BIGVAL;
    min_stat.rx.bytes = min_stat.tx.bytes = BIGVAL;
    min_stat.rx.loss = min_stat.tx.loss = BIGVAL;
    min_stat.rx.dup = min_stat.tx.dup = BIGVAL;
    min_stat.rx.reorder = min_stat.tx.reorder = BIGVAL;
    min_stat.rx.jitter.min = min_stat.tx.jitter.min = BIGVAL;
    min_stat.rtt.min = BIGVAL;

    pj_bzero(&avg_stat, sizeof(avg_stat));
    pj_bzero(&max_stat, sizeof(max_stat));


    for (i=0, count=0; i<app.max_calls; ++i) {

	struct call *call = &app.call[i];
	struct media_stream *audio = &call->media[0];
	pj_time_val dur;
	unsigned msec_dur;

	if (call->inv == NULL || 
	    call->inv->state < PJSIP_INV_STATE_CONFIRMED ||
	    call->connect_time.sec == 0) 
	{
	    continue;
	}

	/* Duration */
	call_get_duration(i, &dur);
	msec_dur = PJ_TIME_VAL_MSEC(dur);

	MIN_(call_dur.min, msec_dur);
	MAX_(call_dur.max, msec_dur);
	AVG_(call_dur.avg, msec_dur);

	/* Connect delay */
	if (call->connect_time.sec) {
	    pj_time_val t = call->connect_time;
	    PJ_TIME_VAL_SUB(t, call->start_time);
	    msec_dur = PJ_TIME_VAL_MSEC(t);
	} else {
	    msec_dur = 10;
	}

	MIN_(call_pdd.min, msec_dur);
	MAX_(call_pdd.max, msec_dur);
	AVG_(call_pdd.avg, msec_dur);

	/* RX Statistisc: */

	/* Packets */
	MIN_(min_stat.rx.pkt, audio->rtcp.stat.rx.pkt);
	MAX_(max_stat.rx.pkt, audio->rtcp.stat.rx.pkt);
	AVG_(avg_stat.rx.pkt, audio->rtcp.stat.rx.pkt);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -