📄 bcast.c
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* * * (C) 2001 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */#include "mpiimpl.h"/* -- Begin Profiling Symbol Block for routine MPI_Bcast */#if defined(HAVE_PRAGMA_WEAK)#pragma weak MPI_Bcast = PMPI_Bcast#elif defined(HAVE_PRAGMA_HP_SEC_DEF)#pragma _HP_SECONDARY_DEF PMPI_Bcast MPI_Bcast#elif defined(HAVE_PRAGMA_CRI_DUP)#pragma _CRI duplicate MPI_Bcast as PMPI_Bcast#endif/* -- End Profiling Symbol Block *//* Define MPICH_MPI_FROM_PMPI if weak symbols are not supported to build the MPI routines */#ifndef MPICH_MPI_FROM_PMPI#define MPI_Bcast PMPI_Bcast/* This is the default implementation of broadcast. The algorithm is: Algorithm: MPI_Bcast For short messages, we use a binomial tree algorithm. Cost = lgp.alpha + n.lgp.beta For long messages, we do a scatter followed by an allgather. We first scatter the buffer using a binomial tree algorithm. This costs lgp.alpha + n.((p-1)/p).beta If the datatype is contiguous and the communicator is homogeneous, we treat the data as bytes and divide (scatter) it among processes by using ceiling division. For the noncontiguous or heterogeneous cases, we first pack the data into a temporary buffer by using MPI_Pack, scatter it as bytes, and unpack it after the allgather. For the allgather, we use a recursive doubling algorithm for medium-size messages and power-of-two number of processes. This takes lgp steps. In each step pairs of processes exchange all the data they have (we take care of non-power-of-two situations). This costs approximately lgp.alpha + n.((p-1)/p).beta. (Approximately because it may be slightly more in the non-power-of-two case, but it's still a logarithmic algorithm.) Therefore, for long messages Total Cost = 2.lgp.alpha + 2.n.((p-1)/p).beta Note that this algorithm has twice the latency as the tree algorithm we use for short messages, but requires lower bandwidth: 2.n.beta versus n.lgp.beta. Therefore, for long messages and when lgp > 2, this algorithm will perform better. For long messages and for medium-size messages and non-power-of-two processes, we use a ring algorithm for the allgather, which takes p-1 steps, because it performs better than recursive doubling. Total Cost = (lgp+p-1).alpha + 2.n.((p-1)/p).beta Possible improvements: For clusters of SMPs, we may want to do something differently to take advantage of shared memory on each node. End Algorithm: MPI_Bcast*//* begin:nested *//* not declared static because it is called in intercomm. allgatherv */int MPIR_Bcast ( void *buffer, int count, MPI_Datatype datatype, int root, MPID_Comm *comm_ptr ){ static const char FCNAME[] = "MPIR_Bcast"; MPI_Status status; int rank, comm_size, src, dst; int relative_rank, mask, tmp_buf_size; int mpi_errno = MPI_SUCCESS; int scatter_size, nbytes=0, curr_size, recv_size = 0, send_size; int type_size, j, k, i, tmp_mask, is_contig, is_homogeneous; int relative_dst, dst_tree_root, my_tree_root, send_offset; int recv_offset, tree_root, nprocs_completed, offset, position; int *recvcnts, *displs, left, right, jnext, pof2, comm_size_is_pof2; void *tmp_buf; MPI_Comm comm; MPID_Datatype *dtp; MPI_Aint true_extent, true_lb; if (count == 0) return MPI_SUCCESS; comm = comm_ptr->handle; comm_size = comm_ptr->local_size; rank = comm_ptr->rank; /* If there is only one process, return */ if (comm_size == 1) return MPI_SUCCESS; if (HANDLE_GET_KIND(datatype) == HANDLE_KIND_BUILTIN) is_contig = 1; else { MPID_Datatype_get_ptr(datatype, dtp); is_contig = dtp->is_contig; } is_homogeneous = 1;#ifdef MPID_HAS_HETERO if (comm_ptr->is_hetero) is_homogeneous = 0;#endif if (is_contig && is_homogeneous) { /* contiguous and homogeneous */ MPID_Datatype_get_size_macro(datatype, type_size); nbytes = type_size * count; } else { mpi_errno = NMPI_Pack_size(1, datatype, comm, &tmp_buf_size); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } /* calculate the value of nbytes, the size in packed representation of the buffer to be broadcasted. We can't simply multiply tmp_buf_size by count because tmp_buf_size is an upper bound on the amount of memory required. (For example, for a single integer, MPICH-1 returns pack_size=12.) Therefore, we actually pack some data into tmp_buf, see by how much 'position' is incremented, and multiply that by count. */ tmp_buf = MPIU_Malloc(tmp_buf_size); /* --BEGIN ERROR HANDLING-- */ if (!tmp_buf) { mpi_errno = MPIR_Err_create_code( MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", "**nomem %d", tmp_buf_size ); return mpi_errno; } /* --END ERROR HANDLING-- */ position = 0; NMPI_Pack(buffer, 1, datatype, tmp_buf, tmp_buf_size, &position, comm); MPIU_Free(tmp_buf); nbytes = position * count; } relative_rank = (rank >= root) ? rank - root : rank - root + comm_size; /* check if multiple threads are calling this collective function */ MPIDU_ERR_CHECK_MULTIPLE_THREADS_ENTER( comm_ptr ); if ((nbytes < MPIR_BCAST_SHORT_MSG) || (comm_size < MPIR_BCAST_MIN_PROCS)) { /* Use short message algorithm, namely, binomial tree */ /* Algorithm: This uses a fairly basic recursive subdivision algorithm. The root sends to the process comm_size/2 away; the receiver becomes a root for a subtree and applies the same process. So that the new root can easily identify the size of its subtree, the (subtree) roots are all powers of two (relative to the root) If m = the first power of 2 such that 2^m >= the size of the communicator, then the subtree at root at 2^(m-k) has size 2^k (with special handling for subtrees that aren't a power of two in size). Do subdivision. There are two phases: 1. Wait for arrival of data. Because of the power of two nature of the subtree roots, the source of this message is alwyas the process whose relative rank has the least significant 1 bit CLEARED. That is, process 4 (100) receives from process 0, process 7 (111) from process 6 (110), etc. 2. Forward to my subtree Note that the process that is the tree root is handled automatically by this code, since it has no bits set. */ mask = 0x1; while (mask < comm_size) { if (relative_rank & mask) { src = rank - mask; if (src < 0) src += comm_size; mpi_errno = MPIC_Recv(buffer,count,datatype,src, MPIR_BCAST_TAG,comm,MPI_STATUS_IGNORE); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } break; } mask <<= 1; } /* This process is responsible for all processes that have bits set from the LSB upto (but not including) mask. Because of the "not including", we start by shifting mask back down one. We can easily change to a different algorithm at any power of two by changing the test (mask > 1) to (mask > block_size) One such version would use non-blocking operations for the last 2-4 steps (this also bounds the number of MPI_Requests that would be needed). */ mask >>= 1; while (mask > 0) { if (relative_rank + mask < comm_size) { dst = rank + mask; if (dst >= comm_size) dst -= comm_size; mpi_errno = MPIC_Send (buffer,count,datatype,dst, MPIR_BCAST_TAG,comm); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } } mask >>= 1; } } else { /* use long message algorithm: binomial tree scatter followed by an allgather */ /* The scatter algorithm divides the buffer into nprocs pieces and scatters them among the processes. Root gets the first piece, root+1 gets the second piece, and so forth. Uses the same binomial tree algorithm as above. Ceiling division is used to compute the size of each piece. This means some processes may not get any data. For example if bufsize = 97 and nprocs = 16, ranks 15 and 16 will get 0 data. On each process, the scattered data is stored at the same offset in the buffer as it is on the root process. */ if (is_contig && is_homogeneous) { /* contiguous and homogeneous. no need to pack. */ mpi_errno = NMPI_Type_get_true_extent(datatype, &true_lb, &true_extent); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } tmp_buf = (char *) buffer + true_lb; } else { /* noncontiguous or heterogeneous. pack into temporary buffer. */ tmp_buf = MPIU_Malloc(nbytes); /* --BEGIN ERROR HANDLING-- */ if (!tmp_buf) { mpi_errno = MPIR_Err_create_code( MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", "**nomem %d", nbytes ); return mpi_errno; } /* --END ERROR HANDLING-- */ if (rank == root) { position = 0; NMPI_Pack(buffer, count, datatype, tmp_buf, nbytes, &position, comm); } } scatter_size = (nbytes + comm_size - 1)/comm_size; /* ceiling division */ curr_size = (rank == root) ? nbytes : 0; /* root starts with all the data */ mask = 0x1; while (mask < comm_size) { if (relative_rank & mask) { src = rank - mask; if (src < 0) src += comm_size; recv_size = nbytes - relative_rank*scatter_size; /* recv_size is larger than what might actually be sent by the sender. We don't need compute the exact value because MPI allows you to post a larger recv.*/ if (recv_size <= 0) { curr_size = 0; /* this process doesn't receive any data because of uneven division */ } else { mpi_errno = MPIC_Recv(((char *)tmp_buf + relative_rank*scatter_size), recv_size, MPI_BYTE, src, MPIR_BCAST_TAG, comm, &status); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } /* query actual size of data received */ NMPI_Get_count(&status, MPI_BYTE, &curr_size); } break; } mask <<= 1; } /* This process is responsible for all processes that have bits set from the LSB upto (but not including) mask. Because of the "not including", we start by shifting mask back down one. */ mask >>= 1; while (mask > 0) { if (relative_rank + mask < comm_size) { send_size = curr_size - scatter_size * mask; /* mask is also the size of this process's subtree */ if (send_size > 0) { dst = rank + mask; if (dst >= comm_size) dst -= comm_size; mpi_errno = MPIC_Send (((char *)tmp_buf + scatter_size*(relative_rank+mask)), send_size, MPI_BYTE, dst, MPIR_BCAST_TAG, comm); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } curr_size -= send_size; } } mask >>= 1; } /* Scatter complete. Now do an allgather . */ /* check if comm_size is a power of two */ pof2 = 1; while (pof2 < comm_size) pof2 *= 2; if (pof2 == comm_size) comm_size_is_pof2 = 1; else comm_size_is_pof2 = 0; if ((nbytes < MPIR_BCAST_LONG_MSG) && (comm_size_is_pof2)) { /* medium size allgather and pof2 comm_size. use recurive doubling. */ mask = 0x1; i = 0; while (mask < comm_size) { relative_dst = relative_rank ^ mask; dst = (relative_dst + root) % comm_size; /* find offset into send and recv buffers. zero out the least significant "i" bits of relative_rank and relative_dst to find root of src and dst subtrees. Use ranks of roots as index to send from and recv into buffer */ dst_tree_root = relative_dst >> i; dst_tree_root <<= i; my_tree_root = relative_rank >> i; my_tree_root <<= i; send_offset = my_tree_root * scatter_size; recv_offset = dst_tree_root * scatter_size; if (relative_dst < comm_size) { mpi_errno = MPIC_Sendrecv(((char *)tmp_buf + send_offset), curr_size, MPI_BYTE, dst, MPIR_BCAST_TAG, ((char *)tmp_buf + recv_offset), scatter_size*mask, MPI_BYTE, dst, MPIR_BCAST_TAG, comm, &status); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } NMPI_Get_count(&status, MPI_BYTE, &recv_size); curr_size += recv_size; } /* if some processes in this process's subtree in this step did not have any destination process to communicate with because of non-power-of-two, we need to send them the data that they would normally have received from those processes. That is, the haves in this subtree must send to the havenots. We use a logarithmic recursive-halfing algorithm for this. */ /* This part of the code will not currently be
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -