coll_tuned_allreduce.c

来自「MPI stands for the Message Passing Inter」· C语言 代码 · 共 680 行 · 第 1/2 页

C
680
字号
   int rank, size, k, recvfrom, sendto;   int segcount, lastsegcount, maxsegcount;   int blockcount, inbi;   size_t typelng;   char *tmpsend = NULL, *tmprecv = NULL;   char *inbuf[2] = {NULL, NULL};   ptrdiff_t true_lb, true_extent, lb, extent, realsegsize, maxrealsegsize;   ompi_request_t *reqs[2] = {NULL, NULL};   size = ompi_comm_size(comm);   rank = ompi_comm_rank(comm);   OPAL_OUTPUT((ompi_coll_tuned_stream,                "coll:tuned:allreduce_intra_ring rank %d, count %d", rank, count));         /* Special case for size == 1 */   if (1 == size) {      if (MPI_IN_PLACE != sbuf) {         ret = ompi_ddt_copy_content_same_ddt(dtype, count, (char*)rbuf, (char*)sbuf);         if (ret < 0) { line = __LINE__; goto error_hndl; }      }      return MPI_SUCCESS;   }   /* Special case for count less than size - 1 - use recursive doubling */   if (count < size - 1) {      OPAL_OUTPUT((ompi_coll_tuned_stream, "coll:tuned:allreduce_ring rank %d/%d, count %d, switching to recursive doubling", rank, size, count));      return (ompi_coll_tuned_allreduce_intra_recursivedoubling(sbuf, rbuf,                                                                 count,                                                                dtype, op,                                                                 comm));   }   /* Allocate and initialize temporary buffers */   ret = ompi_ddt_get_extent(dtype, &lb, &extent);   if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }   ret = ompi_ddt_get_true_extent(dtype, &true_lb, &true_extent);   if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }   ret = ompi_ddt_type_size( dtype, &typelng);   if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }   /* Determine number of elements per block.        This is not the same computation as the one for number of elements       per segment - and we may end up having last block larger any other block.    */   segcount = count / size;   if (0 != count % size) segcount++;   lastsegcount = count - (size - 1) * segcount;   if (lastsegcount <= 0) {      segcount--;      lastsegcount = count - (size - 1) * segcount;   }   realsegsize = segcount * extent;   maxsegcount = (segcount > lastsegcount)? segcount : lastsegcount;   maxrealsegsize = true_extent + (maxsegcount - 1) * extent;   inbuf[0] = (char*)malloc(maxrealsegsize);   if (NULL == inbuf[0]) { ret = -1; line = __LINE__; goto error_hndl; }   if (size > 2) {      inbuf[1] = (char*)malloc(maxrealsegsize);      if (NULL == inbuf[1]) { ret = -1; line = __LINE__; goto error_hndl; }   }   /* Handle MPI_IN_PLACE */   if (MPI_IN_PLACE != sbuf) {      ret = ompi_ddt_copy_content_same_ddt(dtype, count, (char*)rbuf, (char*)sbuf);      if (ret < 0) { line = __LINE__; goto error_hndl; }   }   /* Computation loop */   /*       For each of the remote nodes:      - post irecv for block (r-1)      - send block (r)      - in loop for every step k = 2 .. n         - post irecv for block (r + n - k) % n         - wait on block (r + n - k + 1) % n to arrive         - compute on block (r + n - k + 1) % n         - send block (r + n - k + 1) % n     - wait on block (r + 1)     - compute on block (r + 1)     - send block (r + 1) to rank (r + 1)     Note that for send operations and computation we must compute the exact      block size.    */   sendto = (rank + 1) % size;   recvfrom = (rank + size - 1) % size;   inbi = 0;   tmpsend = ((char*)rbuf) + rank * realsegsize;   /* Initialize first receive from the neighbor on the left */   ret = MCA_PML_CALL(irecv(inbuf[inbi], maxsegcount, dtype, recvfrom,                            MCA_COLL_BASE_TAG_ALLREDUCE, comm, &reqs[inbi]));   if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }   /* Send first block to the neighbor on the right */   blockcount = segcount;   if ((size - 1) == rank) { blockcount = lastsegcount; }   ret = MCA_PML_CALL(send(tmpsend, blockcount, dtype, sendto,                           MCA_COLL_BASE_TAG_ALLREDUCE,                           MCA_PML_BASE_SEND_STANDARD, comm));   if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }      for (k = 2; k < size; k++) {      const int prevblock = (rank + size - k + 1) % size;            inbi = inbi ^ 0x1;            /* Post irecv for the current block */      ret = MCA_PML_CALL(irecv(inbuf[inbi], maxsegcount, dtype, recvfrom,                               MCA_COLL_BASE_TAG_ALLREDUCE, comm, &reqs[inbi]));      if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }            /* Wait on previous block to arrive */      ret = ompi_request_wait(&reqs[inbi ^ 0x1], MPI_STATUS_IGNORE);      if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }            /* Apply operation on previous block: result goes to rbuf         rbuf[prevblock] = inbuf[inbi ^ 0x1] (op) rbuf[prevblock]      */      blockcount = segcount;      if ((size - 1) == prevblock) { blockcount = lastsegcount; }      tmprecv = ((char*)rbuf) + prevblock * realsegsize;      ompi_op_reduce(op, inbuf[inbi ^ 0x1], tmprecv, blockcount, dtype);            /* send previous block to sendto */      ret = MCA_PML_CALL(send(tmprecv, blockcount, dtype, sendto,                              MCA_COLL_BASE_TAG_ALLREDUCE,                              MCA_PML_BASE_SEND_STANDARD, comm));      if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }   }   /* Wait on the last block to arrive */   ret = ompi_request_wait(&reqs[inbi], MPI_STATUS_IGNORE);   if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }   /* Apply operation on the last block (from neighbor (rank + 1)       rbuf[rank+1] = inbuf[inbi] (op) rbuf[rank + 1] */   blockcount = segcount;   if ((size - 1) == (rank + 1) % size) { blockcount = lastsegcount; }   tmprecv = ((char*)rbuf) + ((rank + 1) % size) * realsegsize;   ompi_op_reduce(op, inbuf[inbi], tmprecv, blockcount, dtype);      /* Distribution loop - variation of ring allgather */   for (k = 0; k < size - 1; k++) {      const int recvdatafrom = (rank + size - k) % size;      const int senddatafrom = (rank + 1 + size - k) % size;      blockcount = segcount;      if ((size - 1) == senddatafrom) blockcount = lastsegcount;      tmprecv = (char*)rbuf + recvdatafrom * realsegsize;      tmpsend = (char*)rbuf + senddatafrom * realsegsize;      ret = ompi_coll_tuned_sendrecv(tmpsend, blockcount, dtype, sendto,                                     MCA_COLL_BASE_TAG_ALLREDUCE,                                     tmprecv, maxsegcount, dtype, recvfrom,                                     MCA_COLL_BASE_TAG_ALLREDUCE,                                     comm, MPI_STATUS_IGNORE, rank);      if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl;}   }   if (NULL != inbuf[0]) free(inbuf[0]);   if (NULL != inbuf[1]) free(inbuf[1]);   return MPI_SUCCESS; error_hndl:   OPAL_OUTPUT((ompi_coll_tuned_stream, "%s:%4d\tRank %d Error occurred %d\n",                __FILE__, line, rank, ret));   if (NULL != inbuf[0]) free(inbuf[0]);   if (NULL != inbuf[1]) free(inbuf[1]);   return ret;}/* * Linear functions are copied from the BASIC coll module * they do not segment the message and are simple implementations * but for some small number of nodes and/or small data sizes they  * are just as fast as tuned/tree based segmenting operations  * and as such may be selected by the decision functions * These are copied into this module due to the way we select modules * in V1. i.e. in V2 we will handle this differently and so will not * have to duplicate code. * GEF Oct05 after asking Jeff. *//* copied function (with appropriate renaming) starts here *//* *	allreduce_intra * *	Function:	- allreduce using other MPI collectives *	Accepts:	- same as MPI_Allreduce() *	Returns:	- MPI_SUCCESS or error code */intompi_coll_tuned_allreduce_intra_basic_linear(void *sbuf, void *rbuf, int count,                                             struct ompi_datatype_t *dtype,                                             struct ompi_op_t *op,                                             struct ompi_communicator_t *comm){    int err;    int rank;    rank = ompi_comm_rank(comm);    OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:allreduce_intra_basic_linear rank %d", rank));    /* Reduce to 0 and broadcast. */    if (MPI_IN_PLACE == sbuf) {        if (0 == ompi_comm_rank(comm)) {            err = ompi_coll_tuned_reduce_intra_basic_linear (MPI_IN_PLACE, rbuf, count, dtype, op, 0, comm);        } else {            err = ompi_coll_tuned_reduce_intra_basic_linear(rbuf, NULL, count, dtype, op, 0, comm);        }    } else {        err = ompi_coll_tuned_reduce_intra_basic_linear(sbuf, rbuf, count, dtype, op, 0, comm);    }    if (MPI_SUCCESS != err) {        return err;    }    return ompi_coll_tuned_bcast_intra_basic_linear(rbuf, count, dtype, 0, comm);}/* copied function (with appropriate renaming) ends here *//* The following are used by dynamic and forced rules *//* publish details of each algorithm and if its forced/fixed/locked in *//* as you add methods/algorithms you must update this and the query/map routines *//* this routine is called by the component only *//* this makes sure that the mca parameters are set to their initial values and perms *//* module does not call this they call the forced_getvalues routine instead */int ompi_coll_tuned_allreduce_intra_check_forced_init (coll_tuned_force_algorithm_mca_param_indices_t *mca_param_indices){    int rc, max_alg = 4, requested_alg;    ompi_coll_tuned_forced_max_algorithms[ALLREDUCE] = max_alg;    rc = mca_base_param_reg_int (&mca_coll_tuned_component.super.collm_version,                                 "allreduce_algorithm_count",                                 "Number of allreduce algorithms available",                                 false, true, max_alg, NULL);    mca_param_indices->algorithm_param_index        = mca_base_param_reg_int( &mca_coll_tuned_component.super.collm_version,                                  "allreduce_algorithm",                                  "Which allreduce algorithm is used. Can be locked down to any of: 0 ignore, 1 basic linear, 2 nonoverlapping (tuned reduce + tuned bcast), 3 recursive doubling, 4 ring",                                  false, false, 0, NULL);    mca_base_param_lookup_int( mca_param_indices->algorithm_param_index, &(requested_alg));    if( requested_alg > max_alg ) {        if( 0 == ompi_comm_rank( MPI_COMM_WORLD ) ) {            opal_output( 0, "Allreduce algorithm #%d is not available (range [0..%d]). Switching back to ignore(0)\n",                         requested_alg, max_alg );        }        mca_base_param_set_int( mca_param_indices->algorithm_param_index, 0);    }    mca_param_indices->segsize_param_index        = mca_base_param_reg_int( &mca_coll_tuned_component.super.collm_version,                                  "allreduce_algorithm_segmentsize",                                  "Segment size in bytes used by default for allreduce algorithms. Only has meaning if algorithm is forced and supports segmenting. 0 bytes means no segmentation.",                                  false, false, 0, NULL);      mca_param_indices->tree_fanout_param_index        = mca_base_param_reg_int( &mca_coll_tuned_component.super.collm_version,                                  "allreduce_algorithm_tree_fanout",                                  "Fanout for n-tree used for allreduce algorithms. Only has meaning if algorithm is forced and supports n-tree topo based operation.",                                  false, false, ompi_coll_tuned_init_tree_fanout, /* get system wide default */                                  NULL);    mca_param_indices->chain_fanout_param_index        = mca_base_param_reg_int( &mca_coll_tuned_component.super.collm_version,                                  "allreduce_algorithm_chain_fanout",                                  "Fanout for chains used for allreduce algorithms. Only has meaning if algorithm is forced and supports chain topo based operation.",                                  false, false,                                  ompi_coll_tuned_init_chain_fanout, /* get system wide default */                                  NULL);    return (MPI_SUCCESS);}int ompi_coll_tuned_allreduce_intra_do_forced(void *sbuf, void *rbuf, int count,                                              struct ompi_datatype_t *dtype,                                              struct ompi_op_t *op,                                              struct ompi_communicator_t *comm){    OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:allreduce_intra_do_forced selected algorithm %d, segment size %d",                  comm->c_coll_selected_data->user_forced[ALLREDUCE].algorithm,                 comm->c_coll_selected_data->user_forced[ALLREDUCE].segsize));    switch (comm->c_coll_selected_data->user_forced[ALLREDUCE].algorithm) {    case (0):  return ompi_coll_tuned_allreduce_intra_dec_fixed (sbuf, rbuf, count, dtype, op, comm);    case (1):  return ompi_coll_tuned_allreduce_intra_basic_linear (sbuf, rbuf, count, dtype, op, comm);    case (2):  return ompi_coll_tuned_allreduce_intra_nonoverlapping (sbuf, rbuf, count, dtype, op, comm);    case (3):  return ompi_coll_tuned_allreduce_intra_recursivedoubling (sbuf, rbuf, count, dtype, op, comm);    case (4):  return ompi_coll_tuned_allreduce_intra_ring (sbuf, rbuf, count, dtype, op, comm);    default:        OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:allreduce_intra_do_forced attempt to select algorithm %d when only 0-%d is valid?",                     comm->c_coll_selected_data->user_forced[ALLREDUCE].algorithm,                      ompi_coll_tuned_forced_max_algorithms[ALLREDUCE]));        return (MPI_ERR_ARG);    } /* switch */}int ompi_coll_tuned_allreduce_intra_do_this(void *sbuf, void *rbuf, int count,                                            struct ompi_datatype_t *dtype,                                            struct ompi_op_t *op,                                            struct ompi_communicator_t *comm,                                            int algorithm, int faninout, int segsize){    OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:allreduce_intra_do_this algorithm %d topo fan in/out %d segsize %d",                  algorithm, faninout, segsize));    switch (algorithm) {    case (0):   return ompi_coll_tuned_allreduce_intra_dec_fixed (sbuf, rbuf, count, dtype, op, comm);    case (1):   return ompi_coll_tuned_allreduce_intra_basic_linear (sbuf, rbuf, count, dtype, op, comm);    case (2):   return ompi_coll_tuned_allreduce_intra_nonoverlapping (sbuf, rbuf, count, dtype, op, comm);    case (3):   return ompi_coll_tuned_allreduce_intra_recursivedoubling (sbuf, rbuf, count, dtype, op, comm);    case (4):   return ompi_coll_tuned_allreduce_intra_ring (sbuf, rbuf, count, dtype, op, comm);    default:        OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:allreduce_intra_do_this attempt to select algorithm %d when only 0-%d is valid?",                     algorithm, ompi_coll_tuned_forced_max_algorithms[ALLREDUCE]));        return (MPI_ERR_ARG);    } /* switch */}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?