📄 sock_wait.i
字号:
MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_WRITE, pollinfo->write_nb, pollinfo->user_ptr, event_mpi_errno, mpi_errno, fn_exit); MPIDU_SOCKI_POLLFD_OP_CLEAR(pollfd, pollinfo, POLLOUT); pollinfo->state = MPIDU_SOCKI_STATE_CONNECTED_RO; } /* --END ERROR HANDLING-- */ } else if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RO) { /* * If we are in the read-only state, then we should only get an error if we are looking to read data. If we are not * reading data, then pollfd->fd should be set to -1 and we should not be getting a POLLHUP event. * * There may still be data in the socket buffer, so we will let the POLLIN handler deal with the error. Once all of the * data has been read, the POLLIN handler will change the connection state and remove the connection from the active poll * list. */ MPIU_Assert(pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RO && (pollfd->events & POLLIN) && (pollfd->revents & POLLIN)); } else if (pollinfo->state == MPIDU_SOCKI_STATE_DISCONNECTED) { /* * We should never reach this state because pollfd->fd should be set to -1 if we are in the disconnected state. */ MPIU_Assert(pollinfo->state == MPIDU_SOCKI_STATE_DISCONNECTED && pollfd->fd == -1); } else if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTING) { /* * The process we were connecting to died. Let the POLLOUT handler deal with the error. */ MPIU_Assert(pollinfo->state == MPIDU_SOCKI_STATE_CONNECTING && (pollfd->events & POLLOUT)); pollfd->revents = POLLOUT; } /* --BEGIN ERROR HANDLING-- */ else { mpi_errno = MPIR_Err_create_code( MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|poll|unhandledstate", "**sock|poll|unhandledstate %d", pollinfo->state); goto fn_exit; } /* --END ERROR HANDLING-- */ fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCKI_HANDLE_POLLHUP); return mpi_errno;}/* end MPIDU_Socki_handle_pollhup() */#undef FUNCNAME#define FUNCNAME MPIDU_Socki_handle_pollerr#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)static int MPIDU_Socki_handle_pollerr(struct pollfd * const pollfd, struct pollinfo * const pollinfo){ int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCKI_HANDLE_POLLERR); MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCKI_HANDLE_POLLERR); /* --BEGIN ERROR HANDLING-- */ if (pollinfo->type != MPIDU_SOCKI_TYPE_COMMUNICATION) { mpi_errno = MPIR_Err_create_code( MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|poll|unhandledtype", "**sock|poll|unhandledtype %d", pollinfo->type); goto fn_exit; } /* --END ERROR HANDLING-- */ if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RW) { /* * Stevens suggests that some older version of UNIX did not properly reset so_error, which could allow POLLERR to be * continuously triggered. We remove the socket from the poll list (pollfd->fd = 1) in order to prevent this issue. * Here, we simple check that things are as we expect them to be. */ MPIU_Assert((pollfd->events & (POLLIN | POLLOUT)) || pollfd->fd == -1); /* If a write was posted then cancel it and generate an write completion event */ if (pollfd->events & POLLOUT) { int disconnected; int os_errno; int event_mpi_errno; MPIDU_SOCKI_GET_SOCKET_ERROR(pollinfo, os_errno, mpi_errno, fn_exit); event_mpi_errno = MPIDU_Socki_os_to_mpi_errno(pollinfo, os_errno, FCNAME, __LINE__, &disconnected); /* --BEGIN ERROR HANDLING-- */ if (MPIR_Err_is_fatal(event_mpi_errno)) { mpi_errno = event_mpi_errno; goto fn_exit; } /* --END ERROR HANDLING-- */ MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_WRITE, pollinfo->write_nb, pollinfo->user_ptr, event_mpi_errno, mpi_errno, fn_exit); MPIDU_SOCKI_POLLFD_OP_CLEAR(pollfd, pollinfo, POLLOUT); pollinfo->state = MPIDU_SOCKI_STATE_CONNECTED_RO; } } else if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RO) { /* * If we are in the read-only state, then we should only get an error if we are looking to read data. If we are not * reading data, then pollfd->fd should be set to -1 and we should not be getting a POLLERR event. * * There may still be data in the socket buffer, so we will let the POLLIN handler deal with the error. Once all of the * data has been read, the POLLIN handler will change the connection state and remove the connection from the active poll * list. */ MPIU_Assert(pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RO && (pollfd->events & POLLIN) && (pollfd->revents & POLLIN)); } else if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTING) { /* * The process we were connecting to died. Let the POLLOUT handler deal with the error. */ MPIU_Assert(pollinfo->state == MPIDU_SOCKI_STATE_CONNECTING && (pollfd->events & POLLOUT)); pollfd->revents = POLLOUT; } else if (pollinfo->state == MPIDU_SOCKI_STATE_DISCONNECTED) { /* We are already disconnected! Why are we handling an error? */ MPIU_Assert(pollfd->fd == -1); } /* --BEGIN ERROR HANDLING-- */ else { mpi_errno = MPIR_Err_create_code( MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|poll|unhandledstate", "**sock|poll|unhandledstate %d", pollinfo->state); goto fn_exit; } /* --END ERROR HANDLING-- */ fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCKI_HANDLE_POLLERR); return mpi_errno;}/* end MPIDU_Socki_handle_pollerr() */#undef FUNCNAME#define FUNCNAME MPIDU_Socki_handle_read#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)static int MPIDU_Socki_handle_read(struct pollfd * const pollfd, struct pollinfo * const pollinfo){ int nb; int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_READ); MPIDI_STATE_DECL(MPID_STATE_READV); MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCKI_HANDLE_READ); MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCKI_HANDLE_READ); do { if (pollinfo->read_iov_flag) { MPIDI_FUNC_ENTER(MPID_STATE_READV); nb = readv(pollinfo->fd, pollinfo->read.iov.ptr + pollinfo->read.iov.offset, pollinfo->read.iov.count - pollinfo->read.iov.offset); MPIDI_FUNC_EXIT(MPID_STATE_READV); } else { MPIDI_FUNC_ENTER(MPID_STATE_READ); nb = read(pollinfo->fd, pollinfo->read.buf.ptr + pollinfo->read_nb, pollinfo->read.buf.max - pollinfo->read_nb); MPIDI_FUNC_EXIT(MPID_STATE_READ); } } while (nb < 0 && errno == EINTR); if (nb > 0) { int done; pollinfo->read_nb += nb; done = pollinfo->read_iov_flag ? MPIDU_Socki_adjust_iov(nb, pollinfo->read.iov.ptr, pollinfo->read.iov.count, &pollinfo->read.iov.offset) : (pollinfo->read_nb >= pollinfo->read.buf.min); if (done) { MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_READ, pollinfo->read_nb, pollinfo->user_ptr, MPI_SUCCESS, mpi_errno, fn_exit); MPIDU_SOCKI_POLLFD_OP_CLEAR(pollfd, pollinfo, POLLIN); } } /* --BEGIN ERROR HANDLING-- */ else if (nb == 0) { int event_mpi_errno; event_mpi_errno = MPIR_Err_create_code( MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_CONN_CLOSED, "**sock|connclosed", "**sock|connclosed %d %d", pollinfo->sock_set->id, pollinfo->sock_id); if (MPIDU_SOCKI_POLLFD_OP_ISSET(pollfd, pollinfo, POLLOUT)) { MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_WRITE, pollinfo->write_nb, pollinfo->user_ptr, event_mpi_errno, mpi_errno, fn_exit); } MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_READ, pollinfo->read_nb, pollinfo->user_ptr, event_mpi_errno, mpi_errno, fn_exit); MPIDU_SOCKI_POLLFD_OP_CLEAR(pollfd, pollinfo, POLLIN | POLLOUT); pollinfo->state = MPIDU_SOCKI_STATE_DISCONNECTED; } /* --END ERROR HANDLING-- */ else if (errno == EAGAIN && errno == EWOULDBLOCK) { /* do nothing... */ goto fn_exit; } /* --BEGIN ERROR HANDLING-- */ else { int disconnected; int event_mpi_errno; event_mpi_errno = MPIDU_Socki_os_to_mpi_errno(pollinfo, errno, FCNAME, __LINE__, &disconnected); if (MPIR_Err_is_fatal(event_mpi_errno)) { /* * A serious error occurred. There is no guarantee that the data * structures are still intact. Therefore, we avoid * modifying them. */ mpi_errno = event_mpi_errno; goto fn_exit; } if (disconnected) { if (MPIDU_SOCKI_POLLFD_OP_ISSET(pollfd, pollinfo, POLLOUT)) { MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_WRITE, pollinfo->write_nb, pollinfo->user_ptr, event_mpi_errno, mpi_errno, fn_exit); MPIDU_SOCKI_POLLFD_OP_CLEAR(pollfd, pollinfo, POLLOUT); } pollinfo->state = MPIDU_SOCKI_STATE_DISCONNECTED; } MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_READ, pollinfo->read_nb, pollinfo->user_ptr, event_mpi_errno, mpi_errno, fn_exit); MPIDU_SOCKI_POLLFD_OP_CLEAR(pollfd, pollinfo, POLLIN); } /* --END ERROR HANDLING-- */ fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCKI_HANDLE_READ); return mpi_errno;}/* end MPIDU_Socki_handle_read() */#undef FUNCNAME#define FUNCNAME MPIDU_Socki_handle_write#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)static int MPIDU_Socki_handle_write(struct pollfd * const pollfd, struct pollinfo * const pollinfo){ int nb; int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_WRITE); MPIDI_STATE_DECL(MPID_STATE_WRITEV); MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCKI_HANDLE_WRITE); MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCKI_HANDLE_WRITE); do { if (pollinfo->write_iov_flag) { MPIDI_FUNC_ENTER(MPID_STATE_WRITEV); nb = writev(pollinfo->fd, pollinfo->write.iov.ptr + pollinfo->write.iov.offset, pollinfo->write.iov.count - pollinfo->write.iov.offset); MPIDI_FUNC_EXIT(MPID_STATE_WRITEV); } else { MPIDI_FUNC_ENTER(MPID_STATE_WRITE); nb = write(pollinfo->fd, pollinfo->write.buf.ptr + pollinfo->write_nb, pollinfo->write.buf.max - pollinfo->write_nb); MPIDI_FUNC_EXIT(MPID_STATE_WRITE); } } while (nb < 0 && errno == EINTR); if (nb >= 0) { int done; pollinfo->write_nb += nb; done = pollinfo->write_iov_flag ? MPIDU_Socki_adjust_iov(nb, pollinfo->write.iov.ptr, pollinfo->write.iov.count, &pollinfo->write.iov.offset) : (pollinfo->write_nb >= pollinfo->write.buf.min); if (done) { MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_WRITE, pollinfo->write_nb, pollinfo->user_ptr, MPI_SUCCESS, mpi_errno, fn_exit); MPIDU_SOCKI_POLLFD_OP_CLEAR(pollfd, pollinfo, POLLOUT); } } else if (errno == EAGAIN || errno == EWOULDBLOCK) { /* do nothing... */ goto fn_exit; } /* --BEGIN ERROR HANDLING-- */ else { int disconnected; int event_mpi_errno; event_mpi_errno = MPIDU_Socki_os_to_mpi_errno(pollinfo, errno, FCNAME, __LINE__, &disconnected); if (MPIR_Err_is_fatal(event_mpi_errno)) { /* * A serious error occurred. There is no guarantee that the data structures are still intact. Therefore, we avoid * modifying them. */ mpi_errno = event_mpi_errno; goto fn_exit; } MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_WRITE, pollinfo->write_nb, pollinfo->user_ptr, event_mpi_errno, mpi_errno, fn_exit); MPIDU_SOCKI_POLLFD_OP_CLEAR(pollfd, pollinfo, POLLOUT); if (disconnected) { /* * The connection is dead but data may still be in the socket buffer; thus, we change the state and let * MPIDU_Sock_wait() clean things up. */ pollinfo->state = MPIDU_SOCKI_STATE_CONNECTED_RO; } } /* --END ERROR HANDLING-- */ fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCKI_HANDLE_WRITE); return mpi_errno;}/* end MPIDU_Socki_handle_write() */#undef FUNCNAME#define FUNCNAME MPIDU_Socki_handle_connect#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)static int MPIDU_Socki_handle_connect(struct pollfd * const pollfd, struct pollinfo * const pollinfo){ struct sockaddr_in addr; socklen_t addr_len; int rc; int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCKI_HANDLE_CONNECT); MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCKI_HANDLE_CONNECT); addr_len = sizeof(struct sockaddr_in); rc = getpeername(pollfd->fd, (struct sockaddr *) &addr, &addr_len); if (rc == 0) { MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_CONNECT, 0, pollinfo->user_ptr, MPI_SUCCESS, mpi_errno, fn_exit); pollinfo->state = MPIDU_SOCKI_STATE_CONNECTED_RW; } /* --BEGIN ERROR HANDLING-- */ else { int event_mpi_errno; MPIDU_SOCKI_GET_SOCKET_ERROR(pollinfo, pollinfo->os_errno, mpi_errno, fn_exit); event_mpi_errno = MPIR_Err_create_code( MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_CONN_FAILED, "**sock|connfailed", "**sock|poll|connfailed %d %d %d %s", pollinfo->sock_set->id, pollinfo->sock_id, pollinfo->os_errno, MPIU_Strerror(pollinfo->os_errno)); MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_CONNECT, 0, pollinfo->user_ptr, event_mpi_errno, mpi_errno, fn_exit); pollinfo->state = MPIDU_SOCKI_STATE_DISCONNECTED; } /* --END ERROR HANDLING-- */ MPIDU_SOCKI_POLLFD_OP_CLEAR(pollfd, pollinfo, POLLOUT); fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCKI_HANDLE_CONNECT); return mpi_errno;}/* end MPIDU_Socki_handle_connect() */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -