📄 sock_wait.i
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* * (C) 2001 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. *//* Make sure that we can properly ensure atomic access to the poll routine */#ifdef MPICH_IS_THREADED#if (USE_THREAD_IMPL != MPICH_THREAD_IMPL_GLOBAL_MUTEX)#error selected multi-threaded implementation is not supported#endif#endifstatic int MPIDU_Socki_handle_pollhup(struct pollfd * const pollfd, struct pollinfo * const pollinfo);static int MPIDU_Socki_handle_pollerr(struct pollfd * const pollfd, struct pollinfo * const pollinfo);static int MPIDU_Socki_handle_read(struct pollfd * const pollfd, struct pollinfo * const pollinfo);static int MPIDU_Socki_handle_write(struct pollfd * const pollfd, struct pollinfo * const pollinfo);static int MPIDU_Socki_handle_connect(struct pollfd * const pollfd, struct pollinfo * const pollinfo);/* * MPIDU_Sock_wait() * * NOTES: * * For fatal errors, the state of the connection progresses directly to the * failed state and the connection is marked inactive in * the poll array. Under normal conditions, the fatal error should result in * the termination of the process; but, if that * doesn't happen, we try to leave the implementation in a somewhat sane state. */#undef FUNCNAME#define FUNCNAME MPIDU_Sock_wait#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)int MPIDU_Sock_wait(struct MPIDU_Sock_set * sock_set, int millisecond_timeout, struct MPIDU_Sock_event * eventp){ int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCK_WAIT); MPIDI_STATE_DECL(MPID_STATE_POLL); MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCK_WAIT); for (;;) { int elem=0; /* Keep compiler happy */ int n_fds; int n_elems; int found_active_elem = FALSE; mpi_errno = MPIDU_Socki_event_dequeue(sock_set, &elem, eventp); if (mpi_errno == MPI_SUCCESS) { struct pollinfo * pollinfo; int flags; if (eventp->op_type != MPIDU_SOCK_OP_CLOSE) { break; } pollinfo = &sock_set->pollinfos[elem]; /* * Attempt to set socket back to blocking. This *should* prevent * any data in the socket send buffer from being * discarded. Instead close() will block until the buffer is * flushed or the connection timeouts and is considered * lost. Theoretically, this could cause the MPIDU_Sock_wait() to * hang indefinitely; however, the calling code * should ensure this will not happen by going through a shutdown * protocol before posting a close operation. * * FIXME: If the attempt to set the socket back to blocking fails, * we presently ignore it. Should we return an * error? We need to define acceptible data loss at close time. * MS Windows has worse problems with this, so it * may not be possible to make any guarantees. */ flags = fcntl(pollinfo->fd, F_GETFL, 0); if (flags != -1) { fcntl(pollinfo->fd, F_SETFL, flags & ~O_NONBLOCK); } /* FIXME: return code? If an error occurs do we return it instead of the error specified in the event? */ close(pollinfo->fd); MPIDU_Socki_sock_free(pollinfo->sock); break; } for(;;) {# ifndef MPICH_IS_THREADED { MPIDI_FUNC_ENTER(MPID_STATE_POLL); n_fds = poll(sock_set->pollfds, sock_set->poll_array_elems, millisecond_timeout); MPIDI_FUNC_EXIT(MPID_STATE_POLL); }# else /* MPICH_IS_THREADED */ { /* If we've enabled runtime checking of the thread level, then test for that and if we are *not* multithreaded, just use the same code as above. Otherwise, use multithreaded code (and we don't then need the MPIU_THREAD_CHECK_BEGIN/END macros) */#ifdef HAVE_RUNTIME_THREADCHECK if (!MPIR_ThreadInfo.isThreaded) { MPIDI_FUNC_ENTER(MPID_STATE_POLL); n_fds = poll(sock_set->pollfds, sock_set->poll_array_elems, millisecond_timeout); MPIDI_FUNC_EXIT(MPID_STATE_POLL); } else#endif { /* * First try a non-blocking poll to see if any immediate * progress can be made. This avoids the lock manipulation * overhead. */ MPIDI_FUNC_ENTER(MPID_STATE_POLL); n_fds = poll(sock_set->pollfds, sock_set->poll_array_elems, 0); MPIDI_FUNC_EXIT(MPID_STATE_POLL); if (n_fds == 0 && millisecond_timeout != 0) { int pollfds_active_elems = sock_set->poll_array_elems; sock_set->pollfds_active = sock_set->pollfds; /* Release the lock so that other threads may make progress while this thread waits for something to do */ MPIU_DBG_MSG(THREAD,TYPICAL,"Exit global critical section"); MPID_Thread_mutex_unlock(&MPIR_ThreadInfo.global_mutex); MPIDI_FUNC_ENTER(MPID_STATE_POLL); n_fds = poll(sock_set->pollfds_active, pollfds_active_elems, millisecond_timeout); MPIDI_FUNC_EXIT(MPID_STATE_POLL); /* Reaquire the lock before processing any of the information returned from poll */ MPIU_DBG_MSG(THREAD,TYPICAL,"Enter global critical section"); MPID_Thread_mutex_lock(&MPIR_ThreadInfo.global_mutex); /* * Update pollfds array if changes were posted while we * were blocked in poll */ if (sock_set->pollfds_updated) { mpi_errno = MPIDI_Sock_update_sock_set( sock_set, pollfds_active_elems ); } sock_set->pollfds_active = NULL; sock_set->wakeup_posted = FALSE; } } /* else !MPIR_ThreadInfo.isThreaded */ } # endif /* MPICH_IS_THREADED */ if (n_fds > 0) { break; } else if (n_fds == 0) { mpi_errno = MPIDU_SOCK_ERR_TIMEOUT; goto fn_exit; } else if (errno == EINTR) { if (millisecond_timeout != MPIDU_SOCK_INFINITE_TIME) { mpi_errno = MPIDU_SOCK_ERR_TIMEOUT; goto fn_exit; } continue; } /* --BEGIN ERROR HANDLING-- */ else if (errno == ENOMEM || errno == EAGAIN) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_NOMEM, "**sock|osnomem", NULL); goto fn_exit; } else { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|oserror", "**sock|poll|oserror %d %s", errno, MPIU_Strerror(errno)); goto fn_exit; } /* --END ERROR HANDLING-- */ } elem = sock_set->starting_elem; n_elems = sock_set->poll_array_elems; while (n_fds > 0 && n_elems > 0) { /* * Acquire pointers to the pollfd and pollinfo structures for the next element * * NOTE: These pointers could become stale, if a new sock were to be allocated during the processing of the element. * At present, none of the handler routines allocate a sock, so the issue does not arise. */ struct pollfd * const pollfd = &sock_set->pollfds[elem]; struct pollinfo * const pollinfo = &sock_set->pollinfos[elem]; MPIU_Assert((pollfd->events & (POLLIN | POLLOUT)) || pollfd->fd == -1); MPIU_Assert(pollfd->fd >= 0 || pollfd->fd == -1); if (pollfd->fd < 0 || pollfd->revents == 0) { /* This optimization assumes that most FDs will not have a pending event. */ n_elems -= 1; elem = (elem + 1 < sock_set->poll_array_elems) ? elem + 1 : 0; continue; } if (found_active_elem == FALSE) { found_active_elem = TRUE; sock_set->starting_elem = (elem + 1 < sock_set->poll_array_elems) ? elem + 1 : 0; } if (pollfd->revents & POLLNVAL) { mpi_errno = MPIR_Err_create_code( MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|badhandle", "**sock|poll|badhandle %d %d %d %d", pollinfo->sock_set->id, pollinfo->sock_id, pollfd->fd, pollinfo->fd); goto fn_exit; } /* --BEGIN ERROR HANDLING-- */ if (pollfd->revents & POLLHUP) { mpi_errno = MPIDU_Socki_handle_pollhup(pollfd, pollinfo); if (MPIR_Err_is_fatal(mpi_errno)) { goto fn_exit; } } /* According to Stevens, some errors are reported as normal data (POLLIN) and some are reported with POLLERR. */ if (pollfd->revents & POLLERR) { mpi_errno = MPIDU_Socki_handle_pollerr(pollfd, pollinfo); if (MPIR_Err_is_fatal(mpi_errno)) { goto fn_exit; } } /* --END ERROR HANDLING-- */ if (pollfd->revents & POLLIN) { if (pollinfo->type == MPIDU_SOCKI_TYPE_COMMUNICATION) { if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RW || pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RO) { mpi_errno = MPIDU_Socki_handle_read(pollfd, pollinfo); /* --BEGIN ERROR HANDLING-- */ if (MPIR_Err_is_fatal(mpi_errno)) { goto fn_exit; } /* --END ERROR HANDLING-- */ } /* --BEGIN ERROR HANDLING-- */ else { mpi_errno = MPIR_Err_create_code( MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|poll|unhandledstate", "**sock|poll|unhandledstate %d", pollinfo->state); goto fn_exit; } /* --END ERROR HANDLING-- */ } else if (pollinfo->type == MPIDU_SOCKI_TYPE_LISTENER) { pollfd->events &= ~POLLIN; MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_ACCEPT, 0, pollinfo->user_ptr, MPI_SUCCESS, mpi_errno, fn_exit); } else if ((MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE) && pollinfo->type == MPIDU_SOCKI_TYPE_INTERRUPTER) { char c[16]; int nb; do { nb = read(pollfd->fd, c, 16); } while (nb > 0 || (nb < 0 && errno == EINTR)); } /* --BEGIN ERROR HANDLING-- */ else { mpi_errno = MPIR_Err_create_code( MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|poll|unhandledtype", "**sock|poll|unhandledtype %d", pollinfo->type); goto fn_exit; } /* --END ERROR HANDLING-- */ } if (pollfd->revents & POLLOUT) { if (pollinfo->type == MPIDU_SOCKI_TYPE_COMMUNICATION) { if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RW) { mpi_errno = MPIDU_Socki_handle_write(pollfd, pollinfo); /* --BEGIN ERROR HANDLING-- */ if (MPIR_Err_is_fatal(mpi_errno)) { goto fn_exit; } /* --END ERROR HANDLING-- */ } else if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTING) { mpi_errno = MPIDU_Socki_handle_connect(pollfd, pollinfo); /* --BEGIN ERROR HANDLING-- */ if (MPIR_Err_is_fatal(mpi_errno)) { goto fn_exit; } /* --END ERROR HANDLING-- */ } /* --BEGIN ERROR HANDLING-- */ else { mpi_errno = MPIR_Err_create_code( MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|poll|unhandledstate", "**sock|poll|unhandledstate %d", pollinfo->state); goto fn_exit; } /* --END ERROR HANDLING-- */ } /* --BEGIN ERROR HANDLING-- */ else { mpi_errno = MPIR_Err_create_code( MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|poll|unhandledtype", "**sock|poll|unhandledtype %d", pollinfo->type); goto fn_exit; } /* --END ERROR HANDLING-- */ } n_fds -= 1; n_elems -= 1; elem = (elem + 1 < sock_set->poll_array_elems) ? elem + 1 : 0; } } fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_WAIT); return mpi_errno;}#undef FUNCNAME#define FUNCNAME MPIDU_Socki_handle_pollhup#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)static int MPIDU_Socki_handle_pollhup(struct pollfd * const pollfd, struct pollinfo * const pollinfo){ int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCKI_HANDLE_POLLHUP); MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCKI_HANDLE_POLLHUP); if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RW) { /* * If a write was posted then cancel it and generate an connection closed event. If a read is posted, it will be handled * by the POLLIN handler. */ /* --BEGIN ERROR HANDLING-- */ if (pollfd->events & POLLOUT) { int event_mpi_errno; event_mpi_errno = MPIR_Err_create_code( MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_CONN_CLOSED, "**sock|connclosed", "**sock|connclosed %d %d", pollinfo->sock_set->id, pollinfo->sock_id);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -