⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 transport_test.c

📁 一个开源SIP协议栈
💻 C
📖 第 1 页 / 共 2 页
字号:
    pj_timestamp total_rt_time;
    int sent_request_count, recv_response_count;
    pj_str_t call_id;
    pj_timer_entry timeout_timer;
    pj_timer_entry tx_timer;
    pj_mutex_t *mutex;
} rt_test_data[16];

static char	 rt_target_uri[64];
static pj_bool_t rt_stop;
static pj_str_t  rt_call_id;

static pj_bool_t rt_on_rx_request(pjsip_rx_data *rdata)
{
    if (!pj_strncmp(&rdata->msg_info.cid->id, &rt_call_id, rt_call_id.slen)) {
	pjsip_tx_data *tdata;
	pjsip_response_addr res_addr;
	pj_status_t status;

	status = pjsip_endpt_create_response( endpt, rdata, 200, NULL, &tdata);
	if (status != PJ_SUCCESS) {
	    app_perror("    error creating response", status);
	    return PJ_TRUE;
	}
	status = pjsip_get_response_addr( tdata->pool, rdata, &res_addr);
	if (status != PJ_SUCCESS) {
	    app_perror("    error in get response address", status);
	    pjsip_tx_data_dec_ref(tdata);
	    return PJ_TRUE;
	}
	status = pjsip_endpt_send_response( endpt, &res_addr, tdata, NULL, NULL);
	if (status != PJ_SUCCESS) {
	    app_perror("    error sending response", status);
	    pjsip_tx_data_dec_ref(tdata);
	    return PJ_TRUE;
	}
	return PJ_TRUE;
	
    }
    return PJ_FALSE;
}

static pj_status_t rt_send_request(int thread_id)
{
    pj_status_t status;
    pj_str_t target, from, to, contact, call_id;
    pjsip_tx_data *tdata;
    pj_time_val timeout_delay;

    pj_mutex_lock(rt_test_data[thread_id].mutex);

    /* Create a request message. */
    target = pj_str(rt_target_uri);
    from = pj_str(FROM_HDR);
    to = pj_str(rt_target_uri);
    contact = pj_str(CONTACT_HDR);
    call_id = rt_test_data[thread_id].call_id;

    status = pjsip_endpt_create_request( endpt, &pjsip_options_method, 
					 &target, &from, &to,
					 &contact, &call_id, -1, 
					 NULL, &tdata );
    if (status != PJ_SUCCESS) {
	app_perror("    error: unable to create request", status);
	pj_mutex_unlock(rt_test_data[thread_id].mutex);
	return -610;
    }

    /* Start time. */
    pj_get_timestamp(&rt_test_data[thread_id].send_time);

    /* Send the message (statelessly). */
    status = pjsip_endpt_send_request_stateless( endpt, tdata, NULL, NULL);
    if (status != PJ_SUCCESS) {
	/* Immediate error! */
	app_perror("    error: send request", status);
	pjsip_tx_data_dec_ref(tdata);
	pj_mutex_unlock(rt_test_data[thread_id].mutex);
	return -620;
    }

    /* Update counter. */
    rt_test_data[thread_id].sent_request_count++;

    /* Set timeout timer. */
    if (rt_test_data[thread_id].timeout_timer.user_data != NULL) {
	pjsip_endpt_cancel_timer(endpt, &rt_test_data[thread_id].timeout_timer);
    }
    timeout_delay.sec = 100; timeout_delay.msec = 0;
    rt_test_data[thread_id].timeout_timer.user_data = (void*)1;
    pjsip_endpt_schedule_timer(endpt, &rt_test_data[thread_id].timeout_timer,
			       &timeout_delay);

    pj_mutex_unlock(rt_test_data[thread_id].mutex);
    return PJ_SUCCESS;
}

static pj_bool_t rt_on_rx_response(pjsip_rx_data *rdata)
{
    if (!pj_strncmp(&rdata->msg_info.cid->id, &rt_call_id, rt_call_id.slen)) {
	char *pos = pj_strchr(&rdata->msg_info.cid->id, '/')+1;
	int thread_id = (*pos - '0');
	pj_timestamp recv_time;

	pj_mutex_lock(rt_test_data[thread_id].mutex);

	/* Stop timer. */
	pjsip_endpt_cancel_timer(endpt, &rt_test_data[thread_id].timeout_timer);

	/* Update counter and end-time. */
	rt_test_data[thread_id].recv_response_count++;
	pj_get_timestamp(&recv_time);

	pj_sub_timestamp(&recv_time, &rt_test_data[thread_id].send_time);
	pj_add_timestamp(&rt_test_data[thread_id].total_rt_time, &recv_time);

	if (!rt_stop) {
	    pj_time_val tx_delay = { 0, 0 };
	    pj_assert(rt_test_data[thread_id].tx_timer.user_data == NULL);
	    rt_test_data[thread_id].tx_timer.user_data = (void*)1;
	    pjsip_endpt_schedule_timer(endpt, &rt_test_data[thread_id].tx_timer,
				       &tx_delay);
	}

	pj_mutex_unlock(rt_test_data[thread_id].mutex);

	return PJ_TRUE;
    }
    return PJ_FALSE;
}

static void rt_timeout_timer( pj_timer_heap_t *timer_heap,
			      struct pj_timer_entry *entry )
{
    pj_mutex_lock(rt_test_data[entry->id].mutex);

    PJ_UNUSED_ARG(timer_heap);
    PJ_LOG(3,(THIS_FILE, "    timeout waiting for response"));
    rt_test_data[entry->id].timeout_timer.user_data = NULL;
    
    if (rt_test_data[entry->id].tx_timer.user_data == NULL) {
	pj_time_val delay = { 0, 0 };
	rt_test_data[entry->id].tx_timer.user_data = (void*)1;
	pjsip_endpt_schedule_timer(endpt, &rt_test_data[entry->id].tx_timer,
				   &delay);
    }

    pj_mutex_unlock(rt_test_data[entry->id].mutex);
}

static void rt_tx_timer( pj_timer_heap_t *timer_heap,
			 struct pj_timer_entry *entry )
{
    pj_mutex_lock(rt_test_data[entry->id].mutex);

    PJ_UNUSED_ARG(timer_heap);
    pj_assert(rt_test_data[entry->id].tx_timer.user_data != NULL);
    rt_test_data[entry->id].tx_timer.user_data = NULL;
    rt_send_request(entry->id);

    pj_mutex_unlock(rt_test_data[entry->id].mutex);
}


static int rt_worker_thread(void *arg)
{
    int i;
    pj_time_val poll_delay = { 0, 10 };

    /* Sleep to allow main threads to run. */
    pj_thread_sleep(10);

    while (!rt_stop) {
	pjsip_endpt_handle_events(endpt, &poll_delay);
    }

    /* Exhaust responses. */
    for (i=0; i<100; ++i)
	pjsip_endpt_handle_events(endpt, &poll_delay);

    return 0;
}

int transport_rt_test( pjsip_transport_type_e tp_type,
		       pjsip_transport *ref_tp,
		       char *target_url,
		       int *lost)
{
    enum { THREADS = 4, INTERVAL = 10 };
    int i;
    pj_status_t status;
    pj_pool_t *pool;
    pj_bool_t logger_enabled;

    pj_timestamp zero_time, total_time;
    unsigned usec_rt;
    unsigned total_sent;
    unsigned total_recv;

    PJ_LOG(3,(THIS_FILE, "  multithreaded round-trip test (%d threads)...",
		  THREADS));
    PJ_LOG(3,(THIS_FILE, "    this will take approx %d seconds, please wait..",
		INTERVAL));

    /* Make sure msg logger is disabled. */
    logger_enabled = msg_logger_set_enabled(0);

    /* Register module (if not yet registered) */
    if (rt_module.id == -1) {
	status = pjsip_endpt_register_module( endpt, &rt_module );
	if (status != PJ_SUCCESS) {
	    app_perror("   error: unable to register module", status);
	    return -600;
	}
    }

    /* Create pool for this test. */
    pool = pjsip_endpt_create_pool(endpt, NULL, 4000, 4000);
    if (!pool)
	return -610;

    /* Initialize static test data. */
    pj_native_strcpy(rt_target_uri, target_url);
    rt_call_id = pj_str("RT-Call-Id/");
    rt_stop = PJ_FALSE;

    /* Initialize thread data. */
    for (i=0; i<THREADS; ++i) {
	char buf[1];
	pj_str_t str_id = { buf, 1 };

	pj_bzero(&rt_test_data[i], sizeof(rt_test_data[i]));

	/* Init timer entry */
	rt_test_data[i].tx_timer.id = i;
	rt_test_data[i].tx_timer.cb = &rt_tx_timer;
	rt_test_data[i].timeout_timer.id = i;
	rt_test_data[i].timeout_timer.cb = &rt_timeout_timer;

	/* Generate Call-ID for each thread. */
	rt_test_data[i].call_id.ptr = pj_pool_alloc(pool, rt_call_id.slen+1);
	pj_strcpy(&rt_test_data[i].call_id, &rt_call_id);
	buf[0] = '0' + i;
	pj_strcat(&rt_test_data[i].call_id, &str_id);

	/* Init mutex. */
	status = pj_mutex_create_recursive(pool, "rt", &rt_test_data[i].mutex);
	if (status != PJ_SUCCESS) {
	    app_perror("   error: unable to create mutex", status);
	    return -615;
	}

	/* Create thread, suspended. */
	status = pj_thread_create(pool, "rttest%p", &rt_worker_thread, (void*)(long)i, 0,
				  PJ_THREAD_SUSPENDED, &rt_test_data[i].thread);
	if (status != PJ_SUCCESS) {
	    app_perror("   error: unable to create thread", status);
	    return -620;
	}
    }

    /* Start threads! */
    for (i=0; i<THREADS; ++i) {
	pj_time_val delay = {0,0};
	pj_thread_resume(rt_test_data[i].thread);

	/* Schedule first message transmissions. */
	rt_test_data[i].tx_timer.user_data = (void*)1;
	pjsip_endpt_schedule_timer(endpt, &rt_test_data[i].tx_timer, &delay);
    }

    /* Sleep for some time. */
    pj_thread_sleep(INTERVAL * 1000);

    /* Signal thread to stop. */
    rt_stop = PJ_TRUE;

    /* Wait threads to complete. */
    for (i=0; i<THREADS; ++i) {
	pj_thread_join(rt_test_data[i].thread);
	pj_thread_destroy(rt_test_data[i].thread);
    }

    /* Destroy rt_test_data */
    for (i=0; i<THREADS; ++i) {
	pj_mutex_destroy(rt_test_data[i].mutex);
	pjsip_endpt_cancel_timer(endpt, &rt_test_data[i].timeout_timer);
    }

    /* Gather statistics. */
    pj_bzero(&total_time, sizeof(total_time));
    pj_bzero(&zero_time, sizeof(zero_time));
    usec_rt = total_sent = total_recv = 0;
    for (i=0; i<THREADS; ++i) {
	total_sent += rt_test_data[i].sent_request_count;
	total_recv +=  rt_test_data[i].recv_response_count;
	pj_add_timestamp(&total_time, &rt_test_data[i].total_rt_time);
    }

    /* Display statistics. */
    if (total_recv)
	total_time.u64 = total_time.u64/total_recv;
    else
	total_time.u64 = 0;
    usec_rt = pj_elapsed_usec(&zero_time, &total_time);
    PJ_LOG(3,(THIS_FILE, "    done."));
    PJ_LOG(3,(THIS_FILE, "    total %d messages sent", total_sent));
    PJ_LOG(3,(THIS_FILE, "    average round-trip=%d usec", usec_rt));

    pjsip_endpt_release_pool(endpt, pool);

    *lost = total_sent-total_recv;

    /* Flush events. */
    flush_events(500);

    /* Restore msg logger. */
    msg_logger_set_enabled(logger_enabled);

    return 0;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -