📄 red_scat.c
字号:
return mpi_errno; } /* --END ERROR HANDLING-- */ } } if (!is_commutative && (nbytes < MPIR_REDSCAT_NONCOMMUTATIVE_SHORT_MSG)) { /* noncommutative and short messages, use recursive doubling. */ /* need to allocate temporary buffer to receive incoming data*/ tmp_recvbuf = MPIU_Malloc(total_count*(MPIR_MAX(true_extent,extent))); /* --BEGIN ERROR HANDLING-- */ if (!tmp_recvbuf) { mpi_errno = MPIR_Err_create_code( MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0 ); return mpi_errno; } /* --END ERROR HANDLING-- */ /* adjust for potential negative lower bound in datatype */ tmp_recvbuf = (void *)((char*)tmp_recvbuf - true_lb); /* need to allocate another temporary buffer to accumulate results */ tmp_results = MPIU_Malloc(total_count*(MPIR_MAX(true_extent,extent))); /* --BEGIN ERROR HANDLING-- */ if (!tmp_results) { mpi_errno = MPIR_Err_create_code( MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0 ); return mpi_errno; } /* --END ERROR HANDLING-- */ /* adjust for potential negative lower bound in datatype */ tmp_results = (void *)((char*)tmp_results - true_lb); /* copy sendbuf into tmp_results */ if (sendbuf != MPI_IN_PLACE) mpi_errno = MPIR_Localcopy(sendbuf, total_count, datatype, tmp_results, total_count, datatype); else mpi_errno = MPIR_Localcopy(recvbuf, total_count, datatype, tmp_results, total_count, datatype); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); return mpi_errno; } /* --END ERROR HANDLING-- */ mask = 0x1; i = 0; while (mask < comm_size) { dst = rank ^ mask; dst_tree_root = dst >> i; dst_tree_root <<= i; my_tree_root = rank >> i; my_tree_root <<= i; /* At step 1, processes exchange (n-n/p) amount of data; at step 2, (n-2n/p) amount of data; at step 3, (n-4n/p) amount of data, and so forth. We use derived datatypes for this. At each step, a process does not need to send data indexed from my_tree_root to my_tree_root+mask-1. Similarly, a process won't receive data indexed from dst_tree_root to dst_tree_root+mask-1. */ /* calculate sendtype */ blklens[0] = blklens[1] = 0; for (j=0; j<my_tree_root; j++) blklens[0] += recvcnts[j]; for (j=my_tree_root+mask; j<comm_size; j++) blklens[1] += recvcnts[j]; dis[0] = 0; dis[1] = blklens[0]; for (j=my_tree_root; (j<my_tree_root+mask) && (j<comm_size); j++) dis[1] += recvcnts[j]; NMPI_Type_indexed(2, blklens, dis, datatype, &sendtype); NMPI_Type_commit(&sendtype); /* calculate recvtype */ blklens[0] = blklens[1] = 0; for (j=0; j<dst_tree_root && j<comm_size; j++) blklens[0] += recvcnts[j]; for (j=dst_tree_root+mask; j<comm_size; j++) blklens[1] += recvcnts[j]; dis[0] = 0; dis[1] = blklens[0]; for (j=dst_tree_root; (j<dst_tree_root+mask) && (j<comm_size); j++) dis[1] += recvcnts[j]; NMPI_Type_indexed(2, blklens, dis, datatype, &recvtype); NMPI_Type_commit(&recvtype); received = 0; if (dst < comm_size) { /* tmp_results contains data to be sent in each step. Data is received in tmp_recvbuf and then accumulated into tmp_results. accumulation is done later below. */ mpi_errno = MPIC_Sendrecv(tmp_results, 1, sendtype, dst, MPIR_REDUCE_SCATTER_TAG, tmp_recvbuf, 1, recvtype, dst, MPIR_REDUCE_SCATTER_TAG, comm, MPI_STATUS_IGNORE); received = 1; /* --BEGIN ERROR HANDLING-- */ if (mpi_errno) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); return mpi_errno; } /* --END ERROR HANDLING-- */ } /* if some processes in this process's subtree in this step did not have any destination process to communicate with because of non-power-of-two, we need to send them the result. We use a logarithmic recursive-halfing algorithm for this. */ if (dst_tree_root + mask > comm_size) { nprocs_completed = comm_size - my_tree_root - mask; /* nprocs_completed is the number of processes in this subtree that have all the data. Send data to others in a tree fashion. First find root of current tree that is being divided into two. k is the number of least-significant bits in this process's rank that must be zeroed out to find the rank of the root */ j = mask; k = 0; while (j) { j >>= 1; k++; } k--; tmp_mask = mask >> 1; while (tmp_mask) { dst = rank ^ tmp_mask; tree_root = rank >> k; tree_root <<= k; /* send only if this proc has data and destination doesn't have data. at any step, multiple processes can send if they have the data */ if ((dst > rank) && (rank < tree_root + nprocs_completed) && (dst >= tree_root + nprocs_completed)) { /* send the current result */ mpi_errno = MPIC_Send(tmp_recvbuf, 1, recvtype, dst, MPIR_REDUCE_SCATTER_TAG, comm); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); return mpi_errno; } /* --END ERROR HANDLING-- */ } /* recv only if this proc. doesn't have data and sender has data */ else if ((dst < rank) && (dst < tree_root + nprocs_completed) && (rank >= tree_root + nprocs_completed)) { mpi_errno = MPIC_Recv(tmp_recvbuf, 1, recvtype, dst, MPIR_REDUCE_SCATTER_TAG, comm, MPI_STATUS_IGNORE); received = 1; /* --BEGIN ERROR HANDLING-- */ if (mpi_errno) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); return mpi_errno; } /* --END ERROR HANDLING-- */ } tmp_mask >>= 1; k--; } } /* The following reduction is done here instead of after the MPIC_Sendrecv or MPIC_Recv above. This is because to do it above, in the noncommutative case, we would need an extra temp buffer so as not to overwrite temp_recvbuf, because temp_recvbuf may have to be communicated to other processes in the non-power-of-two case. To avoid that extra allocation, we do the reduce here. */ if (received) { if (is_commutative || (dst_tree_root < my_tree_root)) {#ifdef HAVE_CXX_BINDING if (is_cxx_uop) { (*MPIR_Process.cxx_call_op_fn)( tmp_recvbuf, tmp_results, blklens[0], datatype, uop); (*MPIR_Process.cxx_call_op_fn)( ((char *)tmp_recvbuf + dis[1]*extent), ((char *)tmp_results + dis[1]*extent), blklens[1], datatype, uop ); } else#endif { (*uop)(tmp_recvbuf, tmp_results, &blklens[0], &datatype); (*uop)(((char *)tmp_recvbuf + dis[1]*extent), ((char *)tmp_results + dis[1]*extent), &blklens[1], &datatype); } } else {#ifdef HAVE_CXX_BINDING if (is_cxx_uop) { (*MPIR_Process.cxx_call_op_fn)( tmp_results, tmp_recvbuf, blklens[0], datatype, uop ); (*MPIR_Process.cxx_call_op_fn)( ((char *)tmp_results + dis[1]*extent), ((char *)tmp_recvbuf + dis[1]*extent), blklens[1], datatype, uop ); } else #endif { (*uop)(tmp_results, tmp_recvbuf, &blklens[0], &datatype); (*uop)(((char *)tmp_results + dis[1]*extent), ((char *)tmp_recvbuf + dis[1]*extent), &blklens[1], &datatype); } /* copy result back into tmp_results */ mpi_errno = MPIR_Localcopy(tmp_recvbuf, 1, recvtype, tmp_results, 1, recvtype); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); return mpi_errno; } /* --END ERROR HANDLING-- */ } } NMPI_Type_free(&sendtype); NMPI_Type_free(&recvtype); mask <<= 1; i++; } /* now copy final results from tmp_results to recvbuf */ mpi_errno = MPIR_Localcopy(((char *)tmp_results+disps[rank]*extent), recvcnts[rank], datatype, recvbuf, recvcnts[rank], datatype); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); return mpi_errno; } /* --END ERROR HANDLING-- */ MPIU_Free((char *)tmp_recvbuf+true_lb); MPIU_Free((char *)tmp_results+true_lb); } MPIU_Free(disps); MPIR_Nest_decr(); /* check if multiple threads are calling this collective function */ MPIDU_ERR_CHECK_MULTIPLE_THREADS_EXIT( comm_ptr ); if (p->op_errno) mpi_errno = p->op_errno; return (mpi_errno);}/* end:nested *//* begin:nested *//* not declared static because a machine-specific function may call this one in some cases */int MPIR_Reduce_scatter_inter ( void *sendbuf, void *recvbuf, int *recvcnts, MPI_Datatype datatype, MPI_Op op, MPID_Comm *comm_ptr ){/* Intercommunicator Reduce_scatter. We first do an intercommunicator reduce to rank 0 on left group, then an intercommunicator reduce to rank 0 on right group, followed by local intracommunicator scattervs in each group.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -