📄 ioqueue_winnt.c
字号:
if (ioqueue->auto_delete_lock) { pj_lock_destroy(ioqueue->lock); } ioqueue->lock = lock; ioqueue->auto_delete_lock = auto_delete; return PJ_SUCCESS;}/* * pj_ioqueue_register_sock() */PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, pj_ioqueue_t *ioqueue, pj_sock_t sock, void *user_data, const pj_ioqueue_callback *cb, pj_ioqueue_key_t **key ){ HANDLE hioq; pj_ioqueue_key_t *rec; u_long value; int rc; PJ_ASSERT_RETURN(pool && ioqueue && cb && key, PJ_EINVAL); pj_lock_acquire(ioqueue->lock);#if PJ_IOQUEUE_HAS_SAFE_UNREG /* If safe unregistration is used, then get the key record from * the free list. */ if (pj_list_empty(&ioqueue->free_list)) { pj_lock_release(ioqueue->lock); return PJ_ETOOMANY; } rec = ioqueue->free_list.next; pj_list_erase(rec); /* Set initial reference count to 1 */ pj_assert(pj_atomic_get(rec->ref_count) == 0); pj_atomic_inc(rec->ref_count); rec->closing = 0;#else rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));#endif /* Build the key for this socket. */ rec->ioqueue = ioqueue; rec->hnd = (HANDLE)sock; rec->hnd_type = HND_IS_SOCKET; rec->user_data = user_data; pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback));#if PJ_HAS_TCP rec->connecting = 0;#endif /* Set socket to nonblocking. */ value = 1; rc = ioctlsocket(sock, FIONBIO, &value); if (rc != 0) { pj_lock_release(ioqueue->lock); return PJ_RETURN_OS_ERROR(WSAGetLastError()); } /* Associate with IOCP */ hioq = CreateIoCompletionPort((HANDLE)sock, ioqueue->iocp, (DWORD)rec, 0); if (!hioq) { pj_lock_release(ioqueue->lock); return PJ_RETURN_OS_ERROR(GetLastError()); } *key = rec;#if PJ_IOQUEUE_HAS_SAFE_UNREG pj_list_push_back(&ioqueue->active_list, rec);#endif pj_lock_release(ioqueue->lock); return PJ_SUCCESS;}/* * pj_ioqueue_get_user_data() */PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ){ PJ_ASSERT_RETURN(key, NULL); return key->user_data;}/* * pj_ioqueue_set_user_data() */PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key, void *user_data, void **old_data ){ PJ_ASSERT_RETURN(key, PJ_EINVAL); if (old_data) *old_data = key->user_data; key->user_data = user_data; return PJ_SUCCESS;}#if PJ_IOQUEUE_HAS_SAFE_UNREG/* Decrement the key's reference counter, and when the counter reach zero, * destroy the key. */static void decrement_counter(pj_ioqueue_key_t *key){ if (pj_atomic_dec_and_get(key->ref_count) == 0) { pj_lock_acquire(key->ioqueue->lock); pj_assert(key->closing == 1); pj_gettimeofday(&key->free_time); key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY; pj_time_val_normalize(&key->free_time); pj_list_erase(key); pj_list_push_back(&key->ioqueue->closing_list, key); pj_lock_release(key->ioqueue->lock); }}#endif/* * Poll the I/O Completion Port, execute callback, * and return the key and bytes transfered of the last operation. */static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout, pj_ssize_t *p_bytes, pj_ioqueue_key_t **p_key ){ DWORD dwBytesTransfered, dwKey; generic_overlapped *pOv; pj_ioqueue_key_t *key; pj_ssize_t size_status = -1; BOOL rcGetQueued; /* Poll for completion status. */ rcGetQueued = GetQueuedCompletionStatus(hIocp, &dwBytesTransfered, &dwKey, (OVERLAPPED**)&pOv, dwTimeout); /* The return value is: * - nonzero if event was dequeued. * - zero and pOv==NULL if no event was dequeued. * - zero and pOv!=NULL if event for failed I/O was dequeued. */ if (pOv) { /* Event was dequeued for either successfull or failed I/O */ key = (pj_ioqueue_key_t*)dwKey; size_status = dwBytesTransfered; /* Report to caller regardless */ if (p_bytes) *p_bytes = size_status; if (p_key) *p_key = key;#if PJ_IOQUEUE_HAS_SAFE_UNREG /* We shouldn't call callbacks if key is quitting. */ if (key->closing) return PJ_TRUE; /* Increment reference counter to prevent this key from being * deleted */ pj_atomic_inc(key->ref_count);#endif /* Carry out the callback */ switch (pOv->operation) { case PJ_IOQUEUE_OP_READ: case PJ_IOQUEUE_OP_RECV: case PJ_IOQUEUE_OP_RECV_FROM: pOv->operation = 0; if (key->cb.on_read_complete) key->cb.on_read_complete(key, (pj_ioqueue_op_key_t*)pOv, size_status); break; case PJ_IOQUEUE_OP_WRITE: case PJ_IOQUEUE_OP_SEND: case PJ_IOQUEUE_OP_SEND_TO: pOv->operation = 0; if (key->cb.on_write_complete) key->cb.on_write_complete(key, (pj_ioqueue_op_key_t*)pOv, size_status); break;#if PJ_HAS_TCP case PJ_IOQUEUE_OP_ACCEPT: /* special case for accept. */ ioqueue_on_accept_complete((ioqueue_accept_rec*)pOv); if (key->cb.on_accept_complete) { ioqueue_accept_rec *accept_rec = (ioqueue_accept_rec*)pOv; pj_status_t status = PJ_SUCCESS; if (accept_rec->newsock == PJ_INVALID_SOCKET) { int dwError = WSAGetLastError(); if (dwError == 0) dwError = OSERR_ENOTCONN; status = PJ_RETURN_OS_ERROR(dwError); } key->cb.on_accept_complete(key, (pj_ioqueue_op_key_t*)pOv, accept_rec->newsock, status); accept_rec->newsock = PJ_INVALID_SOCKET; } break; case PJ_IOQUEUE_OP_CONNECT:#endif case PJ_IOQUEUE_OP_NONE: pj_assert(0); break; }#if PJ_IOQUEUE_HAS_SAFE_UNREG decrement_counter(key);#endif return PJ_TRUE; } /* No event was queued. */ return PJ_FALSE;}/* * pj_ioqueue_unregister() */PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ){ unsigned i; enum { RETRY = 10 }; PJ_ASSERT_RETURN(key, PJ_EINVAL);#if PJ_HAS_TCP if (key->connecting) { unsigned pos; pj_ioqueue_t *ioqueue; ioqueue = key->ioqueue; /* Erase from connecting_handles */ pj_lock_acquire(ioqueue->lock); for (pos=0; pos < ioqueue->connecting_count; ++pos) { if (ioqueue->connecting_keys[pos] == key) { erase_connecting_socket(ioqueue, pos); break; } } key->connecting = 0; pj_lock_release(ioqueue->lock); }#endif#if PJ_IOQUEUE_HAS_SAFE_UNREG /* Mark key as closing before closing handle. */ key->closing = 1;#endif /* Close handle (the only way to disassociate handle from IOCP). * We also need to close handle to make sure that no further events * will come to the handle. */ CloseHandle(key->hnd); /* Reset callbacks */ key->cb.on_accept_complete = NULL; key->cb.on_connect_complete = NULL; key->cb.on_read_complete = NULL; key->cb.on_write_complete = NULL;#if PJ_IOQUEUE_HAS_SAFE_UNREG /* Even after handle is closed, I suspect that IOCP may still try to * do something with the handle, causing memory corruption when pool * debugging is enabled. * * Forcing context switch seems to have fixed that, but this is quite * an ugly solution.. */ //This will loop forever if unregistration is done on the callback. //Doing this with RETRY I think should solve the IOCP setting the //socket signalled, without causing the deadlock. //while (pj_atomic_get(key->ref_count) != 1) // pj_thread_sleep(0); for (i=0; pj_atomic_get(key->ref_count) != 1 && i<RETRY; ++i) pj_thread_sleep(0); /* Decrement reference counter to destroy the key. */ decrement_counter(key);#endif return PJ_SUCCESS;}/* * pj_ioqueue_poll() * * Poll for events. */PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout){ DWORD dwMsec;#if PJ_HAS_TCP int connect_count = 0;#endif int event_count = 0; PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL); /* Calculate miliseconds timeout for GetQueuedCompletionStatus */ dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE; /* Poll for completion status. */ event_count = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL);#if PJ_HAS_TCP /* Check the connecting array, only when there's no activity. */ if (event_count == 0) { connect_count = check_connecting(ioqueue); if (connect_count > 0) event_count += connect_count; }#endif#if PJ_IOQUEUE_HAS_SAFE_UNREG /* Check the closing keys only when there's no activity and when there are * pending closing keys. * blp: * no, always check the list. Otherwise on busy activity, this will cause * ioqueue to reject new registration. */ if (/*event_count == 0 &&*/ !pj_list_empty(&ioqueue->closing_list)) { pj_time_val now; pj_ioqueue_key_t *key; pj_gettimeofday(&now); /* Move closing keys to free list when they've finished the closing * idle time. */ pj_lock_acquire(ioqueue->lock); key = ioqueue->closing_list.next; while (key != &ioqueue->closing_list) { pj_ioqueue_key_t *next = key->next; pj_assert(key->closing != 0); if (PJ_TIME_VAL_GTE(now, key->free_time)) { pj_list_erase(key); pj_list_push_back(&ioqueue->free_list, key); } key = next; } pj_lock_release(ioqueue->lock); }#endif /* Return number of events. */ return event_count;}/* * pj_ioqueue_recv() * * Initiate overlapped WSARecv() operation. */PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, void *buffer, pj_ssize_t *length, pj_uint32_t flags ){ /* * Ideally we should just call pj_ioqueue_recvfrom() with NULL addr and * addrlen here. But unfortunately it generates EINVAL... :-( * -bennylp */ int rc; DWORD bytesRead; DWORD dwFlags = 0; union operation_key *op_key_rec; PJ_CHECK_STACK(); PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);#if PJ_IOQUEUE_HAS_SAFE_UNREG /* Check key is not closing */ if (key->closing) return PJ_ECANCELLED;#endif op_key_rec = (union operation_key*)op_key->internal__; op_key_rec->overlapped.wsabuf.buf = buffer; op_key_rec->overlapped.wsabuf.len = *length; dwFlags = flags; /* Try non-overlapped received first to see if data is * immediately available. */ if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) { rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, &bytesRead, &dwFlags, NULL, NULL); if (rc == 0) { *length = bytesRead; return PJ_SUCCESS; } else {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -