📄 ch3_progress.c
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* * (C) 2001 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */#include "mpidi_ch3_impl.h"/*#include "mpidpre.h"*/#include "mpid_nem_impl.h"#if defined (MPID_NEM_INLINE) && MPID_NEM_INLINE#include "mpid_nem_inline.h"#endif#include "pmi.h"#define PKTARRAY_SIZE (MPIDI_NEM_PKT_END+1)static MPIDI_CH3_PktHandler_Fcn *pktArray[PKTARRAY_SIZE];#ifndef MPIDI_POSTED_RECV_ENQUEUE_HOOK#define MPIDI_POSTED_RECV_ENQUEUE_HOOK(x) do {} while (0)#endif#ifndef MPIDI_POSTED_RECV_DEQUEUE_HOOK#define MPIDI_POSTED_RECV_DEQUEUE_HOOK(x) do {} while (0)#endif#ifdef BY_PASS_PROGRESSextern MPID_Request ** const MPID_Recvq_posted_head_ptr;extern MPID_Request ** const MPID_Recvq_unexpected_head_ptr;extern MPID_Request ** const MPID_Recvq_posted_tail_ptr;extern MPID_Request ** const MPID_Recvq_unexpected_tail_ptr;#endifvolatile unsigned int MPIDI_CH3I_progress_completion_count = 0;/* NEMESIS MULTITHREADING: Extra Data Structures Added */#ifdef MPICH_IS_THREADEDvolatile int MPIDI_CH3I_progress_blocked = FALSE;volatile int MPIDI_CH3I_progress_wakeup_signalled = FALSE;static MPID_Thread_cond_t MPIDI_CH3I_progress_completion_cond;static int MPIDI_CH3I_Progress_delay(unsigned int completion_count);static int MPIDI_CH3I_Progress_continue(unsigned int completion_count);#endif /* MPICH_IS_THREADED *//* NEMESIS MULTITHREADING - End block*/struct MPID_Request *MPIDI_CH3I_sendq_head[CH3_NUM_QUEUES] = {0};struct MPID_Request *MPIDI_CH3I_sendq_tail[CH3_NUM_QUEUES] = {0};struct MPID_Request *MPIDI_CH3I_active_send[CH3_NUM_QUEUES] = {0};#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Progress (MPID_Progress_state *progress_state, int is_blocking){ unsigned completions = MPIDI_CH3I_progress_completion_count; int mpi_errno = MPI_SUCCESS; int complete;#if !defined(ENABLE_NO_SCHED_YIELD) || defined(MPICH_IS_THREADED) int pollcount = 0;#endif MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS); do { MPID_Request *sreq; MPID_Request *rreq; MPID_nem_cell_ptr_t cell; int in_fbox = 0; MPIDI_VC_t *vc;#ifdef MPICH_IS_THREADED MPIU_THREAD_CHECK_BEGIN; { if (pollcount >= MPID_NEM_POLLS_BEFORE_YIELD) { pollcount = 0; MPIDI_CH3I_progress_blocked = TRUE; MPID_Thread_mutex_unlock(&MPIR_ThreadInfo.global_mutex); MPID_Thread_yield(); MPID_Thread_mutex_lock(&MPIR_ThreadInfo.global_mutex); MPIDI_CH3I_progress_blocked = FALSE; MPIDI_CH3I_progress_wakeup_signalled = FALSE; } ++pollcount; } MPIU_THREAD_CHECK_END;#elif !defined(ENABLE_NO_SCHED_YIELD) if (pollcount >= MPID_NEM_POLLS_BEFORE_YIELD) { pollcount = 0; sched_yield(); } ++pollcount;#endif do /* receive progress */ {#ifdef MPICH_IS_THREADED MPIU_THREAD_CHECK_BEGIN; { if (MPIDI_CH3I_progress_blocked == TRUE) { /* another thread is already blocking in the progress engine.*/ break; /* break out of receive block */ } } MPIU_THREAD_CHECK_END;#endif /* make progress receiving */ /* check queue */ if (!MPID_nem_lmt_shm_pending && !MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE] && !MPIDI_CH3I_SendQ_head(CH3_NORMAL_QUEUE) && is_blocking#ifdef MPICH_IS_THREADED#ifdef HAVE_RUNTIME_THREADCHECK && !MPIR_ThreadInfo.isThreaded#else && 0#endif#endif ) { mpi_errno = MPID_nem_mpich2_blocking_recv(&cell, &in_fbox); } else { mpi_errno = MPID_nem_mpich2_test_recv(&cell, &in_fbox); } if (mpi_errno) MPIU_ERR_POP (mpi_errno); if (cell) { char *cell_buf = (char *)cell->pkt.mpich2.payload; MPIDI_msg_sz_t payload_len = cell->pkt.mpich2.datalen; MPIDI_CH3_Pkt_t *pkt = (MPIDI_CH3_Pkt_t *)cell_buf; /* Empty packets are not allowed */ MPIU_Assert(payload_len >= 0); if (in_fbox) { MPIDI_CH3I_VC *vc_ch; MPIDI_msg_sz_t buflen = payload_len; /* This packet must be the first packet of a new message */ MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "Recv pkt from fbox"); MPIU_Assert(payload_len >= sizeof (MPIDI_CH3_Pkt_t)); MPIDI_PG_Get_vc(MPIDI_Process.my_pg, MPID_NEM_FBOX_SOURCE(cell), &vc); MPIU_Assert(((MPIDI_CH3I_VC *)vc->channel_private)->recv_active == NULL && ((MPIDI_CH3I_VC *)vc->channel_private)->pending_pkt_len == 0); vc_ch = (MPIDI_CH3I_VC *)vc->channel_private; mpi_errno = pktArray[pkt->type](vc, pkt, &buflen, &rreq); if (mpi_errno) MPIU_ERR_POP(mpi_errno); if (!rreq) { MPID_nem_mpich2_release_fbox(cell); break; /* break out of recv progress block */ } /* we received a truncated packet, handle it with handle_pkt */ vc_ch->recv_active = rreq; cell_buf += buflen; payload_len -= buflen; mpi_errno = MPID_nem_handle_pkt(vc, cell_buf, payload_len); if (mpi_errno) MPIU_ERR_POP(mpi_errno); MPID_nem_mpich2_release_fbox(cell); /* the whole message should have been handled */ MPIU_Assert(!vc_ch->recv_active); break; /* break out of recv progress block */ } MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "Recv pkt from queue"); MPIDI_PG_Get_vc(MPIDI_Process.my_pg, MPID_NEM_CELL_SOURCE(cell), &vc); mpi_errno = MPID_nem_handle_pkt(vc, cell_buf, payload_len); if (mpi_errno) MPIU_ERR_POP(mpi_errno); MPID_nem_mpich2_release_cell(cell, vc); break; /* break out of recv progress block */ } } while(0); /* do the loop exactly once. Used so we can jump out of recv progress using break. */ /* make progress sending */ do { MPID_IOV *iov; int n_iov; int again = 0; if (MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE] == NULL && MPIDI_CH3I_SendQ_head(CH3_NORMAL_QUEUE) == NULL) {#ifdef MPICH_IS_THREADED MPIU_THREAD_CHECK_BEGIN; { if (MPIDI_CH3I_progress_blocked == TRUE && is_blocking && !MPID_nem_lmt_shm_pending) { /* There's nothing to send and there's another thread already blocking in the progress engine.*/ MPIDI_CH3I_Progress_delay(MPIDI_CH3I_progress_completion_count); } } MPIU_THREAD_CHECK_END;#endif /* there are no pending sends */ break; /* break out of send progress */ } sreq = MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE]; MPIU_DBG_STMT(CH3_CHANNEL, VERBOSE, {if (sreq) MPIU_DBG_MSG (CH3_CHANNEL, VERBOSE, "Send: cont sreq");}); if (sreq) { if (!sreq->ch.noncontig) { MPIU_Assert(sreq->dev.iov_count > 0 && sreq->dev.iov[sreq->dev.iov_offset].MPID_IOV_LEN > 0); iov = &sreq->dev.iov[sreq->dev.iov_offset]; n_iov = sreq->dev.iov_count; do { mpi_errno = MPID_nem_mpich2_sendv(&iov, &n_iov, sreq->ch.vc, &again); if (mpi_errno) MPIU_ERR_POP (mpi_errno); } while (!again && n_iov > 0); if (again) /* not finished sending */ { sreq->dev.iov_offset = iov - sreq->dev.iov; sreq->dev.iov_count = n_iov; break; /* break out of send progress */ } else sreq->dev.iov_offset = 0; } else { do { MPID_nem_mpich2_send_seg(sreq->dev.segment_ptr, &sreq->dev.segment_first, sreq->dev.segment_size, sreq->ch.vc, &again); } while (!again && sreq->dev.segment_first < sreq->dev.segment_size); if (again) /* not finished sending */ break; /* break out of send progress */ } } else { sreq = MPIDI_CH3I_SendQ_head (CH3_NORMAL_QUEUE); MPIU_DBG_STMT (CH3_CHANNEL, VERBOSE, {if (sreq) MPIU_DBG_MSG (CH3_CHANNEL, VERBOSE, "Send: new sreq ");}); if (!sreq->ch.noncontig) { MPIU_Assert(sreq->dev.iov_count > 0 && sreq->dev.iov[sreq->dev.iov_offset].MPID_IOV_LEN > 0); iov = &sreq->dev.iov[sreq->dev.iov_offset]; n_iov = sreq->dev.iov_count; mpi_errno = MPID_nem_mpich2_sendv_header(&iov, &n_iov, sreq->ch.vc, &again); if (mpi_errno) MPIU_ERR_POP (mpi_errno); if (!again) { MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE] = sreq; while (!again && n_iov > 0) { mpi_errno = MPID_nem_mpich2_sendv(&iov, &n_iov, sreq->ch.vc, &again); if (mpi_errno) MPIU_ERR_POP (mpi_errno); } } if (again) /* not finished sending */ { sreq->dev.iov_offset = iov - sreq->dev.iov; sreq->dev.iov_count = n_iov; break; /* break out of send progress */ } else sreq->dev.iov_offset = 0; } else { MPID_nem_mpich2_send_seg_header(sreq->dev.segment_ptr, &sreq->dev.segment_first, sreq->dev.segment_size, &sreq->dev.pending_pkt, sreq->ch.header_sz, sreq->ch.vc, &again); if (!again) { MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE] = sreq; while (!again && sreq->dev.segment_first < sreq->dev.segment_size) { MPID_nem_mpich2_send_seg(sreq->dev.segment_ptr, &sreq->dev.segment_first, sreq->dev.segment_size, sreq->ch.vc, &again); } } if (again) /* not finished sending */ break; /* break out of send progress */ } } /* finished sending sreq */ MPIU_Assert(!again); if (!sreq->dev.OnDataAvail) { MPIU_Assert(MPIDI_Request_get_type(sreq) != MPIDI_REQUEST_TYPE_GET_RESP); MPIDI_CH3U_Request_complete(sreq); MPIDI_CH3I_SendQ_dequeue(CH3_NORMAL_QUEUE); MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE] = NULL; MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete"); } else { complete = 0; mpi_errno = sreq->dev.OnDataAvail(sreq->ch.vc, sreq, &complete); if (mpi_errno) MPIU_ERR_POP(mpi_errno); if (complete) { MPIDI_CH3I_SendQ_dequeue(CH3_NORMAL_QUEUE); MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE] = NULL; MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete"); } } } while (0); /* do the loop exactly once. Used so we can jump out of send progress using break. */ /* make progress on LMTs */ if (MPID_nem_lmt_shm_pending) { mpi_errno = MPID_nem_lmt_shm_progress(); if (mpi_errno) MPIU_ERR_POP(mpi_errno); } } while (completions == MPIDI_CH3I_progress_completion_count && is_blocking);#if MPICH_IS_THREADED MPIU_THREAD_CHECK_BEGIN; { if (is_blocking) { MPIDI_CH3I_Progress_continue(MPIDI_CH3I_progress_completion_count); } } MPIU_THREAD_CHECK_END;#endif fn_exit: /* Reset the progress state so it is fresh for the next iteration */ if (progress_state) progress_state->ch.completion_count = MPIDI_CH3I_progress_completion_count; MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS); return mpi_errno; fn_fail: goto fn_exit;}#ifdef MPICH_IS_THREADED
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -