📄 ioqueue_winnt.c
字号:
DWORD dwError = WSAGetLastError(); if (dwError != WSAEWOULDBLOCK) { *length = -1; return PJ_RETURN_OS_ERROR(dwError); } } } dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC); /* * No immediate data available. * Register overlapped Recv() operation. */ pj_bzero( &op_key_rec->overlapped.overlapped, sizeof(op_key_rec->overlapped.overlapped)); op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV; rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, &bytesRead, &dwFlags, &op_key_rec->overlapped.overlapped, NULL); if (rc == SOCKET_ERROR) { DWORD dwStatus = WSAGetLastError(); if (dwStatus!=WSA_IO_PENDING) { *length = -1; return PJ_STATUS_FROM_OS(dwStatus); } } /* Pending operation has been scheduled. */ return PJ_EPENDING;}/* * pj_ioqueue_recvfrom() * * Initiate overlapped RecvFrom() operation. */PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, void *buffer, pj_ssize_t *length, pj_uint32_t flags, pj_sockaddr_t *addr, int *addrlen){ int rc; DWORD bytesRead; DWORD dwFlags = 0; union operation_key *op_key_rec; PJ_CHECK_STACK(); PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL);#if PJ_IOQUEUE_HAS_SAFE_UNREG /* Check key is not closing */ if (key->closing) return PJ_ECANCELLED;#endif op_key_rec = (union operation_key*)op_key->internal__; op_key_rec->overlapped.wsabuf.buf = buffer; op_key_rec->overlapped.wsabuf.len = *length; dwFlags = flags; /* Try non-overlapped received first to see if data is * immediately available. */ if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) { rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, &bytesRead, &dwFlags, addr, addrlen, NULL, NULL); if (rc == 0) { *length = bytesRead; return PJ_SUCCESS; } else { DWORD dwError = WSAGetLastError(); if (dwError != WSAEWOULDBLOCK) { *length = -1; return PJ_RETURN_OS_ERROR(dwError); } } } dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC); /* * No immediate data available. * Register overlapped Recv() operation. */ pj_bzero( &op_key_rec->overlapped.overlapped, sizeof(op_key_rec->overlapped.overlapped)); op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV; rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, &bytesRead, &dwFlags, addr, addrlen, &op_key_rec->overlapped.overlapped, NULL); if (rc == SOCKET_ERROR) { DWORD dwStatus = WSAGetLastError(); if (dwStatus!=WSA_IO_PENDING) { *length = -1; return PJ_STATUS_FROM_OS(dwStatus); } } /* Pending operation has been scheduled. */ return PJ_EPENDING;}/* * pj_ioqueue_send() * * Initiate overlapped Send operation. */PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, const void *data, pj_ssize_t *length, pj_uint32_t flags ){ return pj_ioqueue_sendto(key, op_key, data, length, flags, NULL, 0);}/* * pj_ioqueue_sendto() * * Initiate overlapped SendTo operation. */PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, const void *data, pj_ssize_t *length, pj_uint32_t flags, const pj_sockaddr_t *addr, int addrlen){ int rc; DWORD bytesWritten; DWORD dwFlags; union operation_key *op_key_rec; PJ_CHECK_STACK(); PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL);#if PJ_IOQUEUE_HAS_SAFE_UNREG /* Check key is not closing */ if (key->closing) return PJ_ECANCELLED;#endif op_key_rec = (union operation_key*)op_key->internal__; /* * First try blocking write. */ op_key_rec->overlapped.wsabuf.buf = (void*)data; op_key_rec->overlapped.wsabuf.len = *length; dwFlags = flags; if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) { rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, &bytesWritten, dwFlags, addr, addrlen, NULL, NULL); if (rc == 0) { *length = bytesWritten; return PJ_SUCCESS; } else { DWORD dwStatus = WSAGetLastError(); if (dwStatus != WSAEWOULDBLOCK) { *length = -1; return PJ_RETURN_OS_ERROR(dwStatus); } } } dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC); /* * Data can't be sent immediately. * Schedule asynchronous WSASend(). */ pj_bzero( &op_key_rec->overlapped.overlapped, sizeof(op_key_rec->overlapped.overlapped)); op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_SEND; rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, &bytesWritten, dwFlags, addr, addrlen, &op_key_rec->overlapped.overlapped, NULL); if (rc == SOCKET_ERROR) { DWORD dwStatus = WSAGetLastError(); if (dwStatus!=WSA_IO_PENDING) return PJ_STATUS_FROM_OS(dwStatus); } /* Asynchronous operation successfully submitted. */ return PJ_EPENDING;}#if PJ_HAS_TCP/* * pj_ioqueue_accept() * * Initiate overlapped accept() operation. */PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_sock_t *new_sock, pj_sockaddr_t *local, pj_sockaddr_t *remote, int *addrlen){ BOOL rc; DWORD bytesReceived; pj_status_t status; union operation_key *op_key_rec; SOCKET sock; PJ_CHECK_STACK(); PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);#if PJ_IOQUEUE_HAS_SAFE_UNREG /* Check key is not closing */ if (key->closing) return PJ_ECANCELLED;#endif /* * See if there is a new connection immediately available. */ sock = WSAAccept((SOCKET)key->hnd, remote, addrlen, NULL, 0); if (sock != INVALID_SOCKET) { /* Yes! New socket is available! */ int status; status = getsockname(sock, local, addrlen); if (status != 0) { DWORD dwError = WSAGetLastError(); closesocket(sock); return PJ_RETURN_OS_ERROR(dwError); } *new_sock = sock; return PJ_SUCCESS; } else { DWORD dwError = WSAGetLastError(); if (dwError != WSAEWOULDBLOCK) { return PJ_RETURN_OS_ERROR(dwError); } } /* * No connection is immediately available. * Must schedule an asynchronous operation. */ op_key_rec = (union operation_key*)op_key->internal__; status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0, &op_key_rec->accept.newsock); if (status != PJ_SUCCESS) return status; /* On WinXP or later, use SO_UPDATE_ACCEPT_CONTEXT so that socket * addresses can be obtained with getsockname() and getpeername(). */ status = setsockopt(op_key_rec->accept.newsock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&key->hnd, sizeof(SOCKET)); /* SO_UPDATE_ACCEPT_CONTEXT is for WinXP or later. * So ignore the error status. */ op_key_rec->accept.operation = PJ_IOQUEUE_OP_ACCEPT; op_key_rec->accept.addrlen = addrlen; op_key_rec->accept.local = local; op_key_rec->accept.remote = remote; op_key_rec->accept.newsock_ptr = new_sock; pj_bzero( &op_key_rec->accept.overlapped, sizeof(op_key_rec->accept.overlapped)); rc = AcceptEx( (SOCKET)key->hnd, (SOCKET)op_key_rec->accept.newsock, op_key_rec->accept.accept_buf, 0, ACCEPT_ADDR_LEN, ACCEPT_ADDR_LEN, &bytesReceived, &op_key_rec->accept.overlapped ); if (rc == TRUE) { ioqueue_on_accept_complete(&op_key_rec->accept); return PJ_SUCCESS; } else { DWORD dwStatus = WSAGetLastError(); if (dwStatus!=WSA_IO_PENDING) return PJ_STATUS_FROM_OS(dwStatus); } /* Asynchronous Accept() has been submitted. */ return PJ_EPENDING;}/* * pj_ioqueue_connect() * * Initiate overlapped connect() operation (well, it's non-blocking actually, * since there's no overlapped version of connect()). */PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, const pj_sockaddr_t *addr, int addrlen ){ HANDLE hEvent; pj_ioqueue_t *ioqueue; PJ_CHECK_STACK(); PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);#if PJ_IOQUEUE_HAS_SAFE_UNREG /* Check key is not closing */ if (key->closing) return PJ_ECANCELLED;#endif /* Initiate connect() */ if (connect((pj_sock_t)key->hnd, addr, addrlen) != 0) { DWORD dwStatus; dwStatus = WSAGetLastError(); if (dwStatus != WSAEWOULDBLOCK) { return PJ_RETURN_OS_ERROR(dwStatus); } } else { /* Connect has completed immediately! */ return PJ_SUCCESS; } ioqueue = key->ioqueue; /* Add to the array of connecting socket to be polled */ pj_lock_acquire(ioqueue->lock); if (ioqueue->connecting_count >= MAXIMUM_WAIT_OBJECTS) { pj_lock_release(ioqueue->lock); return PJ_ETOOMANYCONN; } /* Get or create event object. */ if (ioqueue->event_count) { hEvent = ioqueue->event_pool[ioqueue->event_count - 1]; --ioqueue->event_count; } else { hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); if (hEvent == NULL) { DWORD dwStatus = GetLastError(); pj_lock_release(ioqueue->lock); return PJ_STATUS_FROM_OS(dwStatus); } } /* Mark key as connecting. * We can't use array index since key can be removed dynamically. */ key->connecting = 1; /* Associate socket events to the event object. */ if (WSAEventSelect((pj_sock_t)key->hnd, hEvent, FD_CONNECT) != 0) { CloseHandle(hEvent); pj_lock_release(ioqueue->lock); return PJ_RETURN_OS_ERROR(WSAGetLastError()); } /* Add to array. */ ioqueue->connecting_keys[ ioqueue->connecting_count ] = key; ioqueue->connecting_handles[ ioqueue->connecting_count ] = hEvent; ioqueue->connecting_count++; pj_lock_release(ioqueue->lock); return PJ_EPENDING;}#endif /* #if PJ_HAS_TCP */PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key, pj_size_t size ){ pj_bzero(op_key, size);}PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key ){ BOOL rc; DWORD bytesTransfered; rc = GetOverlappedResult( key->hnd, (LPOVERLAPPED)op_key, &bytesTransfered, FALSE ); if (rc == FALSE) { return GetLastError()==ERROR_IO_INCOMPLETE; } return FALSE;}PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_status ){ BOOL rc; rc = PostQueuedCompletionStatus(key->ioqueue->iocp, bytes_status, (long)key, (OVERLAPPED*)op_key ); if (rc == FALSE) { return PJ_RETURN_OS_ERROR(GetLastError()); } return PJ_SUCCESS;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -