📄 ioqueue_winnt.c
字号:
/* $Id: ioqueue_winnt.c 1158 2007-04-06 10:25:23Z bennylp $ */
/*
* Copyright (C)2003-2007 Benny Prijono <benny@prijono.org>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <pj/ioqueue.h>
#include <pj/os.h>
#include <pj/lock.h>
#include <pj/pool.h>
#include <pj/string.h>
#include <pj/sock.h>
#include <pj/array.h>
#include <pj/log.h>
#include <pj/assert.h>
#include <pj/errno.h>
#include <pj/compat/socket.h>
#if defined(PJ_HAS_WINSOCK2_H) && PJ_HAS_WINSOCK2_H != 0
# include <winsock2.h>
#elif defined(PJ_HAS_WINSOCK_H) && PJ_HAS_WINSOCK_H != 0
# include <winsock.h>
#endif
#if defined(PJ_HAS_MSWSOCK_H) && PJ_HAS_MSWSOCK_H != 0
# include <mswsock.h>
#endif
/* The address specified in AcceptEx() must be 16 more than the size of
* SOCKADDR (source: MSDN).
*/
#define ACCEPT_ADDR_LEN (sizeof(pj_sockaddr_in)+16)
typedef struct generic_overlapped
{
WSAOVERLAPPED overlapped;
pj_ioqueue_operation_e operation;
} generic_overlapped;
/*
* OVERLAPPPED structure for send and receive.
*/
typedef struct ioqueue_overlapped
{
WSAOVERLAPPED overlapped;
pj_ioqueue_operation_e operation;
WSABUF wsabuf;
pj_sockaddr_in dummy_addr;
int dummy_addrlen;
} ioqueue_overlapped;
#if PJ_HAS_TCP
/*
* OVERLAP structure for accept.
*/
typedef struct ioqueue_accept_rec
{
WSAOVERLAPPED overlapped;
pj_ioqueue_operation_e operation;
pj_sock_t newsock;
pj_sock_t *newsock_ptr;
int *addrlen;
void *remote;
void *local;
char accept_buf[2 * ACCEPT_ADDR_LEN];
} ioqueue_accept_rec;
#endif
/*
* Structure to hold pending operation key.
*/
union operation_key
{
generic_overlapped generic;
ioqueue_overlapped overlapped;
#if PJ_HAS_TCP
ioqueue_accept_rec accept;
#endif
};
/* Type of handle in the key. */
enum handle_type
{
HND_IS_UNKNOWN,
HND_IS_FILE,
HND_IS_SOCKET,
};
enum { POST_QUIT_LEN = 0xFFFFDEADUL };
/*
* Structure for individual socket.
*/
struct pj_ioqueue_key_t
{
PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t);
pj_ioqueue_t *ioqueue;
HANDLE hnd;
void *user_data;
enum handle_type hnd_type;
pj_ioqueue_callback cb;
#if PJ_HAS_TCP
int connecting;
#endif
#if PJ_IOQUEUE_HAS_SAFE_UNREG
pj_atomic_t *ref_count;
pj_bool_t closing;
pj_time_val free_time;
#endif
};
/*
* IO Queue structure.
*/
struct pj_ioqueue_t
{
HANDLE iocp;
pj_lock_t *lock;
pj_bool_t auto_delete_lock;
#if PJ_IOQUEUE_HAS_SAFE_UNREG
pj_ioqueue_key_t active_list;
pj_ioqueue_key_t free_list;
pj_ioqueue_key_t closing_list;
#endif
/* These are to keep track of connecting sockets */
#if PJ_HAS_TCP
unsigned event_count;
HANDLE event_pool[MAXIMUM_WAIT_OBJECTS+1];
unsigned connecting_count;
HANDLE connecting_handles[MAXIMUM_WAIT_OBJECTS+1];
pj_ioqueue_key_t *connecting_keys[MAXIMUM_WAIT_OBJECTS+1];
#endif
};
#if PJ_HAS_TCP
/*
* Process the socket when the overlapped accept() completed.
*/
static void ioqueue_on_accept_complete(ioqueue_accept_rec *accept_overlapped)
{
struct sockaddr *local;
struct sockaddr *remote;
int locallen, remotelen;
PJ_CHECK_STACK();
/* Operation complete immediately. */
GetAcceptExSockaddrs( accept_overlapped->accept_buf,
0,
ACCEPT_ADDR_LEN,
ACCEPT_ADDR_LEN,
&local,
&locallen,
&remote,
&remotelen);
if (*accept_overlapped->addrlen >= locallen) {
pj_memcpy(accept_overlapped->local, local, locallen);
pj_memcpy(accept_overlapped->remote, remote, locallen);
} else {
pj_bzero(accept_overlapped->local, *accept_overlapped->addrlen);
pj_bzero(accept_overlapped->remote, *accept_overlapped->addrlen);
}
*accept_overlapped->addrlen = locallen;
if (accept_overlapped->newsock_ptr)
*accept_overlapped->newsock_ptr = accept_overlapped->newsock;
accept_overlapped->operation = 0;
}
static void erase_connecting_socket( pj_ioqueue_t *ioqueue, unsigned pos)
{
pj_ioqueue_key_t *key = ioqueue->connecting_keys[pos];
HANDLE hEvent = ioqueue->connecting_handles[pos];
/* Remove key from array of connecting handles. */
pj_array_erase(ioqueue->connecting_keys, sizeof(key),
ioqueue->connecting_count, pos);
pj_array_erase(ioqueue->connecting_handles, sizeof(HANDLE),
ioqueue->connecting_count, pos);
--ioqueue->connecting_count;
/* Disassociate the socket from the event. */
WSAEventSelect((pj_sock_t)key->hnd, hEvent, 0);
/* Put event object to pool. */
if (ioqueue->event_count < MAXIMUM_WAIT_OBJECTS) {
ioqueue->event_pool[ioqueue->event_count++] = hEvent;
} else {
/* Shouldn't happen. There should be no more pending connections
* than max.
*/
pj_assert(0);
CloseHandle(hEvent);
}
}
/*
* Poll for the completion of non-blocking connect().
* If there's a completion, the function return the key of the completed
* socket, and 'result' argument contains the connect() result. If connect()
* succeeded, 'result' will have value zero, otherwise will have the error
* code.
*/
static int check_connecting( pj_ioqueue_t *ioqueue )
{
if (ioqueue->connecting_count) {
int i, count;
struct
{
pj_ioqueue_key_t *key;
pj_status_t status;
} events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL-1];
pj_lock_acquire(ioqueue->lock);
for (count=0; count<PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL-1; ++count) {
DWORD result;
result = WaitForMultipleObjects(ioqueue->connecting_count,
ioqueue->connecting_handles,
FALSE, 0);
if (result >= WAIT_OBJECT_0 &&
result < WAIT_OBJECT_0+ioqueue->connecting_count)
{
WSANETWORKEVENTS net_events;
/* Got completed connect(). */
unsigned pos = result - WAIT_OBJECT_0;
events[count].key = ioqueue->connecting_keys[pos];
/* See whether connect has succeeded. */
WSAEnumNetworkEvents((pj_sock_t)events[count].key->hnd,
ioqueue->connecting_handles[pos],
&net_events);
events[count].status =
PJ_STATUS_FROM_OS(net_events.iErrorCode[FD_CONNECT_BIT]);
/* Erase socket from pending connect. */
erase_connecting_socket(ioqueue, pos);
} else {
/* No more events */
break;
}
}
pj_lock_release(ioqueue->lock);
/* Call callbacks. */
for (i=0; i<count; ++i) {
if (events[i].key->cb.on_connect_complete) {
events[i].key->cb.on_connect_complete(events[i].key,
events[i].status);
}
}
return count;
}
return 0;
}
#endif
/*
* pj_ioqueue_name()
*/
PJ_DEF(const char*) pj_ioqueue_name(void)
{
return "iocp";
}
/*
* pj_ioqueue_create()
*/
PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
pj_size_t max_fd,
pj_ioqueue_t **p_ioqueue)
{
pj_ioqueue_t *ioqueue;
unsigned i;
pj_status_t rc;
PJ_UNUSED_ARG(max_fd);
PJ_ASSERT_RETURN(pool && p_ioqueue, PJ_EINVAL);
rc = sizeof(union operation_key);
/* Check that sizeof(pj_ioqueue_op_key_t) makes sense. */
PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
sizeof(union operation_key), PJ_EBUG);
/* Create IOCP */
ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue));
ioqueue->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (ioqueue->iocp == NULL)
return PJ_RETURN_OS_ERROR(GetLastError());
/* Create IOCP mutex */
rc = pj_lock_create_simple_mutex(pool, NULL, &ioqueue->lock);
if (rc != PJ_SUCCESS) {
CloseHandle(ioqueue->iocp);
return rc;
}
ioqueue->auto_delete_lock = PJ_TRUE;
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/*
* Create and initialize key pools.
*/
pj_list_init(&ioqueue->active_list);
pj_list_init(&ioqueue->free_list);
pj_list_init(&ioqueue->closing_list);
/* Preallocate keys according to max_fd setting, and put them
* in free_list.
*/
for (i=0; i<max_fd; ++i) {
pj_ioqueue_key_t *key;
key = pj_pool_alloc(pool, sizeof(pj_ioqueue_key_t));
rc = pj_atomic_create(pool, 0, &key->ref_count);
if (rc != PJ_SUCCESS) {
key = ioqueue->free_list.next;
while (key != &ioqueue->free_list) {
pj_atomic_destroy(key->ref_count);
key = key->next;
}
CloseHandle(ioqueue->iocp);
return rc;
}
pj_list_push_back(&ioqueue->free_list, key);
}
#endif
*p_ioqueue = ioqueue;
PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioqueue));
return PJ_SUCCESS;
}
/*
* pj_ioqueue_destroy()
*/
PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue )
{
#if PJ_HAS_TCP
unsigned i;
#endif
pj_ioqueue_key_t *key;
PJ_CHECK_STACK();
PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
pj_lock_acquire(ioqueue->lock);
#if PJ_HAS_TCP
/* Destroy events in the pool */
for (i=0; i<ioqueue->event_count; ++i) {
CloseHandle(ioqueue->event_pool[i]);
}
ioqueue->event_count = 0;
#endif
if (CloseHandle(ioqueue->iocp) != TRUE)
return PJ_RETURN_OS_ERROR(GetLastError());
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Destroy reference counters */
key = ioqueue->active_list.next;
while (key != &ioqueue->active_list) {
pj_atomic_destroy(key->ref_count);
key = key->next;
}
key = ioqueue->closing_list.next;
while (key != &ioqueue->closing_list) {
pj_atomic_destroy(key->ref_count);
key = key->next;
}
key = ioqueue->free_list.next;
while (key != &ioqueue->free_list) {
pj_atomic_destroy(key->ref_count);
key = key->next;
}
#endif
if (ioqueue->auto_delete_lock)
pj_lock_destroy(ioqueue->lock);
return PJ_SUCCESS;
}
/*
* pj_ioqueue_set_lock()
*/
PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
pj_lock_t *lock,
pj_bool_t auto_delete )
{
PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -