coll_tuned_bcast.c
来自「MPI stands for the Message Passing Inter」· C语言 代码 · 共 789 行 · 第 1/3 页
C
789 行
int count, struct ompi_datatype_t* datatype, int root, struct ompi_communicator_t* comm, uint32_t segsize ){ int segcount = count; size_t typelng; COLL_TUNED_UPDATE_PIPELINE( comm, root ); /** * Determine number of elements sent per operation. */ ompi_ddt_type_size( datatype, &typelng ); COLL_TUNED_COMPUTED_SEGCOUNT( segsize, typelng, segcount ); OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:bcast_intra_pipeline rank %d ss %5d typelng %ld segcount %d", ompi_comm_rank(comm), segsize, typelng, segcount)); return ompi_coll_tuned_bcast_intra_generic( buffer, count, datatype, root, comm, segcount, comm->c_coll_selected_data->cached_pipeline );}intompi_coll_tuned_bcast_intra_chain( void* buffer, int count, struct ompi_datatype_t* datatype, int root, struct ompi_communicator_t* comm, uint32_t segsize, int32_t chains ){ int segcount = count; size_t typelng; COLL_TUNED_UPDATE_CHAIN( comm, root, chains ); /** * Determine number of elements sent per operation. */ ompi_ddt_type_size( datatype, &typelng ); COLL_TUNED_COMPUTED_SEGCOUNT( segsize, typelng, segcount ); OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:bcast_intra_chain rank %d fo %d ss %5d typelng %ld segcount %d", ompi_comm_rank(comm), chains, segsize, typelng, segcount)); return ompi_coll_tuned_bcast_intra_generic( buffer, count, datatype, root, comm, segcount, comm->c_coll_selected_data->cached_chain );}intompi_coll_tuned_bcast_intra_binomial( void* buffer, int count, struct ompi_datatype_t* datatype, int root, struct ompi_communicator_t* comm, uint32_t segsize ){ int segcount = count; size_t typelng; COLL_TUNED_UPDATE_BMTREE( comm, root ); /** * Determine number of elements sent per operation. */ ompi_ddt_type_size( datatype, &typelng ); COLL_TUNED_COMPUTED_SEGCOUNT( segsize, typelng, segcount ); OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:bcast_intra_binomial rank %d ss %5d typelng %ld segcount %d", ompi_comm_rank(comm), segsize, typelng, segcount)); return ompi_coll_tuned_bcast_intra_generic( buffer, count, datatype, root, comm, segcount, comm->c_coll_selected_data->cached_bmtree );}intompi_coll_tuned_bcast_intra_split_bintree ( void* buffer, int count, struct ompi_datatype_t* datatype, int root, struct ompi_communicator_t* comm, uint32_t segsize ){ int err=0, line; int rank, size; int segindex, i, lr, pair; int segcount[2]; /* Number of elements sent with each segment */ uint32_t counts[2]; int num_segments[2]; /* Number of segmenets */ int sendcount[2]; /* the same like segcount, except for the last segment */ size_t realsegsize[2]; char *tmpbuf[2]; size_t type_size; ptrdiff_t type_extent, lb; ompi_request_t *base_req, *new_req; ompi_coll_tree_t *tree; size = ompi_comm_size(comm); rank = ompi_comm_rank(comm); OPAL_OUTPUT((ompi_coll_tuned_stream,"ompi_coll_tuned_bcast_intra_split_bintree rank %d root %d ss %5d", rank, root, segsize)); if (size == 1) { return MPI_SUCCESS; } /* setup the binary tree topology. */ COLL_TUNED_UPDATE_BINTREE( comm, root ); tree = comm->c_coll_selected_data->cached_bintree; err = ompi_ddt_type_size( datatype, &type_size ); /* Determine number of segments and number of elements per segment */ counts[0] = count/2; if (count % 2 != 0) counts[0]++; counts[1] = count - counts[0]; if ( segsize > 0 ) { /* Note that ompi_ddt_type_size() will never return a negative value in typelng; it returns an int [vs. an unsigned type] because of the MPI spec. */ if (segsize < ((uint32_t) type_size)) { segsize = type_size; /* push segsize up to hold one type */ } segcount[0] = segcount[1] = segsize / type_size; num_segments[0] = counts[0]/segcount[0]; if ((counts[0] % segcount[0]) != 0) num_segments[0]++; num_segments[1] = counts[1]/segcount[1]; if ((counts[1] % segcount[1]) != 0) num_segments[1]++; } else { segcount[0] = counts[0]; segcount[1] = counts[1]; num_segments[0] = num_segments[1] = 1; } /* if the message is too small to be split into segments */ if( (counts[0] == 0 || counts[1] == 0) || (segsize > counts[0] * type_size) || (segsize > counts[1] * type_size) ) { /* call linear version here ! */ return (ompi_coll_tuned_bcast_intra_chain ( buffer, count, datatype, root, comm, segsize, 1 )); } err = ompi_ddt_get_extent (datatype, &lb, &type_extent); /* Determine real segment size */ realsegsize[0] = segcount[0] * type_extent; realsegsize[1] = segcount[1] * type_extent; /* set the buffer pointers */ tmpbuf[0] = (char *) buffer; tmpbuf[1] = (char *) buffer+counts[0] * type_extent; /* Step 1: Root splits the buffer in 2 and sends segmented message down the branches. Left subtree of the tree receives first half of the buffer, while right subtree receives the remaining message. */ /* determine if I am left (0) or right (1), (root is right) */ lr = ((rank + size - root)%size + 1)%2; /* root code */ if( rank == root ) { /* determine segment count */ sendcount[0] = segcount[0]; sendcount[1] = segcount[1]; /* for each segment */ for (segindex = 0; segindex < num_segments[0]; segindex++) { /* for each child */ for( i = 0; i < tree->tree_nextsize && i < 2; i++ ) { if (segindex >= num_segments[i]) { /* no more segments */ continue; } /* determine how many elements are being sent in this round */ if(segindex == (num_segments[i] - 1)) sendcount[i] = counts[i] - segindex*segcount[i]; /* send data */ MCA_PML_CALL(send(tmpbuf[i], sendcount[i], datatype, tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST, MCA_PML_BASE_SEND_STANDARD, comm)); if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } /* update tmp buffer */ tmpbuf[i] += realsegsize[i]; } } } /* intermediate nodes code */ else if( tree->tree_nextsize > 0 ) { /* Intermediate nodes: * It will receive segments only from one half of the data. * Which one is determined by whether the node belongs to the "left" or "right" * subtree. Topoloby building function builds binary tree such that * odd "shifted ranks" ((rank + size - root)%size) are on the left subtree, * and even on the right subtree. * * Create the pipeline. We first post the first receive, then in the loop we * post the next receive and after that wait for the previous receive to complete * and we disseminating the data to all children. */ sendcount[lr] = segcount[lr]; MCA_PML_CALL(irecv(tmpbuf[lr], sendcount[lr], datatype, tree->tree_prev, MCA_COLL_BASE_TAG_BCAST, comm, &base_req)); if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } for( segindex = 1; segindex < num_segments[lr]; segindex++ ) { /* determine how many elements to expect in this round */ if( segindex == (num_segments[lr] - 1)) sendcount[lr] = counts[lr] - segindex*segcount[lr]; /* post new irecv */ MCA_PML_CALL(irecv( tmpbuf[lr] + realsegsize[lr], sendcount[lr], datatype, tree->tree_prev, MCA_COLL_BASE_TAG_BCAST, comm, &new_req)); if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } /* wait for and forward current segment */ err = ompi_request_wait_all( 1, &base_req, MPI_STATUSES_IGNORE ); for( i = 0; i < tree->tree_nextsize; i++ ) { /* send data to children (segcount[lr]) */ MCA_PML_CALL(send( tmpbuf[lr], segcount[lr], datatype, tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST, MCA_PML_BASE_SEND_STANDARD, comm)); if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } } /* end of for each child */ /* upate the base request */ base_req = new_req; /* go to the next buffer (ie. the one corresponding to the next recv) */ tmpbuf[lr] += realsegsize[lr]; } /* end of for segindex */ /* wait for the last segment and forward current segment */ err = ompi_request_wait_all( 1, &base_req, MPI_STATUSES_IGNORE ); for( i = 0; i < tree->tree_nextsize; i++ ) { /* send data to children */ MCA_PML_CALL(send(tmpbuf[lr], sendcount[lr], datatype, tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST, MCA_PML_BASE_SEND_STANDARD, comm)); if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } } /* end of for each child */ } /* leaf nodes */ else { /* Just consume segments as fast as possible */ sendcount[lr] = segcount[lr]; for (segindex = 0; segindex < num_segments[lr]; segindex++) { /* determine how many elements to expect in this round */ if (segindex == (num_segments[lr] - 1)) sendcount[lr] = counts[lr] - segindex*segcount[lr]; /* receive segments */ MCA_PML_CALL(recv(tmpbuf[lr], sendcount[lr], datatype, tree->tree_prev, MCA_COLL_BASE_TAG_BCAST, comm, MPI_STATUS_IGNORE)); if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } /* update the initial pointer to the buffer */ tmpbuf[lr] += realsegsize[lr]; } } /* reset the buffer pointers */ tmpbuf[0] = (char *) buffer; tmpbuf[1] = (char *) buffer+counts[0] * type_extent;
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?