📄 ioq_perf.c
字号:
/* Create socket pair. */
TRACE_((THIS_FILE, " calling socketpair.."));
rc = app_socketpair(PJ_AF_INET, sock_type, 0,
&items[i].server_fd, &items[i].client_fd);
if (rc != PJ_SUCCESS) {
app_perror("...error: unable to create socket pair", rc);
return -20;
}
/* Register server socket to ioqueue. */
TRACE_((THIS_FILE, " register(1).."));
rc = pj_ioqueue_register_sock(pool, ioqueue,
items[i].server_fd,
&items[i], &ioqueue_callback,
&items[i].server_key);
if (rc != PJ_SUCCESS) {
app_perror("...error: registering server socket to ioqueue", rc);
return -60;
}
/* Register client socket to ioqueue. */
TRACE_((THIS_FILE, " register(2).."));
rc = pj_ioqueue_register_sock(pool, ioqueue,
items[i].client_fd,
&items[i], &ioqueue_callback,
&items[i].client_key);
if (rc != PJ_SUCCESS) {
app_perror("...error: registering server socket to ioqueue", rc);
return -70;
}
/* Start reading. */
TRACE_((THIS_FILE, " pj_ioqueue_recv.."));
bytes = items[i].buffer_size;
rc = pj_ioqueue_recv(items[i].server_key, &items[i].recv_op,
items[i].incoming_buffer, &bytes,
0);
if (rc != PJ_EPENDING) {
app_perror("...error: pj_ioqueue_recv", rc);
return -73;
}
/* Start writing. */
TRACE_((THIS_FILE, " pj_ioqueue_write.."));
bytes = items[i].buffer_size;
rc = pj_ioqueue_send(items[i].client_key, &items[i].send_op,
items[i].outgoing_buffer, &bytes, 0);
if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
app_perror("...error: pj_ioqueue_write", rc);
return -76;
}
items[i].has_pending_send = (rc==PJ_EPENDING);
}
/* Create the threads. */
for (i=0; i<thread_cnt; ++i) {
struct thread_arg *arg;
arg = (struct thread_arg*) pj_pool_zalloc(pool, sizeof(*arg));
arg->id = i;
arg->ioqueue = ioqueue;
arg->counter = 0;
rc = pj_thread_create( pool, NULL,
&worker_thread,
arg,
PJ_THREAD_DEFAULT_STACK_SIZE,
PJ_THREAD_SUSPENDED, &thread[i] );
if (rc != PJ_SUCCESS) {
app_perror("...error: unable to create thread", rc);
return -80;
}
}
/* Mark start time. */
rc = pj_get_timestamp(&start);
if (rc != PJ_SUCCESS)
return -90;
/* Start the thread. */
TRACE_((THIS_FILE, " resuming all threads.."));
for (i=0; i<thread_cnt; ++i) {
rc = pj_thread_resume(thread[i]);
if (rc != 0)
return -100;
}
/* Wait for MSEC_DURATION seconds.
* This should be as simple as pj_thread_sleep(MSEC_DURATION) actually,
* but unfortunately it doesn't work when system doesn't employ
* timeslicing for threads.
*/
TRACE_((THIS_FILE, " wait for few seconds.."));
do {
pj_thread_sleep(1);
/* Mark end time. */
rc = pj_get_timestamp(&stop);
if (thread_quit_flag) {
TRACE_((THIS_FILE, " transfer limit reached.."));
break;
}
if (pj_elapsed_usec(&start,&stop)<MSEC_DURATION * 1000) {
TRACE_((THIS_FILE, " time limit reached.."));
break;
}
} while (1);
/* Terminate all threads. */
TRACE_((THIS_FILE, " terminating all threads.."));
thread_quit_flag = 1;
for (i=0; i<thread_cnt; ++i) {
TRACE_((THIS_FILE, " join thread %d..", i));
pj_thread_join(thread[i]);
}
/* Close all sockets. */
TRACE_((THIS_FILE, " closing all sockets.."));
for (i=0; i<sockpair_cnt; ++i) {
pj_ioqueue_unregister(items[i].server_key);
pj_ioqueue_unregister(items[i].client_key);
}
/* Destroy threads */
for (i=0; i<thread_cnt; ++i) {
pj_thread_destroy(thread[i]);
}
/* Destroy ioqueue. */
TRACE_((THIS_FILE, " destroying ioqueue.."));
pj_ioqueue_destroy(ioqueue);
/* Calculate actual time in usec. */
total_elapsed_usec = pj_elapsed_usec(&start, &stop);
/* Calculate total bytes received. */
total_received = 0;
for (i=0; i<sockpair_cnt; ++i) {
total_received = items[i].bytes_recv;
}
/* bandwidth = total_received*1000/total_elapsed_usec */
bandwidth = total_received;
pj_highprec_mul(bandwidth, 1000);
pj_highprec_div(bandwidth, total_elapsed_usec);
*p_bandwidth = (pj_uint32_t)bandwidth;
PJ_LOG(3,(THIS_FILE, " %.4s %2d %2d %8d KB/s",
type_name, thread_cnt, sockpair_cnt,
*p_bandwidth));
/* Done. */
pj_pool_release(pool);
TRACE_((THIS_FILE, " done.."));
return 0;
}
/*
* main test entry.
*/
int ioqueue_perf_test(void)
{
enum { BUF_SIZE = 512 };
int i, rc;
struct {
int type;
const char *type_name;
int thread_cnt;
int sockpair_cnt;
} test_param[] =
{
{ PJ_SOCK_DGRAM, "udp", 1, 1},
{ PJ_SOCK_DGRAM, "udp", 1, 2},
{ PJ_SOCK_DGRAM, "udp", 1, 4},
{ PJ_SOCK_DGRAM, "udp", 1, 8},
{ PJ_SOCK_DGRAM, "udp", 2, 1},
{ PJ_SOCK_DGRAM, "udp", 2, 2},
{ PJ_SOCK_DGRAM, "udp", 2, 4},
{ PJ_SOCK_DGRAM, "udp", 2, 8},
{ PJ_SOCK_DGRAM, "udp", 4, 1},
{ PJ_SOCK_DGRAM, "udp", 4, 2},
{ PJ_SOCK_DGRAM, "udp", 4, 4},
{ PJ_SOCK_DGRAM, "udp", 4, 8},
{ PJ_SOCK_DGRAM, "udp", 4, 16},
{ PJ_SOCK_STREAM, "tcp", 1, 1},
{ PJ_SOCK_STREAM, "tcp", 1, 2},
{ PJ_SOCK_STREAM, "tcp", 1, 4},
{ PJ_SOCK_STREAM, "tcp", 1, 8},
{ PJ_SOCK_STREAM, "tcp", 2, 1},
{ PJ_SOCK_STREAM, "tcp", 2, 2},
{ PJ_SOCK_STREAM, "tcp", 2, 4},
{ PJ_SOCK_STREAM, "tcp", 2, 8},
{ PJ_SOCK_STREAM, "tcp", 4, 1},
{ PJ_SOCK_STREAM, "tcp", 4, 2},
{ PJ_SOCK_STREAM, "tcp", 4, 4},
{ PJ_SOCK_STREAM, "tcp", 4, 8},
{ PJ_SOCK_STREAM, "tcp", 4, 16},
/*
{ PJ_SOCK_DGRAM, "udp", 32, 1},
{ PJ_SOCK_DGRAM, "udp", 32, 1},
{ PJ_SOCK_DGRAM, "udp", 32, 1},
{ PJ_SOCK_DGRAM, "udp", 32, 1},
{ PJ_SOCK_DGRAM, "udp", 1, 32},
{ PJ_SOCK_DGRAM, "udp", 1, 32},
{ PJ_SOCK_DGRAM, "udp", 1, 32},
{ PJ_SOCK_DGRAM, "udp", 1, 32},
{ PJ_SOCK_STREAM, "tcp", 32, 1},
{ PJ_SOCK_STREAM, "tcp", 32, 1},
{ PJ_SOCK_STREAM, "tcp", 32, 1},
{ PJ_SOCK_STREAM, "tcp", 32, 1},
{ PJ_SOCK_STREAM, "tcp", 1, 32},
{ PJ_SOCK_STREAM, "tcp", 1, 32},
{ PJ_SOCK_STREAM, "tcp", 1, 32},
{ PJ_SOCK_STREAM, "tcp", 1, 32},
*/
};
pj_size_t best_bandwidth;
int best_index = 0;
PJ_LOG(3,(THIS_FILE, " Benchmarking %s ioqueue:", pj_ioqueue_name()));
PJ_LOG(3,(THIS_FILE, " ======================================="));
PJ_LOG(3,(THIS_FILE, " Type Threads Skt.Pairs Bandwidth"));
PJ_LOG(3,(THIS_FILE, " ======================================="));
best_bandwidth = 0;
for (i=0; i<(int)(sizeof(test_param)/sizeof(test_param[0])); ++i) {
pj_size_t bandwidth;
rc = perform_test(test_param[i].type,
test_param[i].type_name,
test_param[i].thread_cnt,
test_param[i].sockpair_cnt,
BUF_SIZE,
&bandwidth);
if (rc != 0)
return rc;
if (bandwidth > best_bandwidth)
best_bandwidth = bandwidth, best_index = i;
/* Give it a rest before next test, to allow system to close the
* sockets properly.
*/
pj_thread_sleep(500);
}
PJ_LOG(3,(THIS_FILE,
" Best: Type=%s Threads=%d, Skt.Pairs=%d, Bandwidth=%u KB/s",
test_param[best_index].type_name,
test_param[best_index].thread_cnt,
test_param[best_index].sockpair_cnt,
best_bandwidth));
PJ_LOG(3,(THIS_FILE, " (Note: packet size=%d, total errors=%u)",
BUF_SIZE, last_error_counter));
return 0;
}
#else
/* To prevent warning about "translation unit is empty"
* when this test is disabled.
*/
int dummy_uiq_perf_test;
#endif /* INCLUDE_IOQUEUE_PERF_TEST */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -