📄 gather.c
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* * * (C) 2001 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */#include "mpiimpl.h"/* -- Begin Profiling Symbol Block for routine MPI_Gather */#if defined(HAVE_PRAGMA_WEAK)#pragma weak MPI_Gather = PMPI_Gather#elif defined(HAVE_PRAGMA_HP_SEC_DEF)#pragma _HP_SECONDARY_DEF PMPI_Gather MPI_Gather#elif defined(HAVE_PRAGMA_CRI_DUP)#pragma _CRI duplicate MPI_Gather as PMPI_Gather#endif/* -- End Profiling Symbol Block *//* Define MPICH_MPI_FROM_PMPI if weak symbols are not supported to build the MPI routines */#ifndef MPICH_MPI_FROM_PMPI#undef MPI_Gather#define MPI_Gather PMPI_Gather/* This is the default implementation of gather. The algorithm is: Algorithm: MPI_Gather We use a binomial tree algorithm for both short and long messages. At nodes other than leaf nodes we need to allocate a temporary buffer to store the incoming message. If the root is not rank 0, we receive data in a temporary buffer on the root and then reorder it into the right order. In the heterogeneous case we first pack the buffers by using MPI_Pack and then do the gather. Cost = lgp.alpha + n.((p-1)/p).beta where n is the total size of the data gathered at the root. Possible improvements: End Algorithm: MPI_Gather*//* not declared static because it is called in intercomm. allgather *//* begin:nested */int MPIR_Gather ( void *sendbuf, int sendcnt, MPI_Datatype sendtype, void *recvbuf, int recvcnt, MPI_Datatype recvtype, int root, MPID_Comm *comm_ptr ){ static const char FCNAME[] = "MPIR_Gather"; int comm_size, rank; int mpi_errno = MPI_SUCCESS; int curr_cnt=0, relative_rank, nbytes, is_homogeneous; int mask, sendtype_size, recvtype_size, src, dst, relative_src; int recvblks; int tmp_buf_size, missing; void *tmp_buf=NULL; MPI_Status status; MPI_Aint extent=0; /* Datatype extent */ MPI_Comm comm; int blocks[2]; int displs[2]; MPI_Aint struct_displs[2]; MPI_Datatype types[2], tmp_type; int copy_offset = 0, copy_blks = 0;#ifdef MPID_HAS_HETERO int position, recv_size;#endif comm = comm_ptr->handle; comm_size = comm_ptr->local_size; rank = comm_ptr->rank; if ( ((rank == root) && (recvcnt == 0)) || ((rank != root) && (sendcnt == 0)) ) return MPI_SUCCESS; is_homogeneous = 1;#ifdef MPID_HAS_HETERO if (comm_ptr->is_hetero) is_homogeneous = 0;#endif /* check if multiple threads are calling this collective function */ MPIDU_ERR_CHECK_MULTIPLE_THREADS_ENTER( comm_ptr ); /* Use binomial tree algorithm. */ relative_rank = (rank >= root) ? rank - root : rank - root + comm_size; if (rank == root) MPID_Datatype_get_extent_macro(recvtype, extent); if (is_homogeneous) { /* communicator is homogeneous. no need to pack buffer. */ if (rank == root) { MPID_Datatype_get_size_macro(recvtype, recvtype_size); nbytes = recvtype_size * recvcnt; } else { MPID_Datatype_get_size_macro(sendtype, sendtype_size); nbytes = sendtype_size * sendcnt; } /* Find the number of missing nodes in my sub-tree compared to * a balanced tree */ for (mask = 1; mask < comm_size; mask <<= 1); --mask; while (relative_rank & mask) mask >>= 1; missing = (relative_rank | mask) - comm_size + 1; if (missing < 0) missing = 0; tmp_buf_size = (mask - missing); /* If the message is smaller than the threshold, we will copy * our message in there too */ if (nbytes < MPIR_GATHER_VSMALL_MSG) tmp_buf_size++; tmp_buf_size *= nbytes; /* For zero-ranked root, we don't need any temporary buffer */ if ((rank == root) && (!root || (nbytes >= MPIR_GATHER_VSMALL_MSG))) tmp_buf_size = 0; if (tmp_buf_size) { tmp_buf = MPIU_Malloc(tmp_buf_size); /* --BEGIN ERROR HANDLING-- */ if (!tmp_buf) { mpi_errno = MPIR_Err_create_code( MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0 ); return mpi_errno; } /* --END ERROR HANDLING-- */ } if (rank == root) { if (sendbuf != MPI_IN_PLACE) { mpi_errno = MPIR_Localcopy(sendbuf, sendcnt, sendtype, ((char *) recvbuf + extent*recvcnt*rank), recvcnt, recvtype); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } } } else if (tmp_buf_size && (nbytes < MPIR_GATHER_VSMALL_MSG)) { /* copy from sendbuf into tmp_buf */ mpi_errno = MPIR_Localcopy(sendbuf, sendcnt, sendtype, tmp_buf, nbytes, MPI_BYTE); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } } curr_cnt = nbytes; mask = 0x1; while (mask < comm_size) { if ((mask & relative_rank) == 0) { src = relative_rank | mask; if (src < comm_size) { src = (src + root) % comm_size; if (rank == root) { recvblks = mask; if ((2 * recvblks) > comm_size) recvblks = comm_size - recvblks; if ((rank + mask + recvblks == comm_size) || (((rank + mask) % comm_size) < ((rank + mask + recvblks) % comm_size))) { /* If the data contiguously fits into the * receive buffer, place it directly. This * should cover the case where the root is * rank 0. */ mpi_errno = MPIC_Recv(((char *)recvbuf + (((rank + mask) % comm_size)*recvcnt*extent)), recvblks * recvcnt, recvtype, src, MPIR_GATHER_TAG, comm, &status); } else if (nbytes < MPIR_GATHER_VSMALL_MSG) { mpi_errno = MPIC_Recv(tmp_buf, recvblks * nbytes, MPI_BYTE, src, MPIR_GATHER_TAG, comm, &status); copy_offset = rank + mask; copy_blks = recvblks; } else { blocks[0] = recvcnt * (comm_size - root - mask); displs[0] = recvcnt * (root + mask); blocks[1] = (recvcnt * recvblks) - blocks[0]; displs[1] = 0; NMPI_Type_indexed(2, blocks, displs, recvtype, &tmp_type); NMPI_Type_commit(&tmp_type); mpi_errno = MPIC_Recv(recvbuf, 1, tmp_type, src, MPIR_GATHER_TAG, comm, &status); NMPI_Type_free(&tmp_type); } } else /* Intermediate nodes store in temporary buffer */ { int offset; /* Estimate the amount of data that is going to come in */ recvblks = mask; relative_src = ((src - root) < 0) ? (src - root + comm_size) : (src - root); if (relative_src + mask > comm_size) recvblks -= (relative_src + mask - comm_size); if (nbytes < MPIR_GATHER_VSMALL_MSG) offset = mask * nbytes; else offset = (mask - 1) * nbytes; mpi_errno = MPIC_Recv(((char *)tmp_buf + offset), recvblks * nbytes, MPI_BYTE, src, MPIR_GATHER_TAG, comm, &status); curr_cnt += (recvblks * nbytes); } if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } } } else { dst = relative_rank ^ mask; dst = (dst + root) % comm_size; if (!tmp_buf_size) { /* leaf nodes send directly from sendbuf */ mpi_errno = MPIC_Send(sendbuf, sendcnt, sendtype, dst, MPIR_GATHER_TAG, comm); } else if (nbytes < MPIR_GATHER_VSMALL_MSG) { mpi_errno = MPIC_Send(tmp_buf, curr_cnt, MPI_BYTE, dst, MPIR_GATHER_TAG, comm); } else { blocks[0] = sendcnt; struct_displs[0] = (MPI_Aint) sendbuf; types[0] = sendtype; blocks[1] = curr_cnt - nbytes; struct_displs[1] = (MPI_Aint) tmp_buf; types[1] = MPI_BYTE; NMPI_Type_create_struct(2, blocks, struct_displs, types, &tmp_type); NMPI_Type_commit(&tmp_type); mpi_errno = MPIC_Send(MPI_BOTTOM, 1, tmp_type, dst, MPIR_GATHER_TAG, comm); NMPI_Type_free(&tmp_type); } if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } break; } mask <<= 1; } if ((rank == root) && root && (nbytes < MPIR_GATHER_VSMALL_MSG) && copy_blks) { /* reorder and copy from tmp_buf into recvbuf */ MPIR_Localcopy(tmp_buf, nbytes * (comm_size - copy_offset), MPI_BYTE, ((char *) recvbuf + extent * recvcnt * copy_offset), recvcnt * (comm_size - copy_offset), recvtype); MPIR_Localcopy((char *) tmp_buf + nbytes * (comm_size - copy_offset), nbytes * (copy_blks - comm_size + copy_offset), MPI_BYTE, recvbuf, recvcnt * (copy_blks - comm_size + copy_offset), recvtype); } if (tmp_buf) MPIU_Free(tmp_buf); } #ifdef MPID_HAS_HETERO else { /* communicator is heterogeneous. pack data into tmp_buf. */ if (rank == root) NMPI_Pack_size(recvcnt*comm_size, recvtype, comm, &tmp_buf_size); else NMPI_Pack_size(sendcnt*(comm_size/2), sendtype, comm, &tmp_buf_size); tmp_buf = MPIU_Malloc(tmp_buf_size); /* --BEGIN ERROR HANDLING-- */ if (!tmp_buf) { mpi_errno = MPIR_Err_create_code( MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0 ); return mpi_errno; } /* --END ERROR HANDLING-- */ position = 0; if (sendbuf != MPI_IN_PLACE) { NMPI_Pack(sendbuf, sendcnt, sendtype, tmp_buf, tmp_buf_size, &position, comm); nbytes = position; } else { /* do a dummy pack just to calculate nbytes */ NMPI_Pack(recvbuf, 1, recvtype, tmp_buf, tmp_buf_size, &position, comm); nbytes = position*recvcnt; } curr_cnt = nbytes; mask = 0x1; while (mask < comm_size) { if ((mask & relative_rank) == 0) { src = relative_rank | mask; if (src < comm_size) { src = (src + root) % comm_size; mpi_errno = MPIC_Recv(((char *)tmp_buf + curr_cnt), tmp_buf_size-curr_cnt, MPI_BYTE, src, MPIR_GATHER_TAG, comm, &status); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); return mpi_errno; } /* --END ERROR HANDLING-- */ /* the recv size is larger than what may be sent in some cases. query amount of data actually received */ NMPI_Get_count(&status, MPI_BYTE, &recv_size); curr_cnt += recv_size; } } else { dst = relative_rank ^ mask; dst = (dst + root) % comm_size; mpi_errno = MPIC_Send(tmp_buf, curr_cnt, MPI_BYTE, dst, MPIR_GATHER_TAG, comm); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); return mpi_errno; } /* --END ERROR HANDLING-- */ break; } mask <<= 1; } if (rank == root) { /* reorder and copy from tmp_buf into recvbuf */ if (sendbuf != MPI_IN_PLACE) { position = 0; NMPI_Unpack(tmp_buf, tmp_buf_size, &position, ((char *) recvbuf + extent*recvcnt*rank),
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -