📄 socki_util.i
字号:
/* * MPIDU_Socki_wakeup() */#undef FUNCNAME#define FUNCNAME MPIDU_Socki_wakeup#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)static int MPIDU_Socki_wakeup(struct MPIDU_Sock_set * sock_set){ MPIU_THREAD_CHECK_BEGIN if (sock_set->wakeup_posted == FALSE) { for(;;) { int nb; char c = 0; nb = write(sock_set->intr_fds[1], &c, 1); if (nb == 1) { break; } MPIU_Assertp(nb == 0 || errno == EINTR); } sock_set->wakeup_posted = TRUE; } MPIU_THREAD_CHECK_END return MPIDU_SOCK_SUCCESS;}/* end MPIDU_Socki_wakeup() */#undef FUNCNAME#define FUNCNAME MPIDI_Sock_update_sock_set#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)int MPIDI_Sock_update_sock_set( struct MPIDU_Sock_set *sock_set, int pollfds_active_elems ){ int mpi_errno = MPI_SUCCESS; int elem; MPIDI_STATE_DECL(MPID_STATE_MPIDI_SOCK_UPDATE_SOCK_SET); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_SOCK_UPDATE_SOCK_SET); for (elem = 0; elem < sock_set->poll_array_elems; elem++) { sock_set->pollfds[elem].events = sock_set->pollinfos[elem].pollfd_events; if ((sock_set->pollfds[elem].events & (POLLIN | POLLOUT)) != 0) { sock_set->pollfds[elem].fd = sock_set->pollinfos[elem].fd; } else { sock_set->pollfds[elem].fd = -1; } if (elem < pollfds_active_elems) { if (sock_set->pollfds_active == sock_set->pollfds) { sock_set->pollfds[elem].revents &= ~(POLLIN | POLLOUT) | sock_set->pollfds[elem].events; } else { sock_set->pollfds[elem].revents = sock_set->pollfds_active[elem].revents & (~(POLLIN | POLLOUT) | sock_set->pollfds[elem].events); } } else { sock_set->pollfds[elem].revents = 0; } } if (sock_set->pollfds_active != sock_set->pollfds) { MPIU_Free(sock_set->pollfds_active); } sock_set->pollfds_updated = FALSE; MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_SOCK_UPDATE_SOCK_SET); return mpi_errno;}#endif /* (MPICH_IS_THREADED) */ /* * MPIDU_Socki_os_to_mpi_errno() * * This routine assumes that no thread can change the state between state check before the nonblocking OS operation and the call * to this routine. */#undef FUNCNAME#define FUNCNAME MPIDU_Socki_os_to_mpi_errno#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)/* --BEGIN ERROR HANDLING-- */static int MPIDU_Socki_os_to_mpi_errno(struct pollinfo * pollinfo, int os_errno, char * fcname, int line, int * disconnected){ int mpi_errno; if (os_errno == ENOMEM || os_errno == ENOBUFS) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, fcname, line, MPIDU_SOCK_ERR_NOMEM, "**sock|osnomem", "**sock|poll|osnomem %d %d %d %s", pollinfo->sock_set->id, pollinfo->sock_id, os_errno, MPIU_Strerror(os_errno)); *disconnected = FALSE; } else if (os_errno == EFAULT || os_errno == EINVAL) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, fcname, line, MPIDU_SOCK_ERR_BAD_BUF, "**sock|badbuf", "**sock|poll|badbuf %d %d %d %s", pollinfo->sock_set->id, pollinfo->sock_id, os_errno, MPIU_Strerror(os_errno)); *disconnected = FALSE; } else if (os_errno == EPIPE) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, fcname, line, MPIDU_SOCK_ERR_CONN_CLOSED, "**sock|connclosed", "**sock|poll|connclosed %d %d %d %s", pollinfo->sock_set->id, pollinfo->sock_id, os_errno, MPIU_Strerror(os_errno)); *disconnected = TRUE; } else if (os_errno == ECONNRESET || os_errno == ENOTCONN || os_errno == ETIMEDOUT) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, fcname, line, MPIDU_SOCK_ERR_CONN_FAILED, "**sock|connfailed", "**sock|poll|connfailed %d %d %d %s", pollinfo->sock_set->id, pollinfo->sock_id, os_errno, MPIU_Strerror(os_errno)); pollinfo->os_errno = os_errno; *disconnected = TRUE; } else if (os_errno == EBADF) { /* * If we have a bad file descriptor, then either the sock was bad to * start with and we didn't catch it in the preliminary * checks, or a sock closure was finalized after the preliminary * checks were performed. The latter should not happen if * the thread safety code is correctly implemented. In any case, * the data structures associated with the sock are no * longer valid and should not be modified. We indicate this by * returning a fatal error. */ mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, fcname, line, MPIDU_SOCK_ERR_BAD_SOCK, "**sock|badsock", NULL); *disconnected = FALSE; } else { /* * Unexpected OS error. * * FIXME: technically we should never reach this section of code. * What's the right way to handle this situation? Should * we print an immediate message asking the user to report the errno * so that we can plug the hole? */ mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, fcname, line, MPIDU_SOCK_ERR_CONN_FAILED, "**sock|oserror", "**sock|poll|oserror %d %d %d %s", pollinfo->sock_set->id, pollinfo->sock_id, os_errno, MPIU_Strerror(os_errno)); pollinfo->os_errno = os_errno; *disconnected = TRUE; } return mpi_errno;}/* --END ERROR HANDLING-- *//* end MPIDU_Socki_os_to_mpi_errno() *//* * MPIDU_Socki_adjust_iov() * * Use the specified number of bytes (nb) to adjust the iovec and associated * values. If the iovec has been consumed, return * true; otherwise return false. * * The input is an iov (MPID_IOV is just an iov) and the offset into which * to start (start with entry iov[*offsetp]) and remove nb bytes from the iov. * The use of the offsetp term allows use to remove values from the iov without * making a copy to shift down elements when only part of the iov is * consumed. */#undef FUNCNAME#define FUNCNAME MPIDU_Socki_adjust_iov#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)static int MPIDU_Socki_adjust_iov(ssize_t nb, MPID_IOV * const iov, const int count, int * const offsetp){ int offset = *offsetp; while (offset < count) { if (iov[offset].MPID_IOV_LEN <= nb) { nb -= iov[offset].MPID_IOV_LEN; offset++; } else { iov[offset].MPID_IOV_BUF = (char *) iov[offset].MPID_IOV_BUF + nb; iov[offset].MPID_IOV_LEN -= nb; *offsetp = offset; return FALSE; } } *offsetp = offset; return TRUE;}/* end MPIDU_Socki_adjust_iov() */#undef FUNCNAME#define FUNCNAME MPIDU_Socki_sock_alloc#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)static int MPIDU_Socki_sock_alloc(struct MPIDU_Sock_set * sock_set, struct MPIDU_Sock ** sockp){ struct MPIDU_Sock * sock = NULL; int avail_elem; struct pollfd * pollfds = NULL; struct pollinfo * pollinfos = NULL; int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCKI_SOCK_ALLOC); MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCKI_SOCK_ALLOC); /* FIXME: Should this use the CHKPMEM macros (perm malloc)? */ sock = MPIU_Malloc(sizeof(struct MPIDU_Sock)); /* --BEGIN ERROR HANDLING-- */ if (sock == NULL) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_NOMEM, "**nomem", 0); goto fn_fail; } /* --END ERROR HANDLING-- */ /* * Check existing poll structures for a free element. */ for (avail_elem = 0; avail_elem < sock_set->poll_array_sz; avail_elem++) { if (sock_set->pollinfos[avail_elem].sock_id == -1) { if (avail_elem >= sock_set->poll_array_elems) { sock_set->poll_array_elems = avail_elem + 1; } break; } } /* * No free elements were found. Larger pollfd and pollinfo arrays need to * be allocated and the existing data transfered over. */ if (avail_elem == sock_set->poll_array_sz) { int elem; pollfds = MPIU_Malloc((sock_set->poll_array_sz + MPIDU_SOCK_SET_DEFAULT_SIZE) * sizeof(struct pollfd)); /* --BEGIN ERROR HANDLING-- */ if (pollfds == NULL) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_NOMEM, "**nomem", 0); goto fn_fail; } /* --END ERROR HANDLING-- */ pollinfos = MPIU_Malloc((sock_set->poll_array_sz + MPIDU_SOCK_SET_DEFAULT_SIZE) * sizeof(struct pollinfo)); /* --BEGIN ERROR HANDLING-- */ if (pollinfos == NULL) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_NOMEM, "**nomem", 0); goto fn_fail; } /* --END ERROR HANDLING-- */ if (sock_set->poll_array_sz > 0) { /* * Copy information from the old arrays and then free them. * * In the multi-threaded case, the pollfd array can only be copied * if another thread is not already blocking in poll() * and thus potentially modifying the array. Furthermore, the * pollfd array must not be freed if it is the one * actively being used by pol(). */# ifndef MPICH_IS_THREADED { memcpy(pollfds, sock_set->pollfds, sock_set->poll_array_sz * sizeof(struct pollfd)); MPIU_Free(sock_set->pollfds); }# else { if (sock_set->pollfds_active == NULL) { memcpy(pollfds, sock_set->pollfds, sock_set->poll_array_sz * sizeof(struct pollfd)); } if (sock_set->pollfds_active != sock_set->pollfds) { MPIU_Free(sock_set->pollfds); } }# endif memcpy(pollinfos, sock_set->pollinfos, sock_set->poll_array_sz * sizeof(struct pollinfo)); MPIU_Free(sock_set->pollinfos); } sock_set->poll_array_elems = avail_elem + 1; sock_set->poll_array_sz += MPIDU_SOCK_SET_DEFAULT_SIZE; sock_set->pollfds = pollfds; sock_set->pollinfos = pollinfos; /* * Initialize new elements */ for (elem = avail_elem; elem < sock_set->poll_array_sz; elem++) { pollfds[elem].fd = -1; pollfds[elem].events = 0; pollfds[elem].revents = 0; } for (elem = avail_elem; elem < sock_set->poll_array_sz; elem++) { pollinfos[elem].fd = -1; pollinfos[elem].sock_set = sock_set; pollinfos[elem].elem = elem; pollinfos[elem].sock = NULL; pollinfos[elem].sock_id = -1; pollinfos[elem].type = MPIDU_SOCKI_TYPE_FIRST; pollinfos[elem].state = MPIDU_SOCKI_STATE_FIRST;# ifdef MPICH_IS_THREADED { pollinfos[elem].pollfd_events = 0; }# endif } } /* * Verify that memory hasn't been messed up. */ MPIU_Assert(sock_set->pollinfos[avail_elem].sock_set == sock_set); MPIU_Assert(sock_set->pollinfos[avail_elem].elem == avail_elem); MPIU_Assert(sock_set->pollinfos[avail_elem].fd == -1); MPIU_Assert(sock_set->pollinfos[avail_elem].sock == NULL); MPIU_Assert(sock_set->pollinfos[avail_elem].sock_id == -1); MPIU_Assert(sock_set->pollinfos[avail_elem].type == MPIDU_SOCKI_TYPE_FIRST); MPIU_Assert(sock_set->pollinfos[avail_elem].state == MPIDU_SOCKI_STATE_FIRST);# ifdef MPICH_IS_THREADED { MPIU_Assert(sock_set->pollinfos[avail_elem].pollfd_events == 0); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -