⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ioq_perf.c

📁 基于sip协议的网络电话源码
💻 C
📖 第 1 页 / 共 2 页
字号:
/* $Id: ioq_perf.c 974 2007-02-19 01:13:53Z bennylp $ *//*  * Copyright (C)2003-2007 Benny Prijono <benny@prijono.org> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA  */#include "test.h"#include <pjlib.h>#include <pj/compat/high_precision.h>/** * \page page_pjlib_ioqueue_perf_test Test: I/O Queue Performance * * Test the performance of the I/O queue, using typical producer * consumer test. The test should examine the effect of using multiple * threads on the performance. * * This file is <b>pjlib-test/ioq_perf.c</b> * * \include pjlib-test/ioq_perf.c */#if INCLUDE_IOQUEUE_PERF_TEST#ifdef _MSC_VER#   pragma warning ( disable: 4204)     // non-constant aggregate initializer#endif#define THIS_FILE	"ioq_perf"//#define TRACE_(expr)	PJ_LOG(3,expr)#define TRACE_(expr)static pj_bool_t thread_quit_flag;static pj_status_t last_error;static unsigned last_error_counter;/* Descriptor for each producer/consumer pair. */typedef struct test_item{    pj_sock_t            server_fd,                          client_fd;    pj_ioqueue_t        *ioqueue;    pj_ioqueue_key_t    *server_key,                        *client_key;    pj_ioqueue_op_key_t  recv_op,                         send_op;    int                  has_pending_send;    pj_size_t            buffer_size;    char                *outgoing_buffer;    char                *incoming_buffer;    pj_size_t            bytes_sent,                          bytes_recv;} test_item;/* Callback when data has been read. * Increment item->bytes_recv and ready to read the next data. */static void on_read_complete(pj_ioqueue_key_t *key,                              pj_ioqueue_op_key_t *op_key,                             pj_ssize_t bytes_read){    test_item *item = pj_ioqueue_get_user_data(key);    pj_status_t rc;    int data_is_available = 1;    //TRACE_((THIS_FILE, "     read complete, bytes_read=%d", bytes_read));    do {        if (thread_quit_flag)            return;        if (bytes_read < 0) {            pj_status_t rc = -bytes_read;            char errmsg[PJ_ERR_MSG_SIZE];	    if (rc != last_error) {	        //last_error = rc;	        pj_strerror(rc, errmsg, sizeof(errmsg));	        PJ_LOG(3,(THIS_FILE,"...error: read error, bytes_read=%d (%s)", 		          bytes_read, errmsg));	        PJ_LOG(3,(THIS_FILE, 		          ".....additional info: total read=%u, total sent=%u",		          item->bytes_recv, item->bytes_sent));	    } else {	        last_error_counter++;	    }            bytes_read = 0;        } else if (bytes_read == 0) {            PJ_LOG(3,(THIS_FILE, "...socket has closed!"));        }        item->bytes_recv += bytes_read;            /* To assure that the test quits, even if main thread         * doesn't have time to run.         */        if (item->bytes_recv > item->buffer_size * 10000) 	    thread_quit_flag = 1;        bytes_read = item->buffer_size;        rc = pj_ioqueue_recv( key, op_key,                              item->incoming_buffer, &bytes_read, 0 );        if (rc == PJ_SUCCESS) {            data_is_available = 1;        } else if (rc == PJ_EPENDING) {            data_is_available = 0;        } else {            data_is_available = 0;	    if (rc != last_error) {	        last_error = rc;	        app_perror("...error: read error(1)", rc);	    } else {	        last_error_counter++;	    }        }        if (!item->has_pending_send) {            pj_ssize_t sent = item->buffer_size;            rc = pj_ioqueue_send(item->client_key, &item->send_op,                                 item->outgoing_buffer, &sent, 0);            if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {                app_perror("...error: write error", rc);            }            item->has_pending_send = (rc==PJ_EPENDING);        }    } while (data_is_available);}/* Callback when data has been written. * Increment item->bytes_sent and write the next data. */static void on_write_complete(pj_ioqueue_key_t *key,                               pj_ioqueue_op_key_t *op_key,                              pj_ssize_t bytes_sent){    test_item *item = pj_ioqueue_get_user_data(key);        //TRACE_((THIS_FILE, "     write complete: sent = %d", bytes_sent));    if (thread_quit_flag)        return;    item->has_pending_send = 0;    item->bytes_sent += bytes_sent;    if (bytes_sent <= 0) {        PJ_LOG(3,(THIS_FILE, "...error: sending stopped. bytes_sent=%d",                   bytes_sent));    }     else {        pj_status_t rc;        bytes_sent = item->buffer_size;        rc = pj_ioqueue_send( item->client_key, op_key,                              item->outgoing_buffer, &bytes_sent, 0);        if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {            app_perror("...error: write error", rc);        }        item->has_pending_send = (rc==PJ_EPENDING);    }}struct thread_arg{    int		  id;    pj_ioqueue_t *ioqueue;    unsigned	  counter;};/* The worker thread. */static int worker_thread(void *p){    struct thread_arg *arg = p;    const pj_time_val timeout = {0, 100};    int rc;    while (!thread_quit_flag) {	++arg->counter;        rc = pj_ioqueue_poll(arg->ioqueue, &timeout);	//TRACE_((THIS_FILE, "     thread: poll returned rc=%d", rc));        if (rc < 0) {	    char errmsg[PJ_ERR_MSG_SIZE];	    pj_strerror(-rc, errmsg, sizeof(errmsg));            PJ_LOG(3, (THIS_FILE, 		       "...error in pj_ioqueue_poll() in thread %d "		       "after %d loop: %s [pj_status_t=%d]", 		       arg->id, arg->counter, errmsg, -rc));            //return -1;        }    }    return 0;}/* Calculate the bandwidth for the specific test configuration. * The test is simple: *  - create sockpair_cnt number of producer-consumer socket pair. *  - create thread_cnt number of worker threads. *  - each producer will send buffer_size bytes data as fast and *    as soon as it can. *  - each consumer will read buffer_size bytes of data as fast  *    as it could. *  - measure the total bytes received by all consumers during a *    period of time. */static int perform_test(int sock_type, const char *type_name,                        unsigned thread_cnt, unsigned sockpair_cnt,                        pj_size_t buffer_size,                         pj_size_t *p_bandwidth){    enum { MSEC_DURATION = 5000 };    pj_pool_t *pool;    test_item *items;    pj_thread_t **thread;    pj_ioqueue_t *ioqueue;    pj_status_t rc;    pj_ioqueue_callback ioqueue_callback;    pj_uint32_t total_elapsed_usec, total_received;    pj_highprec_t bandwidth;    pj_timestamp start, stop;    unsigned i;    TRACE_((THIS_FILE, "    starting test.."));    ioqueue_callback.on_read_complete = &on_read_complete;    ioqueue_callback.on_write_complete = &on_write_complete;    thread_quit_flag = 0;    pool = pj_pool_create(mem, NULL, 4096, 4096, NULL);    if (!pool)        return -10;    items = pj_pool_alloc(pool, sockpair_cnt*sizeof(test_item));    thread = pj_pool_alloc(pool, thread_cnt*sizeof(pj_thread_t*));    TRACE_((THIS_FILE, "     creating ioqueue.."));    rc = pj_ioqueue_create(pool, sockpair_cnt*2, &ioqueue);    if (rc != PJ_SUCCESS) {        app_perror("...error: unable to create ioqueue", rc);        return -15;    }    /* Initialize each producer-consumer pair. */    for (i=0; i<sockpair_cnt; ++i) {        pj_ssize_t bytes;        items[i].ioqueue = ioqueue;        items[i].buffer_size = buffer_size;        items[i].outgoing_buffer = pj_pool_alloc(pool, buffer_size);        items[i].incoming_buffer = pj_pool_alloc(pool, buffer_size);        items[i].bytes_recv = items[i].bytes_sent = 0;        /* randomize outgoing buffer. */        pj_create_random_string(items[i].outgoing_buffer, buffer_size);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -