⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ioqueue_winnt.c

📁 基于sip协议的网络电话源码
💻 C
📖 第 1 页 / 共 3 页
字号:
    if (ioqueue->auto_delete_lock) {        pj_lock_destroy(ioqueue->lock);    }    ioqueue->lock = lock;    ioqueue->auto_delete_lock = auto_delete;    return PJ_SUCCESS;}/* * pj_ioqueue_register_sock() */PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,					      pj_ioqueue_t *ioqueue,					      pj_sock_t sock,					      void *user_data,					      const pj_ioqueue_callback *cb,					      pj_ioqueue_key_t **key ){    HANDLE hioq;    pj_ioqueue_key_t *rec;    u_long value;    int rc;    PJ_ASSERT_RETURN(pool && ioqueue && cb && key, PJ_EINVAL);    pj_lock_acquire(ioqueue->lock);#if PJ_IOQUEUE_HAS_SAFE_UNREG    /* If safe unregistration is used, then get the key record from     * the free list.     */    if (pj_list_empty(&ioqueue->free_list)) {	pj_lock_release(ioqueue->lock);	return PJ_ETOOMANY;    }    rec = ioqueue->free_list.next;    pj_list_erase(rec);    /* Set initial reference count to 1 */    pj_assert(pj_atomic_get(rec->ref_count) == 0);    pj_atomic_inc(rec->ref_count);    rec->closing = 0;#else    rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));#endif    /* Build the key for this socket. */    rec->ioqueue = ioqueue;    rec->hnd = (HANDLE)sock;    rec->hnd_type = HND_IS_SOCKET;    rec->user_data = user_data;    pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback));#if PJ_HAS_TCP    rec->connecting = 0;#endif    /* Set socket to nonblocking. */    value = 1;    rc = ioctlsocket(sock, FIONBIO, &value);    if (rc != 0) {	pj_lock_release(ioqueue->lock);        return PJ_RETURN_OS_ERROR(WSAGetLastError());    }    /* Associate with IOCP */    hioq = CreateIoCompletionPort((HANDLE)sock, ioqueue->iocp, (DWORD)rec, 0);    if (!hioq) {	pj_lock_release(ioqueue->lock);	return PJ_RETURN_OS_ERROR(GetLastError());    }    *key = rec;#if PJ_IOQUEUE_HAS_SAFE_UNREG    pj_list_push_back(&ioqueue->active_list, rec);#endif    pj_lock_release(ioqueue->lock);    return PJ_SUCCESS;}/* * pj_ioqueue_get_user_data() */PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ){    PJ_ASSERT_RETURN(key, NULL);    return key->user_data;}/* * pj_ioqueue_set_user_data() */PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,                                              void *user_data,                                              void **old_data ){    PJ_ASSERT_RETURN(key, PJ_EINVAL);        if (old_data)        *old_data = key->user_data;    key->user_data = user_data;    return PJ_SUCCESS;}#if PJ_IOQUEUE_HAS_SAFE_UNREG/* Decrement the key's reference counter, and when the counter reach zero, * destroy the key. */static void decrement_counter(pj_ioqueue_key_t *key){    if (pj_atomic_dec_and_get(key->ref_count) == 0) {	pj_lock_acquire(key->ioqueue->lock);	pj_assert(key->closing == 1);	pj_gettimeofday(&key->free_time);	key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;	pj_time_val_normalize(&key->free_time);	pj_list_erase(key);	pj_list_push_back(&key->ioqueue->closing_list, key);	pj_lock_release(key->ioqueue->lock);    }}#endif/* * Poll the I/O Completion Port, execute callback,  * and return the key and bytes transfered of the last operation. */static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout, 			    pj_ssize_t *p_bytes, pj_ioqueue_key_t **p_key ){    DWORD dwBytesTransfered, dwKey;    generic_overlapped *pOv;    pj_ioqueue_key_t *key;    pj_ssize_t size_status = -1;    BOOL rcGetQueued;    /* Poll for completion status. */    rcGetQueued = GetQueuedCompletionStatus(hIocp, &dwBytesTransfered,					    &dwKey, (OVERLAPPED**)&pOv, 					    dwTimeout);    /* The return value is:     * - nonzero if event was dequeued.     * - zero and pOv==NULL if no event was dequeued.     * - zero and pOv!=NULL if event for failed I/O was dequeued.     */    if (pOv) {	/* Event was dequeued for either successfull or failed I/O */	key = (pj_ioqueue_key_t*)dwKey;	size_status = dwBytesTransfered;	/* Report to caller regardless */	if (p_bytes)	    *p_bytes = size_status;	if (p_key)	    *p_key = key;#if PJ_IOQUEUE_HAS_SAFE_UNREG	/* We shouldn't call callbacks if key is quitting. */	if (key->closing)	    return PJ_TRUE;	/* Increment reference counter to prevent this key from being	 * deleted	 */	pj_atomic_inc(key->ref_count);#endif	/* Carry out the callback */	switch (pOv->operation) {	case PJ_IOQUEUE_OP_READ:	case PJ_IOQUEUE_OP_RECV:	case PJ_IOQUEUE_OP_RECV_FROM:            pOv->operation = 0;            if (key->cb.on_read_complete)	        key->cb.on_read_complete(key, (pj_ioqueue_op_key_t*)pOv,                                          size_status);	    break;	case PJ_IOQUEUE_OP_WRITE:	case PJ_IOQUEUE_OP_SEND:	case PJ_IOQUEUE_OP_SEND_TO:            pOv->operation = 0;            if (key->cb.on_write_complete)	        key->cb.on_write_complete(key, (pj_ioqueue_op_key_t*)pOv,                                                 size_status);	    break;#if PJ_HAS_TCP	case PJ_IOQUEUE_OP_ACCEPT:	    /* special case for accept. */	    ioqueue_on_accept_complete((ioqueue_accept_rec*)pOv);            if (key->cb.on_accept_complete) {                ioqueue_accept_rec *accept_rec = (ioqueue_accept_rec*)pOv;		pj_status_t status = PJ_SUCCESS;		if (accept_rec->newsock == PJ_INVALID_SOCKET) {		    int dwError = WSAGetLastError();		    if (dwError == 0) dwError = OSERR_ENOTCONN;		    status = PJ_RETURN_OS_ERROR(dwError);		}	        key->cb.on_accept_complete(key,                                            (pj_ioqueue_op_key_t*)pOv,                                            accept_rec->newsock,                                           status);		accept_rec->newsock = PJ_INVALID_SOCKET;            }	    break;	case PJ_IOQUEUE_OP_CONNECT:#endif	case PJ_IOQUEUE_OP_NONE:	    pj_assert(0);	    break;	}#if PJ_IOQUEUE_HAS_SAFE_UNREG	decrement_counter(key);#endif	return PJ_TRUE;    }    /* No event was queued. */    return PJ_FALSE;}/* * pj_ioqueue_unregister() */PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ){    unsigned i;    enum { RETRY = 10 };    PJ_ASSERT_RETURN(key, PJ_EINVAL);#if PJ_HAS_TCP    if (key->connecting) {	unsigned pos;        pj_ioqueue_t *ioqueue;        ioqueue = key->ioqueue;	/* Erase from connecting_handles */	pj_lock_acquire(ioqueue->lock);	for (pos=0; pos < ioqueue->connecting_count; ++pos) {	    if (ioqueue->connecting_keys[pos] == key) {		erase_connecting_socket(ioqueue, pos);		break;	    }	}	key->connecting = 0;	pj_lock_release(ioqueue->lock);    }#endif#if PJ_IOQUEUE_HAS_SAFE_UNREG    /* Mark key as closing before closing handle. */    key->closing = 1;#endif        /* Close handle (the only way to disassociate handle from IOCP).      * We also need to close handle to make sure that no further events     * will come to the handle.     */    CloseHandle(key->hnd);    /* Reset callbacks */    key->cb.on_accept_complete = NULL;    key->cb.on_connect_complete = NULL;    key->cb.on_read_complete = NULL;    key->cb.on_write_complete = NULL;#if PJ_IOQUEUE_HAS_SAFE_UNREG    /* Even after handle is closed, I suspect that IOCP may still try to     * do something with the handle, causing memory corruption when pool     * debugging is enabled.     *     * Forcing context switch seems to have fixed that, but this is quite     * an ugly solution..     */    //This will loop forever if unregistration is done on the callback.    //Doing this with RETRY I think should solve the IOCP setting the     //socket signalled, without causing the deadlock.    //while (pj_atomic_get(key->ref_count) != 1)    //	pj_thread_sleep(0);    for (i=0; pj_atomic_get(key->ref_count) != 1 && i<RETRY; ++i)	pj_thread_sleep(0);    /* Decrement reference counter to destroy the key. */    decrement_counter(key);#endif    return PJ_SUCCESS;}/* * pj_ioqueue_poll() * * Poll for events. */PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout){    DWORD dwMsec;#if PJ_HAS_TCP    int connect_count = 0;#endif    int event_count = 0;    PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);    /* Calculate miliseconds timeout for GetQueuedCompletionStatus */    dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE;    /* Poll for completion status. */    event_count = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL);#if PJ_HAS_TCP    /* Check the connecting array, only when there's no activity. */    if (event_count == 0) {	connect_count = check_connecting(ioqueue);	if (connect_count > 0)	    event_count += connect_count;    }#endif#if PJ_IOQUEUE_HAS_SAFE_UNREG    /* Check the closing keys only when there's no activity and when there are     * pending closing keys.     * blp:     *	no, always check the list. Otherwise on busy activity, this will cause     *  ioqueue to reject new registration.     */    if (/*event_count == 0 &&*/ !pj_list_empty(&ioqueue->closing_list)) {	pj_time_val now;	pj_ioqueue_key_t *key;	pj_gettimeofday(&now);		/* Move closing keys to free list when they've finished the closing	 * idle time.	 */	pj_lock_acquire(ioqueue->lock);	key = ioqueue->closing_list.next;	while (key != &ioqueue->closing_list) {	    pj_ioqueue_key_t *next = key->next;	    pj_assert(key->closing != 0);	    if (PJ_TIME_VAL_GTE(now, key->free_time)) {		pj_list_erase(key);		pj_list_push_back(&ioqueue->free_list, key);	    }	    key = next;	}	pj_lock_release(ioqueue->lock);    }#endif    /* Return number of events. */    return event_count;}/* * pj_ioqueue_recv() * * Initiate overlapped WSARecv() operation. */PJ_DEF(pj_status_t) pj_ioqueue_recv(  pj_ioqueue_key_t *key,                                      pj_ioqueue_op_key_t *op_key,				      void *buffer,				      pj_ssize_t *length,				      pj_uint32_t flags ){    /*     * Ideally we should just call pj_ioqueue_recvfrom() with NULL addr and     * addrlen here. But unfortunately it generates EINVAL... :-(     *  -bennylp     */    int rc;    DWORD bytesRead;    DWORD dwFlags = 0;    union operation_key *op_key_rec;    PJ_CHECK_STACK();    PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);#if PJ_IOQUEUE_HAS_SAFE_UNREG    /* Check key is not closing */    if (key->closing)	return PJ_ECANCELLED;#endif    op_key_rec = (union operation_key*)op_key->internal__;    op_key_rec->overlapped.wsabuf.buf = buffer;    op_key_rec->overlapped.wsabuf.len = *length;    dwFlags = flags;        /* Try non-overlapped received first to see if data is     * immediately available.     */    if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {	rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,		     &bytesRead, &dwFlags, NULL, NULL);	if (rc == 0) {	    *length = bytesRead;	    return PJ_SUCCESS;	} else {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -