coll_tuned_allgather.c
来自「MPI stands for the Message Passing Inter」· C语言 代码 · 共 916 行 · 第 1/3 页
C
916 行
/* * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. * Copyright (c) 2004-2006 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */#include "ompi_config.h"#include "mpi.h"#include "ompi/constants.h"#include "ompi/datatype/datatype.h"#include "ompi/communicator/communicator.h"#include "ompi/mca/coll/coll.h"#include "ompi/mca/coll/base/coll_tags.h"#include "ompi/mca/pml/pml.h"#include "ompi/op/op.h"#include "coll_tuned.h"#include "coll_tuned_topo.h"#include "coll_tuned_util.h"/* * ompi_coll_tuned_allgather_intra_bruck * * Function: allgather using O(log(N)) steps. * Accepts: Same arguments as MPI_Allgather * Returns: MPI_SUCCESS or error code * * Description: Variation to All-to-all algorithm described by Bruck et al.in * "Efficient Algorithms for All-to-all Communications * in Multiport Message-Passing Systems" * Memory requirements: non-zero ranks require shift buffer to perform final * step in the algorithm. * * Example on 6 nodes: * Initialization: everyone has its own buffer at location 0 in rbuf * This means if user specified MPI_IN_PLACE for sendbuf * we must copy our block from recvbuf to begining! * # 0 1 2 3 4 5 * [0] [1] [2] [3] [4] [5] * Step 0: send message to (rank - 2^0), receive message from (rank + 2^0) * # 0 1 2 3 4 5 * [0] [1] [2] [3] [4] [5] * [1] [2] [3] [4] [5] [0] * Step 1: send message to (rank - 2^1), receive message from (rank + 2^1) * message contains all blocks from location 0 to 2^1*block size * # 0 1 2 3 4 5 * [0] [1] [2] [3] [4] [5] * [1] [2] [3] [4] [5] [0] * [2] [3] [4] [5] [0] [1] * [3] [4] [5] [0] [1] [2] * Step 2: send message to (rank - 2^2), receive message from (rank + 2^2) * message size is "all remaining blocks" * # 0 1 2 3 4 5 * [0] [1] [2] [3] [4] [5] * [1] [2] [3] [4] [5] [0] * [2] [3] [4] [5] [0] [1] * [3] [4] [5] [0] [1] [2] * [4] [5] [0] [1] [2] [3] * [5] [0] [1] [2] [3] [4] * Finalization: Do a local shift to get data in correct place * # 0 1 2 3 4 5 * [0] [0] [0] [0] [0] [0] * [1] [1] [1] [1] [1] [1] * [2] [2] [2] [2] [2] [2] * [3] [3] [3] [3] [3] [3] * [4] [4] [4] [4] [4] [4] * [5] [5] [5] [5] [5] [5] */int ompi_coll_tuned_allgather_intra_bruck(void *sbuf, int scount, struct ompi_datatype_t *sdtype, void* rbuf, int rcount, struct ompi_datatype_t *rdtype, struct ompi_communicator_t *comm){ int line = -1; int rank, size; int sendto, recvfrom, distance, blockcount; int err = 0; ptrdiff_t slb, rlb, sext, rext; char *tmpsend = NULL, *tmprecv = NULL; size = ompi_comm_size(comm); rank = ompi_comm_rank(comm); OPAL_OUTPUT((ompi_coll_tuned_stream, "coll:tuned:allgather_intra_bruck rank %d", rank)); err = ompi_ddt_get_extent (sdtype, &slb, &sext); if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; } err = ompi_ddt_get_extent (rdtype, &rlb, &rext); if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; } /* Initialization step: - if send buffer is not MPI_IN_PLACE, copy send buffer to block 0 of receive buffer, else - if rank r != 0, copy r^th block from receive buffer to block 0. */ tmprecv = (char*) rbuf; if (MPI_IN_PLACE != sbuf) { tmpsend = (char*) sbuf; err = ompi_ddt_sndrcv(tmpsend, scount, sdtype, tmprecv, rcount, rdtype); if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; } } else if (0 != rank) { tmpsend = ((char*)rbuf) + rank * rcount * rext; err = ompi_ddt_copy_content_same_ddt(rdtype, rcount, tmprecv, tmpsend); if (err < 0) { line = __LINE__; goto err_hndl; } } /* Communication step: At every step i, rank r: - doubles the distance - sends message which starts at begining of rbuf and has size (blockcount * rcount) to rank (r - distance) - receives message of size blockcount * rcount from rank (r + distance) at location (rbuf + distance * rcount * rext) - blockcount doubles until last step when only the remaining data is exchanged. */ blockcount = 1; tmpsend = (char*) rbuf; for (distance = 1; distance < size; distance<<=1) { recvfrom = (rank + distance) % size; sendto = (rank - distance + size) % size; tmprecv = tmpsend + distance * rcount * rext; if (distance <= (size >> 1)) { blockcount = distance; } else { blockcount = size - distance; } /* Sendreceive */ err = ompi_coll_tuned_sendrecv(tmpsend, blockcount * rcount, rdtype, sendto, MCA_COLL_BASE_TAG_ALLGATHER, tmprecv, blockcount * rcount, rdtype, recvfrom, MCA_COLL_BASE_TAG_ALLGATHER, comm, MPI_STATUS_IGNORE, rank); if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; } } /* Finalization step: On all nodes except 0, data needs to be shifted locally: - create temprary shift buffer, see discussion in coll_basic_reduce.c about the size and begining of temporary buffer. - copy blocks [0 .. (size - rank - 1)] in rbuf to shift buffer - move blocks [(size - rank) .. size] in rbuf to begining of rbuf - copy blocks from shift buffer starting at block [rank] in rbuf. */ if (0 != rank) { ptrdiff_t true_extent, true_lb; char *free_buf = NULL, *shift_buf = NULL; err = ompi_ddt_get_true_extent(rdtype, &true_lb, &true_extent); if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; } free_buf = (char*) calloc(((true_extent + ((size - rank) * rcount - 1) * rext)), sizeof(char)); if (NULL == free_buf) { line = __LINE__; err = OMPI_ERR_OUT_OF_RESOURCE; goto err_hndl; } shift_buf = free_buf - rlb; tmpsend = (char*) rbuf; err = ompi_ddt_copy_content_same_ddt(rdtype, ((size - rank) * rcount), shift_buf, tmpsend); if (err < 0) { line = __LINE__; goto err_hndl; } tmprecv = (char*) rbuf; tmpsend = (char*) rbuf + (size - rank) * rcount * rext; err = ompi_ddt_copy_content_same_ddt(rdtype, rank * rcount, tmprecv, tmpsend); if (err < 0) { line = __LINE__; goto err_hndl; } tmprecv = (char*) rbuf + rank * rcount * rext; err = ompi_ddt_copy_content_same_ddt(rdtype, (size - rank) * rcount, tmprecv, shift_buf); if (err < 0) { line = __LINE__; goto err_hndl; } free(free_buf); } return OMPI_SUCCESS; err_hndl: OPAL_OUTPUT((ompi_coll_tuned_stream, "%s:%4d\tError occurred %d, rank %2d", __FILE__, line, err, rank)); return err;}/* * ompi_coll_tuned_allgather_intra_recursivedoubling * * Function: allgather using O(log(N)) steps. * Accepts: Same arguments as MPI_Allgather * Returns: MPI_SUCCESS or error code * * Description: Recursive doubling algorithm for MPI_Allgather implementation. * This algorithm is used in MPICH-2 for small- and medium-sized * messages on power-of-two processes. * * Limitation: Current implementation only works on power-of-two number of * processes. * In case this algorithm is invoked on non-power-of-two * processes, Bruck algorithm will be invoked. * * Memory requirements: * No additional memory requirements beyond user-supplied buffers. * * Example on 4 nodes: * Initialization: everyone has its own buffer at location rank in rbuf * # 0 1 2 3 * [0] [ ] [ ] [ ] * [ ] [1] [ ] [ ] * [ ] [ ] [2] [ ] * [ ] [ ] [ ] [3] * Step 0: exchange data with (rank ^ 2^0) * # 0 1 2 3 * [0] [0] [ ] [ ] * [1] [1] [ ] [ ] * [ ] [ ] [2] [2] * [ ] [ ] [3] [3] * Step 1: exchange data with (rank ^ 2^1) (if you can) * # 0 1 2 3 * [0] [0] [0] [0] * [1] [1] [1] [1] * [2] [2] [2] [2] * [3] [3] [3] [3] * * TODO: Modify the algorithm to work with any number of nodes. * We can modify code to use identical implementation like MPICH-2: * - using recursive-halving algorith, at the end of each step, * determine if there are nodes who did not exchange their data in that * step, and send them appropriate messages. */int ompi_coll_tuned_allgather_intra_recursivedoubling(void *sbuf, int scount, struct ompi_datatype_t *sdtype, void* rbuf, int rcount, struct ompi_datatype_t *rdtype, struct ompi_communicator_t *comm){ int line = -1; int rank, size, pow2size; int remote, distance, sendblocklocation; int err = 0; ptrdiff_t slb, rlb, sext, rext; char *tmpsend = NULL, *tmprecv = NULL; size = ompi_comm_size(comm); rank = ompi_comm_rank(comm); for (pow2size = 1; pow2size <= size; pow2size <<=1); pow2size >>=1; /* Current implementation only handles power-of-two number of processes. If the function was called on non-power-of-two number of processes, print warning and call bruck allgather algorithm with same parameters. */ if (pow2size != size) { OPAL_OUTPUT((ompi_coll_tuned_stream, "coll:tuned:allgather_intra_recursivedoubling WARNING: non-pow-2 size %d, switching to bruck algorithm", size)); return ompi_coll_tuned_allgather_intra_bruck(sbuf, scount, sdtype, rbuf, rcount, rdtype, comm); } OPAL_OUTPUT((ompi_coll_tuned_stream, "coll:tuned:allgather_intra_recursivedoubling rank %d, size %d", rank, size)); err = ompi_ddt_get_extent (sdtype, &slb, &sext); if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; } err = ompi_ddt_get_extent (rdtype, &rlb, &rext); if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; } /* Initialization step: - if send buffer is not MPI_IN_PLACE, copy send buffer to block 0 of receive buffer */ if (MPI_IN_PLACE != sbuf) { tmpsend = (char*) sbuf; tmprecv = (char*) rbuf + rank * rcount * rext; err = ompi_ddt_sndrcv(tmpsend, scount, sdtype, tmprecv, rcount, rdtype);
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?