📄 transport_test.c
字号:
pj_timestamp total_rt_time; int sent_request_count, recv_response_count; pj_str_t call_id; pj_timer_entry timeout_timer; pj_timer_entry tx_timer; pj_mutex_t *mutex;} rt_test_data[16];static char rt_target_uri[64];static pj_bool_t rt_stop;static pj_str_t rt_call_id;static pj_bool_t rt_on_rx_request(pjsip_rx_data *rdata){ if (!pj_strncmp(&rdata->msg_info.cid->id, &rt_call_id, rt_call_id.slen)) { pjsip_tx_data *tdata; pjsip_response_addr res_addr; pj_status_t status; status = pjsip_endpt_create_response( endpt, rdata, 200, NULL, &tdata); if (status != PJ_SUCCESS) { app_perror(" error creating response", status); return PJ_TRUE; } status = pjsip_get_response_addr( tdata->pool, rdata, &res_addr); if (status != PJ_SUCCESS) { app_perror(" error in get response address", status); pjsip_tx_data_dec_ref(tdata); return PJ_TRUE; } status = pjsip_endpt_send_response( endpt, &res_addr, tdata, NULL, NULL); if (status != PJ_SUCCESS) { app_perror(" error sending response", status); pjsip_tx_data_dec_ref(tdata); return PJ_TRUE; } return PJ_TRUE; } return PJ_FALSE;}static pj_status_t rt_send_request(int thread_id){ pj_status_t status; pj_str_t target, from, to, contact, call_id; pjsip_tx_data *tdata; pj_time_val timeout_delay; pj_mutex_lock(rt_test_data[thread_id].mutex); /* Create a request message. */ target = pj_str(rt_target_uri); from = pj_str(FROM_HDR); to = pj_str(rt_target_uri); contact = pj_str(CONTACT_HDR); call_id = rt_test_data[thread_id].call_id; status = pjsip_endpt_create_request( endpt, &pjsip_options_method, &target, &from, &to, &contact, &call_id, -1, NULL, &tdata ); if (status != PJ_SUCCESS) { app_perror(" error: unable to create request", status); pj_mutex_unlock(rt_test_data[thread_id].mutex); return -610; } /* Start time. */ pj_get_timestamp(&rt_test_data[thread_id].send_time); /* Send the message (statelessly). */ status = pjsip_endpt_send_request_stateless( endpt, tdata, NULL, NULL); if (status != PJ_SUCCESS) { /* Immediate error! */ app_perror(" error: send request", status); pjsip_tx_data_dec_ref(tdata); pj_mutex_unlock(rt_test_data[thread_id].mutex); return -620; } /* Update counter. */ rt_test_data[thread_id].sent_request_count++; /* Set timeout timer. */ if (rt_test_data[thread_id].timeout_timer.user_data != NULL) { pjsip_endpt_cancel_timer(endpt, &rt_test_data[thread_id].timeout_timer); } timeout_delay.sec = 100; timeout_delay.msec = 0; rt_test_data[thread_id].timeout_timer.user_data = (void*)1; pjsip_endpt_schedule_timer(endpt, &rt_test_data[thread_id].timeout_timer, &timeout_delay); pj_mutex_unlock(rt_test_data[thread_id].mutex); return PJ_SUCCESS;}static pj_bool_t rt_on_rx_response(pjsip_rx_data *rdata){ if (!pj_strncmp(&rdata->msg_info.cid->id, &rt_call_id, rt_call_id.slen)) { char *pos = pj_strchr(&rdata->msg_info.cid->id, '/')+1; int thread_id = (*pos - '0'); pj_timestamp recv_time; pj_mutex_lock(rt_test_data[thread_id].mutex); /* Stop timer. */ pjsip_endpt_cancel_timer(endpt, &rt_test_data[thread_id].timeout_timer); /* Update counter and end-time. */ rt_test_data[thread_id].recv_response_count++; pj_get_timestamp(&recv_time); pj_sub_timestamp(&recv_time, &rt_test_data[thread_id].send_time); pj_add_timestamp(&rt_test_data[thread_id].total_rt_time, &recv_time); if (!rt_stop) { pj_time_val tx_delay = { 0, 0 }; pj_assert(rt_test_data[thread_id].tx_timer.user_data == NULL); rt_test_data[thread_id].tx_timer.user_data = (void*)1; pjsip_endpt_schedule_timer(endpt, &rt_test_data[thread_id].tx_timer, &tx_delay); } pj_mutex_unlock(rt_test_data[thread_id].mutex); return PJ_TRUE; } return PJ_FALSE;}static void rt_timeout_timer( pj_timer_heap_t *timer_heap, struct pj_timer_entry *entry ){ pj_mutex_lock(rt_test_data[entry->id].mutex); PJ_UNUSED_ARG(timer_heap); PJ_LOG(3,(THIS_FILE, " timeout waiting for response")); rt_test_data[entry->id].timeout_timer.user_data = NULL; if (rt_test_data[entry->id].tx_timer.user_data == NULL) { pj_time_val delay = { 0, 0 }; rt_test_data[entry->id].tx_timer.user_data = (void*)1; pjsip_endpt_schedule_timer(endpt, &rt_test_data[entry->id].tx_timer, &delay); } pj_mutex_unlock(rt_test_data[entry->id].mutex);}static void rt_tx_timer( pj_timer_heap_t *timer_heap, struct pj_timer_entry *entry ){ pj_mutex_lock(rt_test_data[entry->id].mutex); PJ_UNUSED_ARG(timer_heap); pj_assert(rt_test_data[entry->id].tx_timer.user_data != NULL); rt_test_data[entry->id].tx_timer.user_data = NULL; rt_send_request(entry->id); pj_mutex_unlock(rt_test_data[entry->id].mutex);}static int rt_worker_thread(void *arg){ int i; pj_time_val poll_delay = { 0, 10 }; /* Sleep to allow main threads to run. */ pj_thread_sleep(10); while (!rt_stop) { pjsip_endpt_handle_events(endpt, &poll_delay); } /* Exhaust responses. */ for (i=0; i<100; ++i) pjsip_endpt_handle_events(endpt, &poll_delay); return 0;}int transport_rt_test( pjsip_transport_type_e tp_type, pjsip_transport *ref_tp, char *target_url, int *lost){ enum { THREADS = 4, INTERVAL = 10 }; int i; pj_status_t status; pj_pool_t *pool; pj_bool_t logger_enabled; pj_timestamp zero_time, total_time; unsigned usec_rt; unsigned total_sent; unsigned total_recv; PJ_LOG(3,(THIS_FILE, " multithreaded round-trip test (%d threads)...", THREADS)); PJ_LOG(3,(THIS_FILE, " this will take approx %d seconds, please wait..", INTERVAL)); /* Make sure msg logger is disabled. */ logger_enabled = msg_logger_set_enabled(0); /* Register module (if not yet registered) */ if (rt_module.id == -1) { status = pjsip_endpt_register_module( endpt, &rt_module ); if (status != PJ_SUCCESS) { app_perror(" error: unable to register module", status); return -600; } } /* Create pool for this test. */ pool = pjsip_endpt_create_pool(endpt, NULL, 4000, 4000); if (!pool) return -610; /* Initialize static test data. */ pj_native_strcpy(rt_target_uri, target_url); rt_call_id = pj_str("RT-Call-Id/"); rt_stop = PJ_FALSE; /* Initialize thread data. */ for (i=0; i<THREADS; ++i) { char buf[1]; pj_str_t str_id = { buf, 1 }; pj_bzero(&rt_test_data[i], sizeof(rt_test_data[i])); /* Init timer entry */ rt_test_data[i].tx_timer.id = i; rt_test_data[i].tx_timer.cb = &rt_tx_timer; rt_test_data[i].timeout_timer.id = i; rt_test_data[i].timeout_timer.cb = &rt_timeout_timer; /* Generate Call-ID for each thread. */ rt_test_data[i].call_id.ptr = pj_pool_alloc(pool, rt_call_id.slen+1); pj_strcpy(&rt_test_data[i].call_id, &rt_call_id); buf[0] = '0' + i; pj_strcat(&rt_test_data[i].call_id, &str_id); /* Init mutex. */ status = pj_mutex_create_recursive(pool, "rt", &rt_test_data[i].mutex); if (status != PJ_SUCCESS) { app_perror(" error: unable to create mutex", status); return -615; } /* Create thread, suspended. */ status = pj_thread_create(pool, "rttest%p", &rt_worker_thread, (void*)(long)i, 0, PJ_THREAD_SUSPENDED, &rt_test_data[i].thread); if (status != PJ_SUCCESS) { app_perror(" error: unable to create thread", status); return -620; } } /* Start threads! */ for (i=0; i<THREADS; ++i) { pj_time_val delay = {0,0}; pj_thread_resume(rt_test_data[i].thread); /* Schedule first message transmissions. */ rt_test_data[i].tx_timer.user_data = (void*)1; pjsip_endpt_schedule_timer(endpt, &rt_test_data[i].tx_timer, &delay); } /* Sleep for some time. */ pj_thread_sleep(INTERVAL * 1000); /* Signal thread to stop. */ rt_stop = PJ_TRUE; /* Wait threads to complete. */ for (i=0; i<THREADS; ++i) { pj_thread_join(rt_test_data[i].thread); pj_thread_destroy(rt_test_data[i].thread); } /* Destroy rt_test_data */ for (i=0; i<THREADS; ++i) { pj_mutex_destroy(rt_test_data[i].mutex); pjsip_endpt_cancel_timer(endpt, &rt_test_data[i].timeout_timer); } /* Gather statistics. */ pj_bzero(&total_time, sizeof(total_time)); pj_bzero(&zero_time, sizeof(zero_time)); usec_rt = total_sent = total_recv = 0; for (i=0; i<THREADS; ++i) { total_sent += rt_test_data[i].sent_request_count; total_recv += rt_test_data[i].recv_response_count; pj_add_timestamp(&total_time, &rt_test_data[i].total_rt_time); } /* Display statistics. */ if (total_recv) total_time.u64 = total_time.u64/total_recv; else total_time.u64 = 0; usec_rt = pj_elapsed_usec(&zero_time, &total_time); PJ_LOG(3,(THIS_FILE, " done.")); PJ_LOG(3,(THIS_FILE, " total %d messages sent", total_sent)); PJ_LOG(3,(THIS_FILE, " average round-trip=%d usec", usec_rt)); pjsip_endpt_release_pool(endpt, pool); *lost = total_sent-total_recv; /* Flush events. */ flush_events(500); /* Restore msg logger. */ msg_logger_set_enabled(logger_enabled); return 0;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -