📄 sock.c
字号:
MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(sock->read.iov, iov, sizeof(MPID_IOV) * n); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY);#else sock->read.iov = iov;#endif sock->read.iovlen = iov_n; sock->read.index = 0; sock->read.progress_update = fn; sock->state |= SOCKI_READING;#ifdef MPICH_DBG_OUTPUT for (i=0; i<iov_n; i++) { MPIU_DBG_PRINTF(("sock_post_readv - iov[%d].len = %d\n", i, iov[i].MPID_IOV_LEN)); }#endif for (iter=0; iter<10; iter++) { if ((result = WSARecv(sock->sock, sock->read.iov, iov_n, &sock->read.num_bytes, &flags, &sock->read.ovl, NULL)) != SOCKET_ERROR) { break; } mpi_errno = WSAGetLastError(); if (mpi_errno == WSA_IO_PENDING) { mpi_errno = MPI_SUCCESS; break; } if (mpi_errno == WSAENOBUFS) { WSABUF tmp; tmp.buf = sock->read.iov[0].buf; tmp.len = sock->read.iov[0].len; MPIU_Assert(tmp.len > 0); while (mpi_errno == WSAENOBUFS) { /*printf("[%d] receiving %d bytes\n", __LINE__, tmp.len);fflush(stdout);*/ if ((result = WSARecv(sock->sock, &tmp, 1, &sock->read.num_bytes, &flags, &sock->read.ovl, NULL)) != SOCKET_ERROR) { mpi_errno = MPI_SUCCESS; break; } mpi_errno = WSAGetLastError(); if (mpi_errno == WSA_IO_PENDING) { mpi_errno = MPI_SUCCESS; break; } /*printf("[%d] reducing recv length from %d to %d\n", __LINE__, tmp.len, tmp.len / 2);fflush(stdout);*/ tmp.len = tmp.len / 2; if (tmp.len == 0 && mpi_errno == WSAENOBUFS) { break; } } if (mpi_errno == MPI_SUCCESS) { break; } } if (mpi_errno != WSAEWOULDBLOCK) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**fail", "**fail %s %d", get_error_string(mpi_errno), mpi_errno); break; } Sleep(200); } if (mpi_errno == MPI_SUCCESS) sock->pending_operations++; else sock->state &= ~SOCKI_READING; MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_READV); return mpi_errno;}#undef FUNCNAME#define FUNCNAME MPIDU_Sock_post_write#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDU_Sock_post_write(MPIDU_Sock_t sock, void * buf, MPIU_Size_t min, MPIU_Size_t max, MPIDU_Sock_progress_update_func_t fn){ int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCK_POST_WRITE); MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCK_POST_WRITE); MPIU_UNREFERENCED_ARG(max); sock->write.tiov.MPID_IOV_BUF = (MPID_IOV_BUF_CAST)buf; sock->write.tiov.MPID_IOV_LEN = min; mpi_errno = MPIDU_Sock_post_writev(sock, &sock->write.tiov, 1, fn); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_WRITE); return mpi_errno;}#undef FUNCNAME#define FUNCNAME MPIDU_Sock_post_writev#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDU_Sock_post_writev(MPIDU_Sock_t sock, MPID_IOV * iov, int iov_n, MPIDU_Sock_progress_update_func_t fn){ int mpi_errno = MPI_SUCCESS; int iter;#ifdef MPICH_DBG_OUTPUT int i;#endif MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCK_POST_WRITEV);#ifdef USE_SOCK_IOV_COPY MPIDI_STATE_DECL(MPID_STATE_MEMCPY);#endif MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCK_POST_WRITEV); /*sock->wt1 = PMPI_Wtime();*/ sock->write.total = 0;#ifdef USE_SOCK_IOV_COPY MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(sock->write.iov, iov, sizeof(MPID_IOV) * iov_n); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY);#else sock->write.iov = iov;#endif sock->write.iovlen = iov_n; sock->write.index = 0; sock->write.progress_update = fn; sock->state |= SOCKI_WRITING;#ifdef MPICH_DBG_OUTPUT for (i=0; i<iov_n; i++) { MPIU_DBG_PRINTF(("sock_post_writev - iov[%d].len = %d\n", i, iov[i].MPID_IOV_LEN)); }#endif for (iter=0; iter<10; iter++) { if (WSASend(sock->sock, sock->write.iov, iov_n, &sock->write.num_bytes, 0, &sock->write.ovl, NULL) != SOCKET_ERROR) break; mpi_errno = WSAGetLastError(); if (mpi_errno == WSA_IO_PENDING) { mpi_errno = MPI_SUCCESS; break; } if (mpi_errno == WSAENOBUFS) { WSABUF tmp; tmp.buf = sock->write.iov[0].buf; tmp.len = sock->write.iov[0].len; while (mpi_errno == WSAENOBUFS) { /*printf("[%d] sending %d bytes\n", __LINE__, tmp.len);fflush(stdout);*/ if (WSASend(sock->sock, &tmp, 1, &sock->write.num_bytes, 0, &sock->write.ovl, NULL) != SOCKET_ERROR) { mpi_errno = MPI_SUCCESS; break; } mpi_errno = WSAGetLastError(); if (mpi_errno == WSA_IO_PENDING) { mpi_errno = MPI_SUCCESS; break; } /*printf("[%d] reducing send length from %d to %d\n", __LINE__, tmp.len, tmp.len / 2);fflush(stdout);*/ tmp.len = tmp.len / 2; if (tmp.len == 0) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**fail", 0); break; } } if (mpi_errno == MPI_SUCCESS) { break; } } if (mpi_errno != WSAEWOULDBLOCK) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**fail", "**fail %s %d", get_error_string(mpi_errno), mpi_errno); break; } Sleep(200); } if (mpi_errno == MPI_SUCCESS) sock->pending_operations++; else sock->state &= ~SOCKI_WRITING; MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_WRITEV); return mpi_errno;}#undef FUNCNAME#define FUNCNAME MPIDU_Sock_wait#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDU_Sock_wait(MPIDU_Sock_set_t set, int timeout, MPIDU_Sock_event_t * out){ /*double t1, t2;*/ int mpi_errno; DWORD num_bytes; sock_state_t *sock, *iter; OVERLAPPED *ovl; DWORD dwFlags = 0; char error_msg[1024]; MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCK_WAIT); MPIDI_STATE_DECL(MPID_STATE_GETQUEUEDCOMPLETIONSTATUS); MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCK_WAIT); for (;;) {#if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE)# if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX) { /* Release the lock so that other threads may make progress while this thread waits for something to do */ MPIU_DBG_MSG(THREAD,TYPICAL,"Exit global critical section"); MPIU_THREAD_CHECK_BEGIN MPID_Thread_mutex_unlock(&MPIR_ThreadInfo.global_mutex); MPIU_THREAD_CHECK_END }# else# error selected multi-threaded implementation is not supported# endif#endif MPIDI_FUNC_ENTER(MPID_STATE_GETQUEUEDCOMPLETIONSTATUS); /* initialize to NULL so we can compare the output of GetQueuedCompletionStatus */ sock = NULL; ovl = NULL; num_bytes = 0; /*t1 = PMPI_Wtime();*/ if (GetQueuedCompletionStatus(set, &num_bytes, (PULONG_PTR)&sock, &ovl, timeout)) { /*t2 = PMPI_Wtime();*/ /*printf("[%d] GetQueuedCompletionStatus took %.3f seconds for sock: %d\n", getpid(), t2-t1, sock->sock);*/ MPIDI_FUNC_EXIT(MPID_STATE_GETQUEUEDCOMPLETIONSTATUS);#if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE)# if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX) { /* Reaquire the lock before processing any of the information returned from GetQueuedCompletionStatus */ MPIU_DBG_MSG(THREAD,TYPICAL,"Enter global critical section"); MPIU_THREAD_CHECK_BEGIN MPID_Thread_mutex_lock(&MPIR_ThreadInfo.global_mutex); MPIU_THREAD_CHECK_END }# else# error selected multi-threaded implementation is not supported# endif#endif if (sock->type == SOCKI_SOCKET) { if (sock->closing && sock->pending_operations == 0) { /*sock->ct2 = PMPI_Wtime();*/ /*printf("[%d] time from post_close to op_close: %.3f - sock %d\n", getpid(), sock->ct2 - sock->ct1, sock->sock);*/ /*printf("<1>");fflush(stdout);*/ out->op_type = MPIDU_SOCK_OP_CLOSE; out->num_bytes = 0; out->error = MPI_SUCCESS; out->user_ptr = sock->user_ptr; CloseHandle(sock->read.ovl.hEvent); CloseHandle(sock->write.ovl.hEvent); sock->read.ovl.hEvent = NULL; sock->write.ovl.hEvent = NULL;#if 0 MPIU_Free(sock); /* will this cause future io completion port errors since sock is the iocp user pointer? */#endif if (sock->sock != INVALID_SOCKET) { /*printf("closing socket %d\n", sock->sock);fflush(stdout);*/ if (closesocket(sock->sock) == SOCKET_ERROR) { mpi_errno = WSAGetLastError(); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**closesocket", "**closesocket %s %d", get_error_string(mpi_errno), mpi_errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_WAIT); return mpi_errno; } sock->sock = INVALID_SOCKET; } MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_WAIT); return MPI_SUCCESS; } if (ovl == &sock->read.ovl) { if (num_bytes == 0) { /*sock->rt2 = PMPI_Wtime();*/ /*printf("[%d] time from post_read to op_read : %.3f - sock %d\n", getpid(), sock->rt2 - sock->rt1, sock->sock);*/ /* socket closed */ MPIU_DBG_PRINTF(("sock_wait readv returning %d bytes and EOF\n", sock->read.total)); /*printf("sock_wait readv returning %d bytes and EOF\n", sock->read.total);*/ out->op_type = MPIDU_SOCK_OP_READ; out->num_bytes = sock->read.total; out->error = MPIDU_SOCK_ERR_CONN_CLOSED; out->user_ptr = sock->user_ptr; sock->pending_operations--; sock->state &= ~SOCKI_READING; /* remove the SOCKI_READING bit */ if (sock->closing && sock->pending_operations == 0) { MPIU_DBG_PRINTF(("sock_wait: closing socket(%d) after iov read completed.\n", MPIDU_Sock_get_sock_id(sock))); FlushFileBuffers((HANDLE)sock->sock); if (shutdown(sock->sock, SD_BOTH) == SOCKET_ERROR) { mpi_errno = WSAGetLastError(); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**shutdown", "**shutdown %s %d", get_error_string(mpi_errno), mpi_errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_WAIT); return mpi_errno; } /*printf("closing socket %d\n", sock->sock);fflush(stdout);*/ if (closesocket(sock->sock) == SOCKET_ERROR) { mpi_errno = WSAGetLastError(); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**closesocket", "**closesocket %s %d", get_error_string(mpi_errno), mpi_errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_WAIT); return mpi_errno; } sock->sock = INVALID_SOCKET; } MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_WAIT); return MPI_SUCCESS; } MPIU_DBG_PRINTF(("sock_wait readv update: %d bytes\n", num_bytes)); sock->read.total += num_bytes; while (num_bytes) { if (sock->read.iov[sock->read.index].MPID_IOV_LEN <= num_bytes) { num_bytes -= sock->read.iov[sock->read.index].MPID_IOV_LEN; sock->read.index++; sock->read.iovlen--; } else { sock->read.iov[sock->read.index].MPID_IOV_LEN -= num_bytes; sock->read.iov[sock->read.index].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)( (char*)(sock->read.iov[sock->read.index].MPID_IOV_BUF) + num_bytes); num_bytes = 0; } } if (sock->read.iovlen == 0) { /*sock->rt2 = PMPI_Wtime();*/ /*printf("[%d] time from post_read to op_read : %.3f - sock %d\n", getpid(), sock->rt2 - sock->rt1, sock->sock);*/ MPIU_DBG_PRINTF(("sock_wait readv %d bytes\n", sock->read.total)); out->op_type = MPIDU_SOCK_OP_READ; out->num_bytes = sock->read.total; out->error = MPI_SUCCESS; out->user_ptr = sock->user_ptr; sock->pending_operations--; sock->state &= ~SOCKI_READING; /* remove the SOCKI_READING bit */ if (sock->closing && sock->pending_operations == 0) { MPIU_DBG_PRINTF(("sock_wait: closing socket(%d) after iov read completed.\n", MPIDU_Sock_get_sock_id(sock))); FlushFileBuffers((HANDLE)sock->sock); if (shutdown(sock->sock, SD_BOTH) == SOCKET_ERROR) { mpi_errno = WSAGetLastError(); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**shutdown", "**shutdown %s %d", get_error_string(mpi_errno), mpi_errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_WAIT); return mpi_errno; } /*printf("closing socket %d\n", sock->sock);fflush(stdout);*/ if (closesocket(sock->sock) == SOCKET_ERROR) { mpi_errno = WSAGetLastError(); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**closesocket", "**closesocket %s %d", get_error_string(mpi_errno), mpi_errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_WAIT); return mpi_errno; } sock->sock = INVALID_SOCKET; } MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_WAIT); return MPI_SUCCESS; } /* make the user upcall */ if (sock->read.progress_update != NULL) sock->read.progress_update(num_bytes, sock->user_ptr); /* post a read of the remaining data */ /*WSARecv(sock->sock, sock->read.iov, sock->read.iovlen, &sock->read.num_bytes, &dwFlags, &sock->read.ovl, NULL);*/ if (WSARecv(sock->sock, &sock->read.iov[sock->read.index], sock->read.iovlen, &sock->read.num_bytes, &dwFlags, &sock->read.ovl, NULL) == SOCKET_ERROR) { mpi_errno = WSAGetLastError(); if (mpi_errno == 0) { /*sock->rt2 = PMPI_Wtime();*/ /*printf("[%d] time from post_read to op_read : %.3f - sock %d\n", getpid(), sock->rt2 - sock->rt1, sock->sock);*/ out->op_type = MPIDU_SOCK_OP_READ; out->num_bytes = sock->read.total; /*printf("sock_wait returning %d bytes and socket closed\n", sock->read.total);*/ out->error = MPIDU_SOCK_ERR_CONN_CLOSED; out->user_ptr = sock->user_ptr; sock->pending_operations--; sock->state &= ~SOCKI_READING; if (sock->closing && sock->pending_operations == 0) { MPIU_DBG_PRINTF(("sock_wait: closing socket(%d) after iov read completed.\n", MPIDU_Sock_get_sock_id(sock))); FlushFileBuffers((HANDLE)sock->sock); if (shutdown(sock->sock, SD_BOTH) == SOCKET_ERROR) { mpi_errno = WSAGetLastError(); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**shutdown", "**shutdown %s %d", get_error_string(mpi_errno),
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -