⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ch3_progress.c

📁 fortran并行计算包
💻 C
📖 第 1 页 / 共 4 页
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* *  (C) 2001 by Argonne National Laboratory. *      See COPYRIGHT in top-level directory. */#include "mpidi_ch3_impl.h"#include "pmi.h"#ifdef HAVE_STRING_H#include <string.h>#endif#include "sctp_common.h"volatile unsigned int MPIDI_CH3I_progress_completion_count = 0;#if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE)    volatile int MPIDI_CH3I_progress_blocked = FALSE;    volatile int MPIDI_CH3I_progress_wakeup_signalled = FALSE;#   if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX)MPID_Thread_cond_t MPIDI_CH3I_progress_completion_cond;#   endif    static int MPIDI_CH3I_Progress_delay(unsigned int completion_count);    static int MPIDI_CH3I_Progress_continue(unsigned int completion_count);#endifstatic inline void connection_free(MPIDI_VC_t * vc, int stream);static inline int stream_post_sendq_req(MPIDI_VC_t * vc, int stream);static inline int connection_post_send_pkt(MPIDI_VC_t * vc, int stream);static inline int connection_post_recv_pkt(MPIDI_VC_t * vc, int stream);static inline void connection_post_send_pkt_and_pgid(MPIDI_VC_t * vc, int stream);static inline int adjust_posted_iov(SCTP_IOV* post_ptr, MPIU_Size_t nb);/* sri contains association ID and stream number, etc. */static sctp_rcvinfo sctp_sri;/* may be moved to mpid/common later (if used elsewhere) */static int MPIDU_Sctpi_socket_bufsz = 0;GLB_SendQ_Head Global_SendQ;BufferNode_t FirstBufferNode;static int MPIDU_Sctp_init(void);/* these are used in more than one place in ch3:sctp but not promoted to util yet *//* static int MPIDU_Sctp_wait(int fd, int timeout, MPIDU_Sctp_event_t * event); *//* static int MPIDI_CH3I_Progress_handle_sctp_event(MPIDU_Sctp_event_t * event); */static int MPIDU_Sctp_post_close(MPIDI_VC_t * vc);static int MPIDU_Sctp_finalize(void);inline static int read_from_advbuf_and_adjust(MPIDI_VC_t* vc, int stream, int amount,					      char* src, MPID_Request* rreq);/* extern in mpidi_ch3_impl.h */int MPIDI_CH3I_listener_port;#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Progress_test#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_Progress_test(void){    MPIDU_Sctp_event_t event;    int mpi_errno = MPI_SUCCESS;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PROGRESS_TEST);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PROGRESS_TEST);#   if (MPICH_THREAD_LEVEL >= MPI_THREAD_MULTIPLE)/* this is not supported for ch3:sctp at the moment but here as a reminder */    {	if (MPIDI_CH3I_progress_blocked == TRUE) 	{	    /*	     * Another thread is already blocking in the progress engine.  We are not going to block waiting for progress, so we	     * simply return.  It might make sense to yield before * returning, giving the PE thread a change to make progress.	     *	     * MT: Another thread is already blocking in poll.  Right now, calls to the progress routines are effectively	     * serialized by the device.  The only way another thread may enter this function is if MPIDU_Sock_wait() blocks.  If	     * this changes, a flag other than MPIDI_CH3I_Progress_blocked may be required to determine if another thread is in	     * the progress engine.	     */	    	    goto fn_exit;	}    }#   endif        mpi_errno = MPIDU_Sctp_wait(MPIDI_CH3I_onetomany_fd , 0, &event);    if (mpi_errno == MPI_SUCCESS)    {	mpi_errno = MPIDI_CH3I_Progress_handle_sctp_event(&event);	if (mpi_errno != MPI_SUCCESS) {	    MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,				"**ch3|sock|handle_sock_event");	}    }    else {	MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**progress_sock_wait");    }  fn_exit:    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PROGRESS_TEST);    return mpi_errno; fn_fail:    goto fn_exit;}/* end MPIDI_CH3_Progress_test() */#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Progress_wait#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_Progress_wait(MPID_Progress_state * progress_state){    MPIDU_Sctp_event_t event;    int mpi_errno = MPI_SUCCESS;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PROGRESS_WAIT);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PROGRESS_WAIT);    MPIDI_DBG_PRINTF((50, FCNAME, "entering"));    /*     * MT: the following code will be needed if progress can occur between MPIDI_CH3_Progress_start() and     * MPIDI_CH3_Progress_wait(), or iterations of MPIDI_CH3_Progress_wait().     *     * This is presently not possible, and thus the code is commented out.     */#   if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_NOT_IMPLEMENTED)    {	if (progress_state->ch.completion_count != MPIDI_CH3I_progress_completion_count)	{	    goto fn_exit;	}    }#   endif	#   if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE)/* this is not supported for ch3:sctp at the moment but here as a reminder */    {	if (MPIDI_CH3I_progress_blocked == TRUE) 	{	    /*	     * Another thread is already blocking in the progress engine.	     *	     * MT: Another thread is already blocking in poll.  Right now, calls to MPIDI_CH3_Progress_wait() are effectively	     * serialized by the device.  The only way another thread may enter this function is if MPIDU_Sock_wait() blocks.  If	     * this changes, a flag other than MPIDI_CH3I_Progress_blocked may be required to determine if another thread is in	     * the progress engine.	     */	    MPIDI_CH3I_Progress_delay(MPIDI_CH3I_progress_completion_count);			    goto fn_exit;	}    }#   endif        do    {#       if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE)	{	    MPIDI_CH3I_progress_blocked = TRUE;	}#	endif		mpi_errno = MPIDU_Sctp_wait(MPIDI_CH3I_onetomany_fd, MPIDU_SCTP_INFINITE_TIME, &event);#       if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE)	{	    MPIDI_CH3I_progress_blocked = FALSE;	    MPIDI_CH3I_progress_wakeup_signalled = FALSE;	}#	endif	/* --BEGIN ERROR HANDLING-- */	if (mpi_errno != MPI_SUCCESS)	{	    MPIU_Assert(MPIR_ERR_GET_CLASS(mpi_errno) != MPIDU_SOCK_ERR_TIMEOUT);	    MPIU_ERR_SET(mpi_errno,MPI_ERR_OTHER,"**progress_sock_wait");	    goto fn_fail;	}	/* --END ERROR HANDLING-- */	mpi_errno = MPIDI_CH3I_Progress_handle_sctp_event(&event);	if (mpi_errno != MPI_SUCCESS) {	    MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,				"**ch3|sock|handle_sock_event");	}    }    while (progress_state->ch.completion_count == MPIDI_CH3I_progress_completion_count);    /*     * We could continue to call MPIU_Sctp_wait in a non-blocking fashion      * and process any other events; however, this would not     * give the application a chance to post new receives, and thus could      * result in an increased number of unexpected messages     * that would need to be buffered.     */    #   if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE)    {	/*	 * Awaken any threads which are waiting for the progress that just occurred	 */	MPIDI_CH3I_Progress_continue(MPIDI_CH3I_progress_completion_count);    }#   endif     fn_exit:    /*     * Reset the progress state so it is fresh for the next iteration     */    progress_state->ch.completion_count = MPIDI_CH3I_progress_completion_count;        MPIDI_DBG_PRINTF((50, FCNAME, "exiting"));    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PROGRESS_WAIT);    return mpi_errno; fn_fail:    goto fn_exit;}/* end MPIDI_CH3_Progress_wait() */#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Connection_terminate#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_Connection_terminate(MPIDI_VC_t * vc){    int mpi_errno = MPI_SUCCESS;        MPIU_DBG_MSG(CH3_CONNECT,TYPICAL,"Setting state to CONN_STATE_CLOSING");    mpi_errno = MPIDU_Sctp_post_close(vc);    if (mpi_errno != MPI_SUCCESS) {	MPIU_ERR_POP(mpi_errno);    }  fn_exit:    return mpi_errno; fn_fail:    goto fn_exit;}/* end MPIDI_CH3_Connection_terminate() */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_init#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Progress_init(int pg_size){    int mpi_errno = MPI_SUCCESS;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_INIT);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_INIT);    MPIDI_DBG_PRINTF((60, FCNAME, "entering"));#   if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX)    {	MPID_Thread_cond_create(&MPIDI_CH3I_progress_completion_cond, NULL);    }#   endif    MPIDI_CH3I_onetomany_fd = -1;    MPIDI_CH3I_dynamic_tmp_vc = NULL;    MPIDI_CH3I_dynamic_tmp_fd = -1;        mpi_errno = MPIDU_Sctp_init();    if (mpi_errno != MPI_SUCCESS) {	MPIU_ERR_POP(mpi_errno);    }    /* initialize eventq */    eventq_head = NULL;    eventq_tail = NULL;        /* initialize hash table */    MPIDI_CH3I_assocID_table = hash_init(pg_size, sizeof(MPIDI_CH3I_Hash_entry), INT4_MAX, 0);    /* initialize global sendQ */    Global_SendQ_init();  fn_exit:    MPIDI_DBG_PRINTF((60, FCNAME, "exiting"));    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_INIT);    return mpi_errno; fn_fail:    goto fn_exit;}/* end MIPDI_CH3I_Progress_init() */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_finalize#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Progress_finalize(void){    int mpi_errno = MPI_SUCCESS;    MPID_Progress_state progress_state;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_FINALIZE);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_FINALIZE);    MPIDI_DBG_PRINTF((60, FCNAME, "entering"));        /* added code to close one-to-many socket here to simplify MPIDI_CH3_Channel_close */    if (close(MPIDI_CH3I_onetomany_fd) == -1) {            mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0);    }        /* destroy eventq */    MPIDU_Sctp_finalize();    /* finalize hash table */    hash_free(MPIDI_CH3I_assocID_table);#   if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX)    {	MPID_Thread_cond_destroy(&MPIDI_CH3I_progress_completion_cond, NULL);    }#   endif      fn_exit:    MPIDI_DBG_PRINTF((60, FCNAME, "exiting"));    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_FINALIZE);    return mpi_errno; fn_fail:    goto fn_exit;}/* end MPIDI_CH3I_Progress_finalize() */#if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE)/* not supported in ch3:sctp for this initial release but here as * a reminder for the future. */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_wakeup#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)void MPIDI_CH3I_Progress_wakeup(void){  /*  MPIDU_Sock_wakeup(MPIDI_CH3I_sock_set); */}#endif#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Get_business_card#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_Get_business_card(int myRank, char *value, int length){    return MPIDI_CH3U_Get_business_card_sctp(&value, &length);}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_handle_sctp_event#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Progress_handle_sctp_event(MPIDU_Sctp_event_t * event){    int complete = TRUE;    int mpi_errno = MPI_SUCCESS;    int pmi_errno;        MPIDI_CH3I_Hash_entry * result;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -