📄 sock_iocp.c
字号:
sock->pending_operations--; if (sock->closing && sock->pending_operations == 0) { MPIU_dbg_printf("sock_wait: closing socket(%d) after simple read completed.\n", sock_getid(sock)); shutdown(sock->sock, SD_BOTH); closesocket(sock->sock); sock->sock = SOCK_INVALID_SOCKET; } MPIDI_FUNC_EXIT(MPID_STATE_SOCK_WAIT); return SOCK_SUCCESS; } /* make the user upcall */ if (sock->read.progress_update != NULL) sock->read.progress_update(num_bytes, sock->user_ptr); /* post a read of the remaining data */ ReadFile((HANDLE)(sock->sock), sock->read.buffer, sock->read.bufflen, &sock->read.num_bytes, &sock->read.ovl); } } else if (ovl == &sock->write.ovl) { if (sock->state & SOCK_CONNECTING) { /* insert code here to determine that the connect succeeded */ /* ... */ sock->state ^= SOCK_CONNECTING; /* remove the SOCK_CONNECTING bit */ out->op_type = SOCK_OP_CONNECT; out->user_ptr = sock->user_ptr; sock->pending_operations--; if (sock->closing && sock->pending_operations == 0) { MPIU_dbg_printf("sock_wait: closing socket(%d) after connect completed.\n", sock_getid(sock)); shutdown(sock->sock, SD_BOTH); closesocket(sock->sock); sock->sock = SOCK_INVALID_SOCKET; } MPIDI_FUNC_EXIT(MPID_STATE_SOCK_WAIT); return SOCK_SUCCESS; } else { /*MPIU_dbg_printf("sock_wait: write update, total = %d + %d = %d\n", sock->write.total, num_bytes, sock->write.total + num_bytes);*/ sock->write.total += num_bytes; if (sock->write.use_iov) { while (num_bytes) { if (sock->write.iov[sock->write.index].SOCK_IOV_LEN <= num_bytes) { /*MPIU_dbg_printf("sock_wait: write.index %d, len %d\n", sock->write.index, sock->write.iov[sock->write.index].SOCK_IOV_LEN);*/ num_bytes -= sock->write.iov[sock->write.index].SOCK_IOV_LEN; sock->write.index++; sock->write.iovlen--; } else { /*MPIU_dbg_printf("sock_wait: partial data written [%d].len = %d, num_bytes = %d\n", sock->write.index, sock->write.iov[sock->write.index].SOCK_IOV_LEN, num_bytes);*/ sock->write.iov[sock->write.index].SOCK_IOV_LEN -= num_bytes; sock->write.iov[sock->write.index].SOCK_IOV_BUF = (char*)(sock->write.iov[sock->write.index].SOCK_IOV_BUF) + num_bytes; num_bytes = 0; } } if (sock->write.iovlen == 0) { out->num_bytes = sock->write.total; out->op_type = SOCK_OP_WRITE; out->user_ptr = sock->user_ptr; sock->pending_operations--; if (sock->closing && sock->pending_operations == 0) { MPIU_dbg_printf("sock_wait: closing socket(%d) after iov write completed.\n", sock_getid(sock)); shutdown(sock->sock, SD_BOTH); closesocket(sock->sock); sock->sock = SOCK_INVALID_SOCKET; } MPIDI_FUNC_EXIT(MPID_STATE_SOCK_WAIT); return SOCK_SUCCESS; } /* make the user upcall */ if (sock->write.progress_update != NULL) sock->write.progress_update(num_bytes, sock->user_ptr); /* post a write of the remaining data */ MPIU_dbg_printf("sock_wait: posting write of the remaining data, vec size %d\n", sock->write.iovlen); WSASend(sock->sock, sock->write.iov, sock->write.iovlen, &sock->write.num_bytes, 0, &sock->write.ovl, NULL); } else { sock->write.buffer = (char*)(sock->write.buffer) + num_bytes; sock->write.bufflen -= num_bytes; if (sock->write.bufflen == 0) { out->num_bytes = sock->write.total; out->op_type = SOCK_OP_WRITE; out->user_ptr = sock->user_ptr; sock->pending_operations--; if (sock->closing && sock->pending_operations == 0) { MPIU_dbg_printf("sock_wait: closing socket(%d) after simple write completed.\n", sock_getid(sock)); shutdown(sock->sock, SD_BOTH); closesocket(sock->sock); sock->sock = SOCK_INVALID_SOCKET; } MPIDI_FUNC_EXIT(MPID_STATE_SOCK_WAIT); return SOCK_SUCCESS; } /* make the user upcall */ if (sock->write.progress_update != NULL) sock->write.progress_update(num_bytes, sock->user_ptr); /* post a write of the remaining data */ WriteFile((HANDLE)(sock->sock), sock->write.buffer, sock->write.bufflen, &sock->write.num_bytes, &sock->write.ovl); } } } else { MPIDI_FUNC_EXIT(MPID_STATE_SOCK_WAIT); return SOCK_FAIL; } } else if (sock->type == SOCK_LISTENER) { sock->state |= SOCK_ACCEPTED; out->num_bytes = num_bytes; out->op_type = SOCK_OP_ACCEPT; out->user_ptr = sock->user_ptr; MPIDI_FUNC_EXIT(MPID_STATE_SOCK_WAIT); return SOCK_SUCCESS; } else { MPIDI_FUNC_EXIT(MPID_STATE_SOCK_WAIT); return SOCK_FAIL; } } else { error = GetLastError(); /* interpret error, return appropriate SOCK_ERR_... macro */ if (error == WAIT_TIMEOUT) { out->op_type = SOCK_OP_TIMEOUT; out->error = SOCK_ERR_TIMEOUT; MPIDI_FUNC_EXIT(MPID_STATE_SOCK_WAIT); return SOCK_SUCCESS; } MPIDI_FUNC_EXIT(MPID_STATE_SOCK_WAIT); return SOCK_FAIL; } } MPIDI_FUNC_EXIT(MPID_STATE_SOCK_WAIT);}int sock_set_user_ptr(sock_t sock, void *user_ptr){ MPIDI_STATE_DECL(MPID_STATE_SOCK_SET_USER_PTR); MPIDI_FUNC_ENTER(MPID_STATE_SOCK_SET_USER_PTR); if (sock == SOCK_INVALID_SOCKET) { MPIDI_FUNC_EXIT(MPID_STATE_SOCK_SET_USER_PTR); return SOCK_FAIL; } sock->user_ptr = user_ptr; MPIDI_FUNC_EXIT(MPID_STATE_SOCK_SET_USER_PTR); return SOCK_SUCCESS;}/* immediate functions */int sock_read(sock_t sock, void *buf, int len, int *num_read){ MPIDI_STATE_DECL(MPID_STATE_SOCK_READ); MPIDI_FUNC_ENTER(MPID_STATE_SOCK_READ); *num_read = recv(sock->sock, buf, len, 0); MPIDI_FUNC_EXIT(MPID_STATE_SOCK_READ); return SOCK_SUCCESS;}int sock_readv(sock_t sock, SOCK_IOV *iov, int n, int *num_read){ DWORD nFlags = 0; MPIDI_STATE_DECL(MPID_STATE_SOCK_READV); MPIDI_FUNC_ENTER(MPID_STATE_SOCK_READV); if (WSARecv(sock->sock, iov, n, num_read, &nFlags, NULL/*overlapped*/, NULL/*completion routine*/) == SOCKET_ERROR) { if (WSAGetLastError() != WSAEWOULDBLOCK) { MPIDI_FUNC_EXIT(MPID_STATE_SOCK_READV); return SOCKET_ERROR; } *num_read = 0; } MPIDI_FUNC_EXIT(MPID_STATE_SOCK_READV); return SOCK_SUCCESS;}int sock_write(sock_t sock, void *buf, int len, int *num_written){ MPIDI_STATE_DECL(MPID_STATE_SOCK_WRITE); MPIDI_FUNC_ENTER(MPID_STATE_SOCK_WRITE); *num_written = send(sock->sock, buf, len, 0); MPIDI_FUNC_EXIT(MPID_STATE_SOCK_WRITE); return SOCK_SUCCESS;}int sock_writev(sock_t sock, SOCK_IOV *iov, int n, int *num_written){ MPIDI_STATE_DECL(MPID_STATE_SOCK_WRITEV); MPIDI_FUNC_ENTER(MPID_STATE_SOCK_WRITEV); if (n == 0) { MPIU_dbg_printf("empty vector passed into sock_writev\n"); MPIDI_FUNC_EXIT(MPID_STATE_SOCK_WRITEV); return 0; } if (WSASend(sock->sock, iov, n, num_written, 0, NULL/*overlapped*/, NULL/*completion routine*/) == SOCKET_ERROR) { if (WSAGetLastError() != WSAEWOULDBLOCK) { MPIDI_FUNC_EXIT(MPID_STATE_SOCK_WRITEV); return SOCKET_ERROR; } } MPIDI_FUNC_EXIT(MPID_STATE_SOCK_WRITEV); return SOCK_SUCCESS;}/* non-blocking functions */int sock_post_read(sock_t sock, void *buf, int len, int (*rfn)(int, void*)){ MPIDI_STATE_DECL(MPID_STATE_SOCK_POST_READ); MPIDI_FUNC_ENTER(MPID_STATE_SOCK_POST_READ); sock->read.total = 0; sock->read.buffer = buf; sock->read.bufflen = len; sock->read.use_iov = FALSE; sock->read.progress_update = rfn; sock->state |= SOCK_READING; sock->pending_operations++; ReadFile((HANDLE)(sock->sock), buf, len, &sock->read.num_bytes, &sock->read.ovl); MPIDI_FUNC_EXIT(MPID_STATE_SOCK_POST_READ); return SOCK_SUCCESS;}int sock_post_readv(sock_t sock, SOCK_IOV *iov, int n, int (*rfn)(int, void*)){ DWORD flags = 0; MPIDI_STATE_DECL(MPID_STATE_SOCK_POST_READV); MPIDI_FUNC_ENTER(MPID_STATE_SOCK_POST_READV); sock->read.total = 0; /*sock->read.iov = iov;*/ memcpy(sock->read.iov, iov, sizeof(SOCK_IOV) * n); sock->read.iovlen = n; sock->read.index = 0; sock->read.use_iov = TRUE; sock->read.progress_update = rfn; sock->state |= SOCK_READING; sock->pending_operations++; WSARecv(sock->sock, sock->read.iov, n, &sock->read.num_bytes, &flags, &sock->read.ovl, NULL); MPIDI_FUNC_EXIT(MPID_STATE_SOCK_POST_READV); return SOCK_SUCCESS;}int sock_post_write(sock_t sock, void *buf, int len, int (*wfn)(int, void*)){ MPIDI_STATE_DECL(MPID_STATE_SOCK_POST_WRITE); MPIDI_FUNC_ENTER(MPID_STATE_SOCK_POST_WRITE); sock->write.total = 0; sock->write.buffer = buf; sock->write.bufflen = len; sock->write.use_iov = FALSE; sock->write.progress_update = wfn; sock->state |= SOCK_WRITING; sock->pending_operations++; WriteFile((HANDLE)(sock->sock), buf, len, &sock->write.num_bytes, &sock->write.ovl); MPIDI_FUNC_EXIT(MPID_STATE_SOCK_POST_WRITE); return SOCK_SUCCESS;}int sock_post_writev(sock_t sock, SOCK_IOV *iov, int n, int (*wfn)(int, void*)){ MPIDI_STATE_DECL(MPID_STATE_SOCK_POST_WRITEV); MPIDI_FUNC_ENTER(MPID_STATE_SOCK_POST_WRITEV); sock->write.total = 0; /*sock->write.iov = iov;*/ memcpy(sock->write.iov, iov, sizeof(SOCK_IOV) * n); sock->write.iovlen = n; sock->write.index = 0; sock->write.use_iov = TRUE; sock->write.progress_update = wfn; sock->state |= SOCK_WRITING; sock->pending_operations++; /* { char str[1024], *s = str; int i; s += sprintf(s, "sock_post_writev("); for (i=0; i<n; i++) s += sprintf(s, "%d,", iov[i].SOCK_IOV_LEN); sprintf(s, ")\n"); MPIU_dbg_printf("%s", str); } */ WSASend(sock->sock, sock->write.iov, n, &sock->write.num_bytes, 0, &sock->write.ovl, NULL); MPIDI_FUNC_EXIT(MPID_STATE_SOCK_POST_WRITEV); return SOCK_SUCCESS;}/* extended functions */int sock_getid(sock_t sock){ return (int)sock->sock;}int sock_easy_receive(sock_t sock, void *buf, int len, int *num_read){ int error; int n; int total = 0; MPIDI_STATE_DECL(MPID_STATE_SOCK_EASY_RECEIVE); MPIDI_FUNC_ENTER(MPID_STATE_SOCK_EASY_RECEIVE); while (len) { error = sock_read(sock, buf, len, &n); if (error != SOCK_SUCCESS) { *num_read = total; MPIDI_FUNC_EXIT(MPID_STATE_SOCK_EASY_RECEIVE); return error; } total += n; buf = (char*)buf + n; len -= n; } MPIDI_FUNC_EXIT(MPID_STATE_SOCK_EASY_RECEIVE); return SOCK_SUCCESS;}int sock_easy_send(sock_t sock, void *buf, int len, int *num_written){ int error; int n; int total = 0; MPIDI_STATE_DECL(MPID_STATE_SOCK_EASY_SEND); MPIDI_FUNC_ENTER(MPID_STATE_SOCK_EASY_SEND); while (len) { error = sock_write(sock, buf, len, &n); if (error != SOCK_SUCCESS) { *num_written = total; MPIDI_FUNC_EXIT(MPID_STATE_SOCK_EASY_SEND); return error; } total += n; buf = (char*)buf + n; len -= n; } MPIDI_FUNC_EXIT(MPID_STATE_SOCK_EASY_SEND); return SOCK_SUCCESS;}#endif /* WITH_SOCK_TYPE == SOCK_IOCP */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -