⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ioqueue_winnt.c

📁 一个开源SIP协议栈
💻 C
📖 第 1 页 / 共 3 页
字号:
/* $Id: ioqueue_winnt.c 1158 2007-04-06 10:25:23Z bennylp $ */
/* 
 * Copyright (C)2003-2007 Benny Prijono <benny@prijono.org>
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
 */
#include <pj/ioqueue.h>
#include <pj/os.h>
#include <pj/lock.h>
#include <pj/pool.h>
#include <pj/string.h>
#include <pj/sock.h>
#include <pj/array.h>
#include <pj/log.h>
#include <pj/assert.h>
#include <pj/errno.h>
#include <pj/compat/socket.h>


#if defined(PJ_HAS_WINSOCK2_H) && PJ_HAS_WINSOCK2_H != 0
#  include <winsock2.h>
#elif defined(PJ_HAS_WINSOCK_H) && PJ_HAS_WINSOCK_H != 0
#  include <winsock.h>
#endif

#if defined(PJ_HAS_MSWSOCK_H) && PJ_HAS_MSWSOCK_H != 0
#  include <mswsock.h>
#endif


/* The address specified in AcceptEx() must be 16 more than the size of
 * SOCKADDR (source: MSDN).
 */
#define ACCEPT_ADDR_LEN	    (sizeof(pj_sockaddr_in)+16)

typedef struct generic_overlapped
{
    WSAOVERLAPPED	   overlapped;
    pj_ioqueue_operation_e operation;
} generic_overlapped;

/*
 * OVERLAPPPED structure for send and receive.
 */
typedef struct ioqueue_overlapped
{
    WSAOVERLAPPED	   overlapped;
    pj_ioqueue_operation_e operation;
    WSABUF		   wsabuf;
    pj_sockaddr_in         dummy_addr;
    int                    dummy_addrlen;
} ioqueue_overlapped;

#if PJ_HAS_TCP
/*
 * OVERLAP structure for accept.
 */
typedef struct ioqueue_accept_rec
{
    WSAOVERLAPPED	    overlapped;
    pj_ioqueue_operation_e  operation;
    pj_sock_t		    newsock;
    pj_sock_t		   *newsock_ptr;
    int			   *addrlen;
    void		   *remote;
    void		   *local;
    char		    accept_buf[2 * ACCEPT_ADDR_LEN];
} ioqueue_accept_rec;
#endif

/*
 * Structure to hold pending operation key.
 */
union operation_key
{
    generic_overlapped      generic;
    ioqueue_overlapped      overlapped;
#if PJ_HAS_TCP
    ioqueue_accept_rec      accept;
#endif
};

/* Type of handle in the key. */
enum handle_type
{
    HND_IS_UNKNOWN,
    HND_IS_FILE,
    HND_IS_SOCKET,
};

enum { POST_QUIT_LEN = 0xFFFFDEADUL };

/*
 * Structure for individual socket.
 */
struct pj_ioqueue_key_t
{
    PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t);

    pj_ioqueue_t       *ioqueue;
    HANDLE		hnd;
    void	       *user_data;
    enum handle_type    hnd_type;
    pj_ioqueue_callback	cb;

#if PJ_HAS_TCP
    int			connecting;
#endif

#if PJ_IOQUEUE_HAS_SAFE_UNREG
    pj_atomic_t	       *ref_count;
    pj_bool_t		closing;
    pj_time_val		free_time;
#endif

};

/*
 * IO Queue structure.
 */
struct pj_ioqueue_t
{
    HANDLE	      iocp;
    pj_lock_t        *lock;
    pj_bool_t         auto_delete_lock;

#if PJ_IOQUEUE_HAS_SAFE_UNREG
    pj_ioqueue_key_t  active_list;
    pj_ioqueue_key_t  free_list;
    pj_ioqueue_key_t  closing_list;
#endif

    /* These are to keep track of connecting sockets */
#if PJ_HAS_TCP
    unsigned	      event_count;
    HANDLE	      event_pool[MAXIMUM_WAIT_OBJECTS+1];
    unsigned	      connecting_count;
    HANDLE	      connecting_handles[MAXIMUM_WAIT_OBJECTS+1];
    pj_ioqueue_key_t *connecting_keys[MAXIMUM_WAIT_OBJECTS+1];
#endif
};


#if PJ_HAS_TCP
/*
 * Process the socket when the overlapped accept() completed.
 */
static void ioqueue_on_accept_complete(ioqueue_accept_rec *accept_overlapped)
{
    struct sockaddr *local;
    struct sockaddr *remote;
    int locallen, remotelen;

    PJ_CHECK_STACK();

    /* Operation complete immediately. */
    GetAcceptExSockaddrs( accept_overlapped->accept_buf,
			  0, 
			  ACCEPT_ADDR_LEN,
			  ACCEPT_ADDR_LEN,
			  &local,
			  &locallen,
			  &remote,
			  &remotelen);
    if (*accept_overlapped->addrlen >= locallen) {
        pj_memcpy(accept_overlapped->local, local, locallen);
        pj_memcpy(accept_overlapped->remote, remote, locallen);
    } else {
        pj_bzero(accept_overlapped->local, *accept_overlapped->addrlen);
        pj_bzero(accept_overlapped->remote, *accept_overlapped->addrlen);
    }
    *accept_overlapped->addrlen = locallen;
    if (accept_overlapped->newsock_ptr)
        *accept_overlapped->newsock_ptr = accept_overlapped->newsock;
    accept_overlapped->operation = 0;
}

static void erase_connecting_socket( pj_ioqueue_t *ioqueue, unsigned pos)
{
    pj_ioqueue_key_t *key = ioqueue->connecting_keys[pos];
    HANDLE hEvent = ioqueue->connecting_handles[pos];

    /* Remove key from array of connecting handles. */
    pj_array_erase(ioqueue->connecting_keys, sizeof(key),
		   ioqueue->connecting_count, pos);
    pj_array_erase(ioqueue->connecting_handles, sizeof(HANDLE),
		   ioqueue->connecting_count, pos);
    --ioqueue->connecting_count;

    /* Disassociate the socket from the event. */
    WSAEventSelect((pj_sock_t)key->hnd, hEvent, 0);

    /* Put event object to pool. */
    if (ioqueue->event_count < MAXIMUM_WAIT_OBJECTS) {
	ioqueue->event_pool[ioqueue->event_count++] = hEvent;
    } else {
	/* Shouldn't happen. There should be no more pending connections
	 * than max. 
	 */
	pj_assert(0);
	CloseHandle(hEvent);
    }

}

/*
 * Poll for the completion of non-blocking connect().
 * If there's a completion, the function return the key of the completed
 * socket, and 'result' argument contains the connect() result. If connect()
 * succeeded, 'result' will have value zero, otherwise will have the error
 * code.
 */
static int check_connecting( pj_ioqueue_t *ioqueue )
{
    if (ioqueue->connecting_count) {
	int i, count;
	struct 
	{
	    pj_ioqueue_key_t *key;
	    pj_status_t	      status;
	} events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL-1];

	pj_lock_acquire(ioqueue->lock);
	for (count=0; count<PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL-1; ++count) {
	    DWORD result;

	    result = WaitForMultipleObjects(ioqueue->connecting_count,
					    ioqueue->connecting_handles,
					    FALSE, 0);
	    if (result >= WAIT_OBJECT_0 && 
		result < WAIT_OBJECT_0+ioqueue->connecting_count) 
	    {
		WSANETWORKEVENTS net_events;

		/* Got completed connect(). */
		unsigned pos = result - WAIT_OBJECT_0;
		events[count].key = ioqueue->connecting_keys[pos];

		/* See whether connect has succeeded. */
		WSAEnumNetworkEvents((pj_sock_t)events[count].key->hnd, 
				     ioqueue->connecting_handles[pos], 
				     &net_events);
		events[count].status = 
		    PJ_STATUS_FROM_OS(net_events.iErrorCode[FD_CONNECT_BIT]);

		/* Erase socket from pending connect. */
		erase_connecting_socket(ioqueue, pos);
	    } else {
		/* No more events */
		break;
	    }
	}
	pj_lock_release(ioqueue->lock);

	/* Call callbacks. */
	for (i=0; i<count; ++i) {
	    if (events[i].key->cb.on_connect_complete) {
		events[i].key->cb.on_connect_complete(events[i].key, 
						      events[i].status);
	    }
	}

	return count;
    }

    return 0;
    
}
#endif

/*
 * pj_ioqueue_name()
 */
PJ_DEF(const char*) pj_ioqueue_name(void)
{
    return "iocp";
}

/*
 * pj_ioqueue_create()
 */
PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, 
				       pj_size_t max_fd,
				       pj_ioqueue_t **p_ioqueue)
{
    pj_ioqueue_t *ioqueue;
    unsigned i;
    pj_status_t rc;

    PJ_UNUSED_ARG(max_fd);
    PJ_ASSERT_RETURN(pool && p_ioqueue, PJ_EINVAL);

    rc = sizeof(union operation_key);

    /* Check that sizeof(pj_ioqueue_op_key_t) makes sense. */
    PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >= 
                     sizeof(union operation_key), PJ_EBUG);

    /* Create IOCP */
    ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue));
    ioqueue->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
    if (ioqueue->iocp == NULL)
	return PJ_RETURN_OS_ERROR(GetLastError());

    /* Create IOCP mutex */
    rc = pj_lock_create_simple_mutex(pool, NULL, &ioqueue->lock);
    if (rc != PJ_SUCCESS) {
	CloseHandle(ioqueue->iocp);
	return rc;
    }

    ioqueue->auto_delete_lock = PJ_TRUE;

#if PJ_IOQUEUE_HAS_SAFE_UNREG
    /*
     * Create and initialize key pools.
     */
    pj_list_init(&ioqueue->active_list);
    pj_list_init(&ioqueue->free_list);
    pj_list_init(&ioqueue->closing_list);

    /* Preallocate keys according to max_fd setting, and put them
     * in free_list.
     */
    for (i=0; i<max_fd; ++i) {
	pj_ioqueue_key_t *key;

	key = pj_pool_alloc(pool, sizeof(pj_ioqueue_key_t));

	rc = pj_atomic_create(pool, 0, &key->ref_count);
	if (rc != PJ_SUCCESS) {
	    key = ioqueue->free_list.next;
	    while (key != &ioqueue->free_list) {
		pj_atomic_destroy(key->ref_count);
		key = key->next;
	    }
	    CloseHandle(ioqueue->iocp);
	    return rc;
	}

	pj_list_push_back(&ioqueue->free_list, key);

    }
#endif

    *p_ioqueue = ioqueue;

    PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioqueue));
    return PJ_SUCCESS;
}

/*
 * pj_ioqueue_destroy()
 */
PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue )
{
#if PJ_HAS_TCP
    unsigned i;
#endif
    pj_ioqueue_key_t *key;

    PJ_CHECK_STACK();
    PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);

    pj_lock_acquire(ioqueue->lock);

#if PJ_HAS_TCP
    /* Destroy events in the pool */
    for (i=0; i<ioqueue->event_count; ++i) {
	CloseHandle(ioqueue->event_pool[i]);
    }
    ioqueue->event_count = 0;
#endif

    if (CloseHandle(ioqueue->iocp) != TRUE)
	return PJ_RETURN_OS_ERROR(GetLastError());

#if PJ_IOQUEUE_HAS_SAFE_UNREG
    /* Destroy reference counters */
    key = ioqueue->active_list.next;
    while (key != &ioqueue->active_list) {
	pj_atomic_destroy(key->ref_count);
	key = key->next;
    }

    key = ioqueue->closing_list.next;
    while (key != &ioqueue->closing_list) {
	pj_atomic_destroy(key->ref_count);
	key = key->next;
    }

    key = ioqueue->free_list.next;
    while (key != &ioqueue->free_list) {
	pj_atomic_destroy(key->ref_count);
	key = key->next;
    }
#endif

    if (ioqueue->auto_delete_lock)
        pj_lock_destroy(ioqueue->lock);

    return PJ_SUCCESS;
}

/*
 * pj_ioqueue_set_lock()
 */
PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue, 
					 pj_lock_t *lock,
					 pj_bool_t auto_delete )
{
    PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -