📄 ch3_progress.c
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* * (C) 2001 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */#include "mpidi_ch3_impl.h"#include "pmi.h"#ifdef HAVE_STRING_H#include <string.h>#endif#include "sctp_common.h"volatile unsigned int MPIDI_CH3I_progress_completion_count = 0;#if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE) volatile int MPIDI_CH3I_progress_blocked = FALSE; volatile int MPIDI_CH3I_progress_wakeup_signalled = FALSE;# if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX)MPID_Thread_cond_t MPIDI_CH3I_progress_completion_cond;# endif static int MPIDI_CH3I_Progress_delay(unsigned int completion_count); static int MPIDI_CH3I_Progress_continue(unsigned int completion_count);#endifstatic inline void connection_free(MPIDI_VC_t * vc, int stream);static inline int stream_post_sendq_req(MPIDI_VC_t * vc, int stream);static inline int connection_post_send_pkt(MPIDI_VC_t * vc, int stream);static inline int connection_post_recv_pkt(MPIDI_VC_t * vc, int stream);static inline void connection_post_send_pkt_and_pgid(MPIDI_VC_t * vc, int stream);static inline int adjust_posted_iov(SCTP_IOV* post_ptr, MPIU_Size_t nb);/* sri contains association ID and stream number, etc. */static sctp_rcvinfo sctp_sri;/* may be moved to mpid/common later (if used elsewhere) */static int MPIDU_Sctpi_socket_bufsz = 0;GLB_SendQ_Head Global_SendQ;BufferNode_t FirstBufferNode;static int MPIDU_Sctp_init(void);/* these are used in more than one place in ch3:sctp but not promoted to util yet *//* static int MPIDU_Sctp_wait(int fd, int timeout, MPIDU_Sctp_event_t * event); *//* static int MPIDI_CH3I_Progress_handle_sctp_event(MPIDU_Sctp_event_t * event); */static int MPIDU_Sctp_post_close(MPIDI_VC_t * vc);static int MPIDU_Sctp_finalize(void);inline static int read_from_advbuf_and_adjust(MPIDI_VC_t* vc, int stream, int amount, char* src, MPID_Request* rreq);/* extern in mpidi_ch3_impl.h */int MPIDI_CH3I_listener_port;#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Progress_test#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_Progress_test(void){ MPIDU_Sctp_event_t event; int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PROGRESS_TEST); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PROGRESS_TEST);# if (MPICH_THREAD_LEVEL >= MPI_THREAD_MULTIPLE)/* this is not supported for ch3:sctp at the moment but here as a reminder */ { if (MPIDI_CH3I_progress_blocked == TRUE) { /* * Another thread is already blocking in the progress engine. We are not going to block waiting for progress, so we * simply return. It might make sense to yield before * returning, giving the PE thread a change to make progress. * * MT: Another thread is already blocking in poll. Right now, calls to the progress routines are effectively * serialized by the device. The only way another thread may enter this function is if MPIDU_Sock_wait() blocks. If * this changes, a flag other than MPIDI_CH3I_Progress_blocked may be required to determine if another thread is in * the progress engine. */ goto fn_exit; } }# endif mpi_errno = MPIDU_Sctp_wait(MPIDI_CH3I_onetomany_fd , 0, &event); if (mpi_errno == MPI_SUCCESS) { mpi_errno = MPIDI_CH3I_Progress_handle_sctp_event(&event); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**ch3|sock|handle_sock_event"); } } else { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**progress_sock_wait"); } fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PROGRESS_TEST); return mpi_errno; fn_fail: goto fn_exit;}/* end MPIDI_CH3_Progress_test() */#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Progress_wait#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_Progress_wait(MPID_Progress_state * progress_state){ MPIDU_Sctp_event_t event; int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PROGRESS_WAIT); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PROGRESS_WAIT); MPIDI_DBG_PRINTF((50, FCNAME, "entering")); /* * MT: the following code will be needed if progress can occur between MPIDI_CH3_Progress_start() and * MPIDI_CH3_Progress_wait(), or iterations of MPIDI_CH3_Progress_wait(). * * This is presently not possible, and thus the code is commented out. */# if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_NOT_IMPLEMENTED) { if (progress_state->ch.completion_count != MPIDI_CH3I_progress_completion_count) { goto fn_exit; } }# endif # if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE)/* this is not supported for ch3:sctp at the moment but here as a reminder */ { if (MPIDI_CH3I_progress_blocked == TRUE) { /* * Another thread is already blocking in the progress engine. * * MT: Another thread is already blocking in poll. Right now, calls to MPIDI_CH3_Progress_wait() are effectively * serialized by the device. The only way another thread may enter this function is if MPIDU_Sock_wait() blocks. If * this changes, a flag other than MPIDI_CH3I_Progress_blocked may be required to determine if another thread is in * the progress engine. */ MPIDI_CH3I_Progress_delay(MPIDI_CH3I_progress_completion_count); goto fn_exit; } }# endif do {# if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE) { MPIDI_CH3I_progress_blocked = TRUE; }# endif mpi_errno = MPIDU_Sctp_wait(MPIDI_CH3I_onetomany_fd, MPIDU_SCTP_INFINITE_TIME, &event);# if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE) { MPIDI_CH3I_progress_blocked = FALSE; MPIDI_CH3I_progress_wakeup_signalled = FALSE; }# endif /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { MPIU_Assert(MPIR_ERR_GET_CLASS(mpi_errno) != MPIDU_SOCK_ERR_TIMEOUT); MPIU_ERR_SET(mpi_errno,MPI_ERR_OTHER,"**progress_sock_wait"); goto fn_fail; } /* --END ERROR HANDLING-- */ mpi_errno = MPIDI_CH3I_Progress_handle_sctp_event(&event); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**ch3|sock|handle_sock_event"); } } while (progress_state->ch.completion_count == MPIDI_CH3I_progress_completion_count); /* * We could continue to call MPIU_Sctp_wait in a non-blocking fashion * and process any other events; however, this would not * give the application a chance to post new receives, and thus could * result in an increased number of unexpected messages * that would need to be buffered. */ # if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE) { /* * Awaken any threads which are waiting for the progress that just occurred */ MPIDI_CH3I_Progress_continue(MPIDI_CH3I_progress_completion_count); }# endif fn_exit: /* * Reset the progress state so it is fresh for the next iteration */ progress_state->ch.completion_count = MPIDI_CH3I_progress_completion_count; MPIDI_DBG_PRINTF((50, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PROGRESS_WAIT); return mpi_errno; fn_fail: goto fn_exit;}/* end MPIDI_CH3_Progress_wait() */#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Connection_terminate#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_Connection_terminate(MPIDI_VC_t * vc){ int mpi_errno = MPI_SUCCESS; MPIU_DBG_MSG(CH3_CONNECT,TYPICAL,"Setting state to CONN_STATE_CLOSING"); mpi_errno = MPIDU_Sctp_post_close(vc); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } fn_exit: return mpi_errno; fn_fail: goto fn_exit;}/* end MPIDI_CH3_Connection_terminate() */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_init#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Progress_init(int pg_size){ int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_INIT); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_INIT); MPIDI_DBG_PRINTF((60, FCNAME, "entering"));# if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX) { MPID_Thread_cond_create(&MPIDI_CH3I_progress_completion_cond, NULL); }# endif MPIDI_CH3I_onetomany_fd = -1; MPIDI_CH3I_dynamic_tmp_vc = NULL; MPIDI_CH3I_dynamic_tmp_fd = -1; mpi_errno = MPIDU_Sctp_init(); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } /* initialize eventq */ eventq_head = NULL; eventq_tail = NULL; /* initialize hash table */ MPIDI_CH3I_assocID_table = hash_init(pg_size, sizeof(MPIDI_CH3I_Hash_entry), INT4_MAX, 0); /* initialize global sendQ */ Global_SendQ_init(); fn_exit: MPIDI_DBG_PRINTF((60, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_INIT); return mpi_errno; fn_fail: goto fn_exit;}/* end MIPDI_CH3I_Progress_init() */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_finalize#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Progress_finalize(void){ int mpi_errno = MPI_SUCCESS; MPID_Progress_state progress_state; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_FINALIZE); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_FINALIZE); MPIDI_DBG_PRINTF((60, FCNAME, "entering")); /* added code to close one-to-many socket here to simplify MPIDI_CH3_Channel_close */ if (close(MPIDI_CH3I_onetomany_fd) == -1) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); } /* destroy eventq */ MPIDU_Sctp_finalize(); /* finalize hash table */ hash_free(MPIDI_CH3I_assocID_table);# if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX) { MPID_Thread_cond_destroy(&MPIDI_CH3I_progress_completion_cond, NULL); }# endif fn_exit: MPIDI_DBG_PRINTF((60, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_FINALIZE); return mpi_errno; fn_fail: goto fn_exit;}/* end MPIDI_CH3I_Progress_finalize() */#if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE)/* not supported in ch3:sctp for this initial release but here as * a reminder for the future. */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_wakeup#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)void MPIDI_CH3I_Progress_wakeup(void){ /* MPIDU_Sock_wakeup(MPIDI_CH3I_sock_set); */}#endif#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Get_business_card#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_Get_business_card(int myRank, char *value, int length){ return MPIDI_CH3U_Get_business_card_sctp(&value, &length);}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_handle_sctp_event#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Progress_handle_sctp_event(MPIDU_Sctp_event_t * event){ int complete = TRUE; int mpi_errno = MPI_SUCCESS; int pmi_errno; MPIDI_CH3I_Hash_entry * result;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -