📄 allreduce.c
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* $Id: allreduce.c,v 1.28 2002/11/12 21:44:24 thakur Exp $ * * (C) 2001 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */#include "mpiimpl.h"/* -- Begin Profiling Symbol Block for routine MPI_Allreduce */#if defined(HAVE_PRAGMA_WEAK)#pragma weak MPI_Allreduce = PMPI_Allreduce#elif defined(HAVE_PRAGMA_HP_SEC_DEF)#pragma _HP_SECONDARY_DEF PMPI_Allreduce MPI_Allreduce#elif defined(HAVE_PRAGMA_CRI_DUP)#pragma _CRI duplicate MPI_Allreduce as PMPI_Allreduce#endif/* -- End Profiling Symbol Block *//* Define MPICH_MPI_FROM_PMPI if weak symbols are not supported to build the MPI routines */#ifndef MPICH_MPI_FROM_PMPI#define MPI_Allreduce PMPI_AllreduceMPI_User_function *MPIR_Op_table[] = { MPIR_MAXF, MPIR_MINF, MPIR_SUM, MPIR_PROD, MPIR_LAND, MPIR_BAND, MPIR_LOR, MPIR_BOR, MPIR_LXOR, MPIR_BXOR, MPIR_MINLOC, MPIR_MAXLOC, };/* This is the default implementation of allreduce. The algorithm is: Algorithm: MPI_Allreduce For the homogeneous case, we use a recursive doubling algorithm (similar to the one in MPI_Allgather) for both short and long messages. Cost = lgp.alpha + n.lgp.beta + n.lgp.gamma For the heterogeneous case, we call MPI_Reduce followed by MPI_Bcast in order to meet the requirement that all processes must have the same result. Possible improvements: End Algorithm: MPI_Allreduce*/PMPI_LOCAL int MPIR_Allreduce ( void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPID_Comm *comm_ptr ){ int rc, is_homogeneous; int comm_size, rank; int mpi_errno = MPI_SUCCESS; MPI_Status status; int mask, dst, dst_tree_root, my_tree_root, nprocs_completed, k, i, j, tmp_mask, tree_root, is_commutative; MPI_Aint true_extent, true_lb; void *tmp_buf; MPI_User_function *uop; MPID_Op *op_ptr; MPI_Comm comm; MPICH_PerThread_t *p; if (count == 0) return MPI_SUCCESS; comm = comm_ptr->handle; is_homogeneous = 1;#ifdef MPID_HAS_HETERO if (comm_ptr->is_hetero) is_homogeneous = 0;#endif if (!is_homogeneous) { /* heterogeneous. To get the same result on all processes, we do a reduce to 0 and then broadcast. */ MPIR_Nest_incr(); mpi_errno = NMPI_Reduce ( sendbuf, recvbuf, count, datatype, op, 0, comm ); if (mpi_errno == MPI_ERR_OP || mpi_errno == MPI_SUCCESS) { /* Allow MPI_ERR_OP since we can continue from this error */ rc = NMPI_Bcast ( recvbuf, count, datatype, 0, comm ); if (rc) mpi_errno = rc; } MPIR_Nest_decr(); } else { /* homogeneous. Use recursive doubling algorithm similar to the one used in all_gather */ /* set op_errno to 0. stored in perthread structure */ MPID_GetPerThread(p); p->op_errno = 0; comm_size = comm_ptr->local_size; rank = comm_ptr->rank; if (HANDLE_GET_KIND(op) == HANDLE_KIND_BUILTIN) { is_commutative = 1; /* get the function by indexing into the op table */ uop = MPIR_Op_table[op%16 - 1]; } else { MPID_Op_get_ptr(op, op_ptr); if (op_ptr->kind == MPID_OP_USER_NONCOMMUTE) is_commutative = 0; else is_commutative = 1;#ifdef HAVE_CXX_BINDING if ((op_ptr->language == MPID_LANG_C) || (op_ptr->language == MPID_LANG_CXX)) #else if ((op_ptr->language == MPID_LANG_C))#endif uop = (MPI_User_function *) op_ptr->function.c_function; else uop = (MPI_User_function *) op_ptr->function.f77_function; } /* need to allocate temporary buffer to store incoming data*/ mpi_errno = NMPI_Type_get_true_extent(datatype, &true_lb, &true_extent); if (mpi_errno) return mpi_errno; tmp_buf = MPIU_Malloc(count*true_extent); if (!tmp_buf) { mpi_errno = MPIR_Err_create_code( MPI_ERR_OTHER, "**nomem", 0 ); return mpi_errno; } /* adjust for potential negative lower bound in datatype */ tmp_buf = (void *)((char*)tmp_buf - true_lb); /* copy local data into recvbuf */ if (sendbuf != MPI_IN_PLACE) { mpi_errno = MPIR_Localcopy(sendbuf, count, datatype, recvbuf, count, datatype); if (mpi_errno) return mpi_errno; } /* Lock for collective operation */ MPID_Comm_thread_lock( comm_ptr ); mask = 0x1; i = 0; while (mask < comm_size) { dst = rank ^ mask; dst_tree_root = dst >> i; dst_tree_root <<= i; my_tree_root = rank >> i; my_tree_root <<= i; if (dst < comm_size) { /* Send most current data, which is in recvbuf. Recv into tmp_buf */ mpi_errno = MPIC_Sendrecv(recvbuf, count, datatype, dst, MPIR_ALLREDUCE_TAG, tmp_buf, count, datatype, dst, MPIR_ALLREDUCE_TAG, comm, &status); if (mpi_errno) return mpi_errno; /* tmp_buf contains data received in this step. recvbuf contains data accumulated so far */ if (is_commutative || (dst_tree_root < my_tree_root)) { /* op is commutative OR the order is already right */ (*uop)(tmp_buf, recvbuf, &count, &datatype); } else { /* op is noncommutative and the order is not right */ (*uop)(recvbuf, tmp_buf, &count, &datatype); /* copy result back into recvbuf */ mpi_errno = MPIR_Localcopy(tmp_buf, count, datatype, recvbuf, count, datatype); if (mpi_errno) return mpi_errno; } } /* if some processes in this process's subtree in this step did not have any destination process to communicate with because of non-power-of-two, we need to send them the result. We use a logarithmic recursive-halfing algorithm for this. */ if (dst_tree_root + mask > comm_size) { nprocs_completed = comm_size - my_tree_root - mask; /* nprocs_completed is the number of processes in this subtree that have all the data. Send data to others in a tree fashion. First find root of current tree that is being divided into two. k is the number of least-significant bits in this process's rank that must be zeroed out to find the rank of the root */ j = mask; k = 0; while (j) { j >>= 1; k++; } k--; tmp_mask = mask >> 1; while (tmp_mask) { dst = rank ^ tmp_mask; tree_root = rank >> k; tree_root <<= k; /* send only if this proc has data and destination doesn't have data. at any step, multiple processes can send if they have the data */ if ((dst > rank) && (rank < tree_root + nprocs_completed) && (dst >= tree_root + nprocs_completed)) { /* send the current result */ mpi_errno = MPIC_Send(recvbuf, count, datatype, dst, MPIR_ALLREDUCE_TAG, comm); if (mpi_errno) return mpi_errno;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -