📄 allgather.c
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* * * (C) 2001 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */#include "mpiimpl.h"/* -- Begin Profiling Symbol Block for routine MPI_Allgather */#if defined(HAVE_PRAGMA_WEAK)#pragma weak MPI_Allgather = PMPI_Allgather#elif defined(HAVE_PRAGMA_HP_SEC_DEF)#pragma _HP_SECONDARY_DEF PMPI_Allgather MPI_Allgather#elif defined(HAVE_PRAGMA_CRI_DUP)#pragma _CRI duplicate MPI_Allgather as PMPI_Allgather#endif/* -- End Profiling Symbol Block *//* Define MPICH_MPI_FROM_PMPI if weak symbols are not supported to build the MPI routines */#ifndef MPICH_MPI_FROM_PMPI#define MPI_Allgather PMPI_Allgather/* This is the default implementation of allgather. The algorithm is: Algorithm: MPI_Allgather For short messages and non-power-of-two no. of processes, we use the algorithm from the Jehoshua Bruck et al IEEE TPDS Nov 97 paper. It is a variant of the disemmination algorithm for barrier. It takes ceiling(lg p) steps. Cost = lgp.alpha + n.((p-1)/p).beta where n is total size of data gathered on each process. For short or medium-size messages and power-of-two no. of processes, we use the recursive doubling algorithm. Cost = lgp.alpha + n.((p-1)/p).beta TODO: On TCP, we may want to use recursive doubling instead of the Bruck algorithm in all cases because of the pairwise-exchange property of recursive doubling (see Benson et al paper in Euro PVM/MPI 2003). It is interesting to note that either of the above algorithms for MPI_Allgather has the same cost as the tree algorithm for MPI_Gather! For long messages or medium-size messages and non-power-of-two no. of processes, we use a ring algorithm. In the first step, each process i sends its contribution to process i+1 and receives the contribution from process i-1 (with wrap-around). From the second step onwards, each process i forwards to process i+1 the data it received from process i-1 in the previous step. This takes a total of p-1 steps. Cost = (p-1).alpha + n.((p-1)/p).beta We use this algorithm instead of recursive doubling for long messages because we find that this communication pattern (nearest neighbor) performs twice as fast as recursive doubling for long messages (on Myrinet and IBM SP). Possible improvements: End Algorithm: MPI_Allgather*//* begin:nested *//* not declared static because a machine-specific function may call this one in some cases */int MPIR_Allgather ( void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, MPID_Comm *comm_ptr ){ int comm_size, rank; int mpi_errno = MPI_SUCCESS; MPI_Aint recvtype_extent; MPI_Aint recvtype_true_extent, recvbuf_extent, recvtype_true_lb; int j, i, pof2, src, rem; static const char FCNAME[] = "MPIR_Allgather"; void *tmp_buf; int curr_cnt, dst, type_size, left, right, jnext, comm_size_is_pof2; MPI_Comm comm; MPI_Status status; int mask, dst_tree_root, my_tree_root, is_homogeneous, send_offset, recv_offset, last_recv_cnt = 0, nprocs_completed, k, offset, tmp_mask, tree_root;#ifdef MPID_HAS_HETERO int position, tmp_buf_size, nbytes;#endif if (((sendcount == 0) && (sendbuf != MPI_IN_PLACE)) || (recvcount == 0)) return MPI_SUCCESS; comm = comm_ptr->handle; comm_size = comm_ptr->local_size; rank = comm_ptr->rank; MPID_Datatype_get_extent_macro( recvtype, recvtype_extent ); MPID_Datatype_get_size_macro( recvtype, type_size ); /* check if comm_size is a power of two */ pof2 = 1; while (pof2 < comm_size) pof2 *= 2; if (pof2 == comm_size) comm_size_is_pof2 = 1; else comm_size_is_pof2 = 0; /* check if multiple threads are calling this collective function */ MPIDU_ERR_CHECK_MULTIPLE_THREADS_ENTER( comm_ptr ); if ((recvcount*comm_size*type_size < MPIR_ALLGATHER_LONG_MSG) && (comm_size_is_pof2 == 1)) { /* Short or medium size message and power-of-two no. of processes. Use * recursive doubling algorithm */ is_homogeneous = 1;#ifdef MPID_HAS_HETERO if (comm_ptr->is_hetero) is_homogeneous = 0;#endif if (is_homogeneous) { /* homogeneous. no need to pack into tmp_buf on each node. copy local data into recvbuf */ if (sendbuf != MPI_IN_PLACE) { mpi_errno = MPIR_Localcopy (sendbuf, sendcount, sendtype, ((char *)recvbuf + rank*recvcount*recvtype_extent), recvcount, recvtype); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); return mpi_errno; } /* --END ERROR HANDLING-- */ } curr_cnt = recvcount; mask = 0x1; i = 0; while (mask < comm_size) { dst = rank ^ mask; /* find offset into send and recv buffers. zero out the least significant "i" bits of rank and dst to find root of src and dst subtrees. Use ranks of roots as index to send from and recv into buffer */ dst_tree_root = dst >> i; dst_tree_root <<= i; my_tree_root = rank >> i; my_tree_root <<= i; /* FIXME: saving an MPI_Aint into an int */ send_offset = my_tree_root * recvcount * recvtype_extent; recv_offset = dst_tree_root * recvcount * recvtype_extent; if (dst < comm_size) { mpi_errno = MPIC_Sendrecv(((char *)recvbuf + send_offset), curr_cnt, recvtype, dst, MPIR_ALLGATHER_TAG, ((char *)recvbuf + recv_offset), recvcount*mask, recvtype, dst, MPIR_ALLGATHER_TAG, comm, &status); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); return mpi_errno; } /* --END ERROR HANDLING-- */ NMPI_Get_count(&status, recvtype, &last_recv_cnt); curr_cnt += last_recv_cnt; } /* if some processes in this process's subtree in this step did not have any destination process to communicate with because of non-power-of-two, we need to send them the data that they would normally have received from those processes. That is, the haves in this subtree must send to the havenots. We use a logarithmic recursive-halfing algorithm for this. */ /* This part of the code will not currently be executed because we are not using recursive doubling for non power of two. Mark it as experimental so that it doesn't show up as red in the coverage tests. */ /* --BEGIN EXPERIMENTAL-- */ if (dst_tree_root + mask > comm_size) { nprocs_completed = comm_size - my_tree_root - mask; /* nprocs_completed is the number of processes in this subtree that have all the data. Send data to others in a tree fashion. First find root of current tree that is being divided into two. k is the number of least-significant bits in this process's rank that must be zeroed out to find the rank of the root */ j = mask; k = 0; while (j) { j >>= 1; k++; } k--; /* FIXME: saving an MPI_Aint into an int */ offset = recvcount * (my_tree_root + mask) * recvtype_extent; tmp_mask = mask >> 1; while (tmp_mask) { dst = rank ^ tmp_mask; tree_root = rank >> k; tree_root <<= k; /* send only if this proc has data and destination doesn't have data. at any step, multiple processes can send if they have the data */ if ((dst > rank) && (rank < tree_root + nprocs_completed) && (dst >= tree_root + nprocs_completed)) { mpi_errno = MPIC_Send(((char *)recvbuf + offset), last_recv_cnt, recvtype, dst, MPIR_ALLGATHER_TAG, comm); /* last_recv_cnt was set in the previous receive. that's the amount of data to be sent now. */ /* --BEGIN ERROR HANDLING-- */ if (mpi_errno) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); return mpi_errno; } /* --END ERROR HANDLING-- */ } /* recv only if this proc. doesn't have data and sender has data */ else if ((dst < rank) && (dst < tree_root + nprocs_completed) && (rank >= tree_root + nprocs_completed)) { mpi_errno = MPIC_Recv(((char *)recvbuf + offset), recvcount*nprocs_completed, recvtype, dst, MPIR_ALLGATHER_TAG, comm, &status); /* nprocs_completed is also equal to the no. of processes whose data we don't have */ /* --BEGIN ERROR HANDLING-- */ if (mpi_errno) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); return mpi_errno; } /* --END ERROR HANDLING-- */ NMPI_Get_count(&status, recvtype, &last_recv_cnt); curr_cnt += last_recv_cnt; } tmp_mask >>= 1; k--; } } /* --END EXPERIMENTAL-- */ mask <<= 1; i++; } } #ifdef MPID_HAS_HETERO else { /* heterogeneous. need to use temp. buffer. */ NMPI_Pack_size(recvcount*comm_size, recvtype, comm, &tmp_buf_size); tmp_buf = MPIU_Malloc(tmp_buf_size); /* --BEGIN ERROR HANDLING-- */ if (!tmp_buf) { mpi_errno = MPIR_Err_create_code( MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0 ); return mpi_errno; } /* --END ERROR HANDLING-- */ /* calculate the value of nbytes, the number of bytes in packed representation that each process contributes. We can't simply divide tmp_buf_size by comm_size because tmp_buf_size is an upper bound on the amount of memory required. (For example, for a single integer, MPICH-1 returns pack_size=12.) Therefore, we actually pack some data into tmp_buf and see by how much 'position' is incremented. */ position = 0; NMPI_Pack(recvbuf, 1, recvtype, tmp_buf, tmp_buf_size, &position, comm); nbytes = position*recvcount; /* pack local data into right location in tmp_buf */ position = rank * nbytes; if (sendbuf != MPI_IN_PLACE) { NMPI_Pack(sendbuf, sendcount, sendtype, tmp_buf, tmp_buf_size, &position, comm); } else { /* if in_place specified, local data is found in recvbuf */ NMPI_Pack(((char *)recvbuf + recvtype_extent*rank), recvcount,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -