📄 ioq_perf.c
字号:
/* $Id: ioq_perf.c 1266 2007-05-11 15:14:34Z bennylp $ */
/*
* Copyright (C)2003-2007 Benny Prijono <benny@prijono.org>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include "test.h"
#include <pjlib.h>
#include <pj/compat/high_precision.h>
/**
* \page page_pjlib_ioqueue_perf_test Test: I/O Queue Performance
*
* Test the performance of the I/O queue, using typical producer
* consumer test. The test should examine the effect of using multiple
* threads on the performance.
*
* This file is <b>pjlib-test/ioq_perf.c</b>
*
* \include pjlib-test/ioq_perf.c
*/
#if INCLUDE_IOQUEUE_PERF_TEST
#ifdef _MSC_VER
# pragma warning ( disable: 4204) // non-constant aggregate initializer
#endif
#define THIS_FILE "ioq_perf"
//#define TRACE_(expr) PJ_LOG(3,expr)
#define TRACE_(expr)
static pj_bool_t thread_quit_flag;
static pj_status_t last_error;
static unsigned last_error_counter;
/* Descriptor for each producer/consumer pair. */
typedef struct test_item
{
pj_sock_t server_fd,
client_fd;
pj_ioqueue_t *ioqueue;
pj_ioqueue_key_t *server_key,
*client_key;
pj_ioqueue_op_key_t recv_op,
send_op;
int has_pending_send;
pj_size_t buffer_size;
char *outgoing_buffer;
char *incoming_buffer;
pj_size_t bytes_sent,
bytes_recv;
} test_item;
/* Callback when data has been read.
* Increment item->bytes_recv and ready to read the next data.
*/
static void on_read_complete(pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
pj_ssize_t bytes_read)
{
test_item *item = (test_item*)pj_ioqueue_get_user_data(key);
pj_status_t rc;
int data_is_available = 1;
//TRACE_((THIS_FILE, " read complete, bytes_read=%d", bytes_read));
do {
if (thread_quit_flag)
return;
if (bytes_read < 0) {
pj_status_t rc = -bytes_read;
char errmsg[PJ_ERR_MSG_SIZE];
if (rc != last_error) {
//last_error = rc;
pj_strerror(rc, errmsg, sizeof(errmsg));
PJ_LOG(3,(THIS_FILE,"...error: read error, bytes_read=%d (%s)",
bytes_read, errmsg));
PJ_LOG(3,(THIS_FILE,
".....additional info: total read=%u, total sent=%u",
item->bytes_recv, item->bytes_sent));
} else {
last_error_counter++;
}
bytes_read = 0;
} else if (bytes_read == 0) {
PJ_LOG(3,(THIS_FILE, "...socket has closed!"));
}
item->bytes_recv += bytes_read;
/* To assure that the test quits, even if main thread
* doesn't have time to run.
*/
if (item->bytes_recv > item->buffer_size * 10000)
thread_quit_flag = 1;
bytes_read = item->buffer_size;
rc = pj_ioqueue_recv( key, op_key,
item->incoming_buffer, &bytes_read, 0 );
if (rc == PJ_SUCCESS) {
data_is_available = 1;
} else if (rc == PJ_EPENDING) {
data_is_available = 0;
} else {
data_is_available = 0;
if (rc != last_error) {
last_error = rc;
app_perror("...error: read error(1)", rc);
} else {
last_error_counter++;
}
}
if (!item->has_pending_send) {
pj_ssize_t sent = item->buffer_size;
rc = pj_ioqueue_send(item->client_key, &item->send_op,
item->outgoing_buffer, &sent, 0);
if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
app_perror("...error: write error", rc);
}
item->has_pending_send = (rc==PJ_EPENDING);
}
} while (data_is_available);
}
/* Callback when data has been written.
* Increment item->bytes_sent and write the next data.
*/
static void on_write_complete(pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
pj_ssize_t bytes_sent)
{
test_item *item = (test_item*) pj_ioqueue_get_user_data(key);
//TRACE_((THIS_FILE, " write complete: sent = %d", bytes_sent));
if (thread_quit_flag)
return;
item->has_pending_send = 0;
item->bytes_sent += bytes_sent;
if (bytes_sent <= 0) {
PJ_LOG(3,(THIS_FILE, "...error: sending stopped. bytes_sent=%d",
bytes_sent));
}
else {
pj_status_t rc;
bytes_sent = item->buffer_size;
rc = pj_ioqueue_send( item->client_key, op_key,
item->outgoing_buffer, &bytes_sent, 0);
if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
app_perror("...error: write error", rc);
}
item->has_pending_send = (rc==PJ_EPENDING);
}
}
struct thread_arg
{
int id;
pj_ioqueue_t *ioqueue;
unsigned counter;
};
/* The worker thread. */
static int worker_thread(void *p)
{
struct thread_arg *arg = (struct thread_arg*) p;
const pj_time_val timeout = {0, 100};
int rc;
while (!thread_quit_flag) {
++arg->counter;
rc = pj_ioqueue_poll(arg->ioqueue, &timeout);
//TRACE_((THIS_FILE, " thread: poll returned rc=%d", rc));
if (rc < 0) {
char errmsg[PJ_ERR_MSG_SIZE];
pj_strerror(-rc, errmsg, sizeof(errmsg));
PJ_LOG(3, (THIS_FILE,
"...error in pj_ioqueue_poll() in thread %d "
"after %d loop: %s [pj_status_t=%d]",
arg->id, arg->counter, errmsg, -rc));
//return -1;
}
}
return 0;
}
/* Calculate the bandwidth for the specific test configuration.
* The test is simple:
* - create sockpair_cnt number of producer-consumer socket pair.
* - create thread_cnt number of worker threads.
* - each producer will send buffer_size bytes data as fast and
* as soon as it can.
* - each consumer will read buffer_size bytes of data as fast
* as it could.
* - measure the total bytes received by all consumers during a
* period of time.
*/
static int perform_test(int sock_type, const char *type_name,
unsigned thread_cnt, unsigned sockpair_cnt,
pj_size_t buffer_size,
pj_size_t *p_bandwidth)
{
enum { MSEC_DURATION = 5000 };
pj_pool_t *pool;
test_item *items;
pj_thread_t **thread;
pj_ioqueue_t *ioqueue;
pj_status_t rc;
pj_ioqueue_callback ioqueue_callback;
pj_uint32_t total_elapsed_usec, total_received;
pj_highprec_t bandwidth;
pj_timestamp start, stop;
unsigned i;
TRACE_((THIS_FILE, " starting test.."));
ioqueue_callback.on_read_complete = &on_read_complete;
ioqueue_callback.on_write_complete = &on_write_complete;
thread_quit_flag = 0;
pool = pj_pool_create(mem, NULL, 4096, 4096, NULL);
if (!pool)
return -10;
items = (test_item*) pj_pool_alloc(pool, sockpair_cnt*sizeof(test_item));
thread = (pj_thread_t**)
pj_pool_alloc(pool, thread_cnt*sizeof(pj_thread_t*));
TRACE_((THIS_FILE, " creating ioqueue.."));
rc = pj_ioqueue_create(pool, sockpair_cnt*2, &ioqueue);
if (rc != PJ_SUCCESS) {
app_perror("...error: unable to create ioqueue", rc);
return -15;
}
/* Initialize each producer-consumer pair. */
for (i=0; i<sockpair_cnt; ++i) {
pj_ssize_t bytes;
items[i].ioqueue = ioqueue;
items[i].buffer_size = buffer_size;
items[i].outgoing_buffer = (char*) pj_pool_alloc(pool, buffer_size);
items[i].incoming_buffer = (char*) pj_pool_alloc(pool, buffer_size);
items[i].bytes_recv = items[i].bytes_sent = 0;
/* randomize outgoing buffer. */
pj_create_random_string(items[i].outgoing_buffer, buffer_size);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -