📄 socki_util.i
字号:
/* * MPIDU_Socki_adjust_iov() * * Use the specified number of bytes (nb) to adjust the iovec and associated values. If the iovec has been consumed, return * true; otherwise return false. */#undef FUNCNAME#define FUNCNAME MPIDU_Socki_adjust_iov#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)static int MPIDU_Socki_adjust_iov(ssize_t nb, MPID_IOV * const iov, const int count, int * const offsetp){ int offset = *offsetp; while (offset < count) { if (iov[offset].MPID_IOV_LEN <= nb) { nb -= iov[offset].MPID_IOV_LEN; offset++; } else { iov[offset].MPID_IOV_BUF = (char *) iov[offset].MPID_IOV_BUF + nb; iov[offset].MPID_IOV_LEN -= nb; *offsetp = offset; return FALSE; } } *offsetp = offset; return TRUE;}/* end MPIDU_Socki_adjust_iov() */#undef FUNCNAME#define FUNCNAME MPIDU_Socki_sock_alloc#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)static int MPIDU_Socki_sock_alloc(struct MPIDU_Sock_set * sock_set, struct MPIDU_Sock ** sockp){ struct MPIDU_Sock * sock = NULL; int avail_elem; struct pollfd * pollfds = NULL; struct pollinfo * pollinfos = NULL; int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCKI_SOCK_ALLOC); MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCKI_SOCK_ALLOC); sock = MPIU_Malloc(sizeof(struct MPIDU_Sock)); /* --BEGIN ERROR HANDLING-- */ if (sock == NULL) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_NOMEM, "**nomem", 0); goto fn_fail; } /* --END ERROR HANDLING-- */ /* * Check existing poll structures for a free element. */ for (avail_elem = 0; avail_elem < sock_set->poll_array_sz; avail_elem++) { if (sock_set->pollinfos[avail_elem].sock_id == -1) { if (avail_elem >= sock_set->poll_array_elems) { sock_set->poll_array_elems = avail_elem + 1; } break; } } /* * No free elements were found. Larger pollfd and pollinfo arrays need to be allocated and the existing data transfered * over. */ if (avail_elem == sock_set->poll_array_sz) { int elem; pollfds = MPIU_Malloc((sock_set->poll_array_sz + MPIDU_SOCK_SET_DEFAULT_SIZE) * sizeof(struct pollfd)); /* --BEGIN ERROR HANDLING-- */ if (pollfds == NULL) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_NOMEM, "**nomem", 0); goto fn_fail; } /* --END ERROR HANDLING-- */ pollinfos = MPIU_Malloc((sock_set->poll_array_sz + MPIDU_SOCK_SET_DEFAULT_SIZE) * sizeof(struct pollinfo)); /* --BEGIN ERROR HANDLING-- */ if (pollinfos == NULL) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_NOMEM, "**nomem", 0); goto fn_fail; } /* --END ERROR HANDLING-- */ if (sock_set->poll_array_sz > 0) { /* * Copy information from the old arrays and then free them. * * In the multi-threaded case, the pollfd array can only be copied if another thread is not already blocking in poll() * and thus potentially modifying the array. Furthermore, the pollfd array must not be freed if it is the one * actively being used by pol(). */# if (MPICH_THREAD_LEVEL < MPI_THREAD_MULTIPLE) { memcpy(pollfds, sock_set->pollfds, sock_set->poll_array_sz * sizeof(struct pollfd)); MPIU_Free(sock_set->pollfds); }# else { if (sock_set->pollfds_active == NULL) { memcpy(pollfds, sock_set->pollfds, sock_set->poll_array_sz * sizeof(struct pollfd)); } if (sock_set->pollfds_active != sock_set->pollfds) { MPIU_Free(sock_set->pollfds); } }# endif memcpy(pollinfos, sock_set->pollinfos, sock_set->poll_array_sz * sizeof(struct pollinfo)); MPIU_Free(sock_set->pollinfos); } sock_set->poll_array_elems = avail_elem + 1; sock_set->poll_array_sz += MPIDU_SOCK_SET_DEFAULT_SIZE; sock_set->pollfds = pollfds; sock_set->pollinfos = pollinfos; /* * Initialize new elements */ for (elem = avail_elem; elem < sock_set->poll_array_sz; elem++) { pollfds[elem].fd = -1; pollfds[elem].events = 0; pollfds[elem].revents = 0; } for (elem = avail_elem; elem < sock_set->poll_array_sz; elem++) { pollinfos[elem].fd = -1; pollinfos[elem].sock_set = sock_set; pollinfos[elem].elem = elem; pollinfos[elem].sock = NULL; pollinfos[elem].sock_id = -1; pollinfos[elem].type = 0; pollinfos[elem].state = 0;# if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE) { pollinfos[elem].pollfd_events = 0; }# endif } } /* * Verify that memory hasn't been messed up. */ MPIU_Assert(sock_set->pollinfos[avail_elem].sock_set == sock_set); MPIU_Assert(sock_set->pollinfos[avail_elem].elem == avail_elem); MPIU_Assert(sock_set->pollinfos[avail_elem].fd == -1); MPIU_Assert(sock_set->pollinfos[avail_elem].sock == NULL); MPIU_Assert(sock_set->pollinfos[avail_elem].sock_id == -1); MPIU_Assert(sock_set->pollinfos[avail_elem].type == 0); MPIU_Assert(sock_set->pollinfos[avail_elem].state == 0);# if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE) { MPIU_Assert(sock_set->pollinfos[avail_elem].pollfd_events == 0); }# endif /* * Initialize newly allocated sock structure and associated poll structures */ sock_set->pollinfos[avail_elem].sock_id = (sock_set->id << 24) | avail_elem; sock_set->pollinfos[avail_elem].sock = sock; sock->sock_set = sock_set; sock->elem = avail_elem; sock_set->pollfds[avail_elem].fd = -1; sock_set->pollfds[avail_elem].events = 0; sock_set->pollfds[avail_elem].revents = 0;# if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE) { if (sock_set->pollfds_active != NULL) { sock_set->pollfds_updated = TRUE; } }# endif *sockp = sock; fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCKI_SOCK_ALLOC); return mpi_errno; /* --BEGIN ERROR HANDLING-- */ fn_fail: if (pollinfos != NULL) { MPIU_Free(pollinfos); } if (pollfds != NULL) { MPIU_Free(pollfds); } if (sock != NULL) { MPIU_Free(sock); } goto fn_exit; /* --END ERROR HANDLING-- */}/* end MPIDU_Socki_sock_alloc() */#undef FUNCNAME#define FUNCNAME MPIDU_Socki_sock_free#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)static void MPIDU_Socki_sock_free(struct MPIDU_Sock * sock){ struct pollfd * pollfd = MPIDU_Socki_sock_get_pollfd(sock); struct pollinfo * pollinfo = MPIDU_Socki_sock_get_pollinfo(sock); struct MPIDU_Sock_set * sock_set = sock->sock_set; MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCKI_SOCK_FREE); MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCKI_SOCK_FREE);# if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE) { /* * Freeing a sock while Sock_wait() is blocked in poll() is not supported */ MPIU_Assert(sock_set->pollfds_active == NULL); }# endif /* * Compress poll array * * TODO: move last element into current position and update sock associated with last element. */ if (sock->elem + 1 == sock_set->poll_array_elems) { sock_set->poll_array_elems -= 1; if (sock_set->starting_elem >= sock_set->poll_array_elems) { sock_set->starting_elem = 0; } } /* * Remove entry from the poll list and mark the entry as free */ pollinfo->fd = -1; pollinfo->sock = NULL; pollinfo->sock_id = -1; pollinfo->type = 0; pollinfo->state = 0;# if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE) { pollinfo->pollfd_events = 0; }# endif pollfd->fd = -1; pollfd->events = 0; pollfd->revents = 0; /* * Mark the sock as invalid so that any future use might be caught */ sock->sock_set = NULL; sock->elem = -1; MPIU_Free(sock); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCKI_SOCK_FREE);}/* end MPIDU_Socki_sock_free() */#undef FUNCNAME#define FUNCNAME MPIDU_Socki_event_enqueue#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)static int MPIDU_Socki_event_enqueue(struct pollinfo * pollinfo, MPIDU_Sock_op_t op, MPIU_Size_t num_bytes, void * user_ptr, int error){ struct MPIDU_Sock_set * sock_set = pollinfo->sock_set; struct MPIDU_Socki_eventq_elem * eventq_elem; int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_SOCKI_EVENT_ENQUEUE); MPIDI_FUNC_ENTER(MPID_STATE_SOCKI_EVENT_ENQUEUE); if (MPIDU_Socki_eventq_pool != NULL) { eventq_elem = MPIDU_Socki_eventq_pool; MPIDU_Socki_eventq_pool = MPIDU_Socki_eventq_pool->next; } else { int i; struct MPIDU_Socki_eventq_table *eventq_table; eventq_table = MPIU_Malloc(sizeof(struct MPIDU_Socki_eventq_table)); /* --BEGIN ERROR HANDLING-- */ if (eventq_table == NULL) { mpi_errno = MPIR_Err_create_code(errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**sock|poll|eqmalloc", 0); goto fn_exit; } /* --END ERROR HANDLING-- */ eventq_elem = eventq_table->elems; eventq_table->next = MPIDU_Socki_eventq_table_head; MPIDU_Socki_eventq_table_head = eventq_table; if (MPIDU_SOCK_EVENTQ_POOL_SIZE > 1) { MPIDU_Socki_eventq_pool = &eventq_elem[1]; for (i = 0; i < MPIDU_SOCK_EVENTQ_POOL_SIZE - 2; i++) { MPIDU_Socki_eventq_pool[i].next = &MPIDU_Socki_eventq_pool[i+1]; } MPIDU_Socki_eventq_pool[MPIDU_SOCK_EVENTQ_POOL_SIZE - 2].next = NULL; } } eventq_elem->event.op_type = op; eventq_elem->event.num_bytes = num_bytes; eventq_elem->event.user_ptr = user_ptr; eventq_elem->event.error = error; eventq_elem->set_elem = pollinfo->elem; eventq_elem->next = NULL; if (sock_set->eventq_head == NULL) { sock_set->eventq_head = eventq_elem; } else { sock_set->eventq_tail->next = eventq_elem; } sock_set->eventq_tail = eventq_elem;fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_SOCKI_EVENT_ENQUEUE); return mpi_errno;}/* end MPIDU_Socki_event_enqueue() */#undef FUNCNAME#define FUNCNAME MPIDU_Socki_event_dequeue#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)static inline int MPIDU_Socki_event_dequeue(struct MPIDU_Sock_set * sock_set, int * set_elem, struct MPIDU_Sock_event * eventp){ struct MPIDU_Socki_eventq_elem * eventq_elem; int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_SOCKI_EVENT_DEQUEUE); MPIDI_FUNC_ENTER(MPID_STATE_SOCKI_EVENT_DEQUEUE); if (sock_set->eventq_head != NULL) { eventq_elem = sock_set->eventq_head; sock_set->eventq_head = eventq_elem->next; if (eventq_elem->next == NULL) { sock_set->eventq_tail = NULL; } *eventp = eventq_elem->event; *set_elem = eventq_elem->set_elem; eventq_elem->next = MPIDU_Socki_eventq_pool; MPIDU_Socki_eventq_pool = eventq_elem; } /* --BEGIN ERROR HANDLING-- */ else { mpi_errno = MPIDU_SOCK_ERR_FAIL; } /* --END ERROR HANDLING-- */ MPIDI_FUNC_EXIT(MPID_STATE_SOCKI_EVENT_DEQUEUE); return mpi_errno;}/* end MPIDU_Socki_event_dequeue() */#undef FUNCNAME#define FUNCNAME MPIDU_Socki_free_eventq_mem#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static void MPIDU_Socki_free_eventq_mem(void){ struct MPIDU_Socki_eventq_table *eventq_table, *eventq_table_next; MPIDI_STATE_DECL(MPID_STATE_SOCKI_FREE_EVENTQ_MEM); MPIDI_FUNC_ENTER(MPID_STATE_SOCKI_FREE_EVENTQ_MEM); eventq_table = MPIDU_Socki_eventq_table_head; while (eventq_table) { eventq_table_next = eventq_table->next; MPIU_Free(eventq_table); eventq_table = eventq_table_next; } MPIDU_Socki_eventq_table_head = NULL; MPIDI_FUNC_EXIT(MPID_STATE_SOCKI_FREE_EVENTQ_MEM);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -