coll_tuned_bcast.c

来自「MPI stands for the Message Passing Inter」· C语言 代码 · 共 789 行 · 第 1/3 页

C
789
字号
                                      int count,                                       struct ompi_datatype_t* datatype,                                       int root,                                      struct ompi_communicator_t* comm,                                       uint32_t segsize ){    int segcount = count;    size_t typelng;    COLL_TUNED_UPDATE_PIPELINE( comm, root );    /**     * Determine number of elements sent per operation.     */    ompi_ddt_type_size( datatype, &typelng );    COLL_TUNED_COMPUTED_SEGCOUNT( segsize, typelng, segcount );    OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:bcast_intra_pipeline rank %d ss %5d typelng %ld segcount %d",                 ompi_comm_rank(comm), segsize, typelng, segcount));    return ompi_coll_tuned_bcast_intra_generic( buffer, count, datatype, root, comm,                                                segcount, comm->c_coll_selected_data->cached_pipeline );}intompi_coll_tuned_bcast_intra_chain( void* buffer,                                   int count,                                    struct ompi_datatype_t* datatype,                                    int root,                                   struct ompi_communicator_t* comm,                                    uint32_t segsize, int32_t chains ){    int segcount = count;    size_t typelng;    COLL_TUNED_UPDATE_CHAIN( comm, root, chains );    /**     * Determine number of elements sent per operation.     */    ompi_ddt_type_size( datatype, &typelng );    COLL_TUNED_COMPUTED_SEGCOUNT( segsize, typelng, segcount );    OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:bcast_intra_chain rank %d fo %d ss %5d typelng %ld segcount %d",                 ompi_comm_rank(comm), chains, segsize, typelng, segcount));    return ompi_coll_tuned_bcast_intra_generic( buffer, count, datatype, root, comm,                                                segcount, comm->c_coll_selected_data->cached_chain );}intompi_coll_tuned_bcast_intra_binomial( void* buffer,                                      int count,                                       struct ompi_datatype_t* datatype,                                       int root,                                      struct ompi_communicator_t* comm,                                       uint32_t segsize ){    int segcount = count;    size_t typelng;    COLL_TUNED_UPDATE_BMTREE( comm, root );    /**     * Determine number of elements sent per operation.     */    ompi_ddt_type_size( datatype, &typelng );    COLL_TUNED_COMPUTED_SEGCOUNT( segsize, typelng, segcount );    OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:bcast_intra_binomial rank %d ss %5d typelng %ld segcount %d",                 ompi_comm_rank(comm), segsize, typelng, segcount));    return ompi_coll_tuned_bcast_intra_generic( buffer, count, datatype, root, comm,                                                segcount, comm->c_coll_selected_data->cached_bmtree );}intompi_coll_tuned_bcast_intra_split_bintree ( void* buffer,                                            int count,                                             struct ompi_datatype_t* datatype,                                             int root,                                            struct ompi_communicator_t* comm,                                             uint32_t segsize ){    int err=0, line;    int rank, size;    int segindex, i, lr, pair;    int segcount[2];       /* Number of elements sent with each segment */    uint32_t counts[2];    int num_segments[2];   /* Number of segmenets */    int sendcount[2];      /* the same like segcount, except for the last segment */     size_t realsegsize[2];    char *tmpbuf[2];    size_t type_size;    ptrdiff_t type_extent, lb;    ompi_request_t *base_req, *new_req;    ompi_coll_tree_t *tree;    size = ompi_comm_size(comm);    rank = ompi_comm_rank(comm);    OPAL_OUTPUT((ompi_coll_tuned_stream,"ompi_coll_tuned_bcast_intra_split_bintree rank %d root %d ss %5d", rank, root, segsize));    if (size == 1) {        return MPI_SUCCESS;    }    /* setup the binary tree topology. */    COLL_TUNED_UPDATE_BINTREE( comm, root );    tree = comm->c_coll_selected_data->cached_bintree;    err = ompi_ddt_type_size( datatype, &type_size );    /* Determine number of segments and number of elements per segment */    counts[0] = count/2;    if (count % 2 != 0) counts[0]++;    counts[1] = count - counts[0];    if ( segsize > 0 ) {        /* Note that ompi_ddt_type_size() will never return a negative           value in typelng; it returns an int [vs. an unsigned type]           because of the MPI spec. */    	if (segsize < ((uint32_t) type_size)) {            segsize = type_size; /* push segsize up to hold one type */        }        segcount[0] = segcount[1] = segsize / type_size;         num_segments[0] = counts[0]/segcount[0];        if ((counts[0] % segcount[0]) != 0) num_segments[0]++;        num_segments[1] = counts[1]/segcount[1];        if ((counts[1] % segcount[1]) != 0) num_segments[1]++;    } else {        segcount[0]     = counts[0];        segcount[1]     = counts[1];        num_segments[0] = num_segments[1] = 1;    }    /* if the message is too small to be split into segments */    if( (counts[0] == 0 || counts[1] == 0) ||        (segsize > counts[0] * type_size) ||        (segsize > counts[1] * type_size) ) {        /* call linear version here ! */        return (ompi_coll_tuned_bcast_intra_chain ( buffer, count, datatype,                                                     root, comm, segsize, 1 ));    }    err = ompi_ddt_get_extent (datatype, &lb, &type_extent);        /* Determine real segment size */    realsegsize[0] = segcount[0] * type_extent;    realsegsize[1] = segcount[1] * type_extent;      /* set the buffer pointers */    tmpbuf[0] = (char *) buffer;    tmpbuf[1] = (char *) buffer+counts[0] * type_extent;    /* Step 1:       Root splits the buffer in 2 and sends segmented message down the branches.       Left subtree of the tree receives first half of the buffer, while right       subtree receives the remaining message.    */    /* determine if I am left (0) or right (1), (root is right) */    lr = ((rank + size - root)%size + 1)%2;      /* root code */    if( rank == root ) {        /* determine segment count */        sendcount[0] = segcount[0];         sendcount[1] = segcount[1];        /* for each segment */        for (segindex = 0; segindex < num_segments[0]; segindex++) {            /* for each child */            for( i = 0; i < tree->tree_nextsize && i < 2; i++ ) {                if (segindex >= num_segments[i]) { /* no more segments */                    continue;                }                /* determine how many elements are being sent in this round */                if(segindex == (num_segments[i] - 1))                     sendcount[i] = counts[i] - segindex*segcount[i];                /* send data */                MCA_PML_CALL(send(tmpbuf[i], sendcount[i], datatype,                                  tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST,                                  MCA_PML_BASE_SEND_STANDARD, comm));                if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }                /* update tmp buffer */                tmpbuf[i] += realsegsize[i];            }        }    }         /* intermediate nodes code */    else if( tree->tree_nextsize > 0 ) {         /* Intermediate nodes:         * It will receive segments only from one half of the data.         * Which one is determined by whether the node belongs to the "left" or "right"          * subtree. Topoloby building function builds binary tree such that         * odd "shifted ranks" ((rank + size - root)%size) are on the left subtree,         * and even on the right subtree.         *         * Create the pipeline. We first post the first receive, then in the loop we         * post the next receive and after that wait for the previous receive to complete          * and we disseminating the data to all children.         */        sendcount[lr] = segcount[lr];        MCA_PML_CALL(irecv(tmpbuf[lr], sendcount[lr], datatype,                           tree->tree_prev, MCA_COLL_BASE_TAG_BCAST,                           comm, &base_req));        if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }        for( segindex = 1; segindex < num_segments[lr]; segindex++ ) {            /* determine how many elements to expect in this round */            if( segindex == (num_segments[lr] - 1))                 sendcount[lr] = counts[lr] - segindex*segcount[lr];            /* post new irecv */            MCA_PML_CALL(irecv( tmpbuf[lr] + realsegsize[lr], sendcount[lr],                                datatype, tree->tree_prev, MCA_COLL_BASE_TAG_BCAST,                                 comm, &new_req));            if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }            /* wait for and forward current segment */            err = ompi_request_wait_all( 1, &base_req, MPI_STATUSES_IGNORE );            for( i = 0; i < tree->tree_nextsize; i++ ) {  /* send data to children (segcount[lr]) */                MCA_PML_CALL(send( tmpbuf[lr], segcount[lr], datatype,                                   tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST,                                   MCA_PML_BASE_SEND_STANDARD, comm));                if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }            } /* end of for each child */            /* upate the base request */            base_req = new_req;                 /* go to the next buffer (ie. the one corresponding to the next recv) */            tmpbuf[lr] += realsegsize[lr];        } /* end of for segindex */        /* wait for the last segment and forward current segment */        err = ompi_request_wait_all( 1, &base_req, MPI_STATUSES_IGNORE );        for( i = 0; i < tree->tree_nextsize; i++ ) {  /* send data to children */            MCA_PML_CALL(send(tmpbuf[lr], sendcount[lr], datatype,                              tree->tree_next[i], MCA_COLL_BASE_TAG_BCAST,                              MCA_PML_BASE_SEND_STANDARD, comm));            if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }        } /* end of for each child */    }       /* leaf nodes */    else {         /* Just consume segments as fast as possible */        sendcount[lr] = segcount[lr];        for (segindex = 0; segindex < num_segments[lr]; segindex++) {            /* determine how many elements to expect in this round */            if (segindex == (num_segments[lr] - 1)) sendcount[lr] = counts[lr] - segindex*segcount[lr];            /* receive segments */            MCA_PML_CALL(recv(tmpbuf[lr], sendcount[lr], datatype,                              tree->tree_prev, MCA_COLL_BASE_TAG_BCAST,                              comm, MPI_STATUS_IGNORE));            if (err != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }            /* update the initial pointer to the buffer */            tmpbuf[lr] += realsegsize[lr];        }    }    /* reset the buffer pointers */    tmpbuf[0] = (char *) buffer;    tmpbuf[1] = (char *) buffer+counts[0] * type_extent;

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?