📄 ioqueue_winnt.c
字号:
DWORD dwError = WSAGetLastError();
if (dwError != WSAEWOULDBLOCK) {
*length = -1;
return PJ_RETURN_OS_ERROR(dwError);
}
}
}
dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
/*
* No immediate data available.
* Register overlapped Recv() operation.
*/
pj_bzero( &op_key_rec->overlapped.overlapped,
sizeof(op_key_rec->overlapped.overlapped));
op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;
rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
&bytesRead, &dwFlags,
&op_key_rec->overlapped.overlapped, NULL);
if (rc == SOCKET_ERROR) {
DWORD dwStatus = WSAGetLastError();
if (dwStatus!=WSA_IO_PENDING) {
*length = -1;
return PJ_STATUS_FROM_OS(dwStatus);
}
}
/* Pending operation has been scheduled. */
return PJ_EPENDING;
}
/*
* pj_ioqueue_recvfrom()
*
* Initiate overlapped RecvFrom() operation.
*/
PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
void *buffer,
pj_ssize_t *length,
pj_uint32_t flags,
pj_sockaddr_t *addr,
int *addrlen)
{
int rc;
DWORD bytesRead;
DWORD dwFlags = 0;
union operation_key *op_key_rec;
PJ_CHECK_STACK();
PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL);
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Check key is not closing */
if (key->closing)
return PJ_ECANCELLED;
#endif
op_key_rec = (union operation_key*)op_key->internal__;
op_key_rec->overlapped.wsabuf.buf = buffer;
op_key_rec->overlapped.wsabuf.len = *length;
dwFlags = flags;
/* Try non-overlapped received first to see if data is
* immediately available.
*/
if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
&bytesRead, &dwFlags, addr, addrlen, NULL, NULL);
if (rc == 0) {
*length = bytesRead;
return PJ_SUCCESS;
} else {
DWORD dwError = WSAGetLastError();
if (dwError != WSAEWOULDBLOCK) {
*length = -1;
return PJ_RETURN_OS_ERROR(dwError);
}
}
}
dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
/*
* No immediate data available.
* Register overlapped Recv() operation.
*/
pj_bzero( &op_key_rec->overlapped.overlapped,
sizeof(op_key_rec->overlapped.overlapped));
op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;
rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
&bytesRead, &dwFlags, addr, addrlen,
&op_key_rec->overlapped.overlapped, NULL);
if (rc == SOCKET_ERROR) {
DWORD dwStatus = WSAGetLastError();
if (dwStatus!=WSA_IO_PENDING) {
*length = -1;
return PJ_STATUS_FROM_OS(dwStatus);
}
}
/* Pending operation has been scheduled. */
return PJ_EPENDING;
}
/*
* pj_ioqueue_send()
*
* Initiate overlapped Send operation.
*/
PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
const void *data,
pj_ssize_t *length,
pj_uint32_t flags )
{
return pj_ioqueue_sendto(key, op_key, data, length, flags, NULL, 0);
}
/*
* pj_ioqueue_sendto()
*
* Initiate overlapped SendTo operation.
*/
PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
const void *data,
pj_ssize_t *length,
pj_uint32_t flags,
const pj_sockaddr_t *addr,
int addrlen)
{
int rc;
DWORD bytesWritten;
DWORD dwFlags;
union operation_key *op_key_rec;
PJ_CHECK_STACK();
PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL);
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Check key is not closing */
if (key->closing)
return PJ_ECANCELLED;
#endif
op_key_rec = (union operation_key*)op_key->internal__;
/*
* First try blocking write.
*/
op_key_rec->overlapped.wsabuf.buf = (void*)data;
op_key_rec->overlapped.wsabuf.len = *length;
dwFlags = flags;
if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
&bytesWritten, dwFlags, addr, addrlen,
NULL, NULL);
if (rc == 0) {
*length = bytesWritten;
return PJ_SUCCESS;
} else {
DWORD dwStatus = WSAGetLastError();
if (dwStatus != WSAEWOULDBLOCK) {
*length = -1;
return PJ_RETURN_OS_ERROR(dwStatus);
}
}
}
dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
/*
* Data can't be sent immediately.
* Schedule asynchronous WSASend().
*/
pj_bzero( &op_key_rec->overlapped.overlapped,
sizeof(op_key_rec->overlapped.overlapped));
op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_SEND;
rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
&bytesWritten, dwFlags, addr, addrlen,
&op_key_rec->overlapped.overlapped, NULL);
if (rc == SOCKET_ERROR) {
DWORD dwStatus = WSAGetLastError();
if (dwStatus!=WSA_IO_PENDING)
return PJ_STATUS_FROM_OS(dwStatus);
}
/* Asynchronous operation successfully submitted. */
return PJ_EPENDING;
}
#if PJ_HAS_TCP
/*
* pj_ioqueue_accept()
*
* Initiate overlapped accept() operation.
*/
PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
pj_sock_t *new_sock,
pj_sockaddr_t *local,
pj_sockaddr_t *remote,
int *addrlen)
{
BOOL rc;
DWORD bytesReceived;
pj_status_t status;
union operation_key *op_key_rec;
SOCKET sock;
PJ_CHECK_STACK();
PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Check key is not closing */
if (key->closing)
return PJ_ECANCELLED;
#endif
/*
* See if there is a new connection immediately available.
*/
sock = WSAAccept((SOCKET)key->hnd, remote, addrlen, NULL, 0);
if (sock != INVALID_SOCKET) {
/* Yes! New socket is available! */
int status;
status = getsockname(sock, local, addrlen);
if (status != 0) {
DWORD dwError = WSAGetLastError();
closesocket(sock);
return PJ_RETURN_OS_ERROR(dwError);
}
*new_sock = sock;
return PJ_SUCCESS;
} else {
DWORD dwError = WSAGetLastError();
if (dwError != WSAEWOULDBLOCK) {
return PJ_RETURN_OS_ERROR(dwError);
}
}
/*
* No connection is immediately available.
* Must schedule an asynchronous operation.
*/
op_key_rec = (union operation_key*)op_key->internal__;
status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0,
&op_key_rec->accept.newsock);
if (status != PJ_SUCCESS)
return status;
/* On WinXP or later, use SO_UPDATE_ACCEPT_CONTEXT so that socket
* addresses can be obtained with getsockname() and getpeername().
*/
status = setsockopt(op_key_rec->accept.newsock, SOL_SOCKET,
SO_UPDATE_ACCEPT_CONTEXT,
(char*)&key->hnd, sizeof(SOCKET));
/* SO_UPDATE_ACCEPT_CONTEXT is for WinXP or later.
* So ignore the error status.
*/
op_key_rec->accept.operation = PJ_IOQUEUE_OP_ACCEPT;
op_key_rec->accept.addrlen = addrlen;
op_key_rec->accept.local = local;
op_key_rec->accept.remote = remote;
op_key_rec->accept.newsock_ptr = new_sock;
pj_bzero( &op_key_rec->accept.overlapped,
sizeof(op_key_rec->accept.overlapped));
rc = AcceptEx( (SOCKET)key->hnd, (SOCKET)op_key_rec->accept.newsock,
op_key_rec->accept.accept_buf,
0, ACCEPT_ADDR_LEN, ACCEPT_ADDR_LEN,
&bytesReceived,
&op_key_rec->accept.overlapped );
if (rc == TRUE) {
ioqueue_on_accept_complete(&op_key_rec->accept);
return PJ_SUCCESS;
} else {
DWORD dwStatus = WSAGetLastError();
if (dwStatus!=WSA_IO_PENDING)
return PJ_STATUS_FROM_OS(dwStatus);
}
/* Asynchronous Accept() has been submitted. */
return PJ_EPENDING;
}
/*
* pj_ioqueue_connect()
*
* Initiate overlapped connect() operation (well, it's non-blocking actually,
* since there's no overlapped version of connect()).
*/
PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
const pj_sockaddr_t *addr,
int addrlen )
{
HANDLE hEvent;
pj_ioqueue_t *ioqueue;
PJ_CHECK_STACK();
PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Check key is not closing */
if (key->closing)
return PJ_ECANCELLED;
#endif
/* Initiate connect() */
if (connect((pj_sock_t)key->hnd, addr, addrlen) != 0) {
DWORD dwStatus;
dwStatus = WSAGetLastError();
if (dwStatus != WSAEWOULDBLOCK) {
return PJ_RETURN_OS_ERROR(dwStatus);
}
} else {
/* Connect has completed immediately! */
return PJ_SUCCESS;
}
ioqueue = key->ioqueue;
/* Add to the array of connecting socket to be polled */
pj_lock_acquire(ioqueue->lock);
if (ioqueue->connecting_count >= MAXIMUM_WAIT_OBJECTS) {
pj_lock_release(ioqueue->lock);
return PJ_ETOOMANYCONN;
}
/* Get or create event object. */
if (ioqueue->event_count) {
hEvent = ioqueue->event_pool[ioqueue->event_count - 1];
--ioqueue->event_count;
} else {
hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
if (hEvent == NULL) {
DWORD dwStatus = GetLastError();
pj_lock_release(ioqueue->lock);
return PJ_STATUS_FROM_OS(dwStatus);
}
}
/* Mark key as connecting.
* We can't use array index since key can be removed dynamically.
*/
key->connecting = 1;
/* Associate socket events to the event object. */
if (WSAEventSelect((pj_sock_t)key->hnd, hEvent, FD_CONNECT) != 0) {
CloseHandle(hEvent);
pj_lock_release(ioqueue->lock);
return PJ_RETURN_OS_ERROR(WSAGetLastError());
}
/* Add to array. */
ioqueue->connecting_keys[ ioqueue->connecting_count ] = key;
ioqueue->connecting_handles[ ioqueue->connecting_count ] = hEvent;
ioqueue->connecting_count++;
pj_lock_release(ioqueue->lock);
return PJ_EPENDING;
}
#endif /* #if PJ_HAS_TCP */
PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
pj_size_t size )
{
pj_bzero(op_key, size);
}
PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key )
{
BOOL rc;
DWORD bytesTransfered;
rc = GetOverlappedResult( key->hnd, (LPOVERLAPPED)op_key,
&bytesTransfered, FALSE );
if (rc == FALSE) {
return GetLastError()==ERROR_IO_INCOMPLETE;
}
return FALSE;
}
PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
pj_ssize_t bytes_status )
{
BOOL rc;
rc = PostQueuedCompletionStatus(key->ioqueue->iocp, bytes_status,
(long)key, (OVERLAPPED*)op_key );
if (rc == FALSE) {
return PJ_RETURN_OS_ERROR(GetLastError());
}
return PJ_SUCCESS;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -