⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ioqueue_winnt.c

📁 一个开源SIP协议栈
💻 C
📖 第 1 页 / 共 3 页
字号:
	    DWORD dwError = WSAGetLastError();
	    if (dwError != WSAEWOULDBLOCK) {
		*length = -1;
		return PJ_RETURN_OS_ERROR(dwError);
	    }
	}
    }

    dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);

    /*
     * No immediate data available.
     * Register overlapped Recv() operation.
     */
    pj_bzero( &op_key_rec->overlapped.overlapped, 
              sizeof(op_key_rec->overlapped.overlapped));
    op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;

    rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, 
                  &bytesRead, &dwFlags, 
		  &op_key_rec->overlapped.overlapped, NULL);
    if (rc == SOCKET_ERROR) {
	DWORD dwStatus = WSAGetLastError();
        if (dwStatus!=WSA_IO_PENDING) {
            *length = -1;
            return PJ_STATUS_FROM_OS(dwStatus);
        }
    }

    /* Pending operation has been scheduled. */
    return PJ_EPENDING;
}

/*
 * pj_ioqueue_recvfrom()
 *
 * Initiate overlapped RecvFrom() operation.
 */
PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
                                         pj_ioqueue_op_key_t *op_key,
					 void *buffer,
					 pj_ssize_t *length,
                                         pj_uint32_t flags,
					 pj_sockaddr_t *addr,
					 int *addrlen)
{
    int rc;
    DWORD bytesRead;
    DWORD dwFlags = 0;
    union operation_key *op_key_rec;

    PJ_CHECK_STACK();
    PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL);

#if PJ_IOQUEUE_HAS_SAFE_UNREG
    /* Check key is not closing */
    if (key->closing)
	return PJ_ECANCELLED;
#endif

    op_key_rec = (union operation_key*)op_key->internal__;
    op_key_rec->overlapped.wsabuf.buf = buffer;
    op_key_rec->overlapped.wsabuf.len = *length;

    dwFlags = flags;
    
    /* Try non-overlapped received first to see if data is
     * immediately available.
     */
    if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
	rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
			 &bytesRead, &dwFlags, addr, addrlen, NULL, NULL);
	if (rc == 0) {
	    *length = bytesRead;
	    return PJ_SUCCESS;
	} else {
	    DWORD dwError = WSAGetLastError();
	    if (dwError != WSAEWOULDBLOCK) {
		*length = -1;
		return PJ_RETURN_OS_ERROR(dwError);
	    }
	}
    }

    dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);

    /*
     * No immediate data available.
     * Register overlapped Recv() operation.
     */
    pj_bzero( &op_key_rec->overlapped.overlapped, 
              sizeof(op_key_rec->overlapped.overlapped));
    op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;

    rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, 
                     &bytesRead, &dwFlags, addr, addrlen,
		     &op_key_rec->overlapped.overlapped, NULL);
    if (rc == SOCKET_ERROR) {
	DWORD dwStatus = WSAGetLastError();
        if (dwStatus!=WSA_IO_PENDING) {
            *length = -1;
            return PJ_STATUS_FROM_OS(dwStatus);
        }
    } 
    
    /* Pending operation has been scheduled. */
    return PJ_EPENDING;
}

/*
 * pj_ioqueue_send()
 *
 * Initiate overlapped Send operation.
 */
PJ_DEF(pj_status_t) pj_ioqueue_send(  pj_ioqueue_key_t *key,
                                      pj_ioqueue_op_key_t *op_key,
				      const void *data,
				      pj_ssize_t *length,
				      pj_uint32_t flags )
{
    return pj_ioqueue_sendto(key, op_key, data, length, flags, NULL, 0);
}


/*
 * pj_ioqueue_sendto()
 *
 * Initiate overlapped SendTo operation.
 */
PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
                                       pj_ioqueue_op_key_t *op_key,
				       const void *data,
				       pj_ssize_t *length,
                                       pj_uint32_t flags,
				       const pj_sockaddr_t *addr,
				       int addrlen)
{
    int rc;
    DWORD bytesWritten;
    DWORD dwFlags;
    union operation_key *op_key_rec;

    PJ_CHECK_STACK();
    PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL);

#if PJ_IOQUEUE_HAS_SAFE_UNREG
    /* Check key is not closing */
    if (key->closing)
	return PJ_ECANCELLED;
#endif

    op_key_rec = (union operation_key*)op_key->internal__;

    /*
     * First try blocking write.
     */
    op_key_rec->overlapped.wsabuf.buf = (void*)data;
    op_key_rec->overlapped.wsabuf.len = *length;

    dwFlags = flags;

    if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
	rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
		       &bytesWritten, dwFlags, addr, addrlen,
		       NULL, NULL);
	if (rc == 0) {
	    *length = bytesWritten;
	    return PJ_SUCCESS;
	} else {
	    DWORD dwStatus = WSAGetLastError();
	    if (dwStatus != WSAEWOULDBLOCK) {
		*length = -1;
		return PJ_RETURN_OS_ERROR(dwStatus);
	    }
	}
    }

    dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);

    /*
     * Data can't be sent immediately.
     * Schedule asynchronous WSASend().
     */
    pj_bzero( &op_key_rec->overlapped.overlapped, 
              sizeof(op_key_rec->overlapped.overlapped));
    op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_SEND;

    rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
                   &bytesWritten,  dwFlags, addr, addrlen,
		   &op_key_rec->overlapped.overlapped, NULL);
    if (rc == SOCKET_ERROR) {
	DWORD dwStatus = WSAGetLastError();
        if (dwStatus!=WSA_IO_PENDING)
            return PJ_STATUS_FROM_OS(dwStatus);
    }

    /* Asynchronous operation successfully submitted. */
    return PJ_EPENDING;
}

#if PJ_HAS_TCP

/*
 * pj_ioqueue_accept()
 *
 * Initiate overlapped accept() operation.
 */
PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
                                       pj_ioqueue_op_key_t *op_key,
			               pj_sock_t *new_sock,
			               pj_sockaddr_t *local,
			               pj_sockaddr_t *remote,
			               int *addrlen)
{
    BOOL rc;
    DWORD bytesReceived;
    pj_status_t status;
    union operation_key *op_key_rec;
    SOCKET sock;

    PJ_CHECK_STACK();
    PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);

#if PJ_IOQUEUE_HAS_SAFE_UNREG
    /* Check key is not closing */
    if (key->closing)
	return PJ_ECANCELLED;
#endif

    /*
     * See if there is a new connection immediately available.
     */
    sock = WSAAccept((SOCKET)key->hnd, remote, addrlen, NULL, 0);
    if (sock != INVALID_SOCKET) {
        /* Yes! New socket is available! */
        int status;

        status = getsockname(sock, local, addrlen);
        if (status != 0) {
            DWORD dwError = WSAGetLastError();
            closesocket(sock);
            return PJ_RETURN_OS_ERROR(dwError);
        }

        *new_sock = sock;
        return PJ_SUCCESS;

    } else {
        DWORD dwError = WSAGetLastError();
        if (dwError != WSAEWOULDBLOCK) {
            return PJ_RETURN_OS_ERROR(dwError);
        }
    }

    /*
     * No connection is immediately available.
     * Must schedule an asynchronous operation.
     */
    op_key_rec = (union operation_key*)op_key->internal__;
    
    status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0, 
                            &op_key_rec->accept.newsock);
    if (status != PJ_SUCCESS)
	return status;

    /* On WinXP or later, use SO_UPDATE_ACCEPT_CONTEXT so that socket 
     * addresses can be obtained with getsockname() and getpeername().
     */
    status = setsockopt(op_key_rec->accept.newsock, SOL_SOCKET,
                        SO_UPDATE_ACCEPT_CONTEXT, 
                        (char*)&key->hnd, sizeof(SOCKET));
    /* SO_UPDATE_ACCEPT_CONTEXT is for WinXP or later.
     * So ignore the error status.
     */

    op_key_rec->accept.operation = PJ_IOQUEUE_OP_ACCEPT;
    op_key_rec->accept.addrlen = addrlen;
    op_key_rec->accept.local = local;
    op_key_rec->accept.remote = remote;
    op_key_rec->accept.newsock_ptr = new_sock;
    pj_bzero( &op_key_rec->accept.overlapped, 
	      sizeof(op_key_rec->accept.overlapped));

    rc = AcceptEx( (SOCKET)key->hnd, (SOCKET)op_key_rec->accept.newsock,
		   op_key_rec->accept.accept_buf,
		   0, ACCEPT_ADDR_LEN, ACCEPT_ADDR_LEN,
		   &bytesReceived,
		   &op_key_rec->accept.overlapped );

    if (rc == TRUE) {
	ioqueue_on_accept_complete(&op_key_rec->accept);
	return PJ_SUCCESS;
    } else {
	DWORD dwStatus = WSAGetLastError();
	if (dwStatus!=WSA_IO_PENDING)
            return PJ_STATUS_FROM_OS(dwStatus);
    }

    /* Asynchronous Accept() has been submitted. */
    return PJ_EPENDING;
}


/*
 * pj_ioqueue_connect()
 *
 * Initiate overlapped connect() operation (well, it's non-blocking actually,
 * since there's no overlapped version of connect()).
 */
PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
					const pj_sockaddr_t *addr,
					int addrlen )
{
    HANDLE hEvent;
    pj_ioqueue_t *ioqueue;

    PJ_CHECK_STACK();
    PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);

#if PJ_IOQUEUE_HAS_SAFE_UNREG
    /* Check key is not closing */
    if (key->closing)
	return PJ_ECANCELLED;
#endif

    /* Initiate connect() */
    if (connect((pj_sock_t)key->hnd, addr, addrlen) != 0) {
	DWORD dwStatus;
	dwStatus = WSAGetLastError();
        if (dwStatus != WSAEWOULDBLOCK) {
	    return PJ_RETURN_OS_ERROR(dwStatus);
	}
    } else {
	/* Connect has completed immediately! */
	return PJ_SUCCESS;
    }

    ioqueue = key->ioqueue;

    /* Add to the array of connecting socket to be polled */
    pj_lock_acquire(ioqueue->lock);

    if (ioqueue->connecting_count >= MAXIMUM_WAIT_OBJECTS) {
	pj_lock_release(ioqueue->lock);
	return PJ_ETOOMANYCONN;
    }

    /* Get or create event object. */
    if (ioqueue->event_count) {
	hEvent = ioqueue->event_pool[ioqueue->event_count - 1];
	--ioqueue->event_count;
    } else {
	hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
	if (hEvent == NULL) {
	    DWORD dwStatus = GetLastError();
	    pj_lock_release(ioqueue->lock);
	    return PJ_STATUS_FROM_OS(dwStatus);
	}
    }

    /* Mark key as connecting.
     * We can't use array index since key can be removed dynamically. 
     */
    key->connecting = 1;

    /* Associate socket events to the event object. */
    if (WSAEventSelect((pj_sock_t)key->hnd, hEvent, FD_CONNECT) != 0) {
	CloseHandle(hEvent);
	pj_lock_release(ioqueue->lock);
	return PJ_RETURN_OS_ERROR(WSAGetLastError());
    }

    /* Add to array. */
    ioqueue->connecting_keys[ ioqueue->connecting_count ] = key;
    ioqueue->connecting_handles[ ioqueue->connecting_count ] = hEvent;
    ioqueue->connecting_count++;

    pj_lock_release(ioqueue->lock);

    return PJ_EPENDING;
}
#endif	/* #if PJ_HAS_TCP */


PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
				     pj_size_t size )
{
    pj_bzero(op_key, size);
}

PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
                                         pj_ioqueue_op_key_t *op_key )
{
    BOOL rc;
    DWORD bytesTransfered;

    rc = GetOverlappedResult( key->hnd, (LPOVERLAPPED)op_key,
                              &bytesTransfered, FALSE );

    if (rc == FALSE) {
        return GetLastError()==ERROR_IO_INCOMPLETE;
    }

    return FALSE;
}


PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
                                                pj_ioqueue_op_key_t *op_key,
                                                pj_ssize_t bytes_status )
{
    BOOL rc;

    rc = PostQueuedCompletionStatus(key->ioqueue->iocp, bytes_status,
                                    (long)key, (OVERLAPPED*)op_key );
    if (rc == FALSE) {
        return PJ_RETURN_OS_ERROR(GetLastError());
    }

    return PJ_SUCCESS;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -