coll_tuned_allreduce.c
来自「MPI stands for the Message Passing Inter」· C语言 代码 · 共 680 行 · 第 1/2 页
C
680 行
int rank, size, k, recvfrom, sendto; int segcount, lastsegcount, maxsegcount; int blockcount, inbi; size_t typelng; char *tmpsend = NULL, *tmprecv = NULL; char *inbuf[2] = {NULL, NULL}; ptrdiff_t true_lb, true_extent, lb, extent, realsegsize, maxrealsegsize; ompi_request_t *reqs[2] = {NULL, NULL}; size = ompi_comm_size(comm); rank = ompi_comm_rank(comm); OPAL_OUTPUT((ompi_coll_tuned_stream, "coll:tuned:allreduce_intra_ring rank %d, count %d", rank, count)); /* Special case for size == 1 */ if (1 == size) { if (MPI_IN_PLACE != sbuf) { ret = ompi_ddt_copy_content_same_ddt(dtype, count, (char*)rbuf, (char*)sbuf); if (ret < 0) { line = __LINE__; goto error_hndl; } } return MPI_SUCCESS; } /* Special case for count less than size - 1 - use recursive doubling */ if (count < size - 1) { OPAL_OUTPUT((ompi_coll_tuned_stream, "coll:tuned:allreduce_ring rank %d/%d, count %d, switching to recursive doubling", rank, size, count)); return (ompi_coll_tuned_allreduce_intra_recursivedoubling(sbuf, rbuf, count, dtype, op, comm)); } /* Allocate and initialize temporary buffers */ ret = ompi_ddt_get_extent(dtype, &lb, &extent); if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; } ret = ompi_ddt_get_true_extent(dtype, &true_lb, &true_extent); if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; } ret = ompi_ddt_type_size( dtype, &typelng); if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; } /* Determine number of elements per block. This is not the same computation as the one for number of elements per segment - and we may end up having last block larger any other block. */ segcount = count / size; if (0 != count % size) segcount++; lastsegcount = count - (size - 1) * segcount; if (lastsegcount <= 0) { segcount--; lastsegcount = count - (size - 1) * segcount; } realsegsize = segcount * extent; maxsegcount = (segcount > lastsegcount)? segcount : lastsegcount; maxrealsegsize = true_extent + (maxsegcount - 1) * extent; inbuf[0] = (char*)malloc(maxrealsegsize); if (NULL == inbuf[0]) { ret = -1; line = __LINE__; goto error_hndl; } if (size > 2) { inbuf[1] = (char*)malloc(maxrealsegsize); if (NULL == inbuf[1]) { ret = -1; line = __LINE__; goto error_hndl; } } /* Handle MPI_IN_PLACE */ if (MPI_IN_PLACE != sbuf) { ret = ompi_ddt_copy_content_same_ddt(dtype, count, (char*)rbuf, (char*)sbuf); if (ret < 0) { line = __LINE__; goto error_hndl; } } /* Computation loop */ /* For each of the remote nodes: - post irecv for block (r-1) - send block (r) - in loop for every step k = 2 .. n - post irecv for block (r + n - k) % n - wait on block (r + n - k + 1) % n to arrive - compute on block (r + n - k + 1) % n - send block (r + n - k + 1) % n - wait on block (r + 1) - compute on block (r + 1) - send block (r + 1) to rank (r + 1) Note that for send operations and computation we must compute the exact block size. */ sendto = (rank + 1) % size; recvfrom = (rank + size - 1) % size; inbi = 0; tmpsend = ((char*)rbuf) + rank * realsegsize; /* Initialize first receive from the neighbor on the left */ ret = MCA_PML_CALL(irecv(inbuf[inbi], maxsegcount, dtype, recvfrom, MCA_COLL_BASE_TAG_ALLREDUCE, comm, &reqs[inbi])); if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; } /* Send first block to the neighbor on the right */ blockcount = segcount; if ((size - 1) == rank) { blockcount = lastsegcount; } ret = MCA_PML_CALL(send(tmpsend, blockcount, dtype, sendto, MCA_COLL_BASE_TAG_ALLREDUCE, MCA_PML_BASE_SEND_STANDARD, comm)); if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; } for (k = 2; k < size; k++) { const int prevblock = (rank + size - k + 1) % size; inbi = inbi ^ 0x1; /* Post irecv for the current block */ ret = MCA_PML_CALL(irecv(inbuf[inbi], maxsegcount, dtype, recvfrom, MCA_COLL_BASE_TAG_ALLREDUCE, comm, &reqs[inbi])); if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; } /* Wait on previous block to arrive */ ret = ompi_request_wait(&reqs[inbi ^ 0x1], MPI_STATUS_IGNORE); if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; } /* Apply operation on previous block: result goes to rbuf rbuf[prevblock] = inbuf[inbi ^ 0x1] (op) rbuf[prevblock] */ blockcount = segcount; if ((size - 1) == prevblock) { blockcount = lastsegcount; } tmprecv = ((char*)rbuf) + prevblock * realsegsize; ompi_op_reduce(op, inbuf[inbi ^ 0x1], tmprecv, blockcount, dtype); /* send previous block to sendto */ ret = MCA_PML_CALL(send(tmprecv, blockcount, dtype, sendto, MCA_COLL_BASE_TAG_ALLREDUCE, MCA_PML_BASE_SEND_STANDARD, comm)); if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; } } /* Wait on the last block to arrive */ ret = ompi_request_wait(&reqs[inbi], MPI_STATUS_IGNORE); if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; } /* Apply operation on the last block (from neighbor (rank + 1) rbuf[rank+1] = inbuf[inbi] (op) rbuf[rank + 1] */ blockcount = segcount; if ((size - 1) == (rank + 1) % size) { blockcount = lastsegcount; } tmprecv = ((char*)rbuf) + ((rank + 1) % size) * realsegsize; ompi_op_reduce(op, inbuf[inbi], tmprecv, blockcount, dtype); /* Distribution loop - variation of ring allgather */ for (k = 0; k < size - 1; k++) { const int recvdatafrom = (rank + size - k) % size; const int senddatafrom = (rank + 1 + size - k) % size; blockcount = segcount; if ((size - 1) == senddatafrom) blockcount = lastsegcount; tmprecv = (char*)rbuf + recvdatafrom * realsegsize; tmpsend = (char*)rbuf + senddatafrom * realsegsize; ret = ompi_coll_tuned_sendrecv(tmpsend, blockcount, dtype, sendto, MCA_COLL_BASE_TAG_ALLREDUCE, tmprecv, maxsegcount, dtype, recvfrom, MCA_COLL_BASE_TAG_ALLREDUCE, comm, MPI_STATUS_IGNORE, rank); if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl;} } if (NULL != inbuf[0]) free(inbuf[0]); if (NULL != inbuf[1]) free(inbuf[1]); return MPI_SUCCESS; error_hndl: OPAL_OUTPUT((ompi_coll_tuned_stream, "%s:%4d\tRank %d Error occurred %d\n", __FILE__, line, rank, ret)); if (NULL != inbuf[0]) free(inbuf[0]); if (NULL != inbuf[1]) free(inbuf[1]); return ret;}/* * Linear functions are copied from the BASIC coll module * they do not segment the message and are simple implementations * but for some small number of nodes and/or small data sizes they * are just as fast as tuned/tree based segmenting operations * and as such may be selected by the decision functions * These are copied into this module due to the way we select modules * in V1. i.e. in V2 we will handle this differently and so will not * have to duplicate code. * GEF Oct05 after asking Jeff. *//* copied function (with appropriate renaming) starts here *//* * allreduce_intra * * Function: - allreduce using other MPI collectives * Accepts: - same as MPI_Allreduce() * Returns: - MPI_SUCCESS or error code */intompi_coll_tuned_allreduce_intra_basic_linear(void *sbuf, void *rbuf, int count, struct ompi_datatype_t *dtype, struct ompi_op_t *op, struct ompi_communicator_t *comm){ int err; int rank; rank = ompi_comm_rank(comm); OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:allreduce_intra_basic_linear rank %d", rank)); /* Reduce to 0 and broadcast. */ if (MPI_IN_PLACE == sbuf) { if (0 == ompi_comm_rank(comm)) { err = ompi_coll_tuned_reduce_intra_basic_linear (MPI_IN_PLACE, rbuf, count, dtype, op, 0, comm); } else { err = ompi_coll_tuned_reduce_intra_basic_linear(rbuf, NULL, count, dtype, op, 0, comm); } } else { err = ompi_coll_tuned_reduce_intra_basic_linear(sbuf, rbuf, count, dtype, op, 0, comm); } if (MPI_SUCCESS != err) { return err; } return ompi_coll_tuned_bcast_intra_basic_linear(rbuf, count, dtype, 0, comm);}/* copied function (with appropriate renaming) ends here *//* The following are used by dynamic and forced rules *//* publish details of each algorithm and if its forced/fixed/locked in *//* as you add methods/algorithms you must update this and the query/map routines *//* this routine is called by the component only *//* this makes sure that the mca parameters are set to their initial values and perms *//* module does not call this they call the forced_getvalues routine instead */int ompi_coll_tuned_allreduce_intra_check_forced_init (coll_tuned_force_algorithm_mca_param_indices_t *mca_param_indices){ int rc, max_alg = 4, requested_alg; ompi_coll_tuned_forced_max_algorithms[ALLREDUCE] = max_alg; rc = mca_base_param_reg_int (&mca_coll_tuned_component.super.collm_version, "allreduce_algorithm_count", "Number of allreduce algorithms available", false, true, max_alg, NULL); mca_param_indices->algorithm_param_index = mca_base_param_reg_int( &mca_coll_tuned_component.super.collm_version, "allreduce_algorithm", "Which allreduce algorithm is used. Can be locked down to any of: 0 ignore, 1 basic linear, 2 nonoverlapping (tuned reduce + tuned bcast), 3 recursive doubling, 4 ring", false, false, 0, NULL); mca_base_param_lookup_int( mca_param_indices->algorithm_param_index, &(requested_alg)); if( requested_alg > max_alg ) { if( 0 == ompi_comm_rank( MPI_COMM_WORLD ) ) { opal_output( 0, "Allreduce algorithm #%d is not available (range [0..%d]). Switching back to ignore(0)\n", requested_alg, max_alg ); } mca_base_param_set_int( mca_param_indices->algorithm_param_index, 0); } mca_param_indices->segsize_param_index = mca_base_param_reg_int( &mca_coll_tuned_component.super.collm_version, "allreduce_algorithm_segmentsize", "Segment size in bytes used by default for allreduce algorithms. Only has meaning if algorithm is forced and supports segmenting. 0 bytes means no segmentation.", false, false, 0, NULL); mca_param_indices->tree_fanout_param_index = mca_base_param_reg_int( &mca_coll_tuned_component.super.collm_version, "allreduce_algorithm_tree_fanout", "Fanout for n-tree used for allreduce algorithms. Only has meaning if algorithm is forced and supports n-tree topo based operation.", false, false, ompi_coll_tuned_init_tree_fanout, /* get system wide default */ NULL); mca_param_indices->chain_fanout_param_index = mca_base_param_reg_int( &mca_coll_tuned_component.super.collm_version, "allreduce_algorithm_chain_fanout", "Fanout for chains used for allreduce algorithms. Only has meaning if algorithm is forced and supports chain topo based operation.", false, false, ompi_coll_tuned_init_chain_fanout, /* get system wide default */ NULL); return (MPI_SUCCESS);}int ompi_coll_tuned_allreduce_intra_do_forced(void *sbuf, void *rbuf, int count, struct ompi_datatype_t *dtype, struct ompi_op_t *op, struct ompi_communicator_t *comm){ OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:allreduce_intra_do_forced selected algorithm %d, segment size %d", comm->c_coll_selected_data->user_forced[ALLREDUCE].algorithm, comm->c_coll_selected_data->user_forced[ALLREDUCE].segsize)); switch (comm->c_coll_selected_data->user_forced[ALLREDUCE].algorithm) { case (0): return ompi_coll_tuned_allreduce_intra_dec_fixed (sbuf, rbuf, count, dtype, op, comm); case (1): return ompi_coll_tuned_allreduce_intra_basic_linear (sbuf, rbuf, count, dtype, op, comm); case (2): return ompi_coll_tuned_allreduce_intra_nonoverlapping (sbuf, rbuf, count, dtype, op, comm); case (3): return ompi_coll_tuned_allreduce_intra_recursivedoubling (sbuf, rbuf, count, dtype, op, comm); case (4): return ompi_coll_tuned_allreduce_intra_ring (sbuf, rbuf, count, dtype, op, comm); default: OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:allreduce_intra_do_forced attempt to select algorithm %d when only 0-%d is valid?", comm->c_coll_selected_data->user_forced[ALLREDUCE].algorithm, ompi_coll_tuned_forced_max_algorithms[ALLREDUCE])); return (MPI_ERR_ARG); } /* switch */}int ompi_coll_tuned_allreduce_intra_do_this(void *sbuf, void *rbuf, int count, struct ompi_datatype_t *dtype, struct ompi_op_t *op, struct ompi_communicator_t *comm, int algorithm, int faninout, int segsize){ OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:allreduce_intra_do_this algorithm %d topo fan in/out %d segsize %d", algorithm, faninout, segsize)); switch (algorithm) { case (0): return ompi_coll_tuned_allreduce_intra_dec_fixed (sbuf, rbuf, count, dtype, op, comm); case (1): return ompi_coll_tuned_allreduce_intra_basic_linear (sbuf, rbuf, count, dtype, op, comm); case (2): return ompi_coll_tuned_allreduce_intra_nonoverlapping (sbuf, rbuf, count, dtype, op, comm); case (3): return ompi_coll_tuned_allreduce_intra_recursivedoubling (sbuf, rbuf, count, dtype, op, comm); case (4): return ompi_coll_tuned_allreduce_intra_ring (sbuf, rbuf, count, dtype, op, comm); default: OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:allreduce_intra_do_this attempt to select algorithm %d when only 0-%d is valid?", algorithm, ompi_coll_tuned_forced_max_algorithms[ALLREDUCE])); return (MPI_ERR_ARG); } /* switch */}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?