📄 sock_wait.i
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* * (C) 2001 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */static int MPIDU_Socki_handle_pollhup(struct pollfd * const pollfd, struct pollinfo * const pollinfo);static int MPIDU_Socki_handle_pollerr(struct pollfd * const pollfd, struct pollinfo * const pollinfo);static int MPIDU_Socki_handle_read(struct pollfd * const pollfd, struct pollinfo * const pollinfo);static int MPIDU_Socki_handle_write(struct pollfd * const pollfd, struct pollinfo * const pollinfo);static int MPIDU_Socki_handle_connect(struct pollfd * const pollfd, struct pollinfo * const pollinfo);/* * MPIDU_Sock_wait() * * NOTES: * * For fatal errors, the state of the connection progresses directly to the failed state and the connection is marked inactive in * the poll array. Under normal conditions, the fatal error should result in the termination of the process; but, if that * doesn't happen, we try to leave the implementation in a somewhat sane state. */#undef FUNCNAME#define FUNCNAME MPIDU_Sock_wait#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)int MPIDU_Sock_wait(struct MPIDU_Sock_set * sock_set, int millisecond_timeout, struct MPIDU_Sock_event * eventp){ int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCK_WAIT); MPIDI_STATE_DECL(MPID_STATE_POLL); MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCK_WAIT); for (;;) { int elem; int n_fds; int n_elems; int found_active_elem = FALSE; if (MPIDU_Socki_event_dequeue(sock_set, &elem, eventp) == MPI_SUCCESS) { struct pollinfo * pollinfo; int flags; if (eventp->op_type != MPIDU_SOCK_OP_CLOSE) { break; } pollinfo = &sock_set->pollinfos[elem]; /* * Attempt to set socket back to blocking. This *should* prevent any data in the socket send buffer from being * discarded. Instead close() will block until the buffer is flushed or the connection timeouts and is considered * lost. Theoretically, this could cause the MPIDU_Sock_wait() to hang indefinitely; however, the calling code * should ensure this will not happen by going through a shutdown protocol before posting a close operation. * * FIXME: If the attempt to set the socket back to blocking fails, we presently ignore it. Should we return an * error? We need to define acceptible data loss at close time. MS Windows has worse problems with this, so it * may not be possible to make any guarantees. */ flags = fcntl(pollinfo->fd, F_GETFL, 0); if (flags != -1) { fcntl(pollinfo->fd, F_SETFL, flags & ~O_NONBLOCK); } /* FIXME: return code? If an error occurs do we return it instead of the error specified in the event? */ close(pollinfo->fd); MPIDU_Socki_sock_free(pollinfo->sock); break; } for(;;) {# if (MPICH_THREAD_LEVEL < MPI_THREAD_MULTIPLE) { MPIDI_FUNC_ENTER(MPID_STATE_POLL); n_fds = poll(sock_set->pollfds, sock_set->poll_array_elems, millisecond_timeout); MPIDI_FUNC_EXIT(MPID_STATE_POLL); }# else /* (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE) */ { /* * First try a non-blocking poll to see if any immediate progress can be made. This avoids the lock manipulation * overhead. */ MPIDI_FUNC_ENTER(MPID_STATE_POLL); n_fds = poll(sock_set->pollfds, sock_set->poll_array_elems, 0); MPIDI_FUNC_EXIT(MPID_STATE_POLL); if (n_fds == 0 && millisecond_timeout != 0) { int pollfds_active_elems = sock_set->poll_array_elems; sock_set->pollfds_active = sock_set->pollfds; # if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX) { /* Release the lock so that other threads may make progress while this thread waits for something to do */ MPIU_DBG_MSG(THREAD,TYPICAL,"Exit global critical section"); MPID_Thread_mutex_unlock(&MPIR_Process.global_mutex); }# elif (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MONITOR) { /* FIXME: this code is an experiment and is not even close to correct. */ if (MPIU_Monitor_closet_get_occupany_count(MPIR_Process.global_closet) == 0) { MPIU_Monitor_exit(&MPIR_Process.global_monitor); } else { MPIU_Monitor_continue(&MPIR_Process.global_monitor, &MPIR_Process.global_closet); } }# else# error selected multi-threaded implementation is not supported# endif MPIDI_FUNC_ENTER(MPID_STATE_POLL); n_fds = poll(sock_set->pollfds_active, pollfds_active_elems, millisecond_timeout); MPIDI_FUNC_EXIT(MPID_STATE_POLL); # if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX) { /* Reaquire the lock before processing any of the information returned from poll */ MPIU_DBG_MSG(THREAD,TYPICAL,"Enter global critical section"); MPID_Thread_mutex_lock(&MPIR_Process.global_mutex); }# elif (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MONITOR) { MPIU_Monitor_enter(&MPIR_Process.global_monitor); }# else# error selected multi-threaded implementation is not supported# endif /* * Update pollfds array if changes were posted while we were blocked in poll */ if (sock_set->pollfds_updated) { for (elem = 0; elem < sock_set->poll_array_elems; elem++) { sock_set->pollfds[elem].events = sock_set->pollinfos[elem].pollfd_events; if ((sock_set->pollfds[elem].events & (POLLIN | POLLOUT)) != 0) { sock_set->pollfds[elem].fd = sock_set->pollinfos[elem].fd; } else { sock_set->pollfds[elem].fd = -1; } if (elem < pollfds_active_elems) { if (sock_set->pollfds_active == sock_set->pollfds) { sock_set->pollfds[elem].revents &= ~(POLLIN | POLLOUT) | sock_set->pollfds[elem].events; } else { sock_set->pollfds[elem].revents = sock_set->pollfds_active[elem].revents & (~(POLLIN | POLLOUT) | sock_set->pollfds[elem].events); } } else { sock_set->pollfds[elem].revents = 0; } } if (sock_set->pollfds_active != sock_set->pollfds) { MPIU_Free(sock_set->pollfds_active); } sock_set->pollfds_updated = FALSE; } sock_set->pollfds_active = NULL; sock_set->wakeup_posted = FALSE; } }# endif /* (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE) */ if (n_fds > 0) { break; } else if (n_fds == 0) { mpi_errno = MPIDU_SOCK_ERR_TIMEOUT; goto fn_exit; } else if (errno == EINTR) { if (millisecond_timeout != MPIDU_SOCK_INFINITE_TIME) { mpi_errno = MPIDU_SOCK_ERR_TIMEOUT; goto fn_exit; } continue; } /* --BEGIN ERROR HANDLING-- */ else if (errno == ENOMEM || errno == EAGAIN) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_NOMEM, "**sock|osnomem", NULL); goto fn_exit; } else { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|oserror", "**sock|poll|oserror %d %s", errno, MPIU_Strerror(errno)); goto fn_exit; } /* --END ERROR HANDLING-- */ } elem = sock_set->starting_elem; n_elems = sock_set->poll_array_elems; while (n_fds > 0 && n_elems > 0) { /* * Acquire pointers to the pollfd and pollinfo structures for the next element * * NOTE: These pointers could become stale, if a new sock were to be allocated during the processing of the element. * At present, none of the handler routines allocate a sock, so the issue does not arise. */ struct pollfd * const pollfd = &sock_set->pollfds[elem]; struct pollinfo * const pollinfo = &sock_set->pollinfos[elem]; MPIU_Assert((pollfd->events & (POLLIN | POLLOUT)) || pollfd->fd == -1); MPIU_Assert(pollfd->fd >= 0 || pollfd->fd == -1); if (pollfd->fd < 0 || pollfd->revents == 0) { /* This optimization assumes that most FDs will not have a pending event. */ n_elems -= 1; elem = (elem + 1 < sock_set->poll_array_elems) ? elem + 1 : 0; continue; } if (found_active_elem == FALSE) { found_active_elem = TRUE; sock_set->starting_elem = (elem + 1 < sock_set->poll_array_elems) ? elem + 1 : 0; } if (pollfd->revents & POLLNVAL) { mpi_errno = MPIR_Err_create_code( MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|badhandle", "**sock|poll|badhandle %d %d %d %d", pollinfo->sock_set->id, pollinfo->sock_id, pollfd->fd, pollinfo->fd); goto fn_exit; } /* --BEGIN ERROR HANDLING-- */ if (pollfd->revents & POLLHUP) { mpi_errno = MPIDU_Socki_handle_pollhup(pollfd, pollinfo); if (MPIR_Err_is_fatal(mpi_errno)) { goto fn_exit; } } /* According to Stevens, some errors are reported as normal data (POLLIN) and some are reported with POLLERR. */ if (pollfd->revents & POLLERR) { mpi_errno = MPIDU_Socki_handle_pollerr(pollfd, pollinfo); if (MPIR_Err_is_fatal(mpi_errno)) { goto fn_exit; } } /* --END ERROR HANDLING-- */ if (pollfd->revents & POLLIN) { if (pollinfo->type == MPIDU_SOCKI_TYPE_COMMUNICATION) { if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RW || pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RO) { mpi_errno = MPIDU_Socki_handle_read(pollfd, pollinfo); /* --BEGIN ERROR HANDLING-- */ if (MPIR_Err_is_fatal(mpi_errno)) { goto fn_exit; } /* --END ERROR HANDLING-- */ } /* --BEGIN ERROR HANDLING-- */ else { mpi_errno = MPIR_Err_create_code( MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|poll|unhandledstate", "**sock|poll|unhandledstate %d", pollinfo->state); goto fn_exit; } /* --END ERROR HANDLING-- */ } else if (pollinfo->type == MPIDU_SOCKI_TYPE_LISTENER) { pollfd->events &= ~POLLIN; MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_ACCEPT, 0, pollinfo->user_ptr, MPI_SUCCESS, mpi_errno, fn_exit); } else if ((MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE) && pollinfo->type == MPIDU_SOCKI_TYPE_INTERRUPTER) { char c[16]; int nb; do { nb = read(pollfd->fd, c, 16); } while (nb > 0 || (nb < 0 && errno == EINTR)); } /* --BEGIN ERROR HANDLING-- */ else { mpi_errno = MPIR_Err_create_code( MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|poll|unhandledtype", "**sock|poll|unhandledtype %d", pollinfo->type); goto fn_exit; } /* --END ERROR HANDLING-- */ } if (pollfd->revents & POLLOUT) { if (pollinfo->type == MPIDU_SOCKI_TYPE_COMMUNICATION) { if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RW) { mpi_errno = MPIDU_Socki_handle_write(pollfd, pollinfo); /* --BEGIN ERROR HANDLING-- */ if (MPIR_Err_is_fatal(mpi_errno)) { goto fn_exit; } /* --END ERROR HANDLING-- */ } else if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTING) { mpi_errno = MPIDU_Socki_handle_connect(pollfd, pollinfo); /* --BEGIN ERROR HANDLING-- */ if (MPIR_Err_is_fatal(mpi_errno)) { goto fn_exit; } /* --END ERROR HANDLING-- */ } /* --BEGIN ERROR HANDLING-- */ else { mpi_errno = MPIR_Err_create_code( MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|poll|unhandledstate", "**sock|poll|unhandledstate %d", pollinfo->state); goto fn_exit; } /* --END ERROR HANDLING-- */ } /* --BEGIN ERROR HANDLING-- */ else { mpi_errno = MPIR_Err_create_code( MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|poll|unhandledtype", "**sock|poll|unhandledtype %d", pollinfo->type); goto fn_exit; } /* --END ERROR HANDLING-- */ } n_fds -= 1; n_elems -= 1; elem = (elem + 1 < sock_set->poll_array_elems) ? elem + 1 : 0; } } fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_WAIT); return mpi_errno;}#undef FUNCNAME#define FUNCNAME MPIDU_Socki_handle_pollhup#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)static int MPIDU_Socki_handle_pollhup(struct pollfd * const pollfd, struct pollinfo * const pollinfo){ int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCKI_HANDLE_POLLHUP); MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCKI_HANDLE_POLLHUP); if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RW) { /* * If a write was posted then cancel it and generate an connection closed event. If a read is posted, it will be handled * by the POLLIN handler. */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -