coll_tuned_bcast.c
来自「MPI stands for the Message Passing Inter」· C语言 代码 · 共 789 行 · 第 1/3 页
C
789 行
/* * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. * Copyright (c) 2004-2006 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */#include "ompi_config.h"#include "mpi.h"#include "ompi/constants.h"#include "ompi/datatype/datatype.h"#include "ompi/communicator/communicator.h"#include "ompi/mca/coll/coll.h"#include "ompi/mca/coll/base/coll_tags.h"#include "ompi/mca/pml/pml.h"#include "coll_tuned.h"#include "coll_tuned_topo.h"#include "coll_tuned_util.h"intompi_coll_tuned_bcast_intra_generic( void* buffer, int original_count, struct ompi_datatype_t* datatype, int root, struct ompi_communicator_t* comm, uint32_t count_by_segment, ompi_coll_tree_t* tree ){ int err = 0, line, i; int rank, size; int segindex; int num_segments; /* Number of segments */ int sendcount; /* the same like segcount, except for the last segment */ size_t realsegsize; char *tmpbuf; size_t type_size; ptrdiff_t extent, lb; ompi_request_t *recv_reqs[2], **send_reqs = NULL; int req_index = 0, old_req_index; size = ompi_comm_size(comm); rank = ompi_comm_rank(comm); assert( size > 1 ); ompi_ddt_get_extent (datatype, &lb, &extent); ompi_ddt_type_size( datatype, &type_size ); num_segments = (original_count + count_by_segment - 1) / count_by_segment; realsegsize = count_by_segment * extent; /* set the buffer pointers */ tmpbuf = (char *) buffer; if( tree->tree_nextsize != 0 ) { send_reqs = (ompi_request_t**)malloc( tree->tree_nextsize * sizeof(ompi_request_t*) ); } /* root code */ /* just send a segment to each child in turn as fast as you can */ if( rank == root ) { /* determine segment count */ sendcount = count_by_segment; /* for each segment */ for( segindex = 0; segindex < num_segments; segindex++ ) { /* if last segment determine how many elements are being sent */ if( segindex == (num_segments - 1) ) sendcount = original_count - segindex * count_by_segment; for( i = 0; i < tree->tree_nextsize; i++ ) { /* send data to children */ /* send data */#if defined(COLL_TUNED_BCAST_USE_BLOCKING) err = MCA_PML_CALL(send(tmpbuf, sendcount, datatype, tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST, MCA_PML_BASE_SEND_STANDARD, comm));#else err = MCA_PML_CALL(isend(tmpbuf, sendcount, datatype, tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST, MCA_PML_BASE_SEND_STANDARD, comm, &send_reqs[i]));#endif /* COLL_TUNED_BCAST_USE_BLOCKING */ if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } } /* complete the sends before starting the next sends */ err = ompi_request_wait_all( tree->tree_nextsize, send_reqs, MPI_STATUSES_IGNORE ); if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } /* update tmp buffer */ tmpbuf += realsegsize; } /* root for each segment */ } /* root */ /* intermediate nodes code */ else if( tree->tree_nextsize > 0 ) { /* Intermediate nodes: * Create the pipeline. We first post the first receive, then in the loop we * post the next receive and after that wait for the previous receive to complete * and we disseminating the data to all our children. */ sendcount = count_by_segment; MCA_PML_CALL(irecv(tmpbuf, sendcount, datatype, tree->tree_prev, MCA_COLL_BASE_TAG_BCAST, comm, &recv_reqs[req_index])); if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } old_req_index = req_index; req_index = (req_index + 1) & 0x1; for( segindex = 1; segindex < num_segments; segindex++ ) { /* if last segment determine how many elements to expect in this round */ if( segindex == (num_segments - 1) ) sendcount = original_count - segindex * count_by_segment; /* post new irecv */ MCA_PML_CALL(irecv( tmpbuf + realsegsize, sendcount, datatype, tree->tree_prev, MCA_COLL_BASE_TAG_BCAST, comm, &recv_reqs[req_index])); if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } /* wait for and forward current segment */ err = ompi_request_wait( &recv_reqs[old_req_index], MPI_STATUSES_IGNORE ); old_req_index = req_index; req_index = (req_index + 1) & 0x1; if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } /* must wait here or we will forward data before its received! */ for( i = 0; i < tree->tree_nextsize; i++ ) { /* send data to children */ /* send data */#if defined(COLL_TUNED_BCAST_USE_BLOCKING) err = MCA_PML_CALL(send(tmpbuf, count_by_segment, datatype, tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST, MCA_PML_BASE_SEND_STANDARD, comm));#else err = MCA_PML_CALL(isend(tmpbuf, count_by_segment, datatype, tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST, MCA_PML_BASE_SEND_STANDARD, comm, &send_reqs[i]));#endif /* COLL_TUNED_BCAST_USE_BLOCKING */ if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } } #if !defined(COLL_TUNED_BCAST_USE_BLOCKING) /* complete the sends before starting the next pair */ err = ompi_request_wait_all( tree->tree_nextsize, send_reqs, MPI_STATUSES_IGNORE ); if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }#endif /* COLL_TUNED_BCAST_USE_BLOCKING */ /* go to the next buffer (ie. the one corresponding to the next recv) */ tmpbuf += realsegsize; } /* end of for segindex */ /* wait for the last segment and forward current segment */ err = ompi_request_wait( &recv_reqs[old_req_index], MPI_STATUSES_IGNORE ); if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } for( i = 0; i < tree->tree_nextsize; i++ ) { /* send data to children */#if defined(COLL_TUNED_BCAST_USE_BLOCKING) err = MCA_PML_CALL(send(tmpbuf, sendcount, datatype, tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST, MCA_PML_BASE_SEND_STANDARD, comm));#else err = MCA_PML_CALL(isend(tmpbuf, sendcount, datatype, tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST, MCA_PML_BASE_SEND_STANDARD, comm, &send_reqs[i]));#endif /* COLL_TUNED_BCAST_USE_BLOCKING */ if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } }#if !defined(COLL_TUNED_BCAST_USE_BLOCKING) err = ompi_request_wait_all( tree->tree_nextsize, send_reqs, MPI_STATUSES_IGNORE ); if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }#endif /* COLL_TUNED_BCAST_USE_BLOCKING */ } /* leaf nodes */ else { /* We just loop receiving. */ sendcount = count_by_segment; /* Prologue */ err = MCA_PML_CALL(irecv(tmpbuf, sendcount, datatype, tree->tree_prev, MCA_COLL_BASE_TAG_BCAST, comm, &recv_reqs[req_index])); if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } tmpbuf += realsegsize; old_req_index = req_index; req_index = (req_index + 1) & 0x1; /* Loop over the remaining receives */ for( segindex = 1; segindex < num_segments; segindex++ ) { /* determine how many elements to expect in this round */ if( segindex == (num_segments - 1) ) sendcount = original_count - segindex * count_by_segment; /* receive segments */ err = MCA_PML_CALL(irecv(tmpbuf, sendcount, datatype, tree->tree_prev, MCA_COLL_BASE_TAG_BCAST, comm, &recv_reqs[req_index])); if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } /* update the initial pointer to the buffer */ tmpbuf += realsegsize; err = ompi_request_wait( &recv_reqs[old_req_index], MPI_STATUS_IGNORE ); if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } old_req_index = req_index; req_index = (req_index + 1) & 0x1; } /* epilogue */ err = ompi_request_wait( &recv_reqs[old_req_index], MPI_STATUS_IGNORE ); if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } } if( NULL != send_reqs ) free(send_reqs); return (MPI_SUCCESS); error_hndl: OPAL_OUTPUT( (ompi_coll_tuned_stream,"%s:%4d\tError occurred %d, rank %2d", __FILE__, line, err, rank) ); if( NULL != send_reqs ) free(send_reqs); return (err);}intompi_coll_tuned_bcast_intra_bintree ( void* buffer, int count, struct ompi_datatype_t* datatype, int root, struct ompi_communicator_t* comm, uint32_t segsize ){ int segcount = count; size_t typelng; COLL_TUNED_UPDATE_BINTREE( comm, root ); /** * Determine number of elements sent per operation. */ ompi_ddt_type_size( datatype, &typelng ); COLL_TUNED_COMPUTED_SEGCOUNT( segsize, typelng, segcount ); OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:bcast_intra_binary rank %d ss %5d typelng %ld segcount %d", ompi_comm_rank(comm), segsize, typelng, segcount)); return ompi_coll_tuned_bcast_intra_generic( buffer, count, datatype, root, comm, segcount, comm->c_coll_selected_data->cached_bintree );}intompi_coll_tuned_bcast_intra_pipeline( void* buffer,
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?