📄 ioqueue_select.c
字号:
/* $Id: ioqueue_select.c 1158 2007-04-06 10:25:23Z bennylp $ *//* * Copyright (C)2003-2007 Benny Prijono <benny@prijono.org> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA *//* * sock_select.c * * This is the implementation of IOQueue using pj_sock_select(). * It runs anywhere where pj_sock_select() is available (currently * Win32, Linux, Linux kernel, etc.). */#include <pj/ioqueue.h>#include <pj/os.h>#include <pj/lock.h>#include <pj/log.h>#include <pj/list.h>#include <pj/pool.h>#include <pj/string.h>#include <pj/assert.h>#include <pj/sock.h>#include <pj/compat/socket.h>#include <pj/sock_select.h>#include <pj/errno.h>/* * Include declaration from common abstraction. */#include "ioqueue_common_abs.h"/* * ISSUES with ioqueue_select() * * EAGAIN/EWOULDBLOCK error in recv(): * - when multiple threads are working with the ioqueue, application * may receive EAGAIN or EWOULDBLOCK in the receive callback. * This error happens because more than one thread is watching for * the same descriptor set, so when all of them call recv() or recvfrom() * simultaneously, only one will succeed and the rest will get the error. * */#define THIS_FILE "ioq_select"/* * The select ioqueue relies on socket functions (pj_sock_xxx()) to return * the correct error code. */#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100)# error "Error reporting must be enabled for this function to work!"#endif/** * Get the number of descriptors in the set. This is defined in sock_select.c * This function will only return the number of sockets set from PJ_FD_SET * operation. When the set is modified by other means (such as by select()), * the count will not be reflected here. * * That's why don't export this function in the header file, to avoid * misunderstanding. * * @param fdsetp The descriptor set. * * @return Number of descriptors in the set. */PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp);/* * During debugging build, VALIDATE_FD_SET is set. * This will check the validity of the fd_sets. *//*#if defined(PJ_DEBUG) && PJ_DEBUG != 0# define VALIDATE_FD_SET 1#else# define VALIDATE_FD_SET 0#endif*/#define VALIDATE_FD_SET 0#if 0# define TRACE__(args) PJ_LOG(3,args)#else# define TRACE__(args)#endif/* * This describes each key. */struct pj_ioqueue_key_t{ DECLARE_COMMON_KEY};/* * This describes the I/O queue itself. */struct pj_ioqueue_t{ DECLARE_COMMON_IOQUEUE unsigned max, count; /* Max and current key count */ int nfds; /* The largest fd value (for select)*/ pj_ioqueue_key_t active_list; /* List of active keys. */ pj_fd_set_t rfdset; pj_fd_set_t wfdset;#if PJ_HAS_TCP pj_fd_set_t xfdset;#endif#if PJ_IOQUEUE_HAS_SAFE_UNREG pj_mutex_t *ref_cnt_mutex; pj_ioqueue_key_t closing_list; pj_ioqueue_key_t free_list;#endif};/* Include implementation for common abstraction after we declare * pj_ioqueue_key_t and pj_ioqueue_t. */#include "ioqueue_common_abs.c"/* * pj_ioqueue_name() */PJ_DEF(const char*) pj_ioqueue_name(void){ return "select";}/* * Scan the socket descriptor sets for the largest descriptor. * This value is needed by select(). */#if defined(PJ_SELECT_NEEDS_NFDS) && PJ_SELECT_NEEDS_NFDS!=0static void rescan_fdset(pj_ioqueue_t *ioqueue){ pj_ioqueue_key_t *key = ioqueue->active_list.next; int max = 0; while (key != &ioqueue->active_list) { if (key->fd > max) max = key->fd; key = key->next; } ioqueue->nfds = max;}#elsestatic void rescan_fdset(pj_ioqueue_t *ioqueue){ ioqueue->nfds = FD_SETSIZE-1;}#endif/* * pj_ioqueue_create() * * Create select ioqueue. */PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, pj_size_t max_fd, pj_ioqueue_t **p_ioqueue){ pj_ioqueue_t *ioqueue; pj_lock_t *lock; unsigned i; pj_status_t rc; /* Check that arguments are valid. */ PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL && max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES, PJ_EINVAL); /* Check that size of pj_ioqueue_op_key_t is sufficient */ PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >= sizeof(union operation_key), PJ_EBUG); /* Create and init common ioqueue stuffs */ ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); ioqueue_init(ioqueue); ioqueue->max = max_fd; ioqueue->count = 0; PJ_FD_ZERO(&ioqueue->rfdset); PJ_FD_ZERO(&ioqueue->wfdset);#if PJ_HAS_TCP PJ_FD_ZERO(&ioqueue->xfdset);#endif pj_list_init(&ioqueue->active_list); rescan_fdset(ioqueue);#if PJ_IOQUEUE_HAS_SAFE_UNREG /* When safe unregistration is used (the default), we pre-create * all keys and put them in the free list. */ /* Mutex to protect key's reference counter * We don't want to use key's mutex or ioqueue's mutex because * that would create deadlock situation in some cases. */ rc = pj_mutex_create_simple(pool, NULL, &ioqueue->ref_cnt_mutex); if (rc != PJ_SUCCESS) return rc; /* Init key list */ pj_list_init(&ioqueue->free_list); pj_list_init(&ioqueue->closing_list); /* Pre-create all keys according to max_fd */ for (i=0; i<max_fd; ++i) { pj_ioqueue_key_t *key; key = pj_pool_alloc(pool, sizeof(pj_ioqueue_key_t)); key->ref_count = 0; rc = pj_mutex_create_recursive(pool, NULL, &key->mutex); if (rc != PJ_SUCCESS) { key = ioqueue->free_list.next; while (key != &ioqueue->free_list) { pj_mutex_destroy(key->mutex); key = key->next; } pj_mutex_destroy(ioqueue->ref_cnt_mutex); return rc; } pj_list_push_back(&ioqueue->free_list, key); }#endif /* Create and init ioqueue mutex */ rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock); if (rc != PJ_SUCCESS) return rc; rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE); if (rc != PJ_SUCCESS) return rc; PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue)); *p_ioqueue = ioqueue; return PJ_SUCCESS;}/* * pj_ioqueue_destroy() * * Destroy ioqueue. */PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue){ pj_ioqueue_key_t *key; PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); pj_lock_acquire(ioqueue->lock);#if PJ_IOQUEUE_HAS_SAFE_UNREG /* Destroy reference counters */ key = ioqueue->active_list.next; while (key != &ioqueue->active_list) { pj_mutex_destroy(key->mutex); key = key->next; } key = ioqueue->closing_list.next; while (key != &ioqueue->closing_list) { pj_mutex_destroy(key->mutex); key = key->next; } key = ioqueue->free_list.next; while (key != &ioqueue->free_list) { pj_mutex_destroy(key->mutex); key = key->next; } pj_mutex_destroy(ioqueue->ref_cnt_mutex);#endif return ioqueue_destroy(ioqueue);}/* * pj_ioqueue_register_sock() * * Register socket handle to ioqueue. */PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, pj_ioqueue_t *ioqueue, pj_sock_t sock, void *user_data, const pj_ioqueue_callback *cb, pj_ioqueue_key_t **p_key){ pj_ioqueue_key_t *key = NULL;#if defined(PJ_WIN32) && PJ_WIN32!=0 || \ defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE!=0 u_long value;#else pj_uint32_t value;#endif pj_status_t rc = PJ_SUCCESS; PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET && cb && p_key, PJ_EINVAL); pj_lock_acquire(ioqueue->lock); if (ioqueue->count >= ioqueue->max) { rc = PJ_ETOOMANY; goto on_return; } /* If safe unregistration (PJ_IOQUEUE_HAS_SAFE_UNREG) is used, get * the key from the free list. Otherwise allocate a new one. */#if PJ_IOQUEUE_HAS_SAFE_UNREG pj_assert(!pj_list_empty(&ioqueue->free_list)); if (pj_list_empty(&ioqueue->free_list)) { rc = PJ_ETOOMANY; goto on_return; } key = ioqueue->free_list.next; pj_list_erase(key);#else key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));#endif rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb); if (rc != PJ_SUCCESS) { key = NULL; goto on_return; } /* Set socket to nonblocking. */ value = 1;#if defined(PJ_WIN32) && PJ_WIN32!=0 || \ defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE!=0 if (ioctlsocket(sock, FIONBIO, &value)) {#else if (ioctl(sock, FIONBIO, &value)) {#endif rc = pj_get_netos_error(); goto on_return; } /* Put in active list. */ pj_list_insert_before(&ioqueue->active_list, key); ++ioqueue->count; /* Rescan fdset to get max descriptor */ rescan_fdset(ioqueue);on_return: /* On error, socket may be left in non-blocking mode. */ *p_key = key; pj_lock_release(ioqueue->lock); return rc;}#if PJ_IOQUEUE_HAS_SAFE_UNREG/* Increment key's reference counter */static void increment_counter(pj_ioqueue_key_t *key){ pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -