⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ioq_udp.c

📁 基于sip协议的网络电话源码
💻 C
📖 第 1 页 / 共 2 页
字号:
/* $Id: ioq_udp.c 974 2007-02-19 01:13:53Z bennylp $ *//*  * Copyright (C)2003-2007 Benny Prijono <benny@prijono.org> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA  */#include "test.h"/** * \page page_pjlib_ioqueue_udp_test Test: I/O Queue (UDP) * * This file provides implementation to test the * functionality of the I/O queue when UDP socket is used. * * * This file is <b>pjlib-test/ioq_udp.c</b> * * \include pjlib-test/ioq_udp.c */#if INCLUDE_UDP_IOQUEUE_TEST#include <pjlib.h>#include <pj/compat/socket.h>#define THIS_FILE	    "test_udp"#define PORT		    51233#define LOOP		    2///#define LOOP		    2#define BUF_MIN_SIZE	    32#define BUF_MAX_SIZE	    2048#define SOCK_INACTIVE_MIN   (1)#define SOCK_INACTIVE_MAX   (PJ_IOQUEUE_MAX_HANDLES - 2)#define POOL_SIZE	    (2*BUF_MAX_SIZE + SOCK_INACTIVE_MAX*128 + 2048)#undef TRACE_#define TRACE_(msg)	    PJ_LOG(3,(THIS_FILE,"....." msg))#if 0#  define TRACE__(args)	    PJ_LOG(3,args)#else#  define TRACE__(args)#endifstatic pj_ssize_t            callback_read_size,                             callback_write_size,                             callback_accept_status,                             callback_connect_status;static pj_ioqueue_key_t     *callback_read_key,                            *callback_write_key,                            *callback_accept_key,                            *callback_connect_key;static pj_ioqueue_op_key_t  *callback_read_op,                            *callback_write_op,                            *callback_accept_op;static void on_ioqueue_read(pj_ioqueue_key_t *key,                             pj_ioqueue_op_key_t *op_key,                            pj_ssize_t bytes_read){    callback_read_key = key;    callback_read_op = op_key;    callback_read_size = bytes_read;    TRACE__((THIS_FILE, "     callback_read_key = %p, bytes=%d", 	     key, bytes_read));}static void on_ioqueue_write(pj_ioqueue_key_t *key,                              pj_ioqueue_op_key_t *op_key,                             pj_ssize_t bytes_written){    callback_write_key = key;    callback_write_op = op_key;    callback_write_size = bytes_written;}static void on_ioqueue_accept(pj_ioqueue_key_t *key,                               pj_ioqueue_op_key_t *op_key,                              pj_sock_t sock, int status){    PJ_UNUSED_ARG(sock);    callback_accept_key = key;    callback_accept_op = op_key;    callback_accept_status = status;}static void on_ioqueue_connect(pj_ioqueue_key_t *key, int status){    callback_connect_key = key;    callback_connect_status = status;}static pj_ioqueue_callback test_cb = {    &on_ioqueue_read,    &on_ioqueue_write,    &on_ioqueue_accept,    &on_ioqueue_connect,};#ifdef PJ_WIN32#  define S_ADDR S_un.S_addr#else#  define S_ADDR s_addr#endif/* * compliance_test() * To test that the basic IOQueue functionality works. It will just exchange * data between two sockets. */ static int compliance_test(void){    pj_sock_t ssock=-1, csock=-1;    pj_sockaddr_in addr, dst_addr;    int addrlen;    pj_pool_t *pool = NULL;    char *send_buf, *recv_buf;    pj_ioqueue_t *ioque = NULL;    pj_ioqueue_key_t *skey, *ckey;    pj_ioqueue_op_key_t read_op, write_op;    int bufsize = BUF_MIN_SIZE;    pj_ssize_t bytes, status = -1;    pj_str_t temp;    pj_bool_t send_pending, recv_pending;    pj_status_t rc;    pj_set_os_error(PJ_SUCCESS);    // Create pool.    pool = pj_pool_create(mem, NULL, POOL_SIZE, 4000, NULL);    // Allocate buffers for send and receive.    send_buf = (char*)pj_pool_alloc(pool, bufsize);    recv_buf = (char*)pj_pool_alloc(pool, bufsize);    // Allocate sockets for sending and receiving.    TRACE_("creating sockets...");    rc = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &ssock);    if (rc==PJ_SUCCESS)        rc = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &csock);    else        csock = PJ_INVALID_SOCKET;    if (rc != PJ_SUCCESS) {        app_perror("...ERROR in pj_sock_socket()", rc);	status=-1; goto on_error;    }    // Bind server socket.    TRACE_("bind socket...");    pj_bzero(&addr, sizeof(addr));    addr.sin_family = PJ_AF_INET;    addr.sin_port = pj_htons(PORT);    if (pj_sock_bind(ssock, &addr, sizeof(addr))) {	status=-10; goto on_error;    }    // Create I/O Queue.    TRACE_("create ioqueue...");    rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque);    if (rc != PJ_SUCCESS) {	status=-20; goto on_error;    }    // Register server and client socket.    // We put this after inactivity socket, hopefully this can represent the    // worst waiting time.    TRACE_("registering first sockets...");    rc = pj_ioqueue_register_sock(pool, ioque, ssock, NULL, 			          &test_cb, &skey);    if (rc != PJ_SUCCESS) {	app_perror("...error(10): ioqueue_register error", rc);	status=-25; goto on_error;    }    TRACE_("registering second sockets...");    rc = pj_ioqueue_register_sock( pool, ioque, csock, NULL, 			           &test_cb, &ckey);    if (rc != PJ_SUCCESS) {	app_perror("...error(11): ioqueue_register error", rc);	status=-26; goto on_error;    }    // Randomize send_buf.    pj_create_random_string(send_buf, bufsize);    // Register reading from ioqueue.    TRACE_("start recvfrom...");    pj_bzero(&addr, sizeof(addr));    addrlen = sizeof(addr);    bytes = bufsize;    rc = pj_ioqueue_recvfrom(skey, &read_op, recv_buf, &bytes, 0,			     &addr, &addrlen);    if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {        app_perror("...error: pj_ioqueue_recvfrom", rc);	status=-28; goto on_error;    } else if (rc == PJ_EPENDING) {	recv_pending = 1;	PJ_LOG(3, (THIS_FILE, 		   "......ok: recvfrom returned pending"));    } else {	PJ_LOG(3, (THIS_FILE, 		   "......error: recvfrom returned immediate ok!"));	status=-29; goto on_error;    }    // Set destination address to send the packet.    TRACE_("set destination address...");    temp = pj_str("127.0.0.1");    if ((rc=pj_sockaddr_in_init(&dst_addr, &temp, PORT)) != 0) {	app_perror("...error: unable to resolve 127.0.0.1", rc);	status=-290; goto on_error;    }    // Write must return the number of bytes.    TRACE_("start sendto...");    bytes = bufsize;    rc = pj_ioqueue_sendto(ckey, &write_op, send_buf, &bytes, 0, &dst_addr, 			   sizeof(dst_addr));    if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {        app_perror("...error: pj_ioqueue_sendto", rc);	status=-30; goto on_error;    } else if (rc == PJ_EPENDING) {	send_pending = 1;	PJ_LOG(3, (THIS_FILE, 		   "......ok: sendto returned pending"));    } else {	send_pending = 0;	PJ_LOG(3, (THIS_FILE, 		   "......ok: sendto returned immediate success"));    }    // reset callback variables.    callback_read_size = callback_write_size = 0;    callback_accept_status = callback_connect_status = -2;    callback_read_key = callback_write_key =         callback_accept_key = callback_connect_key = NULL;    callback_read_op = callback_write_op = NULL;    // Poll if pending.    while (send_pending || recv_pending) {	int rc;	pj_time_val timeout = { 5, 0 };	TRACE_("poll...");	rc = pj_ioqueue_poll(ioque, &timeout);	if (rc == 0) {	    PJ_LOG(1,(THIS_FILE, "...ERROR: timed out..."));	    status=-45; goto on_error;        } else if (rc < 0) {            app_perror("...ERROR in ioqueue_poll()", -rc);	    status=-50; goto on_error;	}	if (callback_read_key != NULL) {            if (callback_read_size != bufsize) {                status=-61; goto on_error;            }            if (callback_read_key != skey) {                status=-65; goto on_error;            }            if (callback_read_op != &read_op) {                status=-66; goto on_error;            }	    if (pj_memcmp(send_buf, recv_buf, bufsize) != 0) {		status=-67; goto on_error;	    }	    if (addrlen != sizeof(pj_sockaddr_in)) {		status=-68; goto on_error;	    }	    if (addr.sin_family != PJ_AF_INET) {		status=-69; goto on_error;	    }	    recv_pending = 0;	}         if (callback_write_key != NULL) {            if (callback_write_size != bufsize) {                status=-73; goto on_error;            }            if (callback_write_key != ckey) {                status=-75; goto on_error;            }            if (callback_write_op != &write_op) {                status=-76; goto on_error;            }            send_pending = 0;	}    }         // Success    status = 0;on_error:    if (ssock)	pj_sock_close(ssock);    if (csock)	pj_sock_close(csock);    if (ioque != NULL)	pj_ioqueue_destroy(ioque);    pj_pool_release(pool);    return status;}static void on_read_complete(pj_ioqueue_key_t *key,                              pj_ioqueue_op_key_t *op_key,                              pj_ssize_t bytes_read){    unsigned *p_packet_cnt = pj_ioqueue_get_user_data(key);    PJ_UNUSED_ARG(op_key);    PJ_UNUSED_ARG(bytes_read);    (*p_packet_cnt)++;}/* * unregister_test() * Check if callback is still called after socket has been unregistered or  * closed. */ static int unregister_test(void){    enum { RPORT = 50000, SPORT = 50001 };    pj_pool_t *pool;    pj_ioqueue_t *ioqueue;    pj_sock_t ssock;    pj_sock_t rsock;    int addrlen;    pj_sockaddr_in addr;    pj_ioqueue_key_t *key;    pj_ioqueue_op_key_t opkey;    pj_ioqueue_callback cb;    unsigned packet_cnt;    char sendbuf[10], recvbuf[10];    pj_ssize_t bytes;    pj_time_val timeout;    pj_status_t status;    pool = pj_pool_create(mem, "test", 4000, 4000, NULL);    if (!pool) {	app_perror("Unable to create pool", PJ_ENOMEM);	return -100;    }    status = pj_ioqueue_create(pool, 16, &ioqueue);    if (status != PJ_SUCCESS) {	app_perror("Error creating ioqueue", status);	return -110;    }    /* Create sender socket */    status = app_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, SPORT, &ssock);    if (status != PJ_SUCCESS) {	app_perror("Error initializing socket", status);	return -120;    }    /* Create receiver socket. */    status = app_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, RPORT, &rsock);    if (status != PJ_SUCCESS) {	app_perror("Error initializing socket", status);	return -130;    }    /* Register rsock to ioqueue. */    pj_bzero(&cb, sizeof(cb));    cb.on_read_complete = &on_read_complete;    packet_cnt = 0;    status = pj_ioqueue_register_sock(pool, ioqueue, rsock, &packet_cnt,				      &cb, &key);    if (status != PJ_SUCCESS) {	app_perror("Error registering to ioqueue", status);	return -140;    }    /* Init operation key. */    pj_ioqueue_op_key_init(&opkey, sizeof(opkey));    /* Start reading. */    bytes = sizeof(recvbuf);    status = pj_ioqueue_recv( key, &opkey, recvbuf, &bytes, 0);    if (status != PJ_EPENDING) {	app_perror("Expecting PJ_EPENDING, but got this", status);	return -150;    }    /* Init destination address. */    addrlen = sizeof(addr);    status = pj_sock_getsockname(rsock, &addr, &addrlen);    if (status != PJ_SUCCESS) {	app_perror("getsockname error", status);	return -160;    }    /* Override address with 127.0.0.1, since getsockname will return     * zero in the address field.     */    addr.sin_addr = pj_inet_addr2("127.0.0.1");    /* Init buffer to send */    pj_ansi_strcpy(sendbuf, "Hello0123");    /* Send one packet. */    bytes = sizeof(sendbuf);    status = pj_sock_sendto(ssock, sendbuf, &bytes, 0,			    &addr, sizeof(addr));    if (status != PJ_SUCCESS) {	app_perror("sendto error", status);	return -170;    }    /* Check if packet is received. */    timeout.sec = 1; timeout.msec = 0;    pj_ioqueue_poll(ioqueue, &timeout);    if (packet_cnt != 1) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -