📄 ioq_udp.c
字号:
/* $Id: ioq_udp.c 974 2007-02-19 01:13:53Z bennylp $ *//* * Copyright (C)2003-2007 Benny Prijono <benny@prijono.org> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#include "test.h"/** * \page page_pjlib_ioqueue_udp_test Test: I/O Queue (UDP) * * This file provides implementation to test the * functionality of the I/O queue when UDP socket is used. * * * This file is <b>pjlib-test/ioq_udp.c</b> * * \include pjlib-test/ioq_udp.c */#if INCLUDE_UDP_IOQUEUE_TEST#include <pjlib.h>#include <pj/compat/socket.h>#define THIS_FILE "test_udp"#define PORT 51233#define LOOP 2///#define LOOP 2#define BUF_MIN_SIZE 32#define BUF_MAX_SIZE 2048#define SOCK_INACTIVE_MIN (1)#define SOCK_INACTIVE_MAX (PJ_IOQUEUE_MAX_HANDLES - 2)#define POOL_SIZE (2*BUF_MAX_SIZE + SOCK_INACTIVE_MAX*128 + 2048)#undef TRACE_#define TRACE_(msg) PJ_LOG(3,(THIS_FILE,"....." msg))#if 0# define TRACE__(args) PJ_LOG(3,args)#else# define TRACE__(args)#endifstatic pj_ssize_t callback_read_size, callback_write_size, callback_accept_status, callback_connect_status;static pj_ioqueue_key_t *callback_read_key, *callback_write_key, *callback_accept_key, *callback_connect_key;static pj_ioqueue_op_key_t *callback_read_op, *callback_write_op, *callback_accept_op;static void on_ioqueue_read(pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_read){ callback_read_key = key; callback_read_op = op_key; callback_read_size = bytes_read; TRACE__((THIS_FILE, " callback_read_key = %p, bytes=%d", key, bytes_read));}static void on_ioqueue_write(pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_written){ callback_write_key = key; callback_write_op = op_key; callback_write_size = bytes_written;}static void on_ioqueue_accept(pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_sock_t sock, int status){ PJ_UNUSED_ARG(sock); callback_accept_key = key; callback_accept_op = op_key; callback_accept_status = status;}static void on_ioqueue_connect(pj_ioqueue_key_t *key, int status){ callback_connect_key = key; callback_connect_status = status;}static pj_ioqueue_callback test_cb = { &on_ioqueue_read, &on_ioqueue_write, &on_ioqueue_accept, &on_ioqueue_connect,};#ifdef PJ_WIN32# define S_ADDR S_un.S_addr#else# define S_ADDR s_addr#endif/* * compliance_test() * To test that the basic IOQueue functionality works. It will just exchange * data between two sockets. */ static int compliance_test(void){ pj_sock_t ssock=-1, csock=-1; pj_sockaddr_in addr, dst_addr; int addrlen; pj_pool_t *pool = NULL; char *send_buf, *recv_buf; pj_ioqueue_t *ioque = NULL; pj_ioqueue_key_t *skey, *ckey; pj_ioqueue_op_key_t read_op, write_op; int bufsize = BUF_MIN_SIZE; pj_ssize_t bytes, status = -1; pj_str_t temp; pj_bool_t send_pending, recv_pending; pj_status_t rc; pj_set_os_error(PJ_SUCCESS); // Create pool. pool = pj_pool_create(mem, NULL, POOL_SIZE, 4000, NULL); // Allocate buffers for send and receive. send_buf = (char*)pj_pool_alloc(pool, bufsize); recv_buf = (char*)pj_pool_alloc(pool, bufsize); // Allocate sockets for sending and receiving. TRACE_("creating sockets..."); rc = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &ssock); if (rc==PJ_SUCCESS) rc = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &csock); else csock = PJ_INVALID_SOCKET; if (rc != PJ_SUCCESS) { app_perror("...ERROR in pj_sock_socket()", rc); status=-1; goto on_error; } // Bind server socket. TRACE_("bind socket..."); pj_bzero(&addr, sizeof(addr)); addr.sin_family = PJ_AF_INET; addr.sin_port = pj_htons(PORT); if (pj_sock_bind(ssock, &addr, sizeof(addr))) { status=-10; goto on_error; } // Create I/O Queue. TRACE_("create ioqueue..."); rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque); if (rc != PJ_SUCCESS) { status=-20; goto on_error; } // Register server and client socket. // We put this after inactivity socket, hopefully this can represent the // worst waiting time. TRACE_("registering first sockets..."); rc = pj_ioqueue_register_sock(pool, ioque, ssock, NULL, &test_cb, &skey); if (rc != PJ_SUCCESS) { app_perror("...error(10): ioqueue_register error", rc); status=-25; goto on_error; } TRACE_("registering second sockets..."); rc = pj_ioqueue_register_sock( pool, ioque, csock, NULL, &test_cb, &ckey); if (rc != PJ_SUCCESS) { app_perror("...error(11): ioqueue_register error", rc); status=-26; goto on_error; } // Randomize send_buf. pj_create_random_string(send_buf, bufsize); // Register reading from ioqueue. TRACE_("start recvfrom..."); pj_bzero(&addr, sizeof(addr)); addrlen = sizeof(addr); bytes = bufsize; rc = pj_ioqueue_recvfrom(skey, &read_op, recv_buf, &bytes, 0, &addr, &addrlen); if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { app_perror("...error: pj_ioqueue_recvfrom", rc); status=-28; goto on_error; } else if (rc == PJ_EPENDING) { recv_pending = 1; PJ_LOG(3, (THIS_FILE, "......ok: recvfrom returned pending")); } else { PJ_LOG(3, (THIS_FILE, "......error: recvfrom returned immediate ok!")); status=-29; goto on_error; } // Set destination address to send the packet. TRACE_("set destination address..."); temp = pj_str("127.0.0.1"); if ((rc=pj_sockaddr_in_init(&dst_addr, &temp, PORT)) != 0) { app_perror("...error: unable to resolve 127.0.0.1", rc); status=-290; goto on_error; } // Write must return the number of bytes. TRACE_("start sendto..."); bytes = bufsize; rc = pj_ioqueue_sendto(ckey, &write_op, send_buf, &bytes, 0, &dst_addr, sizeof(dst_addr)); if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { app_perror("...error: pj_ioqueue_sendto", rc); status=-30; goto on_error; } else if (rc == PJ_EPENDING) { send_pending = 1; PJ_LOG(3, (THIS_FILE, "......ok: sendto returned pending")); } else { send_pending = 0; PJ_LOG(3, (THIS_FILE, "......ok: sendto returned immediate success")); } // reset callback variables. callback_read_size = callback_write_size = 0; callback_accept_status = callback_connect_status = -2; callback_read_key = callback_write_key = callback_accept_key = callback_connect_key = NULL; callback_read_op = callback_write_op = NULL; // Poll if pending. while (send_pending || recv_pending) { int rc; pj_time_val timeout = { 5, 0 }; TRACE_("poll..."); rc = pj_ioqueue_poll(ioque, &timeout); if (rc == 0) { PJ_LOG(1,(THIS_FILE, "...ERROR: timed out...")); status=-45; goto on_error; } else if (rc < 0) { app_perror("...ERROR in ioqueue_poll()", -rc); status=-50; goto on_error; } if (callback_read_key != NULL) { if (callback_read_size != bufsize) { status=-61; goto on_error; } if (callback_read_key != skey) { status=-65; goto on_error; } if (callback_read_op != &read_op) { status=-66; goto on_error; } if (pj_memcmp(send_buf, recv_buf, bufsize) != 0) { status=-67; goto on_error; } if (addrlen != sizeof(pj_sockaddr_in)) { status=-68; goto on_error; } if (addr.sin_family != PJ_AF_INET) { status=-69; goto on_error; } recv_pending = 0; } if (callback_write_key != NULL) { if (callback_write_size != bufsize) { status=-73; goto on_error; } if (callback_write_key != ckey) { status=-75; goto on_error; } if (callback_write_op != &write_op) { status=-76; goto on_error; } send_pending = 0; } } // Success status = 0;on_error: if (ssock) pj_sock_close(ssock); if (csock) pj_sock_close(csock); if (ioque != NULL) pj_ioqueue_destroy(ioque); pj_pool_release(pool); return status;}static void on_read_complete(pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_read){ unsigned *p_packet_cnt = pj_ioqueue_get_user_data(key); PJ_UNUSED_ARG(op_key); PJ_UNUSED_ARG(bytes_read); (*p_packet_cnt)++;}/* * unregister_test() * Check if callback is still called after socket has been unregistered or * closed. */ static int unregister_test(void){ enum { RPORT = 50000, SPORT = 50001 }; pj_pool_t *pool; pj_ioqueue_t *ioqueue; pj_sock_t ssock; pj_sock_t rsock; int addrlen; pj_sockaddr_in addr; pj_ioqueue_key_t *key; pj_ioqueue_op_key_t opkey; pj_ioqueue_callback cb; unsigned packet_cnt; char sendbuf[10], recvbuf[10]; pj_ssize_t bytes; pj_time_val timeout; pj_status_t status; pool = pj_pool_create(mem, "test", 4000, 4000, NULL); if (!pool) { app_perror("Unable to create pool", PJ_ENOMEM); return -100; } status = pj_ioqueue_create(pool, 16, &ioqueue); if (status != PJ_SUCCESS) { app_perror("Error creating ioqueue", status); return -110; } /* Create sender socket */ status = app_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, SPORT, &ssock); if (status != PJ_SUCCESS) { app_perror("Error initializing socket", status); return -120; } /* Create receiver socket. */ status = app_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, RPORT, &rsock); if (status != PJ_SUCCESS) { app_perror("Error initializing socket", status); return -130; } /* Register rsock to ioqueue. */ pj_bzero(&cb, sizeof(cb)); cb.on_read_complete = &on_read_complete; packet_cnt = 0; status = pj_ioqueue_register_sock(pool, ioqueue, rsock, &packet_cnt, &cb, &key); if (status != PJ_SUCCESS) { app_perror("Error registering to ioqueue", status); return -140; } /* Init operation key. */ pj_ioqueue_op_key_init(&opkey, sizeof(opkey)); /* Start reading. */ bytes = sizeof(recvbuf); status = pj_ioqueue_recv( key, &opkey, recvbuf, &bytes, 0); if (status != PJ_EPENDING) { app_perror("Expecting PJ_EPENDING, but got this", status); return -150; } /* Init destination address. */ addrlen = sizeof(addr); status = pj_sock_getsockname(rsock, &addr, &addrlen); if (status != PJ_SUCCESS) { app_perror("getsockname error", status); return -160; } /* Override address with 127.0.0.1, since getsockname will return * zero in the address field. */ addr.sin_addr = pj_inet_addr2("127.0.0.1"); /* Init buffer to send */ pj_ansi_strcpy(sendbuf, "Hello0123"); /* Send one packet. */ bytes = sizeof(sendbuf); status = pj_sock_sendto(ssock, sendbuf, &bytes, 0, &addr, sizeof(addr)); if (status != PJ_SUCCESS) { app_perror("sendto error", status); return -170; } /* Check if packet is received. */ timeout.sec = 1; timeout.msec = 0; pj_ioqueue_poll(ioqueue, &timeout); if (packet_cnt != 1) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -