coll_tuned_allreduce.c

来自「MPI stands for the Message Passing Inter」· C语言 代码 · 共 680 行 · 第 1/2 页

C
680
字号
/* * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana *                         University Research and Technology *                         Corporation.  All rights reserved. * Copyright (c) 2004-2006 The University of Tennessee and The University *                         of Tennessee Research Foundation.  All rights *                         reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, *                         University of Stuttgart.  All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. *                         All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */#include "ompi_config.h"#include "mpi.h"#include "ompi/constants.h"#include "ompi/datatype/datatype.h"#include "ompi/communicator/communicator.h"#include "ompi/mca/coll/coll.h"#include "ompi/mca/coll/base/coll_tags.h"#include "ompi/mca/pml/pml.h"#include "ompi/op/op.h"#include "coll_tuned.h"#include "coll_tuned_topo.h"#include "coll_tuned_util.h"/* * ompi_coll_tuned_allreduce_intra_nonoverlapping * * This function just calls a reduce followed by a broadcast * both called functions are tuned but they complete sequentially, * i.e. no additional overlapping * meaning if the number of segments used is greater than the topo depth  * then once the first segment of data is fully 'reduced' it is not broadcast * while the reduce continues (cost = cost-reduce + cost-bcast + decision x 3) * */intompi_coll_tuned_allreduce_intra_nonoverlapping(void *sbuf, void *rbuf, int count,                                               struct ompi_datatype_t *dtype,                                               struct ompi_op_t *op,                                               struct ompi_communicator_t *comm){    int err;    int rank;    rank = ompi_comm_rank(comm);    OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:allreduce_intra_nonoverlapping rank %d", rank));    /* Reduce to 0 and broadcast. */    if (MPI_IN_PLACE == sbuf) {        if (0 == ompi_comm_rank(comm)) {            err = comm->c_coll.coll_reduce (MPI_IN_PLACE, rbuf, count, dtype,                                             op, 0, comm);        } else {            err = comm->c_coll.coll_reduce (rbuf, NULL, count, dtype, op, 0,                                             comm);        }    } else {        err = comm->c_coll.coll_reduce (sbuf, rbuf, count, dtype, op, 0, comm);    }    if (MPI_SUCCESS != err) {        return err;    }    return comm->c_coll.coll_bcast (rbuf, count, dtype, 0, comm);}/* *   ompi_coll_tuned_allreduce_intra_recursivedoubling * *   Function:       Recursive doubling algorithm for allreduce operation *   Accepts:        Same as MPI_Allreduce() *   Returns:        MPI_SUCCESS or error code * *   Description:    Implements recursive doubling algorithm for allreduce.   *                   Original (non-segmented) implementation is used in MPICH-2  *                   for small and intermediate size messages. *                   The algorithm preserves order of operations so it can  *                   be used both by commutative and non-commutative operations. * *         Example on 7 nodes: *         Initial state *         #      0       1      2       3      4       5      6  *               [0]     [1]    [2]     [3]    [4]     [5]    [6] *         Initial adjustment step for non-power of two nodes. *         old rank      1              3              5      6 *         new rank      0              1              2      3 *                     [0+1]          [2+3]          [4+5]   [6] *         Step 1 *         old rank      1              3              5      6 *         new rank      0              1              2      3 *                     [0+1+]         [0+1+]         [4+5+]  [4+5+] *                     [2+3+]         [2+3+]         [6   ]  [6   ] *         Step 2 *         old rank      1              3              5      6 *         new rank      0              1              2      3 *                     [0+1+]         [0+1+]         [0+1+]  [0+1+] *                     [2+3+]         [2+3+]         [2+3+]  [2+3+]   *                     [4+5+]         [4+5+]         [4+5+]  [4+5+] *                     [6   ]         [6   ]         [6   ]  [6   ] *         Final adjustment step for non-power of two nodes *         #      0       1      2       3      4       5      6  *              [0+1+] [0+1+] [0+1+]  [0+1+] [0+1+]  [0+1+] [0+1+] *              [2+3+] [2+3+] [2+3+]  [2+3+] [2+3+]  [2+3+] [2+3+]  *              [4+5+] [4+5+] [4+5+]  [4+5+] [4+5+]  [4+5+] [4+5+] *              [6   ] [6   ] [6   ]  [6   ] [6   ]  [6   ] [6   ] * */int ompi_coll_tuned_allreduce_intra_recursivedoubling(void *sbuf, void *rbuf,                                                   int count,                                                  struct ompi_datatype_t *dtype,                                                  struct ompi_op_t *op,                                                  struct ompi_communicator_t *comm) {   int ret, line;   int rank, size, adjsize, remote, distance;   int newrank, newremote, extra_ranks;   char *tmpsend = NULL, *tmprecv = NULL, *tmpswap = NULL, *inplacebuf = NULL;   ptrdiff_t true_lb, true_extent, lb, extent;   ompi_request_t *reqs[2] = {NULL, NULL};   size = ompi_comm_size(comm);   rank = ompi_comm_rank(comm);   OPAL_OUTPUT((ompi_coll_tuned_stream,                "coll:tuned:allreduce_intra_recursivedoubling rank %d", rank));      /* Special case for size == 1 */   if (1 == size) {      if (MPI_IN_PLACE != sbuf) {         ret = ompi_ddt_copy_content_same_ddt(dtype, count, (char*)rbuf, (char*)sbuf);         if (ret < 0) { line = __LINE__; goto error_hndl; }      }      return MPI_SUCCESS;   }   /* Allocate and initialize temporary send buffer */   ret = ompi_ddt_get_extent(dtype, &lb, &extent);   if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }   ret = ompi_ddt_get_true_extent(dtype, &true_lb, &true_extent);   if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }   inplacebuf = (char*) malloc(true_extent + (count - 1) * extent);   if (NULL == inplacebuf) { ret = -1; line = __LINE__; goto error_hndl; }   if (MPI_IN_PLACE == sbuf) {      ret = ompi_ddt_copy_content_same_ddt(dtype, count, inplacebuf, (char*)rbuf);      if (ret < 0) { line = __LINE__; goto error_hndl; }   } else {      ret = ompi_ddt_copy_content_same_ddt(dtype, count, inplacebuf, (char*)sbuf);      if (ret < 0) { line = __LINE__; goto error_hndl; }   }   tmpsend = (char*) inplacebuf;   tmprecv = (char*) rbuf;   /* Determine nearest power of two less than or equal to size */   for (adjsize = 0x1; adjsize <= size; adjsize <<= 1); adjsize = adjsize >> 1;   /* Handle non-power-of-two case:      - Even ranks less than 2 * extra_ranks send their data to (rank + 1), and         sets new rank to -1.      - Odd ranks less than 2 * extra_ranks receive data from (rank - 1),         apply appropriate operation, and set new rank to rank/2      - Everyone else sets rank to rank - extra_ranks    */   extra_ranks = size - adjsize;   if (rank <  (2 * extra_ranks)) {      if (0 == (rank % 2)) {         ret = MCA_PML_CALL(send(tmpsend, count, dtype, (rank + 1),                                  MCA_COLL_BASE_TAG_ALLREDUCE,                                 MCA_PML_BASE_SEND_STANDARD, comm));         if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }         newrank = -1;      } else {         ret = MCA_PML_CALL(recv(tmprecv, count, dtype, (rank - 1),                                 MCA_COLL_BASE_TAG_ALLREDUCE, comm,                                 MPI_STATUS_IGNORE));         if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }         /* tmpsend = tmprecv (op) tmpsend */         ompi_op_reduce(op, tmprecv, tmpsend, count, dtype);         newrank = rank >> 1;      }   } else {      newrank = rank - extra_ranks;   }   /* Communication/Computation loop       - Exchange message with remote node.      - Perform appropriate operation taking in account order of operations:        result = value (op) result    */   for (distance = 0x1; distance < adjsize; distance <<=1) {      if (newrank < 0) break;      /* Determine remote node */      newremote = newrank ^ distance;      remote = (newremote < extra_ranks)?          (newremote * 2 + 1):(newremote + extra_ranks);      /* Exchange the data */      ret = MCA_PML_CALL(irecv(tmprecv, count, dtype, remote,                               MCA_COLL_BASE_TAG_ALLREDUCE, comm, &reqs[0]));      if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }      ret = MCA_PML_CALL(isend(tmpsend, count, dtype, remote,                                MCA_COLL_BASE_TAG_ALLREDUCE,                               MCA_PML_BASE_SEND_STANDARD, comm, &reqs[1]));      if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }      ret = ompi_request_wait_all(2, reqs, MPI_STATUSES_IGNORE);      if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }      /* Apply operation */      if (rank < remote) {         /* tmprecv = tmpsend (op) tmprecv */         ompi_op_reduce(op, tmpsend, tmprecv, count, dtype);         tmpswap = tmprecv;         tmprecv = tmpsend;         tmpsend = tmpswap;      } else {         /* tmpsend = tmprecv (op) tmpsend */         ompi_op_reduce(op, tmprecv, tmpsend, count, dtype);      }   }   /* Handle non-power-of-two case:      - Odd ranks less than 2 * extra_ranks send result from tmpsend to         (rank - 1)      - Even ranks less than 2 * extra_ranks receive result from (rank + 1)   */   if (rank < (2 * extra_ranks)) {      if (0 == (rank % 2)) {         ret = MCA_PML_CALL(recv(rbuf, count, dtype, (rank + 1),                                 MCA_COLL_BASE_TAG_ALLREDUCE, comm,                                  MPI_STATUS_IGNORE));         if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }         tmpsend = (char*)rbuf;      } else {         ret = MCA_PML_CALL(send(tmpsend, count, dtype, (rank - 1),                                 MCA_COLL_BASE_TAG_ALLREDUCE,                                 MCA_PML_BASE_SEND_STANDARD, comm));         if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }      }   }   /* Ensure that the final result is in rbuf */   if (tmpsend != rbuf) {      ret = ompi_ddt_copy_content_same_ddt(dtype, count, (char*)rbuf, tmpsend);      if (ret < 0) { line = __LINE__; goto error_hndl; }   }   if (NULL != inplacebuf) free(inplacebuf);   return MPI_SUCCESS; error_hndl:   OPAL_OUTPUT((ompi_coll_tuned_stream, "%s:%4d\tRank %d Error occurred %d\n",                __FILE__, line, rank, ret));   if (NULL != inplacebuf) free(inplacebuf);   return ret;}/* *   ompi_coll_tuned_allreduce_intra_ring * *   Function:       Ring algorithm for allreduce operation *   Accepts:        Same as MPI_Allreduce() *   Returns:        MPI_SUCCESS or error code * *   Description:    Implements ring algorithm for allreduce: the message is *                   automatically segmented to segment of size M/N. *                   Algorithm requires 2*N - 1 steps. * *   Limitations:    The algorithm DOES NOT preserve order of operations so it  *                   can be used only for commutative operations. *                   In addition, algorithm cannot work if the total count is  *                   less than size. *         Example on 5 nodes: *         Initial state *   #      0              1             2              3             4 *        [00]           [10]          [20]           [30]           [40] *        [01]           [11]          [21]           [31]           [41] *        [02]           [12]          [22]           [32]           [42] *        [03]           [13]          [23]           [33]           [43] *        [04]           [14]          [24]           [34]           [44] * *        COMPUTATION PHASE *         Step 0: rank r sends block r to rank (r+1) and receives bloc (r-1)  *                 from rank (r-1) [with wraparound]. *    #     0              1             2              3             4 *        [00]          [00+10]        [20]           [30]           [40] *        [01]           [11]         [11+21]         [31]           [41] *        [02]           [12]          [22]          [22+32]         [42] *        [03]           [13]          [23]           [33]         [33+43] *      [44+04]          [14]          [24]           [34]           [44] * *         Step 1: rank r sends block (r-1) to rank (r+1) and receives bloc  *                 (r-2) from rank (r-1) [with wraparound]. *    #      0              1             2              3             4 *         [00]          [00+10]     [01+10+20]        [30]           [40] *         [01]           [11]         [11+21]      [11+21+31]        [41] *         [02]           [12]          [22]          [22+32]      [22+32+42] *      [33+43+03]        [13]          [23]           [33]         [33+43] *        [44+04]       [44+04+14]       [24]           [34]           [44] * *         Step 2: rank r sends block (r-2) to rank (r+1) and receives bloc  *                 (r-2) from rank (r-1) [with wraparound]. *    #      0              1             2              3             4 *         [00]          [00+10]     [01+10+20]    [01+10+20+30]      [40] *         [01]           [11]         [11+21]      [11+21+31]    [11+21+31+41] *     [22+32+42+02]      [12]          [22]          [22+32]      [22+32+42] *      [33+43+03]    [33+43+03+13]     [23]           [33]         [33+43] *        [44+04]       [44+04+14]  [44+04+14+24]      [34]           [44] * *         Step 3: rank r sends block (r-3) to rank (r+1) and receives bloc  *                 (r-3) from rank (r-1) [with wraparound]. *    #      0              1             2              3             4 *         [00]          [00+10]     [01+10+20]    [01+10+20+30]     [FULL] *        [FULL]           [11]        [11+21]      [11+21+31]    [11+21+31+41] *     [22+32+42+02]     [FULL]          [22]         [22+32]      [22+32+42] *      [33+43+03]    [33+43+03+13]     [FULL]          [33]         [33+43] *        [44+04]       [44+04+14]  [44+04+14+24]      [FULL]         [44] *          *        DISTRIBUTION PHASE: ring ALLGATHER with ranks shifted by 1. * */int ompi_coll_tuned_allreduce_intra_ring(void *sbuf, void *rbuf, int count,                                     struct ompi_datatype_t *dtype,                                     struct ompi_op_t *op,                                     struct ompi_communicator_t *comm) {   int ret, line;

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?