📄 mpid_accumulate.c
字号:
* \e origin_datatype at \e origin_addr * to node \e target_rank into \e target_count number of \e target_datatype * into window location \e target_disp offset (window displacement units) * * According to the MPI Specification: * * Each datatype argument must be a predefined datatype or * a derived datatype, where all basic components are of the * same predefined datatype. Both datatype arguments must be * constructed from the same predefined datatype. * * \param[in] origin_addr Source buffer * \param[in] origin_count Number of datatype elements * \param[in] origin_datatype Source datatype * \param[in] target_rank Destination rank (target) * \param[in] target_disp Displacement factor in target buffer * \param[in] target_count Number of target datatype elements * \param[in] target_datatype Destination datatype * \param[in] op Operand to perform * \param[in] win_ptr Window * \return MPI_SUCCESS, MPI_ERR_RMA_SYNC, MPI_ERR_OP, * or error returned from MPIR_Localcopy, MPID_Segment_init, * mpid_queue_datatype, or DCMF_Send. * * \ref msginfo_usage\n * \ref accum_design */int MPID_Accumulate(void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPID_Win *win_ptr){ int mpi_errno = MPI_SUCCESS; int dt_contig, rank; MPID_Datatype *dtp; MPI_Aint dt_true_lb; MPIDI_msg_sz_t data_sz; int lpid; mpid_dt_info dti = {0}; int i, j; char *buf; int sent = 0; DCQuad xtra = {0}; int *refp = NULL; DCMF_Callback_t cb_send; char *s, *dd; DCMF_Request_t *reqp; MPIDU_Onesided_info_t *info; DCMF_Consistency consistency = win_ptr->_dev.my_cstcy; MPIU_THREADPRIV_DECL; MPID_MPI_STATE_DECL(MPID_STATE_MPID_WIN_ACCUMULATE); MPID_MPI_FUNC_ENTER(MPID_STATE_MPID_WIN_ACCUMULATE); MPIU_THREADPRIV_GET; MPIR_Nest_incr(); if (win_ptr->_dev.epoch_type == MPID_EPOTYPE_NONE || win_ptr->_dev.epoch_type == MPID_EPOTYPE_POST || !MPIDU_VALID_RMA_TARGET(win_ptr, target_rank)) { /* --BEGIN ERROR HANDLING-- */ MPIU_ERR_SETANDSTMT(mpi_errno, MPI_ERR_RMA_SYNC, goto fn_fail, "**rmasync"); /* --END ERROR HANDLING-- */ } if (op == MPI_REPLACE) { /* Just do a PUT instead... */ mpi_errno = MPID_Put(origin_addr, origin_count, origin_datatype, target_rank, target_disp, target_count, target_datatype, win_ptr); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } goto fn_exit; } MPIDI_Datatype_get_info(origin_count, origin_datatype, dt_contig, data_sz, dtp, dt_true_lb); if ((data_sz == 0) || (target_rank == MPI_PROC_NULL)) { goto fn_exit; } if (DATATYPE_ADDITIONAL(origin_datatype)) { dt_contig = 1; // treat MINLOC/MAXLOC types as contig data_sz = origin_count * dtp->extent; } rank = win_ptr->_dev.comm_ptr->rank; lpid = MPIDU_world_rank(win_ptr, target_rank); if (!DATATYPE_PREDEFINED(target_datatype)) { /* force map to get built but don't assume it was sent (use our lpid) */ (void)MPIDU_check_dt(mpid_my_lpid, target_datatype, &dti); MPID_assert(dti.map != NULL); MPID_assert(dti.map[0].dt != 0); } if (target_rank == rank) { /* * We still must have acquired the lock, unless * we specified NOCHECK. */ if (win_ptr->_dev.epoch_type == MPID_EPOTYPE_LOCK && !(win_ptr->_dev.epoch_assert & MPI_MODE_NOCHECK) && MPIDU_is_lock_free(win_ptr)) { /* --BEGIN ERROR HANDLING-- */ MPIU_ERR_SETANDSTMT(mpi_errno, MPI_ERR_RMA_SYNC, goto fn_fail, "**rmasync"); /* --END ERROR HANDLING-- */ } } else { MPIU_ERR_CHKANDJUMP1( (HANDLE_GET_KIND(op) != HANDLE_KIND_BUILTIN), mpi_errno, MPI_ERR_OP, "**opnotpredefined", "**opnotpredefined %d", op ); } if (dt_contig) { /* all builtin datatypes are also contig */ buf = origin_addr; cb_send.function = done_rqc_cb; } else { MPID_Segment segment; DLOOP_Offset last = data_sz; MPIDU_MALLOC(buf, char, data_sz + sizeof(int), mpi_errno, "MPID_Accumulate"); if (buf == NULL) { MPID_Abort(NULL, MPI_ERR_NO_SPACE, -1, "Unable to allocate non-" "contiguous buffer"); } refp = (int *)buf; xtra.w1 = (unsigned)refp; xtra.w2 = (unsigned)buf; cb_send.function = done_reffree_rqc_cb; buf += sizeof(int); *refp = 0; mpi_errno = MPID_Segment_init(origin_addr, origin_count, origin_datatype, &segment, 0); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } MPID_Segment_pack(&segment, 0, &last, buf); MPID_assert_debug(last == data_sz); } dd = win_ptr->_dev.coll_info[target_rank].base_addr + win_ptr->_dev.coll_info[target_rank].disp_unit * target_disp; if (DATATYPE_PREDEFINED(target_datatype)) { if (target_rank == rank) { /* local accumulate */ local_accumulate(win_ptr, dd, buf, lpid, target_datatype, op, target_count); } else { /* ! local accumulate */ if (DATATYPE_ADDITIONAL(target_datatype)) { MPID_Datatype_get_ptr(target_datatype, dtp); data_sz = dtp->extent; } else { data_sz = MPID_Datatype_get_basic_size(target_datatype); } xtra.w0 = (unsigned)&win_ptr->_dev.my_rma_pends; reqp = MPIDU_get_req(&xtra, &info); info->mpid_info_w0 = MPID_MSGTYPE_ACC; info->mpid_info_w1 = win_ptr->_dev.coll_info[target_rank].win_handle; info->mpid_info_w2 = rank; info->mpid_info_w3 = (unsigned)dd; info->mpid_info_w4 = target_datatype; info->mpid_info_w5 = op; info->mpid_info_w6 = target_count; info->mpid_info_w7 = 0; ++win_ptr->_dev.my_rma_pends; if (refp) { ++*refp; } ++sent; cb_send.clientdata = reqp; mpi_errno = DCMF_Send(&bg1s_sn_proto, reqp, cb_send, consistency, lpid, target_count * data_sz, buf, info->info, 2); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } ++win_ptr->_dev.coll_info[target_rank].rma_sends; } /* ! local accumulate */ } else { s = buf; if (refp) *refp = target_count * dti.map_len; for (j = 0; j < target_count; ++j) { for (i = 0; i < dti.map_len; ++i) { if (target_rank == rank) { /* local accumulate */ local_accumulate(win_ptr, dd + dti.map[i].off, s, lpid, dti.map[i].dt, op, dti.map[i].num); } else { /* ! local accumulate */ MPIDU_Progress_spin(win_ptr->_dev.my_rma_pends > MPIDI_Process.rma_pending); xtra.w0 = (unsigned)&win_ptr->_dev.my_rma_pends; reqp = MPIDU_get_req(&xtra, &info); info->mpid_info_w0 = MPID_MSGTYPE_ACC; info->mpid_info_w1 = win_ptr->_dev.coll_info[target_rank].win_handle; info->mpid_info_w2 = rank; info->mpid_info_w3 = (unsigned)dd + dti.map[i].off; info->mpid_info_w4 = dti.map[i].dt; info->mpid_info_w5 = op; info->mpid_info_w6 = dti.map[i].num; info->mpid_info_w7 = 0; ++win_ptr->_dev.my_rma_pends; ++sent; cb_send.clientdata = reqp; mpi_errno = DCMF_Send(&bg1s_sn_proto, reqp, cb_send, consistency, lpid, dti.map[i].len, s, info->info, 2); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } ++win_ptr->_dev.coll_info[target_rank].rma_sends; } /* ! local accumulate */ s += dti.map[i].len; } /* for map_len */ dd += dti.dtp->extent; } /* for target_count */ } /* * TBD: Could return without waiting for sends... */ MPIDU_Progress_spin(win_ptr->_dev.my_rma_pends > 0); if (sent == 0 && xtra.w2) { MPIDU_FREE(xtra.w2, mpi_errno, "MPID_Accumulate"); }fn_exit: MPIR_Nest_decr(); MPID_MPI_FUNC_EXIT(MPID_STATE_MPID_WIN_ACCUMULATE); return mpi_errno; /* --BEGIN ERROR HANDLING-- */fn_fail: goto fn_exit; /* --END ERROR HANDLING-- */}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -