📄 ioqueue_winnt.c
字号:
/* $Id: ioqueue_winnt.c 1158 2007-04-06 10:25:23Z bennylp $ *//* * Copyright (C)2003-2007 Benny Prijono <benny@prijono.org> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#include <pj/ioqueue.h>#include <pj/os.h>#include <pj/lock.h>#include <pj/pool.h>#include <pj/string.h>#include <pj/sock.h>#include <pj/array.h>#include <pj/log.h>#include <pj/assert.h>#include <pj/errno.h>#include <pj/compat/socket.h>#if defined(PJ_HAS_WINSOCK2_H) && PJ_HAS_WINSOCK2_H != 0# include <winsock2.h>#elif defined(PJ_HAS_WINSOCK_H) && PJ_HAS_WINSOCK_H != 0# include <winsock.h>#endif#if defined(PJ_HAS_MSWSOCK_H) && PJ_HAS_MSWSOCK_H != 0# include <mswsock.h>#endif/* The address specified in AcceptEx() must be 16 more than the size of * SOCKADDR (source: MSDN). */#define ACCEPT_ADDR_LEN (sizeof(pj_sockaddr_in)+16)typedef struct generic_overlapped{ WSAOVERLAPPED overlapped; pj_ioqueue_operation_e operation;} generic_overlapped;/* * OVERLAPPPED structure for send and receive. */typedef struct ioqueue_overlapped{ WSAOVERLAPPED overlapped; pj_ioqueue_operation_e operation; WSABUF wsabuf; pj_sockaddr_in dummy_addr; int dummy_addrlen;} ioqueue_overlapped;#if PJ_HAS_TCP/* * OVERLAP structure for accept. */typedef struct ioqueue_accept_rec{ WSAOVERLAPPED overlapped; pj_ioqueue_operation_e operation; pj_sock_t newsock; pj_sock_t *newsock_ptr; int *addrlen; void *remote; void *local; char accept_buf[2 * ACCEPT_ADDR_LEN];} ioqueue_accept_rec;#endif/* * Structure to hold pending operation key. */union operation_key{ generic_overlapped generic; ioqueue_overlapped overlapped;#if PJ_HAS_TCP ioqueue_accept_rec accept;#endif};/* Type of handle in the key. */enum handle_type{ HND_IS_UNKNOWN, HND_IS_FILE, HND_IS_SOCKET,};enum { POST_QUIT_LEN = 0xFFFFDEADUL };/* * Structure for individual socket. */struct pj_ioqueue_key_t{ PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t); pj_ioqueue_t *ioqueue; HANDLE hnd; void *user_data; enum handle_type hnd_type; pj_ioqueue_callback cb;#if PJ_HAS_TCP int connecting;#endif#if PJ_IOQUEUE_HAS_SAFE_UNREG pj_atomic_t *ref_count; pj_bool_t closing; pj_time_val free_time;#endif};/* * IO Queue structure. */struct pj_ioqueue_t{ HANDLE iocp; pj_lock_t *lock; pj_bool_t auto_delete_lock;#if PJ_IOQUEUE_HAS_SAFE_UNREG pj_ioqueue_key_t active_list; pj_ioqueue_key_t free_list; pj_ioqueue_key_t closing_list;#endif /* These are to keep track of connecting sockets */#if PJ_HAS_TCP unsigned event_count; HANDLE event_pool[MAXIMUM_WAIT_OBJECTS+1]; unsigned connecting_count; HANDLE connecting_handles[MAXIMUM_WAIT_OBJECTS+1]; pj_ioqueue_key_t *connecting_keys[MAXIMUM_WAIT_OBJECTS+1];#endif};#if PJ_HAS_TCP/* * Process the socket when the overlapped accept() completed. */static void ioqueue_on_accept_complete(ioqueue_accept_rec *accept_overlapped){ struct sockaddr *local; struct sockaddr *remote; int locallen, remotelen; PJ_CHECK_STACK(); /* Operation complete immediately. */ GetAcceptExSockaddrs( accept_overlapped->accept_buf, 0, ACCEPT_ADDR_LEN, ACCEPT_ADDR_LEN, &local, &locallen, &remote, &remotelen); if (*accept_overlapped->addrlen >= locallen) { pj_memcpy(accept_overlapped->local, local, locallen); pj_memcpy(accept_overlapped->remote, remote, locallen); } else { pj_bzero(accept_overlapped->local, *accept_overlapped->addrlen); pj_bzero(accept_overlapped->remote, *accept_overlapped->addrlen); } *accept_overlapped->addrlen = locallen; if (accept_overlapped->newsock_ptr) *accept_overlapped->newsock_ptr = accept_overlapped->newsock; accept_overlapped->operation = 0;}static void erase_connecting_socket( pj_ioqueue_t *ioqueue, unsigned pos){ pj_ioqueue_key_t *key = ioqueue->connecting_keys[pos]; HANDLE hEvent = ioqueue->connecting_handles[pos]; /* Remove key from array of connecting handles. */ pj_array_erase(ioqueue->connecting_keys, sizeof(key), ioqueue->connecting_count, pos); pj_array_erase(ioqueue->connecting_handles, sizeof(HANDLE), ioqueue->connecting_count, pos); --ioqueue->connecting_count; /* Disassociate the socket from the event. */ WSAEventSelect((pj_sock_t)key->hnd, hEvent, 0); /* Put event object to pool. */ if (ioqueue->event_count < MAXIMUM_WAIT_OBJECTS) { ioqueue->event_pool[ioqueue->event_count++] = hEvent; } else { /* Shouldn't happen. There should be no more pending connections * than max. */ pj_assert(0); CloseHandle(hEvent); }}/* * Poll for the completion of non-blocking connect(). * If there's a completion, the function return the key of the completed * socket, and 'result' argument contains the connect() result. If connect() * succeeded, 'result' will have value zero, otherwise will have the error * code. */static int check_connecting( pj_ioqueue_t *ioqueue ){ if (ioqueue->connecting_count) { int i, count; struct { pj_ioqueue_key_t *key; pj_status_t status; } events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL-1]; pj_lock_acquire(ioqueue->lock); for (count=0; count<PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL-1; ++count) { DWORD result; result = WaitForMultipleObjects(ioqueue->connecting_count, ioqueue->connecting_handles, FALSE, 0); if (result >= WAIT_OBJECT_0 && result < WAIT_OBJECT_0+ioqueue->connecting_count) { WSANETWORKEVENTS net_events; /* Got completed connect(). */ unsigned pos = result - WAIT_OBJECT_0; events[count].key = ioqueue->connecting_keys[pos]; /* See whether connect has succeeded. */ WSAEnumNetworkEvents((pj_sock_t)events[count].key->hnd, ioqueue->connecting_handles[pos], &net_events); events[count].status = PJ_STATUS_FROM_OS(net_events.iErrorCode[FD_CONNECT_BIT]); /* Erase socket from pending connect. */ erase_connecting_socket(ioqueue, pos); } else { /* No more events */ break; } } pj_lock_release(ioqueue->lock); /* Call callbacks. */ for (i=0; i<count; ++i) { if (events[i].key->cb.on_connect_complete) { events[i].key->cb.on_connect_complete(events[i].key, events[i].status); } } return count; } return 0; }#endif/* * pj_ioqueue_name() */PJ_DEF(const char*) pj_ioqueue_name(void){ return "iocp";}/* * pj_ioqueue_create() */PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, pj_size_t max_fd, pj_ioqueue_t **p_ioqueue){ pj_ioqueue_t *ioqueue; unsigned i; pj_status_t rc; PJ_UNUSED_ARG(max_fd); PJ_ASSERT_RETURN(pool && p_ioqueue, PJ_EINVAL); rc = sizeof(union operation_key); /* Check that sizeof(pj_ioqueue_op_key_t) makes sense. */ PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >= sizeof(union operation_key), PJ_EBUG); /* Create IOCP */ ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue)); ioqueue->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); if (ioqueue->iocp == NULL) return PJ_RETURN_OS_ERROR(GetLastError()); /* Create IOCP mutex */ rc = pj_lock_create_simple_mutex(pool, NULL, &ioqueue->lock); if (rc != PJ_SUCCESS) { CloseHandle(ioqueue->iocp); return rc; } ioqueue->auto_delete_lock = PJ_TRUE;#if PJ_IOQUEUE_HAS_SAFE_UNREG /* * Create and initialize key pools. */ pj_list_init(&ioqueue->active_list); pj_list_init(&ioqueue->free_list); pj_list_init(&ioqueue->closing_list); /* Preallocate keys according to max_fd setting, and put them * in free_list. */ for (i=0; i<max_fd; ++i) { pj_ioqueue_key_t *key; key = pj_pool_alloc(pool, sizeof(pj_ioqueue_key_t)); rc = pj_atomic_create(pool, 0, &key->ref_count); if (rc != PJ_SUCCESS) { key = ioqueue->free_list.next; while (key != &ioqueue->free_list) { pj_atomic_destroy(key->ref_count); key = key->next; } CloseHandle(ioqueue->iocp); return rc; } pj_list_push_back(&ioqueue->free_list, key); }#endif *p_ioqueue = ioqueue; PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioqueue)); return PJ_SUCCESS;}/* * pj_ioqueue_destroy() */PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue ){#if PJ_HAS_TCP unsigned i;#endif pj_ioqueue_key_t *key; PJ_CHECK_STACK(); PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); pj_lock_acquire(ioqueue->lock);#if PJ_HAS_TCP /* Destroy events in the pool */ for (i=0; i<ioqueue->event_count; ++i) { CloseHandle(ioqueue->event_pool[i]); } ioqueue->event_count = 0;#endif if (CloseHandle(ioqueue->iocp) != TRUE) return PJ_RETURN_OS_ERROR(GetLastError());#if PJ_IOQUEUE_HAS_SAFE_UNREG /* Destroy reference counters */ key = ioqueue->active_list.next; while (key != &ioqueue->active_list) { pj_atomic_destroy(key->ref_count); key = key->next; } key = ioqueue->closing_list.next; while (key != &ioqueue->closing_list) { pj_atomic_destroy(key->ref_count); key = key->next; } key = ioqueue->free_list.next; while (key != &ioqueue->free_list) { pj_atomic_destroy(key->ref_count); key = key->next; }#endif if (ioqueue->auto_delete_lock) pj_lock_destroy(ioqueue->lock); return PJ_SUCCESS;}/* * pj_ioqueue_set_lock() */PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue, pj_lock_t *lock, pj_bool_t auto_delete ){ PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -