coll_tuned_bcast.c

来自「MPI stands for the Message Passing Inter」· C语言 代码 · 共 789 行 · 第 1/3 页

C
789
字号
/* * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana *                         University Research and Technology *                         Corporation.  All rights reserved. * Copyright (c) 2004-2006 The University of Tennessee and The University *                         of Tennessee Research Foundation.  All rights *                         reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,  *                         University of Stuttgart.  All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. *                         All rights reserved. * $COPYRIGHT$ *  * Additional copyrights may follow *  * $HEADER$ */#include "ompi_config.h"#include "mpi.h"#include "ompi/constants.h"#include "ompi/datatype/datatype.h"#include "ompi/communicator/communicator.h"#include "ompi/mca/coll/coll.h"#include "ompi/mca/coll/base/coll_tags.h"#include "ompi/mca/pml/pml.h"#include "coll_tuned.h"#include "coll_tuned_topo.h"#include "coll_tuned_util.h"intompi_coll_tuned_bcast_intra_generic( void* buffer,                                     int original_count,                                      struct ompi_datatype_t* datatype,                                      int root,                                     struct ompi_communicator_t* comm,                                      uint32_t count_by_segment,                                     ompi_coll_tree_t* tree ){    int err = 0, line, i;    int rank, size;    int segindex;    int num_segments;   /* Number of segments */    int sendcount;      /* the same like segcount, except for the last segment */     size_t realsegsize;    char *tmpbuf;    size_t type_size;    ptrdiff_t extent, lb;    ompi_request_t *recv_reqs[2], **send_reqs = NULL;    int req_index = 0, old_req_index;    size = ompi_comm_size(comm);    rank = ompi_comm_rank(comm);    assert( size > 1 );    ompi_ddt_get_extent (datatype, &lb, &extent);    ompi_ddt_type_size( datatype, &type_size );    num_segments = (original_count + count_by_segment - 1) / count_by_segment;    realsegsize = count_by_segment * extent;        /* set the buffer pointers */    tmpbuf = (char *) buffer;    if( tree->tree_nextsize != 0 ) {        send_reqs = (ompi_request_t**)malloc( tree->tree_nextsize * sizeof(ompi_request_t*) );    }    /* root code */    /* just send a segment to each child in turn as fast as you can */    if( rank == root ) {        /* determine segment count */        sendcount = count_by_segment;        /* for each segment */        for( segindex = 0; segindex < num_segments; segindex++ ) {            /* if last segment determine how many elements are being sent */            if( segindex == (num_segments - 1) )                sendcount = original_count - segindex * count_by_segment;            for( i = 0; i < tree->tree_nextsize; i++ ) {  /* send data to children */                /* send data */#if defined(COLL_TUNED_BCAST_USE_BLOCKING)                err = MCA_PML_CALL(send(tmpbuf, sendcount, datatype,                                        tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST,                                        MCA_PML_BASE_SEND_STANDARD, comm));#else                err = MCA_PML_CALL(isend(tmpbuf, sendcount, datatype,                                         tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST,                                         MCA_PML_BASE_SEND_STANDARD, comm, &send_reqs[i]));#endif  /* COLL_TUNED_BCAST_USE_BLOCKING */                if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }            }             /* complete the sends before starting the next sends */            err = ompi_request_wait_all( tree->tree_nextsize, send_reqs, MPI_STATUSES_IGNORE );            if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }            /* update tmp buffer */            tmpbuf += realsegsize;        } /* root for each segment */    } /* root */        /* intermediate nodes code */    else if( tree->tree_nextsize > 0 ) {         /* Intermediate nodes:         * Create the pipeline. We first post the first receive, then in the loop we         * post the next receive and after that wait for the previous receive to complete          * and we disseminating the data to all our children.         */        sendcount = count_by_segment;        MCA_PML_CALL(irecv(tmpbuf, sendcount, datatype,                           tree->tree_prev, MCA_COLL_BASE_TAG_BCAST,                           comm, &recv_reqs[req_index]));        if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }        old_req_index = req_index;        req_index = (req_index + 1) & 0x1;        for( segindex = 1; segindex < num_segments; segindex++ ) {            /* if last segment determine how many elements to expect in this round */            if( segindex == (num_segments - 1) )                sendcount = original_count - segindex * count_by_segment;            /* post new irecv */            MCA_PML_CALL(irecv( tmpbuf + realsegsize, sendcount,                                datatype, tree->tree_prev, MCA_COLL_BASE_TAG_BCAST,                                 comm, &recv_reqs[req_index]));            if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }            /* wait for and forward current segment */            err = ompi_request_wait( &recv_reqs[old_req_index], MPI_STATUSES_IGNORE );            old_req_index = req_index;            req_index = (req_index + 1) & 0x1;            if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }            /* must wait here or we will forward data before its received! */            for( i = 0; i < tree->tree_nextsize; i++ ) {  /* send data to children */                /* send data */#if defined(COLL_TUNED_BCAST_USE_BLOCKING)                err = MCA_PML_CALL(send(tmpbuf, count_by_segment, datatype,                                        tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST,                                        MCA_PML_BASE_SEND_STANDARD, comm));#else                err = MCA_PML_CALL(isend(tmpbuf, count_by_segment, datatype,                                         tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST,                                         MCA_PML_BASE_SEND_STANDARD, comm, &send_reqs[i]));#endif  /* COLL_TUNED_BCAST_USE_BLOCKING */                if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }            } #if !defined(COLL_TUNED_BCAST_USE_BLOCKING)            /* complete the sends before starting the next pair */            err = ompi_request_wait_all( tree->tree_nextsize, send_reqs, MPI_STATUSES_IGNORE );            if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }#endif  /* COLL_TUNED_BCAST_USE_BLOCKING */            /* go to the next buffer (ie. the one corresponding to the next recv) */            tmpbuf += realsegsize;        } /* end of for segindex */        /* wait for the last segment and forward current segment */        err = ompi_request_wait( &recv_reqs[old_req_index], MPI_STATUSES_IGNORE );        if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }        for( i = 0; i < tree->tree_nextsize; i++ ) {  /* send data to children */#if defined(COLL_TUNED_BCAST_USE_BLOCKING)            err = MCA_PML_CALL(send(tmpbuf, sendcount, datatype,                                    tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST,                                    MCA_PML_BASE_SEND_STANDARD, comm));#else            err = MCA_PML_CALL(isend(tmpbuf, sendcount, datatype,                                     tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST,                                     MCA_PML_BASE_SEND_STANDARD, comm, &send_reqs[i]));#endif  /* COLL_TUNED_BCAST_USE_BLOCKING */            if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }        }#if !defined(COLL_TUNED_BCAST_USE_BLOCKING)        err = ompi_request_wait_all( tree->tree_nextsize, send_reqs, MPI_STATUSES_IGNORE );        if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }#endif  /* COLL_TUNED_BCAST_USE_BLOCKING */    }       /* leaf nodes */    else {        /* We just loop receiving. */        sendcount = count_by_segment;        /* Prologue */        err = MCA_PML_CALL(irecv(tmpbuf, sendcount, datatype,                                 tree->tree_prev, MCA_COLL_BASE_TAG_BCAST,                                 comm, &recv_reqs[req_index]));        if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }        tmpbuf += realsegsize;        old_req_index = req_index;        req_index = (req_index + 1) & 0x1;        /* Loop over the remaining receives */        for( segindex = 1; segindex < num_segments; segindex++ ) {            /* determine how many elements to expect in this round */            if( segindex == (num_segments - 1) )                sendcount = original_count - segindex * count_by_segment;            /* receive segments */            err = MCA_PML_CALL(irecv(tmpbuf, sendcount, datatype,                                     tree->tree_prev, MCA_COLL_BASE_TAG_BCAST,                                     comm, &recv_reqs[req_index]));            if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }            /* update the initial pointer to the buffer */            tmpbuf += realsegsize;            err = ompi_request_wait( &recv_reqs[old_req_index], MPI_STATUS_IGNORE );            if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }            old_req_index = req_index;            req_index = (req_index + 1) & 0x1;        }        /* epilogue */        err = ompi_request_wait( &recv_reqs[old_req_index], MPI_STATUS_IGNORE );        if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }    }    if( NULL != send_reqs ) free(send_reqs);    return (MPI_SUCCESS);   error_hndl:    OPAL_OUTPUT( (ompi_coll_tuned_stream,"%s:%4d\tError occurred %d, rank %2d",                  __FILE__, line, err, rank) );    if( NULL != send_reqs ) free(send_reqs);    return (err);}intompi_coll_tuned_bcast_intra_bintree ( void* buffer,                                      int count,                                       struct ompi_datatype_t* datatype,                                       int root,                                      struct ompi_communicator_t* comm,                                       uint32_t segsize ){    int segcount = count;    size_t typelng;    COLL_TUNED_UPDATE_BINTREE( comm, root );    /**     * Determine number of elements sent per operation.     */    ompi_ddt_type_size( datatype, &typelng );    COLL_TUNED_COMPUTED_SEGCOUNT( segsize, typelng, segcount );    OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:bcast_intra_binary rank %d ss %5d typelng %ld segcount %d",                 ompi_comm_rank(comm), segsize, typelng, segcount));    return ompi_coll_tuned_bcast_intra_generic( buffer, count, datatype, root, comm,                                                segcount, comm->c_coll_selected_data->cached_bintree );}intompi_coll_tuned_bcast_intra_pipeline( void* buffer,

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?