⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ioq_udp.c

📁 一个开源的sip源代码
💻 C
📖 第 1 页 / 共 2 页
字号:
/* $Id: ioq_udp.c 1235 2007-04-30 21:03:32Z bennylp $ */
/* 
 * Copyright (C)2003-2007 Benny Prijono <benny@prijono.org>
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
 */
#include "test.h"


/**
 * \page page_pjlib_ioqueue_udp_test Test: I/O Queue (UDP)
 *
 * This file provides implementation to test the
 * functionality of the I/O queue when UDP socket is used.
 *
 *
 * This file is <b>pjlib-test/ioq_udp.c</b>
 *
 * \include pjlib-test/ioq_udp.c
 */


#if INCLUDE_UDP_IOQUEUE_TEST

#include <pjlib.h>

#include <pj/compat/socket.h>

#define THIS_FILE	    "test_udp"
#define PORT		    51233
#define LOOP		    2
///#define LOOP		    2
#define BUF_MIN_SIZE	    32
#define BUF_MAX_SIZE	    2048
#define SOCK_INACTIVE_MIN   (1)
#define SOCK_INACTIVE_MAX   (PJ_IOQUEUE_MAX_HANDLES - 2)
#define POOL_SIZE	    (2*BUF_MAX_SIZE + SOCK_INACTIVE_MAX*128 + 2048)

#undef TRACE_
#define TRACE_(msg)	    PJ_LOG(3,(THIS_FILE,"....." msg))

#if 0
#  define TRACE__(args)	    PJ_LOG(3,args)
#else
#  define TRACE__(args)
#endif


static pj_ssize_t            callback_read_size,
                             callback_write_size,
                             callback_accept_status,
                             callback_connect_status;
static pj_ioqueue_key_t     *callback_read_key,
                            *callback_write_key,
                            *callback_accept_key,
                            *callback_connect_key;
static pj_ioqueue_op_key_t  *callback_read_op,
                            *callback_write_op,
                            *callback_accept_op;

static void on_ioqueue_read(pj_ioqueue_key_t *key, 
                            pj_ioqueue_op_key_t *op_key,
                            pj_ssize_t bytes_read)
{
    callback_read_key = key;
    callback_read_op = op_key;
    callback_read_size = bytes_read;
    TRACE__((THIS_FILE, "     callback_read_key = %p, bytes=%d", 
	     key, bytes_read));
}

static void on_ioqueue_write(pj_ioqueue_key_t *key, 
                             pj_ioqueue_op_key_t *op_key,
                             pj_ssize_t bytes_written)
{
    callback_write_key = key;
    callback_write_op = op_key;
    callback_write_size = bytes_written;
}

static void on_ioqueue_accept(pj_ioqueue_key_t *key, 
                              pj_ioqueue_op_key_t *op_key,
                              pj_sock_t sock, int status)
{
    PJ_UNUSED_ARG(sock);
    callback_accept_key = key;
    callback_accept_op = op_key;
    callback_accept_status = status;
}

static void on_ioqueue_connect(pj_ioqueue_key_t *key, int status)
{
    callback_connect_key = key;
    callback_connect_status = status;
}

static pj_ioqueue_callback test_cb = 
{
    &on_ioqueue_read,
    &on_ioqueue_write,
    &on_ioqueue_accept,
    &on_ioqueue_connect,
};

#ifdef PJ_WIN32
#  define S_ADDR S_un.S_addr
#else
#  define S_ADDR s_addr
#endif

/*
 * compliance_test()
 * To test that the basic IOQueue functionality works. It will just exchange
 * data between two sockets.
 */ 
static int compliance_test(void)
{
    pj_sock_t ssock=-1, csock=-1;
    pj_sockaddr_in addr, dst_addr;
    int addrlen;
    pj_pool_t *pool = NULL;
    char *send_buf, *recv_buf;
    pj_ioqueue_t *ioque = NULL;
    pj_ioqueue_key_t *skey, *ckey;
    pj_ioqueue_op_key_t read_op, write_op;
    int bufsize = BUF_MIN_SIZE;
    pj_ssize_t bytes, status = -1;
    pj_str_t temp;
    pj_bool_t send_pending, recv_pending;
    pj_status_t rc;

    pj_set_os_error(PJ_SUCCESS);

    // Create pool.
    pool = pj_pool_create(mem, NULL, POOL_SIZE, 4000, NULL);

    // Allocate buffers for send and receive.
    send_buf = (char*)pj_pool_alloc(pool, bufsize);
    recv_buf = (char*)pj_pool_alloc(pool, bufsize);

    // Allocate sockets for sending and receiving.
    TRACE_("creating sockets...");
    rc = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &ssock);
    if (rc==PJ_SUCCESS)
        rc = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &csock);
    else
        csock = PJ_INVALID_SOCKET;
    if (rc != PJ_SUCCESS) {
        app_perror("...ERROR in pj_sock_socket()", rc);
	status=-1; goto on_error;
    }

    // Bind server socket.
    TRACE_("bind socket...");
    pj_bzero(&addr, sizeof(addr));
    addr.sin_family = PJ_AF_INET;
    addr.sin_port = pj_htons(PORT);
    if (pj_sock_bind(ssock, &addr, sizeof(addr))) {
	status=-10; goto on_error;
    }

    // Create I/O Queue.
    TRACE_("create ioqueue...");
    rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque);
    if (rc != PJ_SUCCESS) {
	status=-20; goto on_error;
    }

    // Register server and client socket.
    // We put this after inactivity socket, hopefully this can represent the
    // worst waiting time.
    TRACE_("registering first sockets...");
    rc = pj_ioqueue_register_sock(pool, ioque, ssock, NULL, 
			          &test_cb, &skey);
    if (rc != PJ_SUCCESS) {
	app_perror("...error(10): ioqueue_register error", rc);
	status=-25; goto on_error;
    }
    TRACE_("registering second sockets...");
    rc = pj_ioqueue_register_sock( pool, ioque, csock, NULL, 
			           &test_cb, &ckey);
    if (rc != PJ_SUCCESS) {
	app_perror("...error(11): ioqueue_register error", rc);
	status=-26; goto on_error;
    }

    // Randomize send_buf.
    pj_create_random_string(send_buf, bufsize);

    // Register reading from ioqueue.
    TRACE_("start recvfrom...");
    pj_bzero(&addr, sizeof(addr));
    addrlen = sizeof(addr);
    bytes = bufsize;
    rc = pj_ioqueue_recvfrom(skey, &read_op, recv_buf, &bytes, 0,
			     &addr, &addrlen);
    if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
        app_perror("...error: pj_ioqueue_recvfrom", rc);
	status=-28; goto on_error;
    } else if (rc == PJ_EPENDING) {
	recv_pending = 1;
	PJ_LOG(3, (THIS_FILE, 
		   "......ok: recvfrom returned pending"));
    } else {
	PJ_LOG(3, (THIS_FILE, 
		   "......error: recvfrom returned immediate ok!"));
	status=-29; goto on_error;
    }

    // Set destination address to send the packet.
    TRACE_("set destination address...");
    temp = pj_str("127.0.0.1");
    if ((rc=pj_sockaddr_in_init(&dst_addr, &temp, PORT)) != 0) {
	app_perror("...error: unable to resolve 127.0.0.1", rc);
	status=-290; goto on_error;
    }

    // Write must return the number of bytes.
    TRACE_("start sendto...");
    bytes = bufsize;
    rc = pj_ioqueue_sendto(ckey, &write_op, send_buf, &bytes, 0, &dst_addr, 
			   sizeof(dst_addr));
    if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
        app_perror("...error: pj_ioqueue_sendto", rc);
	status=-30; goto on_error;
    } else if (rc == PJ_EPENDING) {
	send_pending = 1;
	PJ_LOG(3, (THIS_FILE, 
		   "......ok: sendto returned pending"));
    } else {
	send_pending = 0;
	PJ_LOG(3, (THIS_FILE, 
		   "......ok: sendto returned immediate success"));
    }

    // reset callback variables.
    callback_read_size = callback_write_size = 0;
    callback_accept_status = callback_connect_status = -2;
    callback_read_key = callback_write_key = 
        callback_accept_key = callback_connect_key = NULL;
    callback_read_op = callback_write_op = NULL;

    // Poll if pending.
    while (send_pending || recv_pending) {
	int rc;
	pj_time_val timeout = { 5, 0 };

	TRACE_("poll...");
	rc = pj_ioqueue_poll(ioque, &timeout);

	if (rc == 0) {
	    PJ_LOG(1,(THIS_FILE, "...ERROR: timed out..."));
	    status=-45; goto on_error;
        } else if (rc < 0) {
            app_perror("...ERROR in ioqueue_poll()", -rc);
	    status=-50; goto on_error;
	}

	if (callback_read_key != NULL) {
            if (callback_read_size != bufsize) {
                status=-61; goto on_error;
            }
            if (callback_read_key != skey) {
                status=-65; goto on_error;
            }
            if (callback_read_op != &read_op) {
                status=-66; goto on_error;
            }

	    if (pj_memcmp(send_buf, recv_buf, bufsize) != 0) {
		status=-67; goto on_error;
	    }
	    if (addrlen != sizeof(pj_sockaddr_in)) {
		status=-68; goto on_error;
	    }
	    if (addr.sin_family != PJ_AF_INET) {
		status=-69; goto on_error;
	    }


	    recv_pending = 0;
	} 

        if (callback_write_key != NULL) {
            if (callback_write_size != bufsize) {
                status=-73; goto on_error;
            }
            if (callback_write_key != ckey) {
                status=-75; goto on_error;
            }
            if (callback_write_op != &write_op) {
                status=-76; goto on_error;
            }

            send_pending = 0;
	}
    } 
    
    // Success
    status = 0;

on_error:
    if (ssock)
	pj_sock_close(ssock);
    if (csock)
	pj_sock_close(csock);
    if (ioque != NULL)
	pj_ioqueue_destroy(ioque);
    pj_pool_release(pool);
    return status;

}


static void on_read_complete(pj_ioqueue_key_t *key, 
                             pj_ioqueue_op_key_t *op_key, 
                             pj_ssize_t bytes_read)
{
    unsigned *p_packet_cnt = (unsigned*) pj_ioqueue_get_user_data(key);

    PJ_UNUSED_ARG(op_key);
    PJ_UNUSED_ARG(bytes_read);

    (*p_packet_cnt)++;
}

/*
 * unregister_test()
 * Check if callback is still called after socket has been unregistered or 
 * closed.
 */ 
static int unregister_test(void)
{
    enum { RPORT = 50000, SPORT = 50001 };
    pj_pool_t *pool;
    pj_ioqueue_t *ioqueue;
    pj_sock_t ssock;
    pj_sock_t rsock;
    int addrlen;
    pj_sockaddr_in addr;
    pj_ioqueue_key_t *key;
    pj_ioqueue_op_key_t opkey;
    pj_ioqueue_callback cb;
    unsigned packet_cnt;
    char sendbuf[10], recvbuf[10];
    pj_ssize_t bytes;
    pj_time_val timeout;
    pj_status_t status;

    pool = pj_pool_create(mem, "test", 4000, 4000, NULL);
    if (!pool) {
	app_perror("Unable to create pool", PJ_ENOMEM);
	return -100;
    }

    status = pj_ioqueue_create(pool, 16, &ioqueue);
    if (status != PJ_SUCCESS) {
	app_perror("Error creating ioqueue", status);
	return -110;
    }

    /* Create sender socket */
    status = app_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, SPORT, &ssock);
    if (status != PJ_SUCCESS) {
	app_perror("Error initializing socket", status);
	return -120;
    }

    /* Create receiver socket. */
    status = app_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, RPORT, &rsock);
    if (status != PJ_SUCCESS) {
	app_perror("Error initializing socket", status);
	return -130;
    }

    /* Register rsock to ioqueue. */
    pj_bzero(&cb, sizeof(cb));
    cb.on_read_complete = &on_read_complete;
    packet_cnt = 0;
    status = pj_ioqueue_register_sock(pool, ioqueue, rsock, &packet_cnt,
				      &cb, &key);
    if (status != PJ_SUCCESS) {
	app_perror("Error registering to ioqueue", status);
	return -140;
    }

    /* Init operation key. */
    pj_ioqueue_op_key_init(&opkey, sizeof(opkey));

    /* Start reading. */
    bytes = sizeof(recvbuf);
    status = pj_ioqueue_recv( key, &opkey, recvbuf, &bytes, 0);
    if (status != PJ_EPENDING) {
	app_perror("Expecting PJ_EPENDING, but got this", status);
	return -150;
    }

    /* Init destination address. */
    addrlen = sizeof(addr);
    status = pj_sock_getsockname(rsock, &addr, &addrlen);
    if (status != PJ_SUCCESS) {
	app_perror("getsockname error", status);
	return -160;
    }

    /* Override address with 127.0.0.1, since getsockname will return
     * zero in the address field.
     */
    addr.sin_addr = pj_inet_addr2("127.0.0.1");

    /* Init buffer to send */
    pj_ansi_strcpy(sendbuf, "Hello0123");

    /* Send one packet. */
    bytes = sizeof(sendbuf);
    status = pj_sock_sendto(ssock, sendbuf, &bytes, 0,
			    &addr, sizeof(addr));

    if (status != PJ_SUCCESS) {
	app_perror("sendto error", status);
	return -170;
    }

    /* Check if packet is received. */
    timeout.sec = 1; timeout.msec = 0;
    pj_ioqueue_poll(ioqueue, &timeout);

    if (packet_cnt != 1) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -