📄 ioq_perf.c
字号:
/* $Id: ioq_perf.c 974 2007-02-19 01:13:53Z bennylp $ *//* * Copyright (C)2003-2007 Benny Prijono <benny@prijono.org> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#include "test.h"#include <pjlib.h>#include <pj/compat/high_precision.h>/** * \page page_pjlib_ioqueue_perf_test Test: I/O Queue Performance * * Test the performance of the I/O queue, using typical producer * consumer test. The test should examine the effect of using multiple * threads on the performance. * * This file is <b>pjlib-test/ioq_perf.c</b> * * \include pjlib-test/ioq_perf.c */#if INCLUDE_IOQUEUE_PERF_TEST#ifdef _MSC_VER# pragma warning ( disable: 4204) // non-constant aggregate initializer#endif#define THIS_FILE "ioq_perf"//#define TRACE_(expr) PJ_LOG(3,expr)#define TRACE_(expr)static pj_bool_t thread_quit_flag;static pj_status_t last_error;static unsigned last_error_counter;/* Descriptor for each producer/consumer pair. */typedef struct test_item{ pj_sock_t server_fd, client_fd; pj_ioqueue_t *ioqueue; pj_ioqueue_key_t *server_key, *client_key; pj_ioqueue_op_key_t recv_op, send_op; int has_pending_send; pj_size_t buffer_size; char *outgoing_buffer; char *incoming_buffer; pj_size_t bytes_sent, bytes_recv;} test_item;/* Callback when data has been read. * Increment item->bytes_recv and ready to read the next data. */static void on_read_complete(pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_read){ test_item *item = pj_ioqueue_get_user_data(key); pj_status_t rc; int data_is_available = 1; //TRACE_((THIS_FILE, " read complete, bytes_read=%d", bytes_read)); do { if (thread_quit_flag) return; if (bytes_read < 0) { pj_status_t rc = -bytes_read; char errmsg[PJ_ERR_MSG_SIZE]; if (rc != last_error) { //last_error = rc; pj_strerror(rc, errmsg, sizeof(errmsg)); PJ_LOG(3,(THIS_FILE,"...error: read error, bytes_read=%d (%s)", bytes_read, errmsg)); PJ_LOG(3,(THIS_FILE, ".....additional info: total read=%u, total sent=%u", item->bytes_recv, item->bytes_sent)); } else { last_error_counter++; } bytes_read = 0; } else if (bytes_read == 0) { PJ_LOG(3,(THIS_FILE, "...socket has closed!")); } item->bytes_recv += bytes_read; /* To assure that the test quits, even if main thread * doesn't have time to run. */ if (item->bytes_recv > item->buffer_size * 10000) thread_quit_flag = 1; bytes_read = item->buffer_size; rc = pj_ioqueue_recv( key, op_key, item->incoming_buffer, &bytes_read, 0 ); if (rc == PJ_SUCCESS) { data_is_available = 1; } else if (rc == PJ_EPENDING) { data_is_available = 0; } else { data_is_available = 0; if (rc != last_error) { last_error = rc; app_perror("...error: read error(1)", rc); } else { last_error_counter++; } } if (!item->has_pending_send) { pj_ssize_t sent = item->buffer_size; rc = pj_ioqueue_send(item->client_key, &item->send_op, item->outgoing_buffer, &sent, 0); if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { app_perror("...error: write error", rc); } item->has_pending_send = (rc==PJ_EPENDING); } } while (data_is_available);}/* Callback when data has been written. * Increment item->bytes_sent and write the next data. */static void on_write_complete(pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_sent){ test_item *item = pj_ioqueue_get_user_data(key); //TRACE_((THIS_FILE, " write complete: sent = %d", bytes_sent)); if (thread_quit_flag) return; item->has_pending_send = 0; item->bytes_sent += bytes_sent; if (bytes_sent <= 0) { PJ_LOG(3,(THIS_FILE, "...error: sending stopped. bytes_sent=%d", bytes_sent)); } else { pj_status_t rc; bytes_sent = item->buffer_size; rc = pj_ioqueue_send( item->client_key, op_key, item->outgoing_buffer, &bytes_sent, 0); if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { app_perror("...error: write error", rc); } item->has_pending_send = (rc==PJ_EPENDING); }}struct thread_arg{ int id; pj_ioqueue_t *ioqueue; unsigned counter;};/* The worker thread. */static int worker_thread(void *p){ struct thread_arg *arg = p; const pj_time_val timeout = {0, 100}; int rc; while (!thread_quit_flag) { ++arg->counter; rc = pj_ioqueue_poll(arg->ioqueue, &timeout); //TRACE_((THIS_FILE, " thread: poll returned rc=%d", rc)); if (rc < 0) { char errmsg[PJ_ERR_MSG_SIZE]; pj_strerror(-rc, errmsg, sizeof(errmsg)); PJ_LOG(3, (THIS_FILE, "...error in pj_ioqueue_poll() in thread %d " "after %d loop: %s [pj_status_t=%d]", arg->id, arg->counter, errmsg, -rc)); //return -1; } } return 0;}/* Calculate the bandwidth for the specific test configuration. * The test is simple: * - create sockpair_cnt number of producer-consumer socket pair. * - create thread_cnt number of worker threads. * - each producer will send buffer_size bytes data as fast and * as soon as it can. * - each consumer will read buffer_size bytes of data as fast * as it could. * - measure the total bytes received by all consumers during a * period of time. */static int perform_test(int sock_type, const char *type_name, unsigned thread_cnt, unsigned sockpair_cnt, pj_size_t buffer_size, pj_size_t *p_bandwidth){ enum { MSEC_DURATION = 5000 }; pj_pool_t *pool; test_item *items; pj_thread_t **thread; pj_ioqueue_t *ioqueue; pj_status_t rc; pj_ioqueue_callback ioqueue_callback; pj_uint32_t total_elapsed_usec, total_received; pj_highprec_t bandwidth; pj_timestamp start, stop; unsigned i; TRACE_((THIS_FILE, " starting test..")); ioqueue_callback.on_read_complete = &on_read_complete; ioqueue_callback.on_write_complete = &on_write_complete; thread_quit_flag = 0; pool = pj_pool_create(mem, NULL, 4096, 4096, NULL); if (!pool) return -10; items = pj_pool_alloc(pool, sockpair_cnt*sizeof(test_item)); thread = pj_pool_alloc(pool, thread_cnt*sizeof(pj_thread_t*)); TRACE_((THIS_FILE, " creating ioqueue..")); rc = pj_ioqueue_create(pool, sockpair_cnt*2, &ioqueue); if (rc != PJ_SUCCESS) { app_perror("...error: unable to create ioqueue", rc); return -15; } /* Initialize each producer-consumer pair. */ for (i=0; i<sockpair_cnt; ++i) { pj_ssize_t bytes; items[i].ioqueue = ioqueue; items[i].buffer_size = buffer_size; items[i].outgoing_buffer = pj_pool_alloc(pool, buffer_size); items[i].incoming_buffer = pj_pool_alloc(pool, buffer_size); items[i].bytes_recv = items[i].bytes_sent = 0; /* randomize outgoing buffer. */ pj_create_random_string(items[i].outgoing_buffer, buffer_size);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -