coll_tuned_allreduce.c
来自「MPI stands for the Message Passing Inter」· C语言 代码 · 共 680 行 · 第 1/2 页
C
680 行
/* * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. * Copyright (c) 2004-2006 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */#include "ompi_config.h"#include "mpi.h"#include "ompi/constants.h"#include "ompi/datatype/datatype.h"#include "ompi/communicator/communicator.h"#include "ompi/mca/coll/coll.h"#include "ompi/mca/coll/base/coll_tags.h"#include "ompi/mca/pml/pml.h"#include "ompi/op/op.h"#include "coll_tuned.h"#include "coll_tuned_topo.h"#include "coll_tuned_util.h"/* * ompi_coll_tuned_allreduce_intra_nonoverlapping * * This function just calls a reduce followed by a broadcast * both called functions are tuned but they complete sequentially, * i.e. no additional overlapping * meaning if the number of segments used is greater than the topo depth * then once the first segment of data is fully 'reduced' it is not broadcast * while the reduce continues (cost = cost-reduce + cost-bcast + decision x 3) * */intompi_coll_tuned_allreduce_intra_nonoverlapping(void *sbuf, void *rbuf, int count, struct ompi_datatype_t *dtype, struct ompi_op_t *op, struct ompi_communicator_t *comm){ int err; int rank; rank = ompi_comm_rank(comm); OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:allreduce_intra_nonoverlapping rank %d", rank)); /* Reduce to 0 and broadcast. */ if (MPI_IN_PLACE == sbuf) { if (0 == ompi_comm_rank(comm)) { err = comm->c_coll.coll_reduce (MPI_IN_PLACE, rbuf, count, dtype, op, 0, comm); } else { err = comm->c_coll.coll_reduce (rbuf, NULL, count, dtype, op, 0, comm); } } else { err = comm->c_coll.coll_reduce (sbuf, rbuf, count, dtype, op, 0, comm); } if (MPI_SUCCESS != err) { return err; } return comm->c_coll.coll_bcast (rbuf, count, dtype, 0, comm);}/* * ompi_coll_tuned_allreduce_intra_recursivedoubling * * Function: Recursive doubling algorithm for allreduce operation * Accepts: Same as MPI_Allreduce() * Returns: MPI_SUCCESS or error code * * Description: Implements recursive doubling algorithm for allreduce. * Original (non-segmented) implementation is used in MPICH-2 * for small and intermediate size messages. * The algorithm preserves order of operations so it can * be used both by commutative and non-commutative operations. * * Example on 7 nodes: * Initial state * # 0 1 2 3 4 5 6 * [0] [1] [2] [3] [4] [5] [6] * Initial adjustment step for non-power of two nodes. * old rank 1 3 5 6 * new rank 0 1 2 3 * [0+1] [2+3] [4+5] [6] * Step 1 * old rank 1 3 5 6 * new rank 0 1 2 3 * [0+1+] [0+1+] [4+5+] [4+5+] * [2+3+] [2+3+] [6 ] [6 ] * Step 2 * old rank 1 3 5 6 * new rank 0 1 2 3 * [0+1+] [0+1+] [0+1+] [0+1+] * [2+3+] [2+3+] [2+3+] [2+3+] * [4+5+] [4+5+] [4+5+] [4+5+] * [6 ] [6 ] [6 ] [6 ] * Final adjustment step for non-power of two nodes * # 0 1 2 3 4 5 6 * [0+1+] [0+1+] [0+1+] [0+1+] [0+1+] [0+1+] [0+1+] * [2+3+] [2+3+] [2+3+] [2+3+] [2+3+] [2+3+] [2+3+] * [4+5+] [4+5+] [4+5+] [4+5+] [4+5+] [4+5+] [4+5+] * [6 ] [6 ] [6 ] [6 ] [6 ] [6 ] [6 ] * */int ompi_coll_tuned_allreduce_intra_recursivedoubling(void *sbuf, void *rbuf, int count, struct ompi_datatype_t *dtype, struct ompi_op_t *op, struct ompi_communicator_t *comm) { int ret, line; int rank, size, adjsize, remote, distance; int newrank, newremote, extra_ranks; char *tmpsend = NULL, *tmprecv = NULL, *tmpswap = NULL, *inplacebuf = NULL; ptrdiff_t true_lb, true_extent, lb, extent; ompi_request_t *reqs[2] = {NULL, NULL}; size = ompi_comm_size(comm); rank = ompi_comm_rank(comm); OPAL_OUTPUT((ompi_coll_tuned_stream, "coll:tuned:allreduce_intra_recursivedoubling rank %d", rank)); /* Special case for size == 1 */ if (1 == size) { if (MPI_IN_PLACE != sbuf) { ret = ompi_ddt_copy_content_same_ddt(dtype, count, (char*)rbuf, (char*)sbuf); if (ret < 0) { line = __LINE__; goto error_hndl; } } return MPI_SUCCESS; } /* Allocate and initialize temporary send buffer */ ret = ompi_ddt_get_extent(dtype, &lb, &extent); if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; } ret = ompi_ddt_get_true_extent(dtype, &true_lb, &true_extent); if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; } inplacebuf = (char*) malloc(true_extent + (count - 1) * extent); if (NULL == inplacebuf) { ret = -1; line = __LINE__; goto error_hndl; } if (MPI_IN_PLACE == sbuf) { ret = ompi_ddt_copy_content_same_ddt(dtype, count, inplacebuf, (char*)rbuf); if (ret < 0) { line = __LINE__; goto error_hndl; } } else { ret = ompi_ddt_copy_content_same_ddt(dtype, count, inplacebuf, (char*)sbuf); if (ret < 0) { line = __LINE__; goto error_hndl; } } tmpsend = (char*) inplacebuf; tmprecv = (char*) rbuf; /* Determine nearest power of two less than or equal to size */ for (adjsize = 0x1; adjsize <= size; adjsize <<= 1); adjsize = adjsize >> 1; /* Handle non-power-of-two case: - Even ranks less than 2 * extra_ranks send their data to (rank + 1), and sets new rank to -1. - Odd ranks less than 2 * extra_ranks receive data from (rank - 1), apply appropriate operation, and set new rank to rank/2 - Everyone else sets rank to rank - extra_ranks */ extra_ranks = size - adjsize; if (rank < (2 * extra_ranks)) { if (0 == (rank % 2)) { ret = MCA_PML_CALL(send(tmpsend, count, dtype, (rank + 1), MCA_COLL_BASE_TAG_ALLREDUCE, MCA_PML_BASE_SEND_STANDARD, comm)); if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; } newrank = -1; } else { ret = MCA_PML_CALL(recv(tmprecv, count, dtype, (rank - 1), MCA_COLL_BASE_TAG_ALLREDUCE, comm, MPI_STATUS_IGNORE)); if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; } /* tmpsend = tmprecv (op) tmpsend */ ompi_op_reduce(op, tmprecv, tmpsend, count, dtype); newrank = rank >> 1; } } else { newrank = rank - extra_ranks; } /* Communication/Computation loop - Exchange message with remote node. - Perform appropriate operation taking in account order of operations: result = value (op) result */ for (distance = 0x1; distance < adjsize; distance <<=1) { if (newrank < 0) break; /* Determine remote node */ newremote = newrank ^ distance; remote = (newremote < extra_ranks)? (newremote * 2 + 1):(newremote + extra_ranks); /* Exchange the data */ ret = MCA_PML_CALL(irecv(tmprecv, count, dtype, remote, MCA_COLL_BASE_TAG_ALLREDUCE, comm, &reqs[0])); if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; } ret = MCA_PML_CALL(isend(tmpsend, count, dtype, remote, MCA_COLL_BASE_TAG_ALLREDUCE, MCA_PML_BASE_SEND_STANDARD, comm, &reqs[1])); if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; } ret = ompi_request_wait_all(2, reqs, MPI_STATUSES_IGNORE); if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; } /* Apply operation */ if (rank < remote) { /* tmprecv = tmpsend (op) tmprecv */ ompi_op_reduce(op, tmpsend, tmprecv, count, dtype); tmpswap = tmprecv; tmprecv = tmpsend; tmpsend = tmpswap; } else { /* tmpsend = tmprecv (op) tmpsend */ ompi_op_reduce(op, tmprecv, tmpsend, count, dtype); } } /* Handle non-power-of-two case: - Odd ranks less than 2 * extra_ranks send result from tmpsend to (rank - 1) - Even ranks less than 2 * extra_ranks receive result from (rank + 1) */ if (rank < (2 * extra_ranks)) { if (0 == (rank % 2)) { ret = MCA_PML_CALL(recv(rbuf, count, dtype, (rank + 1), MCA_COLL_BASE_TAG_ALLREDUCE, comm, MPI_STATUS_IGNORE)); if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; } tmpsend = (char*)rbuf; } else { ret = MCA_PML_CALL(send(tmpsend, count, dtype, (rank - 1), MCA_COLL_BASE_TAG_ALLREDUCE, MCA_PML_BASE_SEND_STANDARD, comm)); if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; } } } /* Ensure that the final result is in rbuf */ if (tmpsend != rbuf) { ret = ompi_ddt_copy_content_same_ddt(dtype, count, (char*)rbuf, tmpsend); if (ret < 0) { line = __LINE__; goto error_hndl; } } if (NULL != inplacebuf) free(inplacebuf); return MPI_SUCCESS; error_hndl: OPAL_OUTPUT((ompi_coll_tuned_stream, "%s:%4d\tRank %d Error occurred %d\n", __FILE__, line, rank, ret)); if (NULL != inplacebuf) free(inplacebuf); return ret;}/* * ompi_coll_tuned_allreduce_intra_ring * * Function: Ring algorithm for allreduce operation * Accepts: Same as MPI_Allreduce() * Returns: MPI_SUCCESS or error code * * Description: Implements ring algorithm for allreduce: the message is * automatically segmented to segment of size M/N. * Algorithm requires 2*N - 1 steps. * * Limitations: The algorithm DOES NOT preserve order of operations so it * can be used only for commutative operations. * In addition, algorithm cannot work if the total count is * less than size. * Example on 5 nodes: * Initial state * # 0 1 2 3 4 * [00] [10] [20] [30] [40] * [01] [11] [21] [31] [41] * [02] [12] [22] [32] [42] * [03] [13] [23] [33] [43] * [04] [14] [24] [34] [44] * * COMPUTATION PHASE * Step 0: rank r sends block r to rank (r+1) and receives bloc (r-1) * from rank (r-1) [with wraparound]. * # 0 1 2 3 4 * [00] [00+10] [20] [30] [40] * [01] [11] [11+21] [31] [41] * [02] [12] [22] [22+32] [42] * [03] [13] [23] [33] [33+43] * [44+04] [14] [24] [34] [44] * * Step 1: rank r sends block (r-1) to rank (r+1) and receives bloc * (r-2) from rank (r-1) [with wraparound]. * # 0 1 2 3 4 * [00] [00+10] [01+10+20] [30] [40] * [01] [11] [11+21] [11+21+31] [41] * [02] [12] [22] [22+32] [22+32+42] * [33+43+03] [13] [23] [33] [33+43] * [44+04] [44+04+14] [24] [34] [44] * * Step 2: rank r sends block (r-2) to rank (r+1) and receives bloc * (r-2) from rank (r-1) [with wraparound]. * # 0 1 2 3 4 * [00] [00+10] [01+10+20] [01+10+20+30] [40] * [01] [11] [11+21] [11+21+31] [11+21+31+41] * [22+32+42+02] [12] [22] [22+32] [22+32+42] * [33+43+03] [33+43+03+13] [23] [33] [33+43] * [44+04] [44+04+14] [44+04+14+24] [34] [44] * * Step 3: rank r sends block (r-3) to rank (r+1) and receives bloc * (r-3) from rank (r-1) [with wraparound]. * # 0 1 2 3 4 * [00] [00+10] [01+10+20] [01+10+20+30] [FULL] * [FULL] [11] [11+21] [11+21+31] [11+21+31+41] * [22+32+42+02] [FULL] [22] [22+32] [22+32+42] * [33+43+03] [33+43+03+13] [FULL] [33] [33+43] * [44+04] [44+04+14] [44+04+14+24] [FULL] [44] * * DISTRIBUTION PHASE: ring ALLGATHER with ranks shifted by 1. * */int ompi_coll_tuned_allreduce_intra_ring(void *sbuf, void *rbuf, int count, struct ompi_datatype_t *dtype, struct ompi_op_t *op, struct ompi_communicator_t *comm) { int ret, line;
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?