📄 ioqueue_winnt.c
字号:
if (ioqueue->auto_delete_lock) {
pj_lock_destroy(ioqueue->lock);
}
ioqueue->lock = lock;
ioqueue->auto_delete_lock = auto_delete;
return PJ_SUCCESS;
}
/*
* pj_ioqueue_register_sock()
*/
PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
pj_ioqueue_t *ioqueue,
pj_sock_t sock,
void *user_data,
const pj_ioqueue_callback *cb,
pj_ioqueue_key_t **key )
{
HANDLE hioq;
pj_ioqueue_key_t *rec;
u_long value;
int rc;
PJ_ASSERT_RETURN(pool && ioqueue && cb && key, PJ_EINVAL);
pj_lock_acquire(ioqueue->lock);
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* If safe unregistration is used, then get the key record from
* the free list.
*/
if (pj_list_empty(&ioqueue->free_list)) {
pj_lock_release(ioqueue->lock);
return PJ_ETOOMANY;
}
rec = ioqueue->free_list.next;
pj_list_erase(rec);
/* Set initial reference count to 1 */
pj_assert(pj_atomic_get(rec->ref_count) == 0);
pj_atomic_inc(rec->ref_count);
rec->closing = 0;
#else
rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
#endif
/* Build the key for this socket. */
rec->ioqueue = ioqueue;
rec->hnd = (HANDLE)sock;
rec->hnd_type = HND_IS_SOCKET;
rec->user_data = user_data;
pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback));
#if PJ_HAS_TCP
rec->connecting = 0;
#endif
/* Set socket to nonblocking. */
value = 1;
rc = ioctlsocket(sock, FIONBIO, &value);
if (rc != 0) {
pj_lock_release(ioqueue->lock);
return PJ_RETURN_OS_ERROR(WSAGetLastError());
}
/* Associate with IOCP */
hioq = CreateIoCompletionPort((HANDLE)sock, ioqueue->iocp, (DWORD)rec, 0);
if (!hioq) {
pj_lock_release(ioqueue->lock);
return PJ_RETURN_OS_ERROR(GetLastError());
}
*key = rec;
#if PJ_IOQUEUE_HAS_SAFE_UNREG
pj_list_push_back(&ioqueue->active_list, rec);
#endif
pj_lock_release(ioqueue->lock);
return PJ_SUCCESS;
}
/*
* pj_ioqueue_get_user_data()
*/
PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
{
PJ_ASSERT_RETURN(key, NULL);
return key->user_data;
}
/*
* pj_ioqueue_set_user_data()
*/
PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
void *user_data,
void **old_data )
{
PJ_ASSERT_RETURN(key, PJ_EINVAL);
if (old_data)
*old_data = key->user_data;
key->user_data = user_data;
return PJ_SUCCESS;
}
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Decrement the key's reference counter, and when the counter reach zero,
* destroy the key.
*/
static void decrement_counter(pj_ioqueue_key_t *key)
{
if (pj_atomic_dec_and_get(key->ref_count) == 0) {
pj_lock_acquire(key->ioqueue->lock);
pj_assert(key->closing == 1);
pj_gettimeofday(&key->free_time);
key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;
pj_time_val_normalize(&key->free_time);
pj_list_erase(key);
pj_list_push_back(&key->ioqueue->closing_list, key);
pj_lock_release(key->ioqueue->lock);
}
}
#endif
/*
* Poll the I/O Completion Port, execute callback,
* and return the key and bytes transfered of the last operation.
*/
static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout,
pj_ssize_t *p_bytes, pj_ioqueue_key_t **p_key )
{
DWORD dwBytesTransfered, dwKey;
generic_overlapped *pOv;
pj_ioqueue_key_t *key;
pj_ssize_t size_status = -1;
BOOL rcGetQueued;
/* Poll for completion status. */
rcGetQueued = GetQueuedCompletionStatus(hIocp, &dwBytesTransfered,
&dwKey, (OVERLAPPED**)&pOv,
dwTimeout);
/* The return value is:
* - nonzero if event was dequeued.
* - zero and pOv==NULL if no event was dequeued.
* - zero and pOv!=NULL if event for failed I/O was dequeued.
*/
if (pOv) {
/* Event was dequeued for either successfull or failed I/O */
key = (pj_ioqueue_key_t*)dwKey;
size_status = dwBytesTransfered;
/* Report to caller regardless */
if (p_bytes)
*p_bytes = size_status;
if (p_key)
*p_key = key;
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* We shouldn't call callbacks if key is quitting. */
if (key->closing)
return PJ_TRUE;
/* Increment reference counter to prevent this key from being
* deleted
*/
pj_atomic_inc(key->ref_count);
#endif
/* Carry out the callback */
switch (pOv->operation) {
case PJ_IOQUEUE_OP_READ:
case PJ_IOQUEUE_OP_RECV:
case PJ_IOQUEUE_OP_RECV_FROM:
pOv->operation = 0;
if (key->cb.on_read_complete)
key->cb.on_read_complete(key, (pj_ioqueue_op_key_t*)pOv,
size_status);
break;
case PJ_IOQUEUE_OP_WRITE:
case PJ_IOQUEUE_OP_SEND:
case PJ_IOQUEUE_OP_SEND_TO:
pOv->operation = 0;
if (key->cb.on_write_complete)
key->cb.on_write_complete(key, (pj_ioqueue_op_key_t*)pOv,
size_status);
break;
#if PJ_HAS_TCP
case PJ_IOQUEUE_OP_ACCEPT:
/* special case for accept. */
ioqueue_on_accept_complete((ioqueue_accept_rec*)pOv);
if (key->cb.on_accept_complete) {
ioqueue_accept_rec *accept_rec = (ioqueue_accept_rec*)pOv;
pj_status_t status = PJ_SUCCESS;
if (accept_rec->newsock == PJ_INVALID_SOCKET) {
int dwError = WSAGetLastError();
if (dwError == 0) dwError = OSERR_ENOTCONN;
status = PJ_RETURN_OS_ERROR(dwError);
}
key->cb.on_accept_complete(key,
(pj_ioqueue_op_key_t*)pOv,
accept_rec->newsock,
status);
accept_rec->newsock = PJ_INVALID_SOCKET;
}
break;
case PJ_IOQUEUE_OP_CONNECT:
#endif
case PJ_IOQUEUE_OP_NONE:
pj_assert(0);
break;
}
#if PJ_IOQUEUE_HAS_SAFE_UNREG
decrement_counter(key);
#endif
return PJ_TRUE;
}
/* No event was queued. */
return PJ_FALSE;
}
/*
* pj_ioqueue_unregister()
*/
PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
{
unsigned i;
enum { RETRY = 10 };
PJ_ASSERT_RETURN(key, PJ_EINVAL);
#if PJ_HAS_TCP
if (key->connecting) {
unsigned pos;
pj_ioqueue_t *ioqueue;
ioqueue = key->ioqueue;
/* Erase from connecting_handles */
pj_lock_acquire(ioqueue->lock);
for (pos=0; pos < ioqueue->connecting_count; ++pos) {
if (ioqueue->connecting_keys[pos] == key) {
erase_connecting_socket(ioqueue, pos);
break;
}
}
key->connecting = 0;
pj_lock_release(ioqueue->lock);
}
#endif
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Mark key as closing before closing handle. */
key->closing = 1;
#endif
/* Close handle (the only way to disassociate handle from IOCP).
* We also need to close handle to make sure that no further events
* will come to the handle.
*/
CloseHandle(key->hnd);
/* Reset callbacks */
key->cb.on_accept_complete = NULL;
key->cb.on_connect_complete = NULL;
key->cb.on_read_complete = NULL;
key->cb.on_write_complete = NULL;
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Even after handle is closed, I suspect that IOCP may still try to
* do something with the handle, causing memory corruption when pool
* debugging is enabled.
*
* Forcing context switch seems to have fixed that, but this is quite
* an ugly solution..
*/
//This will loop forever if unregistration is done on the callback.
//Doing this with RETRY I think should solve the IOCP setting the
//socket signalled, without causing the deadlock.
//while (pj_atomic_get(key->ref_count) != 1)
// pj_thread_sleep(0);
for (i=0; pj_atomic_get(key->ref_count) != 1 && i<RETRY; ++i)
pj_thread_sleep(0);
/* Decrement reference counter to destroy the key. */
decrement_counter(key);
#endif
return PJ_SUCCESS;
}
/*
* pj_ioqueue_poll()
*
* Poll for events.
*/
PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
{
DWORD dwMsec;
#if PJ_HAS_TCP
int connect_count = 0;
#endif
int event_count = 0;
PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
/* Calculate miliseconds timeout for GetQueuedCompletionStatus */
dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE;
/* Poll for completion status. */
event_count = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL);
#if PJ_HAS_TCP
/* Check the connecting array, only when there's no activity. */
if (event_count == 0) {
connect_count = check_connecting(ioqueue);
if (connect_count > 0)
event_count += connect_count;
}
#endif
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Check the closing keys only when there's no activity and when there are
* pending closing keys.
* blp:
* no, always check the list. Otherwise on busy activity, this will cause
* ioqueue to reject new registration.
*/
if (/*event_count == 0 &&*/ !pj_list_empty(&ioqueue->closing_list)) {
pj_time_val now;
pj_ioqueue_key_t *key;
pj_gettimeofday(&now);
/* Move closing keys to free list when they've finished the closing
* idle time.
*/
pj_lock_acquire(ioqueue->lock);
key = ioqueue->closing_list.next;
while (key != &ioqueue->closing_list) {
pj_ioqueue_key_t *next = key->next;
pj_assert(key->closing != 0);
if (PJ_TIME_VAL_GTE(now, key->free_time)) {
pj_list_erase(key);
pj_list_push_back(&ioqueue->free_list, key);
}
key = next;
}
pj_lock_release(ioqueue->lock);
}
#endif
/* Return number of events. */
return event_count;
}
/*
* pj_ioqueue_recv()
*
* Initiate overlapped WSARecv() operation.
*/
PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
void *buffer,
pj_ssize_t *length,
pj_uint32_t flags )
{
/*
* Ideally we should just call pj_ioqueue_recvfrom() with NULL addr and
* addrlen here. But unfortunately it generates EINVAL... :-(
* -bennylp
*/
int rc;
DWORD bytesRead;
DWORD dwFlags = 0;
union operation_key *op_key_rec;
PJ_CHECK_STACK();
PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Check key is not closing */
if (key->closing)
return PJ_ECANCELLED;
#endif
op_key_rec = (union operation_key*)op_key->internal__;
op_key_rec->overlapped.wsabuf.buf = buffer;
op_key_rec->overlapped.wsabuf.len = *length;
dwFlags = flags;
/* Try non-overlapped received first to see if data is
* immediately available.
*/
if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
&bytesRead, &dwFlags, NULL, NULL);
if (rc == 0) {
*length = bytesRead;
return PJ_SUCCESS;
} else {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -