📄 sock.c
字号:
sockAddr.sin_family = AF_INET; sockAddr.sin_addr.s_addr = inet_addr(host); if (sockAddr.sin_addr.s_addr == INADDR_NONE || sockAddr.sin_addr.s_addr == 0) { lphost = gethostbyname(host); if (lphost != NULL) sockAddr.sin_addr.s_addr = ((struct in_addr *)lphost->h_addr)->s_addr; else { mpi_errno = WSAGetLastError(); connect_errno = MPIR_Err_create_code(connect_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**gethostbyname", "**gethostbyname %s %d", get_error_string(mpi_errno), mpi_errno); /* MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CONNECT); return mpi_errno; */ continue; } } /* if a subnet was specified, make sure the currently extracted ip falls in the subnet */ if (use_subnet) { if ((sockAddr.sin_addr.s_addr & nNicMask) != nNicNet) { /* this ip does not match, move to the next */ continue; } } sockAddr.sin_port = htons((u_short)port); /* connect */ for (i=0; i<5; i++) { /*printf("connecting to %s\n", host);fflush(stdout);*/ if (connect(connect_state->sock, (SOCKADDR*)&sockAddr, sizeof(sockAddr)) == SOCKET_ERROR) { int random_time; int error = WSAGetLastError(); if (error != WSAECONNREFUSED || i == 4) { connect_errno = MPIR_Err_create_code(connect_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_INIT, "**sock_connect", "**sock_connect %s %d %s %d", host, port, get_error_string(error), error); /* MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CONNECT); return mpi_errno; */ break; } random_time = (int)((double)rand() / (double)RAND_MAX * 250.0); Sleep(random_time); } else { /*printf("connect to %s:%d succeeded.\n", host, port);fflush(stdout);*/ connected = 1; break; } } } /* set the socket to non-blocking */ optval = TRUE; ioctlsocket(connect_state->sock, FIONBIO, &optval); connect_state->user_ptr = user_ptr; connect_state->type = SOCKI_SOCKET; connect_state->state = SOCKI_CONNECTING; connect_state->set = set; /* associate the socket with the completion port */ if (CreateIoCompletionPort((HANDLE)connect_state->sock, set, (ULONG_PTR)connect_state, g_num_cp_threads) == NULL) { mpi_errno = GetLastError(); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_INIT, "**iocp", "**iocp %s %d", get_error_string(mpi_errno), mpi_errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CONNECT); return mpi_errno; } connect_state->pending_operations++; /* post a completion event so the sock_post_connect can be notified through sock_wait */ PostQueuedCompletionStatus(set, 0, (ULONG_PTR)connect_state, &connect_state->write.ovl); *sock = connect_state; MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CONNECT); return MPI_SUCCESS;}#undef FUNCNAME#define FUNCNAME MPIDU_Sock_set_user_ptr#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDU_Sock_set_user_ptr(MPIDU_Sock_t sock, void * user_ptr){ int mpi_errno; MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCK_SET_USER_PTR); MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCK_SET_USER_PTR); if (!g_init_called) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_INIT, "**sock_init", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_SET_USER_PTR); return mpi_errno; } if (sock == NULL) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_BAD_SOCK, "**bad_sock", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_SET_USER_PTR); return mpi_errno; } sock->user_ptr = user_ptr; MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_SET_USER_PTR); return MPI_SUCCESS;}#undef FUNCNAME#define FUNCNAME MPIDU_Sock_post_close#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDU_Sock_post_close(MPIDU_Sock_t sock){ int mpi_errno; SOCKET s, *sp; MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCK_POST_CLOSE); MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCK_POST_CLOSE); if (!g_init_called) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_INIT, "**sock_init", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CLOSE); return mpi_errno; } if (sock == NULL) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_BAD_SOCK, "**nullptr", "**nullptr %s", "sock"); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CLOSE); return mpi_errno; } if (sock->closing) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**pctwice", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CLOSE); return mpi_errno; } if (sock->type == SOCKI_LISTENER) { s = sock->listen_sock; sp = &sock->listen_sock; } else { s = sock->sock; sp = &sock->sock; } if (s == INVALID_SOCKET) { if (sock->type == SOCKI_LISTENER) mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_BAD_SOCK, "**bad_listenersock", 0); else mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_BAD_SOCK, "**bad_sock", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CLOSE); return mpi_errno; } if (sock->pending_operations != 0) { /*MPIU_Assert(sock->state != 0);*/ /* The state can be 0 if the operation was aborted */#ifdef MPICH_DBG_OUTPUT if (sock->state & SOCKI_CONNECTING) MPIU_DBG_PRINTF(("sock_post_close(%d) called while sock is connecting.\n", MPIDU_Sock_get_sock_id(sock))); if (sock->state & SOCKI_READING) { int i, n = 0; for (i=0; i<sock->read.iovlen; i++) n += sock->read.iov[i].MPID_IOV_LEN; MPIU_DBG_PRINTF(("sock_post_close(%d) called while sock is reading: %d bytes out of %d, index %d, iovlen %d.\n", MPIDU_Sock_get_sock_id(sock), sock->read.total, n, sock->read.index, sock->read.iovlen)); } if (sock->state & SOCKI_WRITING) { int i, n = 0; for (i=0; i<sock->write.iovlen; i++) n += sock->write.iov[i].MPID_IOV_LEN; MPIU_DBG_PRINTF(("sock_post_close(%d) called while sock is writing: %d bytes out of %d, index %d, iovlen %d.\n", MPIDU_Sock_get_sock_id(sock), sock->write.total, n, sock->write.index, sock->write.iovlen)); } fflush(stdout);#endif /* MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CLOSE); return SOCK_ERR_OP_IN_PROGRESS; */ /* posting a close cancels all outstanding operations */ /* It would be nice to cancel the outstanding reads or writes and then close the socket after handling the cancelled operations */ /* But it cannot be done because CancelIo only cancels operations started by the current thread. There is no way to cancel all operations. */ /*CancelIo(sock->sock);*/ } sock->closing = TRUE; /*sock->ct1 = PMPI_Wtime();*/ if (sock->type != SOCKI_LISTENER) /* calling shutdown for a listening socket is not valid */ { /* Mark the socket as non-writable */ if (shutdown(s, SD_SEND) == SOCKET_ERROR) { sock->pending_operations = 0; if (closesocket(s) == SOCKET_ERROR) { mpi_errno = WSAGetLastError(); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**closesocket", "**closesocket %s %d", get_error_string(mpi_errno), mpi_errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CLOSE); return mpi_errno; } *sp = INVALID_SOCKET; if (!PostQueuedCompletionStatus(sock->set, 0, (ULONG_PTR)sock, NULL)) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_BAD_SOCK, "**fail", "**fail %d", GetLastError()); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CLOSE); return mpi_errno; } MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CLOSE); return MPI_SUCCESS; } } /* Cancel any outstanding operations */ sock->pending_operations = 0; if (sock->type == SOCKI_SOCKET) { static char ch; mpi_errno = MPI_SUCCESS; if (sock->state ^ SOCKI_READING) { /* If a read is not already posted, post a bogus one here. */ mpi_errno = MPIDU_Sock_post_read(sock, &ch, 1, 1, NULL); /* ignore this posted read so wait will return an op_close */ sock->pending_operations = 0; } if (mpi_errno == MPI_SUCCESS) { MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CLOSE); return MPI_SUCCESS; } } if (sock->type != SOCKI_LISTENER) /* calling shutdown for a listening socket is not valid */ { /* Mark the socket as non-readable */ if (shutdown(s, SD_RECEIVE) == SOCKET_ERROR) { mpi_errno = WSAGetLastError(); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**shutdown", "**shutdown %s %d", get_error_string(mpi_errno), mpi_errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CLOSE); return mpi_errno; } } /* Close the socket and insert a completion status so wait will return an op_close */ if (closesocket(s) == SOCKET_ERROR) { mpi_errno = WSAGetLastError(); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**closesocket", "**closesocket %s %d", get_error_string(mpi_errno), mpi_errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CLOSE); return mpi_errno; } *sp = INVALID_SOCKET; if (!PostQueuedCompletionStatus(sock->set, 0, (ULONG_PTR)sock, NULL)) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_BAD_SOCK, "**fail", "**fail %d", GetLastError()); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CLOSE); return mpi_errno; } MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CLOSE); return MPI_SUCCESS;}#undef FUNCNAME#define FUNCNAME MPIDU_Sock_post_read#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDU_Sock_post_read(MPIDU_Sock_t sock, void * buf, MPIU_Size_t minbr, MPIU_Size_t maxbr, MPIDU_Sock_progress_update_func_t fn){ int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCK_POST_READ); MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCK_POST_READ); MPIU_UNREFERENCED_ARG(maxbr); sock->read.tiov.MPID_IOV_BUF = (MPID_IOV_BUF_CAST)buf; sock->read.tiov.MPID_IOV_LEN = minbr; mpi_errno = MPIDU_Sock_post_readv(sock, &sock->read.tiov, 1, fn); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_READ); return mpi_errno;}#undef FUNCNAME#define FUNCNAME MPIDU_Sock_post_readv#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDU_Sock_post_readv(MPIDU_Sock_t sock, MPID_IOV * iov, int iov_n, MPIDU_Sock_progress_update_func_t fn){ int iter; int mpi_errno = MPI_SUCCESS; int result;#ifdef MPICH_DBG_OUTPUT int i;#endif DWORD flags = 0; MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCK_POST_READV);#ifdef USE_SOCK_IOV_COPY MPIDI_STATE_DECL(MPID_STATE_MEMCPY);#endif MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCK_POST_READV); if (!g_init_called) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_INIT, "**sock_init", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_READV); return mpi_errno; } /*sock->rt1 = PMPI_Wtime();*/ /* strip any trailing empty buffers */ while (iov_n && iov[iov_n-1].MPID_IOV_LEN == 0) iov_n--; sock->read.total = 0;#ifdef USE_SOCK_IOV_COPY MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(sock->read.iov, iov, sizeof(MPID_IOV) * n); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY);#else sock->read.iov = iov;#endif sock->read.iovlen = iov_n; sock->read.index = 0; sock->read.progress_update = fn; sock->state |= SOCKI_READING;#ifdef MPICH_DBG_OUTPUT for (i=0; i<iov_n; i++) { MPIU_DBG_PRINTF(("sock_post_readv - iov[%d].len = %d\n", i, iov[i].MPID_IOV_LEN)); }#endif for (iter=0; iter<10; iter++) { if ((result = WSARecv(sock->sock, sock->read.iov, iov_n, &sock->read.num_bytes, &flags, &sock->read.ovl, NULL)) != SOCKET_ERROR) { break; } mpi_errno = WSAGetLastError(); if (mpi_errno == WSA_IO_PENDING) { mpi_errno = MPI_SUCCESS; break; } if (mpi_errno == WSAENOBUFS) { WSABUF tmp; tmp.buf = sock->read.iov[0].buf; tmp.len = sock->read.iov[0].len; MPIU_Assert(tmp.len > 0); while (mpi_errno == WSAENOBUFS) { /*printf("[%d] receiving %d bytes\n", __LINE__, tmp.len);fflush(stdout);*/ if ((result = WSARecv(sock->sock, &tmp, 1, &sock->read.num_bytes, &flags, &sock->read.ovl, NULL)) != SOCKET_ERROR) { mpi_errno = MPI_SUCCESS; break; } mpi_errno = WSAGetLastError(); if (mpi_errno == WSA_IO_PENDING) { mpi_errno = MPI_SUCCESS; break; } /*printf("[%d] reducing recv length from %d to %d\n", __LINE__, tmp.len, tmp.len / 2);fflush(stdout);*/ tmp.len = tmp.len / 2; if (tmp.len == 0 && mpi_errno == WSAENOBUFS) { break; } } if (mpi_errno == MPI_SUCCESS) { break; } } if (mpi_errno != WSAEWOULDBLOCK) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**fail", "**fail %s %d", get_error_string(mpi_errno), mpi_errno); break; } Sleep(200); } if (mpi_errno == MPI_SUCCESS) sock->pending_operations++;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -