📄 ch3_progress.c
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* * (C) 2001 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */#include "mpidi_ch3_impl.h"#include "pmi.h"#include "mpidu_sock.h"#ifdef HAVE_STRING_H#include <string.h>#endif#undef USE_CH3I_PROGRESS_DELAY_QUEUEvolatile unsigned int MPIDI_CH3I_progress_completion_count = 0;#if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE) volatile int MPIDI_CH3I_progress_blocked = FALSE; volatile int MPIDI_CH3I_progress_wakeup_signalled = FALSE;# if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX)# if defined(USE_CH3I_PROGRESS_DELAY_QUEUE) struct MPIDI_CH3I_Progress_delay_queue_elem { unsigned int count; volatile int flag; MPID_Thread_cond_t cond; struct MPIDI_CH3I_Progress_delay_queue_elem * next; }; static struct MPIDI_CH3I_Progress_delay_queue_elem * MPIDI_CH3I_Progress_delay_queue_head = NULL; static struct MPIDI_CH3I_Progress_delay_queue_elem * MPIDI_CH3I_Progress_delay_queue_tail = NULL;# else MPID_Thread_cond_t MPIDI_CH3I_progress_completion_cond;# endif# endif#endif#if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE) static int MPIDI_CH3I_Progress_delay(unsigned int completion_count); static int MPIDI_CH3I_Progress_continue(unsigned int completion_count);#endifMPIDU_Sock_set_t MPIDI_CH3I_sock_set = NULL; static MPIDI_CH3I_Connection_t * MPIDI_CH3I_listener_conn = NULL;static int MPIDI_CH3I_Progress_handle_sock_event(MPIDU_Sock_event_t * event);/* FIXME: move this prototype */int MPIDI_CH3I_Connection_alloc(MPIDI_CH3I_Connection_t **);static inline void connection_free(MPIDI_CH3I_Connection_t * conn);static inline int connection_post_sendq_req(MPIDI_CH3I_Connection_t * conn);static inline int connection_post_send_pkt(MPIDI_CH3I_Connection_t * conn);static inline int connection_post_recv_pkt(MPIDI_CH3I_Connection_t * conn);static inline void connection_post_send_pkt_and_pgid(MPIDI_CH3I_Connection_t * conn);static int adjust_iov(MPID_IOV ** iovp, int * countp, MPIU_Size_t nb);#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Progress_test#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_Progress_test(void){ MPIDU_Sock_event_t event; int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PROGRESS_TEST); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PROGRESS_TEST);# if (MPICH_THREAD_LEVEL >= MPI_THREAD_MULTIPLE) { if (MPIDI_CH3I_progress_blocked == TRUE) { /* * Another thread is already blocking in the progress engine. We are not going to block waiting for progress, so we * simply return. It might make sense to yield before * returning, giving the PE thread a change to make progress. * * MT: Another thread is already blocking in poll. Right now, calls to the progress routines are effectively * serialized by the device. The only way another thread may enter this function is if MPIDU_Sock_wait() blocks. If * this changes, a flag other than MPIDI_CH3I_Progress_blocked may be required to determine if another thread is in * the progress engine. */ goto fn_exit; } }# endif mpi_errno = MPIDU_Sock_wait(MPIDI_CH3I_sock_set, 0, &event); if (mpi_errno == MPI_SUCCESS) { mpi_errno = MPIDI_CH3I_Progress_handle_sock_event(&event); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**ch3|sock|handle_sock_event"); } } else if (MPIR_ERR_GET_CLASS(mpi_errno) == MPIDU_SOCK_ERR_TIMEOUT) { mpi_errno = MPI_SUCCESS; goto fn_exit; } else { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**progress_sock_wait"); } fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PROGRESS_TEST); return mpi_errno; fn_fail: goto fn_exit;}/* end MPIDI_CH3_Progress_test() */#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Progress_wait#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_Progress_wait(MPID_Progress_state * progress_state){ MPIDU_Sock_event_t event; int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PROGRESS_WAIT); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PROGRESS_WAIT); MPIDI_DBG_PRINTF((50, FCNAME, "entering")); /* * MT: the following code will be needed if progress can occur between MPIDI_CH3_Progress_start() and * MPIDI_CH3_Progress_wait(), or iterations of MPIDI_CH3_Progress_wait(). * * This is presently not possible, and thus the code is commented out. */# if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_NOT_IMPLEMENTED) { if (progress_state->ch.completion_count != MPIDI_CH3I_progress_completion_count) { goto fn_exit; } }# endif # if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE) { if (MPIDI_CH3I_progress_blocked == TRUE) { /* * Another thread is already blocking in the progress engine. * * MT: Another thread is already blocking in poll. Right now, calls to MPIDI_CH3_Progress_wait() are effectively * serialized by the device. The only way another thread may enter this function is if MPIDU_Sock_wait() blocks. If * this changes, a flag other than MPIDI_CH3I_Progress_blocked may be required to determine if another thread is in * the progress engine. */ MPIDI_CH3I_Progress_delay(MPIDI_CH3I_progress_completion_count); goto fn_exit; } }# endif do {# if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE) { MPIDI_CH3I_progress_blocked = TRUE; }# endif mpi_errno = MPIDU_Sock_wait(MPIDI_CH3I_sock_set, MPIDU_SOCK_INFINITE_TIME, &event);# if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE) { MPIDI_CH3I_progress_blocked = FALSE; MPIDI_CH3I_progress_wakeup_signalled = FALSE; }# endif /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { MPIU_Assert(MPIR_ERR_GET_CLASS(mpi_errno) != MPIDU_SOCK_ERR_TIMEOUT); MPIU_ERR_SET(mpi_errno,MPI_ERR_OTHER,"**progress_sock_wait"); goto fn_fail; } /* --END ERROR HANDLING-- */ mpi_errno = MPIDI_CH3I_Progress_handle_sock_event(&event); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**ch3|sock|handle_sock_event"); } } while (progress_state->ch.completion_count == MPIDI_CH3I_progress_completion_count); /* * We could continue to call MPIU_Sock_wait in a non-blocking fashion * and process any other events; however, this would not * give the application a chance to post new receives, and thus could * result in an increased number of unexpected messages * that would need to be buffered. */ # if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE) { /* * Awaken any threads which are waiting for the progress that just occurred */ MPIDI_CH3I_Progress_continue(MPIDI_CH3I_progress_completion_count); }# endif fn_exit: /* * Reset the progress state so it is fresh for the next iteration */ progress_state->ch.completion_count = MPIDI_CH3I_progress_completion_count; MPIDI_DBG_PRINTF((50, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PROGRESS_WAIT); return mpi_errno; fn_fail: goto fn_exit;}/* end MPIDI_CH3_Progress_wait() */#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Connection_terminate#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_Connection_terminate(MPIDI_VC_t * vc){ int mpi_errno = MPI_SUCCESS; MPIU_DBG_MSG(CH3_CONNECT,TYPICAL,"Setting state to CONN_STATE_CLOSING"); vc->ch.conn->state = CONN_STATE_CLOSING; mpi_errno = MPIDU_Sock_post_close(vc->ch.sock); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } fn_exit: return mpi_errno; fn_fail: goto fn_exit;}/* end MPIDI_CH3_Connection_terminate() */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_init#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Progress_init(void){ MPIDU_Sock_t sock; int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_INIT); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_INIT); MPIDI_DBG_PRINTF((60, FCNAME, "entering"));# if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX && !defined(USE_CH3I_PROGRESS_DELAY_QUEUE)) { MPID_Thread_cond_create(&MPIDI_CH3I_progress_completion_cond, NULL); }# endif mpi_errno = MPIDU_Sock_init(); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } /* create sock set */ mpi_errno = MPIDU_Sock_create_set(&MPIDI_CH3I_sock_set); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } /* establish non-blocking listener */ mpi_errno = MPIDI_CH3I_Connection_alloc(&MPIDI_CH3I_listener_conn); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } MPIU_DBG_MSG(CH3_CONNECT,TYPICAL,"Setting listener connect state to CONN_STATE_LISTENING"); MPIDI_CH3I_listener_conn->sock = NULL; MPIDI_CH3I_listener_conn->vc = NULL; MPIDI_CH3I_listener_conn->state = CONN_STATE_LISTENING; MPIDI_CH3I_listener_conn->send_active = NULL; MPIDI_CH3I_listener_conn->recv_active = NULL; mpi_errno = MPIDU_Sock_listen(MPIDI_CH3I_sock_set, MPIDI_CH3I_listener_conn, &MPIDI_CH3I_listener_port, &sock); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } MPIDI_CH3I_listener_conn->sock = sock; fn_exit: MPIDI_DBG_PRINTF((60, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_INIT); return mpi_errno; fn_fail: goto fn_exit;}/* end MIPDI_CH3I_Progress_init() */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_finalize#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Progress_finalize(void){ int mpi_errno; MPID_Progress_state progress_state; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_FINALIZE); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_FINALIZE); MPIDI_DBG_PRINTF((60, FCNAME, "entering")); /* Shut down the listener */ mpi_errno = MPIDU_Sock_post_close(MPIDI_CH3I_listener_conn->sock); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } MPID_Progress_start(&progress_state); while(MPIDI_CH3I_listener_conn != NULL) { mpi_errno = MPID_Progress_wait(&progress_state); } MPID_Progress_end(&progress_state); /* FIXME: Cleanly shutdown other socks and free connection structures. (close protocol?) */ /* * MT: in a multi-threaded environment, finalize() should signal any * thread(s) blocking on MPIDU_Sock_wait() and wait for * those * threads to complete before destroying the progress engine data structures. */ MPIDU_Sock_destroy_set(MPIDI_CH3I_sock_set); MPIDU_Sock_finalize();# if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX && !defined(USE_CH3I_PROGRESS_DELAY_QUEUE)) { MPID_Thread_cond_destroy(&MPIDI_CH3I_progress_completion_cond, NULL); }# endif fn_exit: MPIDI_DBG_PRINTF((60, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_FINALIZE); return mpi_errno; fn_fail: goto fn_exit;}/* end MPIDI_CH3I_Progress_finalize() */#if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE)#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_wakeup#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)void MPIDI_CH3I_Progress_wakeup(void){ MPIDU_Sock_wakeup(MPIDI_CH3I_sock_set);}#endif#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Get_business_card#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Get_business_card(char *value, int length){ return MPIDI_CH3U_Get_business_card_sock(&value, &length);}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_handle_sock_event#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int MPIDI_CH3I_Progress_handle_sock_event(MPIDU_Sock_event_t * event){ int complete; int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_HANDLE_SOCK_EVENT); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_HANDLE_SOCK_EVENT); switch (event->op_type) { case MPIDU_SOCK_OP_READ: { MPIDI_CH3I_Connection_t * conn = (MPIDI_CH3I_Connection_t *) event->user_ptr; MPID_Request * rreq = conn->recv_active; /* --BEGIN ERROR HANDLING-- */ if (event->error != MPI_SUCCESS) { /* FIXME: the following should be handled by the close protocol */ if (MPIR_ERR_GET_CLASS(event->error) != MPIDU_SOCK_ERR_CONN_CLOSED) { mpi_errno = event->error; MPIU_ERR_POP(mpi_errno); } break; } /* --END ERROR HANDLING-- */ if (conn->state == CONN_STATE_CONNECTED) { if (conn->recv_active == NULL) { MPIU_Assert(conn->pkt.type < MPIDI_CH3_PKT_END_CH3); mpi_errno = MPIDI_CH3U_Handle_recv_pkt(conn->vc, &conn->pkt, &rreq); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } if (rreq == NULL) { if (conn->state != CONN_STATE_CLOSING) { /* conn->recv_active = NULL; -- already set to NULL */ mpi_errno = connection_post_recv_pkt(conn); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } } } else { for(;;) { MPID_IOV * iovp; MPIU_Size_t nb; iovp = rreq->dev.iov; mpi_errno = MPIDU_Sock_readv(conn->sock, iovp, rreq->dev.iov_count, &nb); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|sock|immedread", "ch3|sock|immedread %p %p %p", rreq, conn, conn->vc); goto fn_fail; } /* --END ERROR HANDLING-- */ MPIDI_DBG_PRINTF((55, FCNAME, "immediate readv, vc=0x%p nb=%d, rreq=0x%08x", conn->vc, rreq->handle, nb)); if (nb > 0 && adjust_iov(&iovp, &rreq->dev.iov_count, nb)) { mpi_errno = MPIDI_CH3U_Handle_recv_req(conn->vc, rreq, &complete); if (mpi_errno != MPI_SUCCESS) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -