⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ioqueue_winnt.c

📁 一个开源SIP协议栈
💻 C
📖 第 1 页 / 共 3 页
字号:

    if (ioqueue->auto_delete_lock) {
        pj_lock_destroy(ioqueue->lock);
    }

    ioqueue->lock = lock;
    ioqueue->auto_delete_lock = auto_delete;

    return PJ_SUCCESS;
}

/*
 * pj_ioqueue_register_sock()
 */
PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
					      pj_ioqueue_t *ioqueue,
					      pj_sock_t sock,
					      void *user_data,
					      const pj_ioqueue_callback *cb,
					      pj_ioqueue_key_t **key )
{
    HANDLE hioq;
    pj_ioqueue_key_t *rec;
    u_long value;
    int rc;

    PJ_ASSERT_RETURN(pool && ioqueue && cb && key, PJ_EINVAL);

    pj_lock_acquire(ioqueue->lock);

#if PJ_IOQUEUE_HAS_SAFE_UNREG
    /* If safe unregistration is used, then get the key record from
     * the free list.
     */
    if (pj_list_empty(&ioqueue->free_list)) {
	pj_lock_release(ioqueue->lock);
	return PJ_ETOOMANY;
    }

    rec = ioqueue->free_list.next;
    pj_list_erase(rec);

    /* Set initial reference count to 1 */
    pj_assert(pj_atomic_get(rec->ref_count) == 0);
    pj_atomic_inc(rec->ref_count);

    rec->closing = 0;

#else
    rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
#endif

    /* Build the key for this socket. */
    rec->ioqueue = ioqueue;
    rec->hnd = (HANDLE)sock;
    rec->hnd_type = HND_IS_SOCKET;
    rec->user_data = user_data;
    pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback));

#if PJ_HAS_TCP
    rec->connecting = 0;
#endif

    /* Set socket to nonblocking. */
    value = 1;
    rc = ioctlsocket(sock, FIONBIO, &value);
    if (rc != 0) {
	pj_lock_release(ioqueue->lock);
        return PJ_RETURN_OS_ERROR(WSAGetLastError());
    }

    /* Associate with IOCP */
    hioq = CreateIoCompletionPort((HANDLE)sock, ioqueue->iocp, (DWORD)rec, 0);
    if (!hioq) {
	pj_lock_release(ioqueue->lock);
	return PJ_RETURN_OS_ERROR(GetLastError());
    }

    *key = rec;

#if PJ_IOQUEUE_HAS_SAFE_UNREG
    pj_list_push_back(&ioqueue->active_list, rec);
#endif

    pj_lock_release(ioqueue->lock);

    return PJ_SUCCESS;
}


/*
 * pj_ioqueue_get_user_data()
 */
PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
{
    PJ_ASSERT_RETURN(key, NULL);
    return key->user_data;
}

/*
 * pj_ioqueue_set_user_data()
 */
PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
                                              void *user_data,
                                              void **old_data )
{
    PJ_ASSERT_RETURN(key, PJ_EINVAL);
    
    if (old_data)
        *old_data = key->user_data;

    key->user_data = user_data;
    return PJ_SUCCESS;
}


#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Decrement the key's reference counter, and when the counter reach zero,
 * destroy the key.
 */
static void decrement_counter(pj_ioqueue_key_t *key)
{
    if (pj_atomic_dec_and_get(key->ref_count) == 0) {

	pj_lock_acquire(key->ioqueue->lock);

	pj_assert(key->closing == 1);
	pj_gettimeofday(&key->free_time);
	key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;
	pj_time_val_normalize(&key->free_time);

	pj_list_erase(key);
	pj_list_push_back(&key->ioqueue->closing_list, key);

	pj_lock_release(key->ioqueue->lock);
    }
}
#endif

/*
 * Poll the I/O Completion Port, execute callback, 
 * and return the key and bytes transfered of the last operation.
 */
static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout, 
			    pj_ssize_t *p_bytes, pj_ioqueue_key_t **p_key )
{
    DWORD dwBytesTransfered, dwKey;
    generic_overlapped *pOv;
    pj_ioqueue_key_t *key;
    pj_ssize_t size_status = -1;
    BOOL rcGetQueued;

    /* Poll for completion status. */
    rcGetQueued = GetQueuedCompletionStatus(hIocp, &dwBytesTransfered,
					    &dwKey, (OVERLAPPED**)&pOv, 
					    dwTimeout);

    /* The return value is:
     * - nonzero if event was dequeued.
     * - zero and pOv==NULL if no event was dequeued.
     * - zero and pOv!=NULL if event for failed I/O was dequeued.
     */
    if (pOv) {
	/* Event was dequeued for either successfull or failed I/O */
	key = (pj_ioqueue_key_t*)dwKey;
	size_status = dwBytesTransfered;

	/* Report to caller regardless */
	if (p_bytes)
	    *p_bytes = size_status;
	if (p_key)
	    *p_key = key;

#if PJ_IOQUEUE_HAS_SAFE_UNREG
	/* We shouldn't call callbacks if key is quitting. */
	if (key->closing)
	    return PJ_TRUE;

	/* Increment reference counter to prevent this key from being
	 * deleted
	 */
	pj_atomic_inc(key->ref_count);
#endif

	/* Carry out the callback */
	switch (pOv->operation) {
	case PJ_IOQUEUE_OP_READ:
	case PJ_IOQUEUE_OP_RECV:
	case PJ_IOQUEUE_OP_RECV_FROM:
            pOv->operation = 0;
            if (key->cb.on_read_complete)
	        key->cb.on_read_complete(key, (pj_ioqueue_op_key_t*)pOv, 
                                         size_status);
	    break;
	case PJ_IOQUEUE_OP_WRITE:
	case PJ_IOQUEUE_OP_SEND:
	case PJ_IOQUEUE_OP_SEND_TO:
            pOv->operation = 0;
            if (key->cb.on_write_complete)
	        key->cb.on_write_complete(key, (pj_ioqueue_op_key_t*)pOv, 
                                                size_status);
	    break;
#if PJ_HAS_TCP
	case PJ_IOQUEUE_OP_ACCEPT:
	    /* special case for accept. */
	    ioqueue_on_accept_complete((ioqueue_accept_rec*)pOv);
            if (key->cb.on_accept_complete) {
                ioqueue_accept_rec *accept_rec = (ioqueue_accept_rec*)pOv;
		pj_status_t status = PJ_SUCCESS;

		if (accept_rec->newsock == PJ_INVALID_SOCKET) {
		    int dwError = WSAGetLastError();
		    if (dwError == 0) dwError = OSERR_ENOTCONN;
		    status = PJ_RETURN_OS_ERROR(dwError);
		}

	        key->cb.on_accept_complete(key, 
                                           (pj_ioqueue_op_key_t*)pOv, 
                                           accept_rec->newsock,
                                           status);
		accept_rec->newsock = PJ_INVALID_SOCKET;
            }
	    break;
	case PJ_IOQUEUE_OP_CONNECT:
#endif
	case PJ_IOQUEUE_OP_NONE:
	    pj_assert(0);
	    break;
	}

#if PJ_IOQUEUE_HAS_SAFE_UNREG
	decrement_counter(key);
#endif

	return PJ_TRUE;
    }

    /* No event was queued. */
    return PJ_FALSE;
}

/*
 * pj_ioqueue_unregister()
 */
PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
{
    unsigned i;
    enum { RETRY = 10 };

    PJ_ASSERT_RETURN(key, PJ_EINVAL);

#if PJ_HAS_TCP
    if (key->connecting) {
	unsigned pos;
        pj_ioqueue_t *ioqueue;

        ioqueue = key->ioqueue;

	/* Erase from connecting_handles */
	pj_lock_acquire(ioqueue->lock);
	for (pos=0; pos < ioqueue->connecting_count; ++pos) {
	    if (ioqueue->connecting_keys[pos] == key) {
		erase_connecting_socket(ioqueue, pos);
		break;
	    }
	}
	key->connecting = 0;
	pj_lock_release(ioqueue->lock);
    }
#endif

#if PJ_IOQUEUE_HAS_SAFE_UNREG
    /* Mark key as closing before closing handle. */
    key->closing = 1;
#endif
    
    /* Close handle (the only way to disassociate handle from IOCP). 
     * We also need to close handle to make sure that no further events
     * will come to the handle.
     */
    CloseHandle(key->hnd);

    /* Reset callbacks */
    key->cb.on_accept_complete = NULL;
    key->cb.on_connect_complete = NULL;
    key->cb.on_read_complete = NULL;
    key->cb.on_write_complete = NULL;

#if PJ_IOQUEUE_HAS_SAFE_UNREG
    /* Even after handle is closed, I suspect that IOCP may still try to
     * do something with the handle, causing memory corruption when pool
     * debugging is enabled.
     *
     * Forcing context switch seems to have fixed that, but this is quite
     * an ugly solution..
     */
    //This will loop forever if unregistration is done on the callback.
    //Doing this with RETRY I think should solve the IOCP setting the 
    //socket signalled, without causing the deadlock.
    //while (pj_atomic_get(key->ref_count) != 1)
    //	pj_thread_sleep(0);
    for (i=0; pj_atomic_get(key->ref_count) != 1 && i<RETRY; ++i)
	pj_thread_sleep(0);

    /* Decrement reference counter to destroy the key. */
    decrement_counter(key);
#endif

    return PJ_SUCCESS;
}

/*
 * pj_ioqueue_poll()
 *
 * Poll for events.
 */
PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
{
    DWORD dwMsec;
#if PJ_HAS_TCP
    int connect_count = 0;
#endif
    int event_count = 0;

    PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);

    /* Calculate miliseconds timeout for GetQueuedCompletionStatus */
    dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE;

    /* Poll for completion status. */
    event_count = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL);

#if PJ_HAS_TCP
    /* Check the connecting array, only when there's no activity. */
    if (event_count == 0) {
	connect_count = check_connecting(ioqueue);
	if (connect_count > 0)
	    event_count += connect_count;
    }
#endif

#if PJ_IOQUEUE_HAS_SAFE_UNREG
    /* Check the closing keys only when there's no activity and when there are
     * pending closing keys.
     * blp:
     *	no, always check the list. Otherwise on busy activity, this will cause
     *  ioqueue to reject new registration.
     */
    if (/*event_count == 0 &&*/ !pj_list_empty(&ioqueue->closing_list)) {
	pj_time_val now;
	pj_ioqueue_key_t *key;

	pj_gettimeofday(&now);
	
	/* Move closing keys to free list when they've finished the closing
	 * idle time.
	 */
	pj_lock_acquire(ioqueue->lock);
	key = ioqueue->closing_list.next;
	while (key != &ioqueue->closing_list) {
	    pj_ioqueue_key_t *next = key->next;

	    pj_assert(key->closing != 0);

	    if (PJ_TIME_VAL_GTE(now, key->free_time)) {
		pj_list_erase(key);
		pj_list_push_back(&ioqueue->free_list, key);
	    }
	    key = next;
	}
	pj_lock_release(ioqueue->lock);
    }
#endif

    /* Return number of events. */
    return event_count;
}

/*
 * pj_ioqueue_recv()
 *
 * Initiate overlapped WSARecv() operation.
 */
PJ_DEF(pj_status_t) pj_ioqueue_recv(  pj_ioqueue_key_t *key,
                                      pj_ioqueue_op_key_t *op_key,
				      void *buffer,
				      pj_ssize_t *length,
				      pj_uint32_t flags )
{
    /*
     * Ideally we should just call pj_ioqueue_recvfrom() with NULL addr and
     * addrlen here. But unfortunately it generates EINVAL... :-(
     *  -bennylp
     */
    int rc;
    DWORD bytesRead;
    DWORD dwFlags = 0;
    union operation_key *op_key_rec;

    PJ_CHECK_STACK();
    PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);

#if PJ_IOQUEUE_HAS_SAFE_UNREG
    /* Check key is not closing */
    if (key->closing)
	return PJ_ECANCELLED;
#endif

    op_key_rec = (union operation_key*)op_key->internal__;
    op_key_rec->overlapped.wsabuf.buf = buffer;
    op_key_rec->overlapped.wsabuf.len = *length;

    dwFlags = flags;
    
    /* Try non-overlapped received first to see if data is
     * immediately available.
     */
    if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
	rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
		     &bytesRead, &dwFlags, NULL, NULL);
	if (rc == 0) {
	    *length = bytesRead;
	    return PJ_SUCCESS;
	} else {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -