📄 sock.c
字号:
pEnv = getenv("MPICH_NETMASK"); if (pEnv != NULL) { MPIU_Strncpy(pszNetMask, pEnv, 50); token = strtok(pszNetMask, "/"); if (token != NULL) { token = strtok(NULL, "\n"); if (token != NULL) { nNicNet = GetIP(pszNetMask); nNicMask = GetMask(token); use_subnet = 1; } } } while (!connected) { host[0] = '\0'; mpi_errno = MPIU_Str_get_string(&connect_state->cur_host, host, 100); /*printf("got <%s> out of <%s>\n", host, connect_state->host_description);fflush(stdout);*/ if (mpi_errno != MPIU_STR_SUCCESS) { if (mpi_errno == MPIU_STR_NOMEM) mpi_errno = MPIR_Err_create_code(connect_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_NOMEM, "**nomem", 0); else mpi_errno = MPIR_Err_create_code(connect_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**fail", "**fail %d", mpi_errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CONNECT); return mpi_errno; } if (host[0] == '\0') { mpi_errno = MPIR_Err_create_code(connect_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock_connect", "**sock_connect %s %d %s %d", connect_state->host_description, port, "exhausted all endpoints", -1); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CONNECT); return mpi_errno; } sockAddr.sin_family = AF_INET; sockAddr.sin_addr.s_addr = inet_addr(host); if (sockAddr.sin_addr.s_addr == INADDR_NONE || sockAddr.sin_addr.s_addr == 0) { lphost = gethostbyname(host); if (lphost != NULL) sockAddr.sin_addr.s_addr = ((struct in_addr *)lphost->h_addr)->s_addr; else { mpi_errno = WSAGetLastError(); connect_errno = MPIR_Err_create_code(connect_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**gethostbyname", "**gethostbyname %s %d", get_error_string(mpi_errno), mpi_errno); /* MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CONNECT); return mpi_errno; */ continue; } } /* if a subnet was specified, make sure the currently extracted ip falls in the subnet */ if (use_subnet) { if ((sockAddr.sin_addr.s_addr & nNicMask) != nNicNet) { /* this ip does not match, move to the next */ continue; } } sockAddr.sin_port = htons((u_short)port); /* connect */ for (i=0; i<5; i++) { /*printf("connecting to %s\n", host);fflush(stdout);*/ if (connect(connect_state->sock, (SOCKADDR*)&sockAddr, sizeof(sockAddr)) == SOCKET_ERROR) { int random_time; int error = WSAGetLastError(); if ((error != WSAECONNREFUSED && error != WSAETIMEDOUT) || i == 4) { connect_errno = MPIR_Err_create_code(connect_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_INIT, "**sock_connect", "**sock_connect %s %d %s %d", host, port, get_error_string(error), error); /* MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CONNECT); return mpi_errno; */ /* This code assumes that all errors other than WSAECONNREFUSED and WSAETIMEDOUT should not cause a connection retry */ /* FIXME: Is this correct for resource errors like WSAENOBUFS or an interrupted operation? */ /* Should all errors cause a retry? or just WSAECONNREFUSED? or a subset of the possible errors? */ /* The reason for not retrying on all errors is that it slows down connection time for multi-nic /* hosts that cannot be contacted on the first address listed. */ break; } /* Close the socket with an error and create a new one */ if (closesocket(connect_state->sock) == SOCKET_ERROR) { error = WSAGetLastError(); connect_errno = MPIR_Err_create_code(connect_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_INIT, "**sock_connect", "**sock_connect %s %d %s %d", host, port, get_error_string(error), error); /* MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CONNECT); return mpi_errno; */ break; } connect_state->sock = INVALID_SOCKET; mpi_errno = easy_create(&connect_state->sock, ADDR_ANY, INADDR_ANY); if (mpi_errno != MPI_SUCCESS) { /* Warning: Loss of information. We have two error stacks, one in connect_errno and the other in mpi_errno, that cannot be joined given the current error code interface. */ /*connect_errno = MPIR_Err_create_code(connect_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_INIT, "**sock_create", 0);*/ mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_INIT, "**sock_create", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CONNECT); return mpi_errno; } random_time = (int)((double)rand() / (double)RAND_MAX * 250.0); Sleep(random_time); } else { /*printf("connect to %s:%d succeeded.\n", host, port);fflush(stdout);*/ connected = 1; break; } } } /* set the socket to non-blocking */ optval = TRUE; ioctlsocket(connect_state->sock, FIONBIO, &optval); connect_state->user_ptr = user_ptr; connect_state->type = SOCKI_SOCKET; connect_state->state = SOCKI_CONNECTING; connect_state->set = set; /* associate the socket with the completion port */ if (CreateIoCompletionPort((HANDLE)connect_state->sock, set, (ULONG_PTR)connect_state, g_num_cp_threads) == NULL) { mpi_errno = GetLastError(); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_INIT, "**iocp", "**iocp %s %d", get_error_string(mpi_errno), mpi_errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CONNECT); return mpi_errno; } connect_state->pending_operations++; /* post a completion event so the sock_post_connect can be notified through sock_wait */ PostQueuedCompletionStatus(set, 0, (ULONG_PTR)connect_state, &connect_state->write.ovl); *sock = connect_state; /*printf("connected socket %d\n", connect_state->sock);fflush(stdout);*/ MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CONNECT); return MPI_SUCCESS;}#undef FUNCNAME#define FUNCNAME MPIDU_Sock_set_user_ptr#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDU_Sock_set_user_ptr(MPIDU_Sock_t sock, void * user_ptr){ int mpi_errno; MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCK_SET_USER_PTR); MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCK_SET_USER_PTR); if (!g_init_called) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_INIT, "**sock_init", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_SET_USER_PTR); return mpi_errno; } if (sock == NULL) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_BAD_SOCK, "**bad_sock", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_SET_USER_PTR); return mpi_errno; } sock->user_ptr = user_ptr; MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_SET_USER_PTR); return MPI_SUCCESS;}#undef FUNCNAME#define FUNCNAME MPIDU_Sock_post_close#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDU_Sock_post_close(MPIDU_Sock_t sock){ int mpi_errno; SOCKET s, *sp; MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCK_POST_CLOSE); MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCK_POST_CLOSE); if (!g_init_called) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_INIT, "**sock_init", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CLOSE); return mpi_errno; } if (sock == NULL) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_BAD_SOCK, "**nullptr", "**nullptr %s", "sock"); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CLOSE); return mpi_errno; } if (sock->closing) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**pctwice", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CLOSE); return mpi_errno; } if (sock->type == SOCKI_LISTENER) { s = sock->listen_sock; sp = &sock->listen_sock; } else { s = sock->sock; sp = &sock->sock; } if (s == INVALID_SOCKET) { if (sock->type == SOCKI_LISTENER) mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_BAD_SOCK, "**bad_listenersock", 0); else mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_BAD_SOCK, "**bad_sock", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CLOSE); return mpi_errno; } if (sock->pending_operations != 0) { /*MPIU_Assert(sock->state != 0);*/ /* The state can be 0 if the operation was aborted */#ifdef MPICH_DBG_OUTPUT if (sock->state & SOCKI_CONNECTING) MPIU_DBG_PRINTF(("sock_post_close(%d) called while sock is connecting.\n", MPIDU_Sock_get_sock_id(sock))); if (sock->state & SOCKI_READING) { int i, n = 0; for (i=0; i<sock->read.iovlen; i++) n += sock->read.iov[i].MPID_IOV_LEN; MPIU_DBG_PRINTF(("sock_post_close(%d) called while sock is reading: %d bytes out of %d, index %d, iovlen %d.\n", MPIDU_Sock_get_sock_id(sock), sock->read.total, n, sock->read.index, sock->read.iovlen)); } if (sock->state & SOCKI_WRITING) { int i, n = 0; for (i=0; i<sock->write.iovlen; i++) n += sock->write.iov[i].MPID_IOV_LEN; MPIU_DBG_PRINTF(("sock_post_close(%d) called while sock is writing: %d bytes out of %d, index %d, iovlen %d.\n", MPIDU_Sock_get_sock_id(sock), sock->write.total, n, sock->write.index, sock->write.iovlen)); } fflush(stdout);#endif /* MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CLOSE); return SOCK_ERR_OP_IN_PROGRESS; */ /* posting a close cancels all outstanding operations */ /* It would be nice to cancel the outstanding reads or writes and then close the socket after handling the cancelled operations */ /* But it cannot be done because CancelIo only cancels operations started by the current thread. There is no way to cancel all operations. */ /*CancelIo(sock->sock);*/ } sock->closing = TRUE; /*sock->ct1 = PMPI_Wtime();*/ if (sock->type != SOCKI_LISTENER) /* calling shutdown for a listening socket is not valid */ { /* Mark the socket as non-writable */ if (shutdown(s, SD_SEND) == SOCKET_ERROR) { sock->pending_operations = 0; /*printf("closing socket %d\n", s);fflush(stdout);*/ if (closesocket(s) == SOCKET_ERROR) { mpi_errno = WSAGetLastError(); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**closesocket", "**closesocket %s %d", get_error_string(mpi_errno), mpi_errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CLOSE); return mpi_errno; } *sp = INVALID_SOCKET; if (!PostQueuedCompletionStatus(sock->set, 0, (ULONG_PTR)sock, NULL)) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_BAD_SOCK, "**fail", "**fail %d", GetLastError()); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CLOSE); return mpi_errno; } MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CLOSE); return MPI_SUCCESS; } } /* Cancel any outstanding operations */ sock->pending_operations = 0; if (sock->type == SOCKI_SOCKET) { static char ch; mpi_errno = MPI_SUCCESS; if (sock->state ^ SOCKI_READING) { /* If a read is not already posted, post a bogus one here. */ mpi_errno = MPIDU_Sock_post_read(sock, &ch, 1, 1, NULL); /* ignore this posted read so wait will return an op_close */ sock->pending_operations = 0; } if (mpi_errno == MPI_SUCCESS) { MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CLOSE); return MPI_SUCCESS; } } if (sock->type != SOCKI_LISTENER) /* calling shutdown for a listening socket is not valid */ { /* Mark the socket as non-readable */ if (shutdown(s, SD_RECEIVE) == SOCKET_ERROR) { mpi_errno = WSAGetLastError(); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**shutdown", "**shutdown %s %d", get_error_string(mpi_errno), mpi_errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CLOSE); return mpi_errno; } } /* Close the socket and insert a completion status so wait will return an op_close */ /*printf("closing socket %d\n", s);fflush(stdout);*/ if (closesocket(s) == SOCKET_ERROR) { mpi_errno = WSAGetLastError(); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**closesocket", "**closesocket %s %d", get_error_string(mpi_errno), mpi_errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CLOSE); return mpi_errno; } *sp = INVALID_SOCKET; if (!PostQueuedCompletionStatus(sock->set, 0, (ULONG_PTR)sock, NULL)) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_BAD_SOCK, "**fail", "**fail %d", GetLastError()); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CLOSE); return mpi_errno; } MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_CLOSE); return MPI_SUCCESS;}#undef FUNCNAME#define FUNCNAME MPIDU_Sock_post_read#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDU_Sock_post_read(MPIDU_Sock_t sock, void * buf, MPIU_Size_t minbr, MPIU_Size_t maxbr, MPIDU_Sock_progress_update_func_t fn){ int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCK_POST_READ); MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCK_POST_READ); MPIU_UNREFERENCED_ARG(maxbr); sock->read.tiov.MPID_IOV_BUF = (MPID_IOV_BUF_CAST)buf; sock->read.tiov.MPID_IOV_LEN = minbr; mpi_errno = MPIDU_Sock_post_readv(sock, &sock->read.tiov, 1, fn); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_READ); return mpi_errno;}#undef FUNCNAME#define FUNCNAME MPIDU_Sock_post_readv#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDU_Sock_post_readv(MPIDU_Sock_t sock, MPID_IOV * iov, int iov_n, MPIDU_Sock_progress_update_func_t fn){ int iter; int mpi_errno = MPI_SUCCESS; int result;#ifdef MPICH_DBG_OUTPUT int i;#endif DWORD flags = 0; MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCK_POST_READV);#ifdef USE_SOCK_IOV_COPY MPIDI_STATE_DECL(MPID_STATE_MEMCPY);#endif MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCK_POST_READV); if (!g_init_called) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_INIT, "**sock_init", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_POST_READV); return mpi_errno; } /*sock->rt1 = PMPI_Wtime();*/ /* strip any trailing empty buffers */ while (iov_n && iov[iov_n-1].MPID_IOV_LEN == 0) iov_n--; sock->read.total = 0;#ifdef USE_SOCK_IOV_COPY
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -