📄 ch3_progress.c
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* * (C) 2001 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */#include "mpidi_ch3_impl.h"#include "pmi.h"#include "mpidu_sock.h"#ifdef HAVE_STRING_H#include <string.h>#endifstatic MPIDI_CH3_PktHandler_Fcn *pktArray[MPIDI_CH3_PKT_END_CH3+1];static int ReadMoreData( MPIDI_CH3I_Connection_t *, MPID_Request * );static int MPIDI_CH3i_Progress_wait(MPID_Progress_state * );static int MPIDI_CH3i_Progress_test(void);/* FIXME: Move thread stuff into some set of abstractions in order to remove ifdefs */volatile unsigned int MPIDI_CH3I_progress_completion_count = 0;#ifdef MPICH_IS_THREADED volatile int MPIDI_CH3I_progress_blocked = FALSE; volatile int MPIDI_CH3I_progress_wakeup_signalled = FALSE;# if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX)/* This value must be static so that it isn't an uninitialized common symbol */static MPID_Thread_cond_t MPIDI_CH3I_progress_completion_cond;# endif static int MPIDI_CH3I_Progress_delay(unsigned int completion_count); static int MPIDI_CH3I_Progress_continue(unsigned int completion_count);#endifMPIDU_Sock_set_t MPIDI_CH3I_sock_set = NULL; static int MPIDI_CH3I_Progress_handle_sock_event(MPIDU_Sock_event_t * event);static inline int connection_pop_sendq_req(MPIDI_CH3I_Connection_t * conn);static inline int connection_post_recv_pkt(MPIDI_CH3I_Connection_t * conn);static int adjust_iov(MPID_IOV ** iovp, int * countp, MPIU_Size_t nb);#undef FUNCNAME#define FUNCNAME MPIDI_CH3i_Progress_test#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int MPIDI_CH3i_Progress_test(void){ MPIDU_Sock_event_t event; int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_TEST); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_TEST);# ifdef MPICH_IS_THREADED { /* We don't bother testing whether threads are enabled in the runtime-checking case because this simple test will always be false if threads are not enabled. */ if (MPIDI_CH3I_progress_blocked == TRUE) { /* * Another thread is already blocking in the progress engine. * We are not going to block waiting for progress, so we * simply return. It might make sense to yield before * returning, * giving the PE thread a change to make progress. * * MT: Another thread is already blocking in poll. Right now, * calls to the progress routines are effectively * serialized by the device. The only way another thread may * enter this function is if MPIDU_Sock_wait() blocks. If * this changes, a flag other than MPIDI_CH3I_Progress_blocked * may be required to determine if another thread is in * the progress engine. */ goto fn_exit; } }# endif mpi_errno = MPIDU_Sock_wait(MPIDI_CH3I_sock_set, 0, &event); if (mpi_errno == MPI_SUCCESS) { mpi_errno = MPIDI_CH3I_Progress_handle_sock_event(&event); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**ch3|sock|handle_sock_event"); } } else if (MPIR_ERR_GET_CLASS(mpi_errno) == MPIDU_SOCK_ERR_TIMEOUT) { mpi_errno = MPI_SUCCESS; goto fn_exit; } else { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**progress_sock_wait"); } fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_TEST); return mpi_errno; fn_fail: goto fn_exit;}/* end MPIDI_CH3_Progress_test() */#undef FUNCNAME#define FUNCNAME MPIDI_CH3i_Progress_wait#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int MPIDI_CH3i_Progress_wait(MPID_Progress_state * progress_state){ MPIDU_Sock_event_t event; int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_WAIT); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_WAIT); /* * MT: the following code will be needed if progress can occur between * MPIDI_CH3_Progress_start() and * MPIDI_CH3_Progress_wait(), or iterations of MPIDI_CH3_Progress_wait(). * * This is presently not possible, and thus the code is commented out. */# if 0 /* FIXME: Was (USE_THREAD_IMPL == MPICH_THREAD_IMPL_NOT_IMPLEMENTED), which really meant not-using-global-mutex-thread model . This was true for the single threaded case, but was probably not intended for that case*/ { if (progress_state->ch.completion_count != MPIDI_CH3I_progress_completion_count) { goto fn_exit; } }# endif # ifdef MPICH_IS_THREADED MPIU_THREAD_CHECK_BEGIN { if (MPIDI_CH3I_progress_blocked == TRUE) { /* * Another thread is already blocking in the progress engine. * * MT: Another thread is already blocking in poll. Right now, * calls to MPIDI_CH3_Progress_wait() are effectively * serialized by the device. The only way another thread may * enter this function is if MPIDU_Sock_wait() blocks. If * this changes, a flag other than MPIDI_CH3I_Progress_blocked * may be required to determine if another thread is in * the progress engine. */ MPIDI_CH3I_Progress_delay(MPIDI_CH3I_progress_completion_count); goto fn_exit; } } MPIU_THREAD_CHECK_END# endif do {# ifdef MPICH_IS_THREADED /* The logic for this case is just complicated enough that we write separate code for each possibility */# ifdef HAVE_RUNTIME_THREADCHECK if (MPIR_ThreadInfo.isThreaded) { MPIDI_CH3I_progress_blocked = TRUE; mpi_errno = MPIDU_Sock_wait(MPIDI_CH3I_sock_set, MPIDU_SOCK_INFINITE_TIME, &event); MPIDI_CH3I_progress_blocked = FALSE; MPIDI_CH3I_progress_wakeup_signalled = FALSE; } else { mpi_errno = MPIDU_Sock_wait(MPIDI_CH3I_sock_set, MPIDU_SOCK_INFINITE_TIME, &event); }# else MPIDI_CH3I_progress_blocked = TRUE; mpi_errno = MPIDU_Sock_wait(MPIDI_CH3I_sock_set, MPIDU_SOCK_INFINITE_TIME, &event); MPIDI_CH3I_progress_blocked = FALSE; MPIDI_CH3I_progress_wakeup_signalled = FALSE;# endif /* HAVE_RUNTIME_THREADCHECK */# else mpi_errno = MPIDU_Sock_wait(MPIDI_CH3I_sock_set, MPIDU_SOCK_INFINITE_TIME, &event);# endif /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { MPIU_Assert(MPIR_ERR_GET_CLASS(mpi_errno) != MPIDU_SOCK_ERR_TIMEOUT); MPIU_ERR_SET(mpi_errno,MPI_ERR_OTHER,"**progress_sock_wait"); goto fn_fail; } /* --END ERROR HANDLING-- */ mpi_errno = MPIDI_CH3I_Progress_handle_sock_event(&event); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**ch3|sock|handle_sock_event"); } } while (progress_state->ch.completion_count == MPIDI_CH3I_progress_completion_count); /* * We could continue to call MPIU_Sock_wait in a non-blocking fashion * and process any other events; however, this would not * give the application a chance to post new receives, and thus could * result in an increased number of unexpected messages * that would need to be buffered. */ # if MPICH_IS_THREADED { /* * Awaken any threads which are waiting for the progress that just * occurred */ MPIDI_CH3I_Progress_continue(MPIDI_CH3I_progress_completion_count); }# endif fn_exit: /* * Reset the progress state so it is fresh for the next iteration */ progress_state->ch.completion_count = MPIDI_CH3I_progress_completion_count; MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_WAIT); return mpi_errno; fn_fail: goto fn_exit;}/* end MPIDI_CH3_Progress_wait() */#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Connection_terminate#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_Connection_terminate(MPIDI_VC_t * vc){ int mpi_errno = MPI_SUCCESS; MPIDI_CH3I_VC *vcch = (MPIDI_CH3I_VC *)vc->channel_private; MPIU_DBG_CONNSTATECHANGE(vc,vcch->conn,CONN_STATE_CLOSING); vcch->conn->state = CONN_STATE_CLOSING; MPIU_DBG_MSG(CH3_DISCONNECT,TYPICAL,"Closing sock (Post_close)"); mpi_errno = MPIDU_Sock_post_close(vcch->sock); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } fn_exit: return mpi_errno; fn_fail: goto fn_exit;}/* end MPIDI_CH3_Connection_terminate() */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_init#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Progress_init(void){ int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_INIT); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_INIT); MPIU_THREAD_CHECK_BEGIN# if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX) { MPID_Thread_cond_create(&MPIDI_CH3I_progress_completion_cond, NULL); }# endif MPIU_THREAD_CHECK_END mpi_errno = MPIDU_Sock_init(); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } /* create sock set */ mpi_errno = MPIDU_Sock_create_set(&MPIDI_CH3I_sock_set); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } /* establish non-blocking listener */ mpi_errno = MPIDU_CH3I_SetupListener( MPIDI_CH3I_sock_set ); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } /* Initialize the code to handle incoming packets */ mpi_errno = MPIDI_CH3_PktHandler_Init( pktArray, MPIDI_CH3_PKT_END_CH3+1 ); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_INIT); return mpi_errno; fn_fail: goto fn_exit;}/* end MIPDI_CH3I_Progress_init() */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_finalize#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Progress_finalize(void){ int mpi_errno; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_FINALIZE); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_FINALIZE); /* Shut down the listener */ mpi_errno = MPIDU_CH3I_ShutdownListener(); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } /* FIXME: Cleanly shutdown other socks and free connection structures. (close protocol?) */ /* * MT: in a multi-threaded environment, finalize() should signal any * thread(s) blocking on MPIDU_Sock_wait() and wait for * those * threads to complete before destroying the progress engine * data structures. */ MPIDU_Sock_destroy_set(MPIDI_CH3I_sock_set); MPIDU_Sock_finalize(); MPIU_THREAD_CHECK_BEGIN# if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX) { MPID_Thread_cond_destroy(&MPIDI_CH3I_progress_completion_cond, NULL); }# endif MPIU_THREAD_CHECK_END fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_FINALIZE); return mpi_errno; fn_fail: goto fn_exit;}/* end MPIDI_CH3I_Progress_finalize() */#ifdef MPICH_IS_THREADED#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_wakeup#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)void MPIDI_CH3I_Progress_wakeup(void){ MPIDU_Sock_wakeup(MPIDI_CH3I_sock_set);}#endif#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Get_business_card#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_Get_business_card(int myRank, char *value, int length){ return MPIDI_CH3U_Get_business_card_sock(myRank, &value, &length);}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_handle_sock_event#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int MPIDI_CH3I_Progress_handle_sock_event(MPIDU_Sock_event_t * event){ int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_HANDLE_SOCK_EVENT); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_HANDLE_SOCK_EVENT); MPIU_DBG_MSG_D(CH3_OTHER,VERBOSE,"Socket event of type %d", event->op_type ); switch (event->op_type) { case MPIDU_SOCK_OP_READ: { MPIDI_CH3I_Connection_t * conn = (MPIDI_CH3I_Connection_t *) event->user_ptr; MPID_Request * rreq = conn->recv_active; /* --BEGIN ERROR HANDLING-- */ if (event->error != MPI_SUCCESS) { /* FIXME: the following should be handled by the close protocol */ if (MPIR_ERR_GET_CLASS(event->error) != MPIDU_SOCK_ERR_CONN_CLOSED) { mpi_errno = event->error; MPIU_ERR_POP(mpi_errno); } break; } /* --END ERROR HANDLING-- */ if (conn->state == CONN_STATE_CONNECTED) { if (conn->recv_active == NULL) { MPIDI_msg_sz_t buflen = sizeof (MPIDI_CH3_Pkt_t); MPIU_Assert(conn->pkt.type < MPIDI_CH3_PKT_END_CH3); mpi_errno = pktArray[conn->pkt.type]( conn->vc, &conn->pkt, &buflen, &rreq ); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } MPIU_Assert(buflen == sizeof (MPIDI_CH3_Pkt_t)); if (rreq == NULL) { if (conn->state != CONN_STATE_CLOSING) { /* conn->recv_active = NULL; -- already set to NULL */ mpi_errno = connection_post_recv_pkt(conn); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } } } else {#if 1 mpi_errno = ReadMoreData( conn, rreq ); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }#else for(;;) { MPID_IOV * iovp; MPIU_Size_t nb; iovp = rreq->dev.iov; mpi_errno = MPIDU_Sock_readv(conn->sock, iovp, rreq->dev.iov_count, &nb); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|sock|immedread", "ch3|sock|immedread %p %p %p", rreq, conn, conn->vc); goto fn_fail; } /* --END ERROR HANDLING-- */ MPIU_DBG_MSG_FMT(CH3_CHANNEL,VERBOSE, (MPIU_DBG_FDEST,"immediate readv, vc=%p nb=%d, rreq=0x%08x", conn->vc, nb, rreq->handle)); if (nb > 0 && adjust_iov(&iovp, &rreq->dev.iov_count, nb)) { int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *); int complete; reqFn = rreq->dev.OnDataAvail; if (!reqFn) { MPIU_Assert(MPIDI_Request_get_type(rreq)!=MPIDI_REQUEST_TYPE_GET_RESP); MPIDI_CH3U_Request_complete(rreq); complete = TRUE; } else { mpi_errno = reqFn( conn->vc, rreq, &complete ); if (mpi_errno) MPIU_ERR_POP(mpi_errno); } if (complete) { /* conn->recv_active = NULL; -- already set to NULL */ mpi_errno = connection_post_recv_pkt(conn); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } break; } } else { MPIU_DBG_MSG_FMT(CH3_CHANNEL,VERBOSE, (MPIU_DBG_FDEST,"posting readv, vc=%p, rreq=0x%08x", conn->vc, rreq->handle)); conn->recv_active = rreq; mpi_errno = MPIDU_Sock_post_readv(conn->sock, iovp, rreq->dev.iov_count, NULL); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code( mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|sock|postread", "ch3|sock|postread %p %p %p", rreq, conn, conn->vc); goto fn_fail;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -