⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ch3_progress.c

📁 mpi并行计算的c++代码 可用vc或gcc编译通过 可以用来搭建并行计算试验环境
💻 C
📖 第 1 页 / 共 3 页
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* *  (C) 2001 by Argonne National Laboratory. *      See COPYRIGHT in top-level directory. */#include "mpidi_ch3_impl.h"#include "pmi.h"#include "mpidu_sock.h"#ifdef HAVE_STRING_H#include <string.h>#endif#undef USE_CH3I_PROGRESS_DELAY_QUEUEvolatile unsigned int MPIDI_CH3I_progress_completion_count = 0;#if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE)    volatile int MPIDI_CH3I_progress_blocked = FALSE;    volatile int MPIDI_CH3I_progress_wakeup_signalled = FALSE;#   if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX)#       if defined(USE_CH3I_PROGRESS_DELAY_QUEUE)            struct MPIDI_CH3I_Progress_delay_queue_elem	    {		unsigned int count;		volatile int flag;		MPID_Thread_cond_t cond;		struct MPIDI_CH3I_Progress_delay_queue_elem * next;	    };            static struct MPIDI_CH3I_Progress_delay_queue_elem * MPIDI_CH3I_Progress_delay_queue_head = NULL;            static struct MPIDI_CH3I_Progress_delay_queue_elem * MPIDI_CH3I_Progress_delay_queue_tail = NULL;#       else            MPID_Thread_cond_t MPIDI_CH3I_progress_completion_cond;#       endif#   endif#endif#if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE)    static int MPIDI_CH3I_Progress_delay(unsigned int completion_count);    static int MPIDI_CH3I_Progress_continue(unsigned int completion_count);#endifMPIDU_Sock_set_t MPIDI_CH3I_sock_set = NULL; static MPIDI_CH3I_Connection_t * MPIDI_CH3I_listener_conn = NULL;static int MPIDI_CH3I_Progress_handle_sock_event(MPIDU_Sock_event_t * event);/* FIXME: move this prototype */int MPIDI_CH3I_Connection_alloc(MPIDI_CH3I_Connection_t **);static inline void connection_free(MPIDI_CH3I_Connection_t * conn);static inline int connection_post_sendq_req(MPIDI_CH3I_Connection_t * conn);static inline int connection_post_send_pkt(MPIDI_CH3I_Connection_t * conn);static inline int connection_post_recv_pkt(MPIDI_CH3I_Connection_t * conn);static inline void connection_post_send_pkt_and_pgid(MPIDI_CH3I_Connection_t * conn);static int adjust_iov(MPID_IOV ** iovp, int * countp, MPIU_Size_t nb);#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Progress_test#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_Progress_test(void){    MPIDU_Sock_event_t event;    int mpi_errno = MPI_SUCCESS;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PROGRESS_TEST);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PROGRESS_TEST);#   if (MPICH_THREAD_LEVEL >= MPI_THREAD_MULTIPLE)    {	if (MPIDI_CH3I_progress_blocked == TRUE) 	{	    /*	     * Another thread is already blocking in the progress engine.  We are not going to block waiting for progress, so we	     * simply return.  It might make sense to yield before * returning, giving the PE thread a change to make progress.	     *	     * MT: Another thread is already blocking in poll.  Right now, calls to the progress routines are effectively	     * serialized by the device.  The only way another thread may enter this function is if MPIDU_Sock_wait() blocks.  If	     * this changes, a flag other than MPIDI_CH3I_Progress_blocked may be required to determine if another thread is in	     * the progress engine.	     */	    	    goto fn_exit;	}    }#   endif        mpi_errno = MPIDU_Sock_wait(MPIDI_CH3I_sock_set, 0, &event);    if (mpi_errno == MPI_SUCCESS)    {	mpi_errno = MPIDI_CH3I_Progress_handle_sock_event(&event);	if (mpi_errno != MPI_SUCCESS) {	    MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,				"**ch3|sock|handle_sock_event");	}    }    else if (MPIR_ERR_GET_CLASS(mpi_errno) == MPIDU_SOCK_ERR_TIMEOUT)    {	mpi_errno = MPI_SUCCESS;	goto fn_exit;    }    else {	MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**progress_sock_wait");    }  fn_exit:    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PROGRESS_TEST);    return mpi_errno; fn_fail:    goto fn_exit;}/* end MPIDI_CH3_Progress_test() */#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Progress_wait#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_Progress_wait(MPID_Progress_state * progress_state){    MPIDU_Sock_event_t event;    int mpi_errno = MPI_SUCCESS;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PROGRESS_WAIT);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PROGRESS_WAIT);    MPIDI_DBG_PRINTF((50, FCNAME, "entering"));    /*     * MT: the following code will be needed if progress can occur between MPIDI_CH3_Progress_start() and     * MPIDI_CH3_Progress_wait(), or iterations of MPIDI_CH3_Progress_wait().     *     * This is presently not possible, and thus the code is commented out.     */#   if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_NOT_IMPLEMENTED)    {	if (progress_state->ch.completion_count != MPIDI_CH3I_progress_completion_count)	{	    goto fn_exit;	}    }#   endif	#   if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE)    {	if (MPIDI_CH3I_progress_blocked == TRUE) 	{	    /*	     * Another thread is already blocking in the progress engine.	     *	     * MT: Another thread is already blocking in poll.  Right now, calls to MPIDI_CH3_Progress_wait() are effectively	     * serialized by the device.  The only way another thread may enter this function is if MPIDU_Sock_wait() blocks.  If	     * this changes, a flag other than MPIDI_CH3I_Progress_blocked may be required to determine if another thread is in	     * the progress engine.	     */	    MPIDI_CH3I_Progress_delay(MPIDI_CH3I_progress_completion_count);			    goto fn_exit;	}    }#   endif        do    {#       if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE)	{	    MPIDI_CH3I_progress_blocked = TRUE;	}#	endif		mpi_errno = MPIDU_Sock_wait(MPIDI_CH3I_sock_set, MPIDU_SOCK_INFINITE_TIME, &event);#       if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE)	{	    MPIDI_CH3I_progress_blocked = FALSE;	    MPIDI_CH3I_progress_wakeup_signalled = FALSE;	}#	endif	/* --BEGIN ERROR HANDLING-- */	if (mpi_errno != MPI_SUCCESS)	{	    MPIU_Assert(MPIR_ERR_GET_CLASS(mpi_errno) != MPIDU_SOCK_ERR_TIMEOUT);	    MPIU_ERR_SET(mpi_errno,MPI_ERR_OTHER,"**progress_sock_wait");	    goto fn_fail;	}	/* --END ERROR HANDLING-- */	mpi_errno = MPIDI_CH3I_Progress_handle_sock_event(&event);	if (mpi_errno != MPI_SUCCESS) {	    MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,				"**ch3|sock|handle_sock_event");	}    }    while (progress_state->ch.completion_count == MPIDI_CH3I_progress_completion_count);    /*     * We could continue to call MPIU_Sock_wait in a non-blocking fashion      * and process any other events; however, this would not     * give the application a chance to post new receives, and thus could      * result in an increased number of unexpected messages     * that would need to be buffered.     */    #   if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE)    {	/*	 * Awaken any threads which are waiting for the progress that just occurred	 */	MPIDI_CH3I_Progress_continue(MPIDI_CH3I_progress_completion_count);    }#   endif     fn_exit:    /*     * Reset the progress state so it is fresh for the next iteration     */    progress_state->ch.completion_count = MPIDI_CH3I_progress_completion_count;        MPIDI_DBG_PRINTF((50, FCNAME, "exiting"));    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PROGRESS_WAIT);    return mpi_errno; fn_fail:    goto fn_exit;}/* end MPIDI_CH3_Progress_wait() */#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Connection_terminate#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_Connection_terminate(MPIDI_VC_t * vc){    int mpi_errno = MPI_SUCCESS;        MPIU_DBG_MSG(CH3_CONNECT,TYPICAL,"Setting state to CONN_STATE_CLOSING");    vc->ch.conn->state = CONN_STATE_CLOSING;    mpi_errno = MPIDU_Sock_post_close(vc->ch.sock);    if (mpi_errno != MPI_SUCCESS) {	MPIU_ERR_POP(mpi_errno);    }  fn_exit:    return mpi_errno; fn_fail:    goto fn_exit;}/* end MPIDI_CH3_Connection_terminate() */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_init#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Progress_init(void){    MPIDU_Sock_t sock;    int mpi_errno = MPI_SUCCESS;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_INIT);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_INIT);    MPIDI_DBG_PRINTF((60, FCNAME, "entering"));#   if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX && !defined(USE_CH3I_PROGRESS_DELAY_QUEUE))    {	MPID_Thread_cond_create(&MPIDI_CH3I_progress_completion_cond, NULL);    }#   endif	    mpi_errno = MPIDU_Sock_init();    if (mpi_errno != MPI_SUCCESS) {	MPIU_ERR_POP(mpi_errno);    }        /* create sock set */    mpi_errno = MPIDU_Sock_create_set(&MPIDI_CH3I_sock_set);    if (mpi_errno != MPI_SUCCESS) {	MPIU_ERR_POP(mpi_errno);    }        /* establish non-blocking listener */    mpi_errno = MPIDI_CH3I_Connection_alloc(&MPIDI_CH3I_listener_conn);    if (mpi_errno != MPI_SUCCESS) {	MPIU_ERR_POP(mpi_errno);    }    MPIU_DBG_MSG(CH3_CONNECT,TYPICAL,"Setting listener connect state to CONN_STATE_LISTENING");    MPIDI_CH3I_listener_conn->sock = NULL;    MPIDI_CH3I_listener_conn->vc = NULL;    MPIDI_CH3I_listener_conn->state = CONN_STATE_LISTENING;    MPIDI_CH3I_listener_conn->send_active = NULL;    MPIDI_CH3I_listener_conn->recv_active = NULL;        mpi_errno = MPIDU_Sock_listen(MPIDI_CH3I_sock_set, MPIDI_CH3I_listener_conn, &MPIDI_CH3I_listener_port, &sock);    if (mpi_errno != MPI_SUCCESS) {	MPIU_ERR_POP(mpi_errno);    }        MPIDI_CH3I_listener_conn->sock = sock;      fn_exit:    MPIDI_DBG_PRINTF((60, FCNAME, "exiting"));    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_INIT);    return mpi_errno; fn_fail:    goto fn_exit;}/* end MIPDI_CH3I_Progress_init() */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_finalize#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Progress_finalize(void){    int mpi_errno;    MPID_Progress_state progress_state;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_FINALIZE);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_FINALIZE);    MPIDI_DBG_PRINTF((60, FCNAME, "entering"));    /* Shut down the listener */    mpi_errno = MPIDU_Sock_post_close(MPIDI_CH3I_listener_conn->sock);    if (mpi_errno != MPI_SUCCESS) {	MPIU_ERR_POP(mpi_errno);    }        MPID_Progress_start(&progress_state);    while(MPIDI_CH3I_listener_conn != NULL)    {	mpi_errno = MPID_Progress_wait(&progress_state);	    }    MPID_Progress_end(&progress_state);        /* FIXME: Cleanly shutdown other socks and free connection structures. (close protocol?) */    /*     * MT: in a multi-threaded environment, finalize() should signal any      * thread(s) blocking on MPIDU_Sock_wait() and wait for     * those * threads to complete before destroying the progress engine data structures.     */    MPIDU_Sock_destroy_set(MPIDI_CH3I_sock_set);    MPIDU_Sock_finalize();#   if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX && !defined(USE_CH3I_PROGRESS_DELAY_QUEUE))    {	MPID_Thread_cond_destroy(&MPIDI_CH3I_progress_completion_cond, NULL);    }#   endif      fn_exit:    MPIDI_DBG_PRINTF((60, FCNAME, "exiting"));    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_FINALIZE);    return mpi_errno; fn_fail:    goto fn_exit;}/* end MPIDI_CH3I_Progress_finalize() */#if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE)#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_wakeup#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)void MPIDI_CH3I_Progress_wakeup(void){    MPIDU_Sock_wakeup(MPIDI_CH3I_sock_set);}#endif#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Get_business_card#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Get_business_card(char *value, int length){    return MPIDI_CH3U_Get_business_card_sock(&value, &length);}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_handle_sock_event#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int MPIDI_CH3I_Progress_handle_sock_event(MPIDU_Sock_event_t * event){    int complete;    int mpi_errno = MPI_SUCCESS;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_HANDLE_SOCK_EVENT);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_HANDLE_SOCK_EVENT);    switch (event->op_type)    {	case MPIDU_SOCK_OP_READ:	{	    MPIDI_CH3I_Connection_t * conn = (MPIDI_CH3I_Connection_t *) event->user_ptr;			    MPID_Request * rreq = conn->recv_active;	    /* --BEGIN ERROR HANDLING-- */	    if (event->error != MPI_SUCCESS)	    {		/* FIXME: the following should be handled by the close protocol */		if (MPIR_ERR_GET_CLASS(event->error) != MPIDU_SOCK_ERR_CONN_CLOSED) {		    mpi_errno = event->error;		    MPIU_ERR_POP(mpi_errno);		}		    		break;	    }	    /* --END ERROR HANDLING-- */			    if (conn->state == CONN_STATE_CONNECTED)	    {		if (conn->recv_active == NULL)		{		    MPIU_Assert(conn->pkt.type < MPIDI_CH3_PKT_END_CH3);					    mpi_errno = MPIDI_CH3U_Handle_recv_pkt(conn->vc, &conn->pkt, &rreq);		    if (mpi_errno != MPI_SUCCESS) {			MPIU_ERR_POP(mpi_errno);		    }		    if (rreq == NULL)		    {			if (conn->state != CONN_STATE_CLOSING)			{			    /* conn->recv_active = NULL;  -- already set to NULL */			    mpi_errno = connection_post_recv_pkt(conn);			    if (mpi_errno != MPI_SUCCESS) {				MPIU_ERR_POP(mpi_errno);			    }			}		    }		    else		    {			for(;;)			{			    MPID_IOV * iovp;			    MPIU_Size_t nb;							    iovp = rreq->dev.iov;			    			    mpi_errno = MPIDU_Sock_readv(conn->sock, iovp, rreq->dev.iov_count, &nb);			    /* --BEGIN ERROR HANDLING-- */			    if (mpi_errno != MPI_SUCCESS)			    {				mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER,								 "**ch3|sock|immedread", "ch3|sock|immedread %p %p %p",								 rreq, conn, conn->vc);				goto fn_fail;			    }			    /* --END ERROR HANDLING-- */			    MPIDI_DBG_PRINTF((55, FCNAME, "immediate readv, vc=0x%p nb=%d, rreq=0x%08x",					      conn->vc, rreq->handle, nb));							    if (nb > 0 && adjust_iov(&iovp, &rreq->dev.iov_count, nb))			    {				mpi_errno = MPIDI_CH3U_Handle_recv_req(conn->vc, rreq, &complete);				if (mpi_errno != MPI_SUCCESS) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -