coll_tuned_allgather.c

来自「MPI stands for the Message Passing Inter」· C语言 代码 · 共 916 行 · 第 1/3 页

C
916
字号
/* * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana *                         University Research and Technology *                         Corporation.  All rights reserved. * Copyright (c) 2004-2006 The University of Tennessee and The University *                         of Tennessee Research Foundation.  All rights *                         reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, *                         University of Stuttgart.  All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. *                         All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */#include "ompi_config.h"#include "mpi.h"#include "ompi/constants.h"#include "ompi/datatype/datatype.h"#include "ompi/communicator/communicator.h"#include "ompi/mca/coll/coll.h"#include "ompi/mca/coll/base/coll_tags.h"#include "ompi/mca/pml/pml.h"#include "ompi/op/op.h"#include "coll_tuned.h"#include "coll_tuned_topo.h"#include "coll_tuned_util.h"/* * ompi_coll_tuned_allgather_intra_bruck * * Function:     allgather using O(log(N)) steps. * Accepts:      Same arguments as MPI_Allgather * Returns:      MPI_SUCCESS or error code * * Description:  Variation to All-to-all algorithm described by Bruck et al.in *               "Efficient Algorithms for All-to-all Communications *                in Multiport Message-Passing Systems" * Memory requirements:  non-zero ranks require shift buffer to perform final *               step in the algorithm. *  * Example on 6 nodes: *   Initialization: everyone has its own buffer at location 0 in rbuf *                   This means if user specified MPI_IN_PLACE for sendbuf *                   we must copy our block from recvbuf to begining! *    #     0      1      2      3      4      5 *         [0]    [1]    [2]    [3]    [4]    [5] *   Step 0: send message to (rank - 2^0), receive message from (rank + 2^0) *    #     0      1      2      3      4      5 *         [0]    [1]    [2]    [3]    [4]    [5] *         [1]    [2]    [3]    [4]    [5]    [0] *   Step 1: send message to (rank - 2^1), receive message from (rank + 2^1) *           message contains all blocks from location 0 to 2^1*block size *    #     0      1      2      3      4      5 *         [0]    [1]    [2]    [3]    [4]    [5] *         [1]    [2]    [3]    [4]    [5]    [0] *         [2]    [3]    [4]    [5]    [0]    [1] *         [3]    [4]    [5]    [0]    [1]    [2] *   Step 2: send message to (rank - 2^2), receive message from (rank + 2^2) *           message size is "all remaining blocks"  *    #     0      1      2      3      4      5 *         [0]    [1]    [2]    [3]    [4]    [5] *         [1]    [2]    [3]    [4]    [5]    [0] *         [2]    [3]    [4]    [5]    [0]    [1] *         [3]    [4]    [5]    [0]    [1]    [2] *         [4]    [5]    [0]    [1]    [2]    [3] *         [5]    [0]    [1]    [2]    [3]    [4] *    Finalization: Do a local shift to get data in correct place *    #     0      1      2      3      4      5 *         [0]    [0]    [0]    [0]    [0]    [0] *         [1]    [1]    [1]    [1]    [1]    [1] *         [2]    [2]    [2]    [2]    [2]    [2] *         [3]    [3]    [3]    [3]    [3]    [3] *         [4]    [4]    [4]    [4]    [4]    [4] *         [5]    [5]    [5]    [5]    [5]    [5] */int ompi_coll_tuned_allgather_intra_bruck(void *sbuf, int scount,                                          struct ompi_datatype_t *sdtype,                                          void* rbuf, int rcount,                                          struct ompi_datatype_t *rdtype,                                          struct ompi_communicator_t *comm){   int line = -1;   int rank, size;   int sendto, recvfrom, distance, blockcount;   int err = 0;   ptrdiff_t slb, rlb, sext, rext;   char *tmpsend = NULL, *tmprecv = NULL;   size = ompi_comm_size(comm);   rank = ompi_comm_rank(comm);   OPAL_OUTPUT((ompi_coll_tuned_stream,                "coll:tuned:allgather_intra_bruck rank %d", rank));   err = ompi_ddt_get_extent (sdtype, &slb, &sext);   if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }   err = ompi_ddt_get_extent (rdtype, &rlb, &rext);   if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }   /* Initialization step:      - if send buffer is not MPI_IN_PLACE, copy send buffer to block 0 of       receive buffer, else      - if rank r != 0, copy r^th block from receive buffer to block 0.   */   tmprecv = (char*) rbuf;   if (MPI_IN_PLACE != sbuf) {      tmpsend = (char*) sbuf;      err = ompi_ddt_sndrcv(tmpsend, scount, sdtype, tmprecv, rcount, rdtype);      if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl;  }   } else if (0 != rank) {      tmpsend = ((char*)rbuf) + rank * rcount * rext;      err = ompi_ddt_copy_content_same_ddt(rdtype, rcount, tmprecv, tmpsend);      if (err < 0) { line = __LINE__; goto err_hndl; }   }      /* Communication step:      At every step i, rank r:      - doubles the distance      - sends message which starts at begining of rbuf and has size       (blockcount * rcount) to rank (r - distance)      - receives message of size blockcount * rcount from rank (r + distance)      at location (rbuf + distance * rcount * rext)      - blockcount doubles until last step when only the remaining data is       exchanged.   */   blockcount = 1;   tmpsend = (char*) rbuf;   for (distance = 1; distance < size; distance<<=1) {      recvfrom = (rank + distance) % size;      sendto = (rank - distance + size) % size;      tmprecv = tmpsend + distance * rcount * rext;      if (distance <= (size >> 1)) {         blockcount = distance;      } else {          blockcount = size - distance;      }      /* Sendreceive */      err = ompi_coll_tuned_sendrecv(tmpsend, blockcount * rcount, rdtype,                                      sendto, MCA_COLL_BASE_TAG_ALLGATHER,                                     tmprecv, blockcount * rcount, rdtype,                                      recvfrom, MCA_COLL_BASE_TAG_ALLGATHER,                                     comm, MPI_STATUS_IGNORE, rank);      if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }   }   /* Finalization step:      On all nodes except 0, data needs to be shifted locally:      - create temprary shift buffer,       see discussion in coll_basic_reduce.c about the size and begining       of temporary buffer.      - copy blocks [0 .. (size - rank - 1)] in rbuf to shift buffer      - move blocks [(size - rank) .. size] in rbuf to begining of rbuf      - copy blocks from shift buffer starting at block [rank] in rbuf.   */   if (0 != rank) {      ptrdiff_t true_extent, true_lb;      char *free_buf = NULL, *shift_buf = NULL;      err = ompi_ddt_get_true_extent(rdtype, &true_lb, &true_extent);      if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }      free_buf = (char*) calloc(((true_extent +                                   ((size - rank) * rcount - 1) * rext)),                                sizeof(char));      if (NULL == free_buf) {          line = __LINE__; err = OMPI_ERR_OUT_OF_RESOURCE; goto err_hndl;       }      shift_buf = free_buf - rlb;            tmpsend = (char*) rbuf;      err = ompi_ddt_copy_content_same_ddt(rdtype, ((size - rank) * rcount),                                           shift_buf, tmpsend);      if (err < 0) { line = __LINE__; goto err_hndl;  }      tmprecv = (char*) rbuf;      tmpsend = (char*) rbuf + (size - rank) * rcount * rext;      err = ompi_ddt_copy_content_same_ddt(rdtype, rank * rcount,                                            tmprecv, tmpsend);      if (err < 0) { line = __LINE__; goto err_hndl;  }      tmprecv = (char*) rbuf + rank * rcount * rext;      err = ompi_ddt_copy_content_same_ddt(rdtype, (size - rank) * rcount,                                            tmprecv, shift_buf);      if (err < 0) { line = __LINE__; goto err_hndl;  }      free(free_buf);   }   return OMPI_SUCCESS; err_hndl:   OPAL_OUTPUT((ompi_coll_tuned_stream,  "%s:%4d\tError occurred %d, rank %2d",                __FILE__, line, err, rank));   return err;}/* * ompi_coll_tuned_allgather_intra_recursivedoubling * * Function:     allgather using O(log(N)) steps. * Accepts:      Same arguments as MPI_Allgather * Returns:      MPI_SUCCESS or error code * * Description:  Recursive doubling algorithm for MPI_Allgather implementation. *               This algorithm is used in MPICH-2 for small- and medium-sized *               messages on power-of-two processes. * * Limitation:   Current implementation only works on power-of-two number of  *               processes.   *               In case this algorithm is invoked on non-power-of-two *               processes, Bruck algorithm will be invoked. *  * Memory requirements: *               No additional memory requirements beyond user-supplied buffers. *  * Example on 4 nodes: *   Initialization: everyone has its own buffer at location rank in rbuf *    #     0      1      2      3  *         [0]    [ ]    [ ]    [ ] *         [ ]    [1]    [ ]    [ ] *         [ ]    [ ]    [2]    [ ] *         [ ]    [ ]    [ ]    [3] *   Step 0: exchange data with (rank ^ 2^0) *    #     0      1      2      3  *         [0]    [0]    [ ]    [ ] *         [1]    [1]    [ ]    [ ] *         [ ]    [ ]    [2]    [2] *         [ ]    [ ]    [3]    [3] *   Step 1: exchange data with (rank ^ 2^1) (if you can) *    #     0      1      2      3  *         [0]    [0]    [0]    [0] *         [1]    [1]    [1]    [1] *         [2]    [2]    [2]    [2] *         [3]    [3]    [3]    [3] * *  TODO: Modify the algorithm to work with any number of nodes. *        We can modify code to use identical implementation like MPICH-2: *        - using recursive-halving algorith, at the end of each step,  *          determine if there are nodes who did not exchange their data in that *          step, and send them appropriate messages. */int ompi_coll_tuned_allgather_intra_recursivedoubling(void *sbuf, int scount,                                                  struct ompi_datatype_t *sdtype,                                                  void* rbuf, int rcount,                                                  struct ompi_datatype_t *rdtype,                                                  struct ompi_communicator_t *comm){   int line = -1;   int rank, size, pow2size;   int remote, distance, sendblocklocation;   int err = 0;   ptrdiff_t slb, rlb, sext, rext;   char *tmpsend = NULL, *tmprecv = NULL;   size = ompi_comm_size(comm);   rank = ompi_comm_rank(comm);   for (pow2size  = 1; pow2size <= size; pow2size <<=1);   pow2size >>=1;   /* Current implementation only handles power-of-two number of processes.      If the function was called on non-power-of-two number of processes,       print warning and call bruck allgather algorithm with same parameters.    */   if (pow2size != size) {      OPAL_OUTPUT((ompi_coll_tuned_stream,                   "coll:tuned:allgather_intra_recursivedoubling WARNING: non-pow-2 size %d, switching to bruck algorithm",                    size));      return ompi_coll_tuned_allgather_intra_bruck(sbuf, scount, sdtype,                                                    rbuf, rcount, rdtype, comm);   }   OPAL_OUTPUT((ompi_coll_tuned_stream,                "coll:tuned:allgather_intra_recursivedoubling rank %d, size %d",                 rank, size));   err = ompi_ddt_get_extent (sdtype, &slb, &sext);   if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }   err = ompi_ddt_get_extent (rdtype, &rlb, &rext);   if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }   /* Initialization step:      - if send buffer is not MPI_IN_PLACE, copy send buffer to block 0 of       receive buffer   */   if (MPI_IN_PLACE != sbuf) {      tmpsend = (char*) sbuf;      tmprecv = (char*) rbuf + rank * rcount * rext;      err = ompi_ddt_sndrcv(tmpsend, scount, sdtype, tmprecv, rcount, rdtype);

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?