⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ch3_progress.c

📁 fortran并行计算包
💻 C
📖 第 1 页 / 共 5 页
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* *  (C) 2001 by Argonne National Laboratory. *      See COPYRIGHT in top-level directory. */#include "mpidi_ch3_impl.h"/*#include "mpidpre.h"*/#include "mpid_nem_impl.h"#if defined (MPID_NEM_INLINE) && MPID_NEM_INLINE#include "mpid_nem_inline.h"#endif#include "pmi.h"#define PKTARRAY_SIZE (MPIDI_NEM_PKT_END+1)static MPIDI_CH3_PktHandler_Fcn *pktArray[PKTARRAY_SIZE];#ifndef MPIDI_POSTED_RECV_ENQUEUE_HOOK#define MPIDI_POSTED_RECV_ENQUEUE_HOOK(x) do {} while (0)#endif#ifndef MPIDI_POSTED_RECV_DEQUEUE_HOOK#define MPIDI_POSTED_RECV_DEQUEUE_HOOK(x) do {} while (0)#endif#ifdef BY_PASS_PROGRESSextern MPID_Request ** const MPID_Recvq_posted_head_ptr;extern MPID_Request ** const MPID_Recvq_unexpected_head_ptr;extern MPID_Request ** const MPID_Recvq_posted_tail_ptr;extern MPID_Request ** const MPID_Recvq_unexpected_tail_ptr;#endifvolatile unsigned int MPIDI_CH3I_progress_completion_count = 0;/* NEMESIS MULTITHREADING: Extra Data Structures Added */#ifdef MPICH_IS_THREADEDvolatile int MPIDI_CH3I_progress_blocked = FALSE;volatile int MPIDI_CH3I_progress_wakeup_signalled = FALSE;static MPID_Thread_cond_t MPIDI_CH3I_progress_completion_cond;static int MPIDI_CH3I_Progress_delay(unsigned int completion_count);static int MPIDI_CH3I_Progress_continue(unsigned int completion_count);#endif /* MPICH_IS_THREADED *//* NEMESIS MULTITHREADING - End block*/struct MPID_Request *MPIDI_CH3I_sendq_head[CH3_NUM_QUEUES] = {0};struct MPID_Request *MPIDI_CH3I_sendq_tail[CH3_NUM_QUEUES] = {0};struct MPID_Request *MPIDI_CH3I_active_send[CH3_NUM_QUEUES] = {0};#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Progress (MPID_Progress_state *progress_state, int is_blocking){    unsigned completions = MPIDI_CH3I_progress_completion_count;    int mpi_errno = MPI_SUCCESS;    int complete;#if !defined(ENABLE_NO_SCHED_YIELD) || defined(MPICH_IS_THREADED)    int pollcount = 0;#endif    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS);    do    {	MPID_Request        *sreq;	MPID_Request        *rreq;	MPID_nem_cell_ptr_t  cell;	int                  in_fbox = 0;	MPIDI_VC_t          *vc;#ifdef MPICH_IS_THREADED        MPIU_THREAD_CHECK_BEGIN;        {            if (pollcount >= MPID_NEM_POLLS_BEFORE_YIELD)            {                pollcount = 0;                MPIDI_CH3I_progress_blocked = TRUE;                MPID_Thread_mutex_unlock(&MPIR_ThreadInfo.global_mutex);                MPID_Thread_yield();                MPID_Thread_mutex_lock(&MPIR_ThreadInfo.global_mutex);                MPIDI_CH3I_progress_blocked = FALSE;                MPIDI_CH3I_progress_wakeup_signalled = FALSE;            }            ++pollcount;        }        MPIU_THREAD_CHECK_END;#elif !defined(ENABLE_NO_SCHED_YIELD)        if (pollcount >= MPID_NEM_POLLS_BEFORE_YIELD)        {            pollcount = 0;            sched_yield();        }        ++pollcount;#endif        do /* receive progress */        {#ifdef MPICH_IS_THREADED            MPIU_THREAD_CHECK_BEGIN;            {                if (MPIDI_CH3I_progress_blocked == TRUE)                {                    /* another thread is already blocking in the progress engine.*/                    break; /* break out of receive block */                }            }            MPIU_THREAD_CHECK_END;#endif            /* make progress receiving */            /* check queue */            if (!MPID_nem_lmt_shm_pending && !MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE]                && !MPIDI_CH3I_SendQ_head(CH3_NORMAL_QUEUE) && is_blocking#ifdef MPICH_IS_THREADED#ifdef HAVE_RUNTIME_THREADCHECK                && !MPIR_ThreadInfo.isThreaded#else                && 0#endif#endif                )            {                mpi_errno = MPID_nem_mpich2_blocking_recv(&cell, &in_fbox);            }            else            {                mpi_errno = MPID_nem_mpich2_test_recv(&cell, &in_fbox);            }            if (mpi_errno) MPIU_ERR_POP (mpi_errno);            if (cell)            {                char            *cell_buf    = (char *)cell->pkt.mpich2.payload;                MPIDI_msg_sz_t   payload_len = cell->pkt.mpich2.datalen;                MPIDI_CH3_Pkt_t *pkt         = (MPIDI_CH3_Pkt_t *)cell_buf;                /* Empty packets are not allowed */                MPIU_Assert(payload_len >= 0);                if (in_fbox)                {                    MPIDI_CH3I_VC *vc_ch;                    MPIDI_msg_sz_t buflen = payload_len;                    /* This packet must be the first packet of a new message */                    MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "Recv pkt from fbox");                    MPIU_Assert(payload_len >= sizeof (MPIDI_CH3_Pkt_t));                    MPIDI_PG_Get_vc(MPIDI_Process.my_pg, MPID_NEM_FBOX_SOURCE(cell), &vc);                    MPIU_Assert(((MPIDI_CH3I_VC *)vc->channel_private)->recv_active == NULL &&                                ((MPIDI_CH3I_VC *)vc->channel_private)->pending_pkt_len == 0);                    vc_ch = (MPIDI_CH3I_VC *)vc->channel_private;                    mpi_errno = pktArray[pkt->type](vc, pkt, &buflen, &rreq);                    if (mpi_errno) MPIU_ERR_POP(mpi_errno);                    if (!rreq)                    {                        MPID_nem_mpich2_release_fbox(cell);                        break; /* break out of recv progress block */                    }                    /* we received a truncated packet, handle it with handle_pkt */                    vc_ch->recv_active = rreq;                    cell_buf    += buflen;                    payload_len -= buflen;                    mpi_errno = MPID_nem_handle_pkt(vc, cell_buf, payload_len);                    if (mpi_errno) MPIU_ERR_POP(mpi_errno);                    MPID_nem_mpich2_release_fbox(cell);                    /* the whole message should have been handled */                    MPIU_Assert(!vc_ch->recv_active);                    break; /* break out of recv progress block */                }                MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "Recv pkt from queue");                MPIDI_PG_Get_vc(MPIDI_Process.my_pg, MPID_NEM_CELL_SOURCE(cell), &vc);                mpi_errno = MPID_nem_handle_pkt(vc, cell_buf, payload_len);                if (mpi_errno) MPIU_ERR_POP(mpi_errno);                MPID_nem_mpich2_release_cell(cell, vc);                break; /* break out of recv progress block */            }        }        while(0);  /* do the loop exactly once.  Used so we can jump out of recv progress using break. */	/* make progress sending */	do        {            MPID_IOV *iov;            int n_iov;            int again = 0;            if (MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE] == NULL && MPIDI_CH3I_SendQ_head(CH3_NORMAL_QUEUE) == NULL)            {#ifdef MPICH_IS_THREADED                MPIU_THREAD_CHECK_BEGIN;                {                    if (MPIDI_CH3I_progress_blocked == TRUE && is_blocking && !MPID_nem_lmt_shm_pending)                    {                        /* There's nothing to send and there's another thread already blocking in the progress engine.*/                        MPIDI_CH3I_Progress_delay(MPIDI_CH3I_progress_completion_count);                    }                }                MPIU_THREAD_CHECK_END;#endif                /* there are no pending sends */                break; /* break out of send progress */            }            sreq = MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE];            MPIU_DBG_STMT(CH3_CHANNEL, VERBOSE, {if (sreq) MPIU_DBG_MSG (CH3_CHANNEL, VERBOSE, "Send: cont sreq");});            if (sreq)            {                if (!sreq->ch.noncontig)                {                    MPIU_Assert(sreq->dev.iov_count > 0 && sreq->dev.iov[sreq->dev.iov_offset].MPID_IOV_LEN > 0);                    iov = &sreq->dev.iov[sreq->dev.iov_offset];                    n_iov = sreq->dev.iov_count;                    do                    {                        mpi_errno = MPID_nem_mpich2_sendv(&iov, &n_iov, sreq->ch.vc, &again);                        if (mpi_errno) MPIU_ERR_POP (mpi_errno);                    }                    while (!again && n_iov > 0);                    if (again) /* not finished sending */                    {                        sreq->dev.iov_offset = iov - sreq->dev.iov;                        sreq->dev.iov_count = n_iov;                        break; /* break out of send progress */                    }                    else                        sreq->dev.iov_offset = 0;                }                else                {                    do                    {                        MPID_nem_mpich2_send_seg(sreq->dev.segment_ptr, &sreq->dev.segment_first, sreq->dev.segment_size,                                                 sreq->ch.vc, &again);                    }                    while (!again && sreq->dev.segment_first < sreq->dev.segment_size);                    if (again) /* not finished sending */                        break; /* break out of send progress */                }            }            else            {                sreq = MPIDI_CH3I_SendQ_head (CH3_NORMAL_QUEUE);                MPIU_DBG_STMT (CH3_CHANNEL, VERBOSE, {if (sreq) MPIU_DBG_MSG (CH3_CHANNEL, VERBOSE, "Send: new sreq ");});                if (!sreq->ch.noncontig)                {                    MPIU_Assert(sreq->dev.iov_count > 0 && sreq->dev.iov[sreq->dev.iov_offset].MPID_IOV_LEN > 0);                    iov = &sreq->dev.iov[sreq->dev.iov_offset];                    n_iov = sreq->dev.iov_count;                    mpi_errno = MPID_nem_mpich2_sendv_header(&iov, &n_iov, sreq->ch.vc, &again);                    if (mpi_errno) MPIU_ERR_POP (mpi_errno);                    if (!again)                    {                        MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE] = sreq;                        while (!again && n_iov > 0)                        {                            mpi_errno = MPID_nem_mpich2_sendv(&iov, &n_iov, sreq->ch.vc, &again);                            if (mpi_errno) MPIU_ERR_POP (mpi_errno);                        }                    }                    if (again) /* not finished sending */                    {                        sreq->dev.iov_offset = iov - sreq->dev.iov;                        sreq->dev.iov_count = n_iov;                        break; /* break out of send progress */                    }                    else                        sreq->dev.iov_offset = 0;                }                else                {                    MPID_nem_mpich2_send_seg_header(sreq->dev.segment_ptr, &sreq->dev.segment_first, sreq->dev.segment_size,                                                    &sreq->dev.pending_pkt, sreq->ch.header_sz, sreq->ch.vc, &again);                    if (!again)                    {                        MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE] = sreq;                        while (!again && sreq->dev.segment_first < sreq->dev.segment_size)                        {                            MPID_nem_mpich2_send_seg(sreq->dev.segment_ptr, &sreq->dev.segment_first, sreq->dev.segment_size,                                                     sreq->ch.vc, &again);                        }                    }                    if (again) /* not finished sending */                        break; /* break out of send progress */                }            }            /* finished sending sreq */            MPIU_Assert(!again);            if (!sreq->dev.OnDataAvail)            {                MPIU_Assert(MPIDI_Request_get_type(sreq) != MPIDI_REQUEST_TYPE_GET_RESP);                MPIDI_CH3U_Request_complete(sreq);                MPIDI_CH3I_SendQ_dequeue(CH3_NORMAL_QUEUE);                MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE] = NULL;                MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");            }            else            {                complete = 0;                mpi_errno = sreq->dev.OnDataAvail(sreq->ch.vc, sreq, &complete);                if (mpi_errno) MPIU_ERR_POP(mpi_errno);                if (complete)                {                    MPIDI_CH3I_SendQ_dequeue(CH3_NORMAL_QUEUE);                    MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE] = NULL;                    MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");                }            }        }        while (0); /* do the loop exactly once.  Used so we can jump out of send progress using break. */        /* make progress on LMTs */        if (MPID_nem_lmt_shm_pending)        {            mpi_errno = MPID_nem_lmt_shm_progress();            if (mpi_errno) MPIU_ERR_POP(mpi_errno);        }    }    while (completions == MPIDI_CH3I_progress_completion_count && is_blocking);#if MPICH_IS_THREADED    MPIU_THREAD_CHECK_BEGIN;    {        if (is_blocking)        {            MPIDI_CH3I_Progress_continue(MPIDI_CH3I_progress_completion_count);        }    }    MPIU_THREAD_CHECK_END;#endif fn_exit:    /* Reset the progress state so it is fresh for the next iteration */    if (progress_state)        progress_state->ch.completion_count = MPIDI_CH3I_progress_completion_count;    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS);    return mpi_errno; fn_fail:    goto fn_exit;}#ifdef MPICH_IS_THREADED

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -