⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ioq_perf.c

📁 一个开源的sip源代码
💻 C
📖 第 1 页 / 共 2 页
字号:
/* $Id: ioq_perf.c 1266 2007-05-11 15:14:34Z bennylp $ */
/* 
 * Copyright (C)2003-2007 Benny Prijono <benny@prijono.org>
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
 */
#include "test.h"
#include <pjlib.h>
#include <pj/compat/high_precision.h>

/**
 * \page page_pjlib_ioqueue_perf_test Test: I/O Queue Performance
 *
 * Test the performance of the I/O queue, using typical producer
 * consumer test. The test should examine the effect of using multiple
 * threads on the performance.
 *
 * This file is <b>pjlib-test/ioq_perf.c</b>
 *
 * \include pjlib-test/ioq_perf.c
 */

#if INCLUDE_IOQUEUE_PERF_TEST

#ifdef _MSC_VER
#   pragma warning ( disable: 4204)     // non-constant aggregate initializer
#endif

#define THIS_FILE	"ioq_perf"
//#define TRACE_(expr)	PJ_LOG(3,expr)
#define TRACE_(expr)


static pj_bool_t thread_quit_flag;
static pj_status_t last_error;
static unsigned last_error_counter;

/* Descriptor for each producer/consumer pair. */
typedef struct test_item
{
    pj_sock_t            server_fd, 
                         client_fd;
    pj_ioqueue_t        *ioqueue;
    pj_ioqueue_key_t    *server_key,
                        *client_key;
    pj_ioqueue_op_key_t  recv_op,
                         send_op;
    int                  has_pending_send;
    pj_size_t            buffer_size;
    char                *outgoing_buffer;
    char                *incoming_buffer;
    pj_size_t            bytes_sent, 
                         bytes_recv;
} test_item;

/* Callback when data has been read.
 * Increment item->bytes_recv and ready to read the next data.
 */
static void on_read_complete(pj_ioqueue_key_t *key, 
                             pj_ioqueue_op_key_t *op_key,
                             pj_ssize_t bytes_read)
{
    test_item *item = (test_item*)pj_ioqueue_get_user_data(key);
    pj_status_t rc;
    int data_is_available = 1;

    //TRACE_((THIS_FILE, "     read complete, bytes_read=%d", bytes_read));

    do {
        if (thread_quit_flag)
            return;

        if (bytes_read < 0) {
            pj_status_t rc = -bytes_read;
            char errmsg[PJ_ERR_MSG_SIZE];

	    if (rc != last_error) {
	        //last_error = rc;
	        pj_strerror(rc, errmsg, sizeof(errmsg));
	        PJ_LOG(3,(THIS_FILE,"...error: read error, bytes_read=%d (%s)", 
		          bytes_read, errmsg));
	        PJ_LOG(3,(THIS_FILE, 
		          ".....additional info: total read=%u, total sent=%u",
		          item->bytes_recv, item->bytes_sent));
	    } else {
	        last_error_counter++;
	    }
            bytes_read = 0;

        } else if (bytes_read == 0) {
            PJ_LOG(3,(THIS_FILE, "...socket has closed!"));
        }

        item->bytes_recv += bytes_read;
    
        /* To assure that the test quits, even if main thread
         * doesn't have time to run.
         */
        if (item->bytes_recv > item->buffer_size * 10000) 
	    thread_quit_flag = 1;

        bytes_read = item->buffer_size;
        rc = pj_ioqueue_recv( key, op_key,
                              item->incoming_buffer, &bytes_read, 0 );

        if (rc == PJ_SUCCESS) {
            data_is_available = 1;
        } else if (rc == PJ_EPENDING) {
            data_is_available = 0;
        } else {
            data_is_available = 0;
	    if (rc != last_error) {
	        last_error = rc;
	        app_perror("...error: read error(1)", rc);
	    } else {
	        last_error_counter++;
	    }
        }

        if (!item->has_pending_send) {
            pj_ssize_t sent = item->buffer_size;
            rc = pj_ioqueue_send(item->client_key, &item->send_op,
                                 item->outgoing_buffer, &sent, 0);
            if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
                app_perror("...error: write error", rc);
            }

            item->has_pending_send = (rc==PJ_EPENDING);
        }

    } while (data_is_available);
}

/* Callback when data has been written.
 * Increment item->bytes_sent and write the next data.
 */
static void on_write_complete(pj_ioqueue_key_t *key, 
                              pj_ioqueue_op_key_t *op_key,
                              pj_ssize_t bytes_sent)
{
    test_item *item = (test_item*) pj_ioqueue_get_user_data(key);
    
    //TRACE_((THIS_FILE, "     write complete: sent = %d", bytes_sent));

    if (thread_quit_flag)
        return;

    item->has_pending_send = 0;
    item->bytes_sent += bytes_sent;

    if (bytes_sent <= 0) {
        PJ_LOG(3,(THIS_FILE, "...error: sending stopped. bytes_sent=%d", 
                  bytes_sent));
    } 
    else {
        pj_status_t rc;

        bytes_sent = item->buffer_size;
        rc = pj_ioqueue_send( item->client_key, op_key,
                              item->outgoing_buffer, &bytes_sent, 0);
        if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
            app_perror("...error: write error", rc);
        }

        item->has_pending_send = (rc==PJ_EPENDING);
    }
}

struct thread_arg
{
    int		  id;
    pj_ioqueue_t *ioqueue;
    unsigned	  counter;
};

/* The worker thread. */
static int worker_thread(void *p)
{
    struct thread_arg *arg = (struct thread_arg*) p;
    const pj_time_val timeout = {0, 100};
    int rc;

    while (!thread_quit_flag) {

	++arg->counter;
        rc = pj_ioqueue_poll(arg->ioqueue, &timeout);
	//TRACE_((THIS_FILE, "     thread: poll returned rc=%d", rc));
        if (rc < 0) {
	    char errmsg[PJ_ERR_MSG_SIZE];
	    pj_strerror(-rc, errmsg, sizeof(errmsg));
            PJ_LOG(3, (THIS_FILE, 
		       "...error in pj_ioqueue_poll() in thread %d "
		       "after %d loop: %s [pj_status_t=%d]", 
		       arg->id, arg->counter, errmsg, -rc));
            //return -1;
        }
    }
    return 0;
}

/* Calculate the bandwidth for the specific test configuration.
 * The test is simple:
 *  - create sockpair_cnt number of producer-consumer socket pair.
 *  - create thread_cnt number of worker threads.
 *  - each producer will send buffer_size bytes data as fast and
 *    as soon as it can.
 *  - each consumer will read buffer_size bytes of data as fast 
 *    as it could.
 *  - measure the total bytes received by all consumers during a
 *    period of time.
 */
static int perform_test(int sock_type, const char *type_name,
                        unsigned thread_cnt, unsigned sockpair_cnt,
                        pj_size_t buffer_size, 
                        pj_size_t *p_bandwidth)
{
    enum { MSEC_DURATION = 5000 };
    pj_pool_t *pool;
    test_item *items;
    pj_thread_t **thread;
    pj_ioqueue_t *ioqueue;
    pj_status_t rc;
    pj_ioqueue_callback ioqueue_callback;
    pj_uint32_t total_elapsed_usec, total_received;
    pj_highprec_t bandwidth;
    pj_timestamp start, stop;
    unsigned i;

    TRACE_((THIS_FILE, "    starting test.."));

    ioqueue_callback.on_read_complete = &on_read_complete;
    ioqueue_callback.on_write_complete = &on_write_complete;

    thread_quit_flag = 0;

    pool = pj_pool_create(mem, NULL, 4096, 4096, NULL);
    if (!pool)
        return -10;

    items = (test_item*) pj_pool_alloc(pool, sockpair_cnt*sizeof(test_item));
    thread = (pj_thread_t**)
    	     pj_pool_alloc(pool, thread_cnt*sizeof(pj_thread_t*));

    TRACE_((THIS_FILE, "     creating ioqueue.."));
    rc = pj_ioqueue_create(pool, sockpair_cnt*2, &ioqueue);
    if (rc != PJ_SUCCESS) {
        app_perror("...error: unable to create ioqueue", rc);
        return -15;
    }

    /* Initialize each producer-consumer pair. */
    for (i=0; i<sockpair_cnt; ++i) {
        pj_ssize_t bytes;

        items[i].ioqueue = ioqueue;
        items[i].buffer_size = buffer_size;
        items[i].outgoing_buffer = (char*) pj_pool_alloc(pool, buffer_size);
        items[i].incoming_buffer = (char*) pj_pool_alloc(pool, buffer_size);
        items[i].bytes_recv = items[i].bytes_sent = 0;

        /* randomize outgoing buffer. */
        pj_create_random_string(items[i].outgoing_buffer, buffer_size);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -