📄 transport_test.c
字号:
pj_timestamp total_rt_time;
int sent_request_count, recv_response_count;
pj_str_t call_id;
pj_timer_entry timeout_timer;
pj_timer_entry tx_timer;
pj_mutex_t *mutex;
} rt_test_data[16];
static char rt_target_uri[64];
static pj_bool_t rt_stop;
static pj_str_t rt_call_id;
static pj_bool_t rt_on_rx_request(pjsip_rx_data *rdata)
{
if (!pj_strncmp(&rdata->msg_info.cid->id, &rt_call_id, rt_call_id.slen)) {
pjsip_tx_data *tdata;
pjsip_response_addr res_addr;
pj_status_t status;
status = pjsip_endpt_create_response( endpt, rdata, 200, NULL, &tdata);
if (status != PJ_SUCCESS) {
app_perror(" error creating response", status);
return PJ_TRUE;
}
status = pjsip_get_response_addr( tdata->pool, rdata, &res_addr);
if (status != PJ_SUCCESS) {
app_perror(" error in get response address", status);
pjsip_tx_data_dec_ref(tdata);
return PJ_TRUE;
}
status = pjsip_endpt_send_response( endpt, &res_addr, tdata, NULL, NULL);
if (status != PJ_SUCCESS) {
app_perror(" error sending response", status);
pjsip_tx_data_dec_ref(tdata);
return PJ_TRUE;
}
return PJ_TRUE;
}
return PJ_FALSE;
}
static pj_status_t rt_send_request(int thread_id)
{
pj_status_t status;
pj_str_t target, from, to, contact, call_id;
pjsip_tx_data *tdata;
pj_time_val timeout_delay;
pj_mutex_lock(rt_test_data[thread_id].mutex);
/* Create a request message. */
target = pj_str(rt_target_uri);
from = pj_str(FROM_HDR);
to = pj_str(rt_target_uri);
contact = pj_str(CONTACT_HDR);
call_id = rt_test_data[thread_id].call_id;
status = pjsip_endpt_create_request( endpt, &pjsip_options_method,
&target, &from, &to,
&contact, &call_id, -1,
NULL, &tdata );
if (status != PJ_SUCCESS) {
app_perror(" error: unable to create request", status);
pj_mutex_unlock(rt_test_data[thread_id].mutex);
return -610;
}
/* Start time. */
pj_get_timestamp(&rt_test_data[thread_id].send_time);
/* Send the message (statelessly). */
status = pjsip_endpt_send_request_stateless( endpt, tdata, NULL, NULL);
if (status != PJ_SUCCESS) {
/* Immediate error! */
app_perror(" error: send request", status);
pjsip_tx_data_dec_ref(tdata);
pj_mutex_unlock(rt_test_data[thread_id].mutex);
return -620;
}
/* Update counter. */
rt_test_data[thread_id].sent_request_count++;
/* Set timeout timer. */
if (rt_test_data[thread_id].timeout_timer.user_data != NULL) {
pjsip_endpt_cancel_timer(endpt, &rt_test_data[thread_id].timeout_timer);
}
timeout_delay.sec = 100; timeout_delay.msec = 0;
rt_test_data[thread_id].timeout_timer.user_data = (void*)1;
pjsip_endpt_schedule_timer(endpt, &rt_test_data[thread_id].timeout_timer,
&timeout_delay);
pj_mutex_unlock(rt_test_data[thread_id].mutex);
return PJ_SUCCESS;
}
static pj_bool_t rt_on_rx_response(pjsip_rx_data *rdata)
{
if (!pj_strncmp(&rdata->msg_info.cid->id, &rt_call_id, rt_call_id.slen)) {
char *pos = pj_strchr(&rdata->msg_info.cid->id, '/')+1;
int thread_id = (*pos - '0');
pj_timestamp recv_time;
pj_mutex_lock(rt_test_data[thread_id].mutex);
/* Stop timer. */
pjsip_endpt_cancel_timer(endpt, &rt_test_data[thread_id].timeout_timer);
/* Update counter and end-time. */
rt_test_data[thread_id].recv_response_count++;
pj_get_timestamp(&recv_time);
pj_sub_timestamp(&recv_time, &rt_test_data[thread_id].send_time);
pj_add_timestamp(&rt_test_data[thread_id].total_rt_time, &recv_time);
if (!rt_stop) {
pj_time_val tx_delay = { 0, 0 };
pj_assert(rt_test_data[thread_id].tx_timer.user_data == NULL);
rt_test_data[thread_id].tx_timer.user_data = (void*)1;
pjsip_endpt_schedule_timer(endpt, &rt_test_data[thread_id].tx_timer,
&tx_delay);
}
pj_mutex_unlock(rt_test_data[thread_id].mutex);
return PJ_TRUE;
}
return PJ_FALSE;
}
static void rt_timeout_timer( pj_timer_heap_t *timer_heap,
struct pj_timer_entry *entry )
{
pj_mutex_lock(rt_test_data[entry->id].mutex);
PJ_UNUSED_ARG(timer_heap);
PJ_LOG(3,(THIS_FILE, " timeout waiting for response"));
rt_test_data[entry->id].timeout_timer.user_data = NULL;
if (rt_test_data[entry->id].tx_timer.user_data == NULL) {
pj_time_val delay = { 0, 0 };
rt_test_data[entry->id].tx_timer.user_data = (void*)1;
pjsip_endpt_schedule_timer(endpt, &rt_test_data[entry->id].tx_timer,
&delay);
}
pj_mutex_unlock(rt_test_data[entry->id].mutex);
}
static void rt_tx_timer( pj_timer_heap_t *timer_heap,
struct pj_timer_entry *entry )
{
pj_mutex_lock(rt_test_data[entry->id].mutex);
PJ_UNUSED_ARG(timer_heap);
pj_assert(rt_test_data[entry->id].tx_timer.user_data != NULL);
rt_test_data[entry->id].tx_timer.user_data = NULL;
rt_send_request(entry->id);
pj_mutex_unlock(rt_test_data[entry->id].mutex);
}
static int rt_worker_thread(void *arg)
{
int i;
pj_time_val poll_delay = { 0, 10 };
/* Sleep to allow main threads to run. */
pj_thread_sleep(10);
while (!rt_stop) {
pjsip_endpt_handle_events(endpt, &poll_delay);
}
/* Exhaust responses. */
for (i=0; i<100; ++i)
pjsip_endpt_handle_events(endpt, &poll_delay);
return 0;
}
int transport_rt_test( pjsip_transport_type_e tp_type,
pjsip_transport *ref_tp,
char *target_url,
int *lost)
{
enum { THREADS = 4, INTERVAL = 10 };
int i;
pj_status_t status;
pj_pool_t *pool;
pj_bool_t logger_enabled;
pj_timestamp zero_time, total_time;
unsigned usec_rt;
unsigned total_sent;
unsigned total_recv;
PJ_LOG(3,(THIS_FILE, " multithreaded round-trip test (%d threads)...",
THREADS));
PJ_LOG(3,(THIS_FILE, " this will take approx %d seconds, please wait..",
INTERVAL));
/* Make sure msg logger is disabled. */
logger_enabled = msg_logger_set_enabled(0);
/* Register module (if not yet registered) */
if (rt_module.id == -1) {
status = pjsip_endpt_register_module( endpt, &rt_module );
if (status != PJ_SUCCESS) {
app_perror(" error: unable to register module", status);
return -600;
}
}
/* Create pool for this test. */
pool = pjsip_endpt_create_pool(endpt, NULL, 4000, 4000);
if (!pool)
return -610;
/* Initialize static test data. */
pj_native_strcpy(rt_target_uri, target_url);
rt_call_id = pj_str("RT-Call-Id/");
rt_stop = PJ_FALSE;
/* Initialize thread data. */
for (i=0; i<THREADS; ++i) {
char buf[1];
pj_str_t str_id = { buf, 1 };
pj_bzero(&rt_test_data[i], sizeof(rt_test_data[i]));
/* Init timer entry */
rt_test_data[i].tx_timer.id = i;
rt_test_data[i].tx_timer.cb = &rt_tx_timer;
rt_test_data[i].timeout_timer.id = i;
rt_test_data[i].timeout_timer.cb = &rt_timeout_timer;
/* Generate Call-ID for each thread. */
rt_test_data[i].call_id.ptr = pj_pool_alloc(pool, rt_call_id.slen+1);
pj_strcpy(&rt_test_data[i].call_id, &rt_call_id);
buf[0] = '0' + i;
pj_strcat(&rt_test_data[i].call_id, &str_id);
/* Init mutex. */
status = pj_mutex_create_recursive(pool, "rt", &rt_test_data[i].mutex);
if (status != PJ_SUCCESS) {
app_perror(" error: unable to create mutex", status);
return -615;
}
/* Create thread, suspended. */
status = pj_thread_create(pool, "rttest%p", &rt_worker_thread, (void*)(long)i, 0,
PJ_THREAD_SUSPENDED, &rt_test_data[i].thread);
if (status != PJ_SUCCESS) {
app_perror(" error: unable to create thread", status);
return -620;
}
}
/* Start threads! */
for (i=0; i<THREADS; ++i) {
pj_time_val delay = {0,0};
pj_thread_resume(rt_test_data[i].thread);
/* Schedule first message transmissions. */
rt_test_data[i].tx_timer.user_data = (void*)1;
pjsip_endpt_schedule_timer(endpt, &rt_test_data[i].tx_timer, &delay);
}
/* Sleep for some time. */
pj_thread_sleep(INTERVAL * 1000);
/* Signal thread to stop. */
rt_stop = PJ_TRUE;
/* Wait threads to complete. */
for (i=0; i<THREADS; ++i) {
pj_thread_join(rt_test_data[i].thread);
pj_thread_destroy(rt_test_data[i].thread);
}
/* Destroy rt_test_data */
for (i=0; i<THREADS; ++i) {
pj_mutex_destroy(rt_test_data[i].mutex);
pjsip_endpt_cancel_timer(endpt, &rt_test_data[i].timeout_timer);
}
/* Gather statistics. */
pj_bzero(&total_time, sizeof(total_time));
pj_bzero(&zero_time, sizeof(zero_time));
usec_rt = total_sent = total_recv = 0;
for (i=0; i<THREADS; ++i) {
total_sent += rt_test_data[i].sent_request_count;
total_recv += rt_test_data[i].recv_response_count;
pj_add_timestamp(&total_time, &rt_test_data[i].total_rt_time);
}
/* Display statistics. */
if (total_recv)
total_time.u64 = total_time.u64/total_recv;
else
total_time.u64 = 0;
usec_rt = pj_elapsed_usec(&zero_time, &total_time);
PJ_LOG(3,(THIS_FILE, " done."));
PJ_LOG(3,(THIS_FILE, " total %d messages sent", total_sent));
PJ_LOG(3,(THIS_FILE, " average round-trip=%d usec", usec_rt));
pjsip_endpt_release_pool(endpt, pool);
*lost = total_sent-total_recv;
/* Flush events. */
flush_events(500);
/* Restore msg logger. */
msg_logger_set_enabled(logger_enabled);
return 0;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -