📄 ch3_progress.c
字号:
/* Note that this routine is only called if threads are enabled; it does not need to check whether runtime threads are enabled */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_delay#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int MPIDI_CH3I_Progress_delay(unsigned int completion_count){ int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_DELAY); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_DELAY);# if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX) { while (completion_count == MPIDI_CH3I_progress_completion_count && MPIDI_CH3I_progress_blocked == TRUE) { MPID_Thread_cond_wait(&MPIDI_CH3I_progress_completion_cond, &MPIR_ThreadInfo.global_mutex); } }# endif MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_DELAY); return mpi_errno;}/* end MPIDI_CH3I_Progress_delay() */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_continue#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int MPIDI_CH3I_Progress_continue(unsigned int completion_count){ int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_CONTINUE); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_CONTINUE);# if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX) { MPID_Thread_cond_broadcast(&MPIDI_CH3I_progress_completion_cond); }# endif MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_CONTINUE); return mpi_errno;}/* end MPIDI_CH3I_Progress_continue() */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_wakeup#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)void MPIDI_CH3I_Progress_wakeup(void){ MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_WAKEUP); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_WAKEUP); /* no processes sleep in nemesis progress */ MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_WAKEUP); return;}#endif /* MPICH_IS_THREADED */#undef FUNCNAME#define FUNCNAME MPID_nem_handle_pkt#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPID_nem_handle_pkt(MPIDI_VC_t *vc, char *buf, MPIDI_msg_sz_t buflen){ int mpi_errno = MPI_SUCCESS; MPID_Request *rreq; int complete; MPIDI_CH3I_VC *vc_ch = (MPIDI_CH3I_VC *)vc->channel_private; MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_HANDLE_PKT); MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_HANDLE_PKT); do { if (!vc_ch->recv_active && vc_ch->pending_pkt_len == 0 && buflen >= sizeof(MPIDI_CH3_Pkt_t)) { /* handle fast-path first: received a new whole message */ do { MPIDI_msg_sz_t len = buflen; MPIDI_CH3_Pkt_t *pkt = (MPIDI_CH3_Pkt_t *)buf; MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "received new message"); mpi_errno = pktArray[pkt->type](vc, pkt, &len, &rreq); if (mpi_errno) MPIU_ERR_POP(mpi_errno); buflen -= len; buf += len; MPIU_DBG_STMT(CH3_CHANNEL, VERBOSE, if (!rreq) MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "...completed immediately")); } while (!rreq && buflen >= sizeof(MPIDI_CH3_Pkt_t)); if (!rreq) continue; /* Channel fields don't get initialized on request creation, init them here */ if (rreq) rreq->dev.iov_offset = 0; } else if (vc_ch->recv_active) { MPIU_Assert(vc_ch->pending_pkt_len == 0); MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "continuing recv"); rreq = vc_ch->recv_active; } else { /* collect header fragments in vc's pending_pkt */ MPIDI_msg_sz_t copylen; MPIDI_msg_sz_t pktlen; MPIDI_CH3_Pkt_t *pkt = (MPIDI_CH3_Pkt_t *)vc_ch->pending_pkt; MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "received header fragment"); copylen = ((vc_ch->pending_pkt_len + buflen <= sizeof(MPIDI_CH3_Pkt_t)) ? buflen : sizeof(MPIDI_CH3_Pkt_t) - vc_ch->pending_pkt_len); MPID_NEM_MEMCPY((char *)vc_ch->pending_pkt + vc_ch->pending_pkt_len, buf, copylen); vc_ch->pending_pkt_len += copylen; if (vc_ch->pending_pkt_len < sizeof(MPIDI_CH3_Pkt_t)) goto fn_exit; /* we have a whole header */ MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, " completed header"); MPIU_Assert(vc_ch->pending_pkt_len == sizeof(MPIDI_CH3_Pkt_t)); buflen -= copylen; buf += copylen; pktlen = sizeof(MPIDI_CH3_Pkt_t); mpi_errno = pktArray[pkt->type](vc, pkt, &pktlen, &rreq); if (mpi_errno) MPIU_ERR_POP(mpi_errno); MPIU_Assert(pktlen == sizeof(MPIDI_CH3_Pkt_t)); vc_ch->pending_pkt_len = 0; if (!rreq) { MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "...completed immediately"); continue; } /* Channel fields don't get initialized on request creation, init them here */ rreq->dev.iov_offset = 0; } /* copy data into user buffer described by iov in rreq */ MPIU_Assert(rreq); MPIU_Assert(rreq->dev.iov_count > 0 && rreq->dev.iov[rreq->dev.iov_offset].MPID_IOV_LEN > 0); MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, " copying into user buffer from IOV"); if (buflen == 0) { vc_ch->recv_active = rreq; goto fn_exit; } complete = 0; while (buflen && !complete) { MPID_IOV *iov; int n_iov; iov = &rreq->dev.iov[rreq->dev.iov_offset]; n_iov = rreq->dev.iov_count; while (n_iov && buflen >= iov->MPID_IOV_LEN) { int iov_len = iov->MPID_IOV_LEN; MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, " %d\n", iov_len); MPID_NEM_MEMCPY (iov->MPID_IOV_BUF, buf, iov_len); buflen -= iov_len; buf += iov_len; --n_iov; ++iov; } if (n_iov) { if (buflen > 0) { MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, " %d\n", buflen); MPID_NEM_MEMCPY (iov->MPID_IOV_BUF, buf, buflen); iov->MPID_IOV_BUF = (void *)((char *)iov->MPID_IOV_BUF + buflen); iov->MPID_IOV_LEN -= buflen; buflen = 0; } rreq->dev.iov_offset = iov - rreq->dev.iov; rreq->dev.iov_count = n_iov; vc_ch->recv_active = rreq; MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, " remaining: %d bytes + %d iov entries\n", iov->MPID_IOV_LEN, n_iov - rreq->dev.iov_offset - 1)); } else { int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *); reqFn = rreq->dev.OnDataAvail; if (!reqFn) { MPIU_Assert(MPIDI_Request_get_type(rreq) != MPIDI_REQUEST_TYPE_GET_RESP); MPIDI_CH3U_Request_complete(rreq); complete = TRUE; } else { mpi_errno = reqFn(vc, rreq, &complete); if (mpi_errno) MPIU_ERR_POP(mpi_errno); } if (!complete) { rreq->dev.iov_offset = 0; MPIU_Assert(rreq->dev.iov_count > 0 && rreq->dev.iov[rreq->dev.iov_offset].MPID_IOV_LEN > 0); vc_ch->recv_active = rreq; MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "...not complete"); } else { MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "...complete"); vc_ch->recv_active = NULL; } } } } while (buflen); fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_HANDLE_PKT); return mpi_errno; fn_fail: goto fn_exit;}#define set_request_info(rreq_, pkt_, msg_type_) \{ \ (rreq_)->status.MPI_SOURCE = (pkt_)->match.rank; \ (rreq_)->status.MPI_TAG = (pkt_)->match.tag; \ (rreq_)->status.count = (pkt_)->data_sz; \ (rreq_)->dev.sender_req_id = (pkt_)->sender_req_id; \ (rreq_)->dev.recv_data_sz = (pkt_)->data_sz; \ MPIDI_Request_set_seqnum((rreq_), (pkt_)->seqnum); \ MPIDI_Request_set_msg_type((rreq_), (msg_type_)); \}#ifdef BYPASS_PROGRESS#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Progress_poke_with_matching#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)MPID_Request *MPIDI_CH3_Progress_poke_with_matching (int source, int tag, MPID_Comm *comm,int context_id,int *foundp, void *buf, int count, MPI_Datatype datatype,MPI_Status * status){ int mpi_errno = MPI_SUCCESS; MPID_Request *rreq = NULL; MPID_nem_cell_ptr_t cell = NULL; int in_fbox; int dt_contig; MPI_Aint dt_true_lb; MPIDI_msg_sz_t userbuf_sz; MPID_Datatype *dt_ptr; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_POKE_WITH_MATCHING); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_POKE_WITH_MATCHING); MPIDI_DBG_PRINTF((50, FCNAME, "entering, buf=%p, count=%d, dtype=%d", buf,count,datatype)); *foundp = FALSE ; MPIDI_Datatype_get_info(count, datatype, dt_contig, userbuf_sz, dt_ptr, dt_true_lb); /* handle only contiguous types (for now) and one-cell packets */ if((dt_contig) && (( userbuf_sz <= MPID_NEM__BYPASS_Q_MAX_VAL))) { /*PAPI_reset(PAPI_EventSet);*/ MPID_nem_mpich2_blocking_recv (&cell, &in_fbox); /*PAPI_accum(PAPI_EventSet, PAPI_values2); */ if (cell) { char *cell_buf = cell->pkt.mpich2.payload; switch(((MPIDI_CH3_Pkt_t *)cell_buf)->type) { case MPIDI_CH3_PKT_EAGER_SEND: { MPIDI_CH3_Pkt_eager_send_t *eager_pkt = &((MPIDI_CH3_Pkt_t *)cell_buf)->eager_send; int payload_len = eager_pkt->data_sz; cell_buf += sizeof (MPIDI_CH3_Pkt_t); if(((eager_pkt->match.tag == tag )||(tag == MPI_ANY_TAG )) && ((eager_pkt->match.rank == source)||(source == MPI_ANY_SOURCE)) && (eager_pkt->match.context_id == context_id)) { /* cell matches */ *foundp = TRUE; if (payload_len > 0) { if (payload_len <= userbuf_sz) { MPID_NEM_MEMCPY((char *)(buf+ dt_true_lb), cell_buf,payload_len); } else { /* error : truncate */ MPID_NEM_MEMCPY((char *)(buf+dt_true_lb),cell_buf, userbuf_sz); status->MPI_ERROR = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_TRUNCATE,"**truncate", "**truncate %d %d %d %d", status->MPI_SOURCE,status->MPI_TAG,payload_len, userbuf_sz ); mpi_errno = status->MPI_ERROR; goto exit_fn; } } } else { /* create a request for the cell, enqueue it on the unexpected queue */ rreq = MPID_Request_create(); if (rreq != NULL) { MPIU_Object_set_ref(rreq, 2); rreq->kind = MPID_REQUEST_RECV; rreq->dev.match.tag = eager_pkt->match.tag ; rreq->dev.match.rank = eager_pkt->match.rank; rreq->dev.match.context_id = eager_pkt->match.context_id; rreq->dev.tmpbuf = MPIU_Malloc(userbuf_sz); MPID_NEM_MEMCPY((char *)(rreq->dev.tmpbuf),cell_buf, userbuf_sz); rreq->dev.next = NULL; if (*MPID_Recvq_unexpected_tail_ptr != NULL) { (*MPID_Recvq_unexpected_tail_ptr)->dev.next = rreq; } else { *MPID_Recvq_unexpected_head_ptr = rreq; } *MPID_Recvq_unexpected_tail_ptr = rreq; } } } break; case MPIDI_CH3_PKT_READY_SEND: { MPIDI_CH3_Pkt_ready_send_t *ready_pkt = &((MPIDI_CH3_Pkt_t *)cell_buf)->ready_send; fprintf(stdout,"ERROR : MPIDI_CH3_PKT_READY_SEND not handled (yet) \n"); } break; case MPIDI_CH3_PKT_EAGER_SYNC_SEND: { MPIDI_CH3_Pkt_eager_send_t *es_pkt = &((MPIDI_CH3_Pkt_t *)cell_buf)->eager_send; int payload_len = es_pkt->data_sz; cell_buf += sizeof (MPIDI_CH3_Pkt_t); if(((es_pkt->match.tag == tag )||(tag == MPI_ANY_TAG )) && ((es_pkt->match.rank == source)||(source == MPI_ANY_SOURCE)) && (es_pkt->match.context_id == context_id)) { MPIDI_CH3_Pkt_t upkt; MPIDI_CH3_Pkt_eager_sync_ack_t * const esa_pkt = &upkt.eager_sync_ack; MPID_Request * esa_req = NULL; MPIDI_VC_t *vc; /* cell matches */ *foundp = TRUE; if (payload_len > 0)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -