⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ch3_progress.c

📁 fortran并行计算包
💻 C
📖 第 1 页 / 共 2 页
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* *  (C) 2001 by Argonne National Laboratory. *      See COPYRIGHT in top-level directory. */#include "mpidi_ch3_impl.h"#include "pmi.h"#include "mpidu_sock.h"#ifdef HAVE_STRING_H#include <string.h>#endifstatic MPIDI_CH3_PktHandler_Fcn *pktArray[MPIDI_CH3_PKT_END_CH3+1];static int ReadMoreData( MPIDI_CH3I_Connection_t *, MPID_Request * );static int MPIDI_CH3i_Progress_wait(MPID_Progress_state * );static int MPIDI_CH3i_Progress_test(void);/* FIXME: Move thread stuff into some set of abstractions in order to remove   ifdefs */volatile unsigned int MPIDI_CH3I_progress_completion_count = 0;#ifdef MPICH_IS_THREADED    volatile int MPIDI_CH3I_progress_blocked = FALSE;    volatile int MPIDI_CH3I_progress_wakeup_signalled = FALSE;#   if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX)/* This value must be static so that it isn't an uninitialized   common symbol */static MPID_Thread_cond_t MPIDI_CH3I_progress_completion_cond;#   endif    static int MPIDI_CH3I_Progress_delay(unsigned int completion_count);    static int MPIDI_CH3I_Progress_continue(unsigned int completion_count);#endifMPIDU_Sock_set_t MPIDI_CH3I_sock_set = NULL; static int MPIDI_CH3I_Progress_handle_sock_event(MPIDU_Sock_event_t * event);static inline int connection_pop_sendq_req(MPIDI_CH3I_Connection_t * conn);static inline int connection_post_recv_pkt(MPIDI_CH3I_Connection_t * conn);static int adjust_iov(MPID_IOV ** iovp, int * countp, MPIU_Size_t nb);#undef FUNCNAME#define FUNCNAME MPIDI_CH3i_Progress_test#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int MPIDI_CH3i_Progress_test(void){    MPIDU_Sock_event_t event;    int mpi_errno = MPI_SUCCESS;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_TEST);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_TEST);#   ifdef MPICH_IS_THREADED    {	/* We don't bother testing whether threads are enabled in the 	   runtime-checking case because this simple test will always be false	   if threads are not enabled. */	if (MPIDI_CH3I_progress_blocked == TRUE) 	{	    /*	     * Another thread is already blocking in the progress engine.  	     * We are not going to block waiting for progress, so we	     * simply return.  It might make sense to yield before * returning,	     * giving the PE thread a change to make progress.	     *	     * MT: Another thread is already blocking in poll.  Right now, 	     * calls to the progress routines are effectively	     * serialized by the device.  The only way another thread may 	     * enter this function is if MPIDU_Sock_wait() blocks.  If	     * this changes, a flag other than MPIDI_CH3I_Progress_blocked 	     * may be required to determine if another thread is in	     * the progress engine.	     */	    	    goto fn_exit;	}    }#   endif        mpi_errno = MPIDU_Sock_wait(MPIDI_CH3I_sock_set, 0, &event);    if (mpi_errno == MPI_SUCCESS)    {	mpi_errno = MPIDI_CH3I_Progress_handle_sock_event(&event);	if (mpi_errno != MPI_SUCCESS) {	    MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,				"**ch3|sock|handle_sock_event");	}    }    else if (MPIR_ERR_GET_CLASS(mpi_errno) == MPIDU_SOCK_ERR_TIMEOUT)    {	mpi_errno = MPI_SUCCESS;	goto fn_exit;    }    else {	MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**progress_sock_wait");    }  fn_exit:    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_TEST);    return mpi_errno; fn_fail:    goto fn_exit;}/* end MPIDI_CH3_Progress_test() */#undef FUNCNAME#define FUNCNAME MPIDI_CH3i_Progress_wait#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int MPIDI_CH3i_Progress_wait(MPID_Progress_state * progress_state){    MPIDU_Sock_event_t event;    int mpi_errno = MPI_SUCCESS;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_WAIT);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_WAIT);    /*     * MT: the following code will be needed if progress can occur between      * MPIDI_CH3_Progress_start() and     * MPIDI_CH3_Progress_wait(), or iterations of MPIDI_CH3_Progress_wait().     *     * This is presently not possible, and thus the code is commented out.     */#   if 0    /* FIXME: Was (USE_THREAD_IMPL == MPICH_THREAD_IMPL_NOT_IMPLEMENTED),       which really meant not-using-global-mutex-thread model .  This       was true for the single threaded case, but was probably not intended       for that case*/    {	if (progress_state->ch.completion_count != MPIDI_CH3I_progress_completion_count)	{	    goto fn_exit;	}    }#   endif	#   ifdef MPICH_IS_THREADED    MPIU_THREAD_CHECK_BEGIN    {	if (MPIDI_CH3I_progress_blocked == TRUE) 	{	    /*	     * Another thread is already blocking in the progress engine.	     *	     * MT: Another thread is already blocking in poll.  Right now, 	     * calls to MPIDI_CH3_Progress_wait() are effectively	     * serialized by the device.  The only way another thread may 	     * enter this function is if MPIDU_Sock_wait() blocks.  If	     * this changes, a flag other than MPIDI_CH3I_Progress_blocked 	     * may be required to determine if another thread is in	     * the progress engine.	     */	    MPIDI_CH3I_Progress_delay(MPIDI_CH3I_progress_completion_count);			    goto fn_exit;	}    }    MPIU_THREAD_CHECK_END#   endif        do    {#       ifdef MPICH_IS_THREADED	/* The logic for this case is just complicated enough that	   we write separate code for each possibility */#       ifdef HAVE_RUNTIME_THREADCHECK	if (MPIR_ThreadInfo.isThreaded) {	    MPIDI_CH3I_progress_blocked = TRUE;	    mpi_errno = MPIDU_Sock_wait(MPIDI_CH3I_sock_set, 				    MPIDU_SOCK_INFINITE_TIME, &event);	    MPIDI_CH3I_progress_blocked = FALSE;	    MPIDI_CH3I_progress_wakeup_signalled = FALSE;	}	else {	    mpi_errno = MPIDU_Sock_wait(MPIDI_CH3I_sock_set, 				    MPIDU_SOCK_INFINITE_TIME, &event);	}#       else	MPIDI_CH3I_progress_blocked = TRUE;	mpi_errno = MPIDU_Sock_wait(MPIDI_CH3I_sock_set, 				    MPIDU_SOCK_INFINITE_TIME, &event);	MPIDI_CH3I_progress_blocked = FALSE;	MPIDI_CH3I_progress_wakeup_signalled = FALSE;#       endif /* HAVE_RUNTIME_THREADCHECK */#       else	mpi_errno = MPIDU_Sock_wait(MPIDI_CH3I_sock_set, 				    MPIDU_SOCK_INFINITE_TIME, &event);#	endif	/* --BEGIN ERROR HANDLING-- */	if (mpi_errno != MPI_SUCCESS)	{	    MPIU_Assert(MPIR_ERR_GET_CLASS(mpi_errno) != MPIDU_SOCK_ERR_TIMEOUT);	    MPIU_ERR_SET(mpi_errno,MPI_ERR_OTHER,"**progress_sock_wait");	    goto fn_fail;	}	/* --END ERROR HANDLING-- */	mpi_errno = MPIDI_CH3I_Progress_handle_sock_event(&event);	if (mpi_errno != MPI_SUCCESS) {	    MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,				"**ch3|sock|handle_sock_event");	}    }    while (progress_state->ch.completion_count == MPIDI_CH3I_progress_completion_count);    /*     * We could continue to call MPIU_Sock_wait in a non-blocking fashion      * and process any other events; however, this would not     * give the application a chance to post new receives, and thus could      * result in an increased number of unexpected messages     * that would need to be buffered.     */    #   if MPICH_IS_THREADED    {	/*	 * Awaken any threads which are waiting for the progress that just 	 * occurred	 */	MPIDI_CH3I_Progress_continue(MPIDI_CH3I_progress_completion_count);    }#   endif     fn_exit:    /*     * Reset the progress state so it is fresh for the next iteration     */    progress_state->ch.completion_count = MPIDI_CH3I_progress_completion_count;        MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_WAIT);    return mpi_errno; fn_fail:    goto fn_exit;}/* end MPIDI_CH3_Progress_wait() */#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Connection_terminate#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_Connection_terminate(MPIDI_VC_t * vc){    int mpi_errno = MPI_SUCCESS;    MPIDI_CH3I_VC *vcch = (MPIDI_CH3I_VC *)vc->channel_private;    MPIU_DBG_CONNSTATECHANGE(vc,vcch->conn,CONN_STATE_CLOSING);    vcch->conn->state = CONN_STATE_CLOSING;    MPIU_DBG_MSG(CH3_DISCONNECT,TYPICAL,"Closing sock (Post_close)");    mpi_errno = MPIDU_Sock_post_close(vcch->sock);    if (mpi_errno != MPI_SUCCESS) {	MPIU_ERR_POP(mpi_errno);    } fn_exit:    return mpi_errno; fn_fail:    goto fn_exit;}/* end MPIDI_CH3_Connection_terminate() */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_init#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Progress_init(void){    int mpi_errno = MPI_SUCCESS;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_INIT);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_INIT);    MPIU_THREAD_CHECK_BEGIN#   if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX)    {	MPID_Thread_cond_create(&MPIDI_CH3I_progress_completion_cond, NULL);    }#   endif    MPIU_THREAD_CHECK_END	    mpi_errno = MPIDU_Sock_init();    if (mpi_errno != MPI_SUCCESS) {	MPIU_ERR_POP(mpi_errno);    }        /* create sock set */    mpi_errno = MPIDU_Sock_create_set(&MPIDI_CH3I_sock_set);    if (mpi_errno != MPI_SUCCESS) {	MPIU_ERR_POP(mpi_errno);    }        /* establish non-blocking listener */    mpi_errno = MPIDU_CH3I_SetupListener( MPIDI_CH3I_sock_set );    if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }    /* Initialize the code to handle incoming packets */    mpi_errno = MPIDI_CH3_PktHandler_Init( pktArray, MPIDI_CH3_PKT_END_CH3+1 );    if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }      fn_exit:    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_INIT);    return mpi_errno; fn_fail:    goto fn_exit;}/* end MIPDI_CH3I_Progress_init() */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_finalize#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Progress_finalize(void){    int mpi_errno;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_FINALIZE);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_FINALIZE);    /* Shut down the listener */    mpi_errno = MPIDU_CH3I_ShutdownListener();    if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); }        /* FIXME: Cleanly shutdown other socks and free connection structures.        (close protocol?) */    /*     * MT: in a multi-threaded environment, finalize() should signal any      * thread(s) blocking on MPIDU_Sock_wait() and wait for     * those * threads to complete before destroying the progress engine      * data structures.     */    MPIDU_Sock_destroy_set(MPIDI_CH3I_sock_set);    MPIDU_Sock_finalize();    MPIU_THREAD_CHECK_BEGIN#   if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX)    {	MPID_Thread_cond_destroy(&MPIDI_CH3I_progress_completion_cond, NULL);    }#   endif    MPIU_THREAD_CHECK_END  fn_exit:    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_FINALIZE);    return mpi_errno; fn_fail:    goto fn_exit;}/* end MPIDI_CH3I_Progress_finalize() */#ifdef MPICH_IS_THREADED#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_wakeup#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)void MPIDI_CH3I_Progress_wakeup(void){    MPIDU_Sock_wakeup(MPIDI_CH3I_sock_set);}#endif#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Get_business_card#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_Get_business_card(int myRank, char *value, int length){    return MPIDI_CH3U_Get_business_card_sock(myRank, &value, &length);}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_handle_sock_event#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int MPIDI_CH3I_Progress_handle_sock_event(MPIDU_Sock_event_t * event){    int mpi_errno = MPI_SUCCESS;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_HANDLE_SOCK_EVENT);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_HANDLE_SOCK_EVENT);    MPIU_DBG_MSG_D(CH3_OTHER,VERBOSE,"Socket event of type %d", event->op_type );    switch (event->op_type)    {	case MPIDU_SOCK_OP_READ:	{	    MPIDI_CH3I_Connection_t * conn = 		(MPIDI_CH3I_Connection_t *) event->user_ptr;			    MPID_Request * rreq = conn->recv_active;	    /* --BEGIN ERROR HANDLING-- */	    if (event->error != MPI_SUCCESS)	    {		/* FIXME: the following should be handled by the close 		   protocol */		if (MPIR_ERR_GET_CLASS(event->error) != MPIDU_SOCK_ERR_CONN_CLOSED) {		    mpi_errno = event->error;		    MPIU_ERR_POP(mpi_errno);		}		    		break;	    }	    /* --END ERROR HANDLING-- */			    if (conn->state == CONN_STATE_CONNECTED)	    {		if (conn->recv_active == NULL)		{                    MPIDI_msg_sz_t buflen = sizeof (MPIDI_CH3_Pkt_t);		    MPIU_Assert(conn->pkt.type < MPIDI_CH3_PKT_END_CH3);                    		    mpi_errno = pktArray[conn->pkt.type]( conn->vc, &conn->pkt,							  &buflen, &rreq );		    if (mpi_errno != MPI_SUCCESS) {			MPIU_ERR_POP(mpi_errno);		    }                    MPIU_Assert(buflen == sizeof (MPIDI_CH3_Pkt_t));		    if (rreq == NULL)		    {			if (conn->state != CONN_STATE_CLOSING)			{			    /* conn->recv_active = NULL;  -- 			       already set to NULL */			    mpi_errno = connection_post_recv_pkt(conn);			    if (mpi_errno != MPI_SUCCESS) {				MPIU_ERR_POP(mpi_errno);			    }			}		    }		    else		    {#if 1			mpi_errno = ReadMoreData( conn, rreq );			if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }#else			for(;;)			{			    MPID_IOV * iovp;			    MPIU_Size_t nb;							    iovp = rreq->dev.iov;			    			    mpi_errno = MPIDU_Sock_readv(conn->sock, iovp, 						   rreq->dev.iov_count, &nb);			    /* --BEGIN ERROR HANDLING-- */			    if (mpi_errno != MPI_SUCCESS)			    {				mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER,								 "**ch3|sock|immedread", "ch3|sock|immedread %p %p %p",								 rreq, conn, conn->vc);				goto fn_fail;			    }			    /* --END ERROR HANDLING-- */			    MPIU_DBG_MSG_FMT(CH3_CHANNEL,VERBOSE,    (MPIU_DBG_FDEST,"immediate readv, vc=%p nb=%d, rreq=0x%08x",     conn->vc, nb, rreq->handle));							    if (nb > 0 && adjust_iov(&iovp, &rreq->dev.iov_count, nb))			    {				int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *);				int complete;				reqFn = rreq->dev.OnDataAvail;				if (!reqFn) {				    MPIU_Assert(MPIDI_Request_get_type(rreq)!=MPIDI_REQUEST_TYPE_GET_RESP);				    MPIDI_CH3U_Request_complete(rreq);				    complete = TRUE;				}				else {				    mpi_errno = reqFn( conn->vc, rreq, &complete );				    if (mpi_errno) MPIU_ERR_POP(mpi_errno);				}				if (complete)				{				    /* conn->recv_active = NULL; -- 				       already set to NULL */				    mpi_errno = connection_post_recv_pkt(conn);				    if (mpi_errno != MPI_SUCCESS) {					MPIU_ERR_POP(mpi_errno);				    }				    break;				}			    }			    else			    {				MPIU_DBG_MSG_FMT(CH3_CHANNEL,VERBOSE,        (MPIU_DBG_FDEST,"posting readv, vc=%p, rreq=0x%08x", 	 conn->vc, rreq->handle));				conn->recv_active = rreq;				mpi_errno = MPIDU_Sock_post_readv(conn->sock, iovp, rreq->dev.iov_count, NULL);				/* --BEGIN ERROR HANDLING-- */				if (mpi_errno != MPI_SUCCESS)				{				    mpi_errno = MPIR_Err_create_code(					mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|sock|postread",					"ch3|sock|postread %p %p %p", rreq, conn, conn->vc);				    goto fn_fail;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -