coll_tuned_alltoall.c

来自「MPI stands for the Message Passing Inter」· C语言 代码 · 共 536 行 · 第 1/2 页

C
536
字号
/* * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana *                         University Research and Technology *                         Corporation.  All rights reserved. * Copyright (c) 2004-2006 The University of Tennessee and The University *                         of Tennessee Research Foundation.  All rights *                         reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, *                         University of Stuttgart.  All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. *                         All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */#include "ompi_config.h"#include "mpi.h"#include "ompi/constants.h"#include "ompi/datatype/datatype.h"#include "ompi/communicator/communicator.h"#include "ompi/mca/coll/coll.h"#include "ompi/mca/coll/base/coll_tags.h"#include "ompi/mca/pml/pml.h"#include "ompi/op/op.h"#include "coll_tuned.h"#include "coll_tuned_topo.h"#include "coll_tuned_util.h"int ompi_coll_tuned_alltoall_intra_pairwise(void *sbuf, int scount,                                             struct ompi_datatype_t *sdtype,                                            void* rbuf, int rcount,                                            struct ompi_datatype_t *rdtype,                                            struct ompi_communicator_t *comm){    int line = -1, err = 0;    int rank, size, step;    int sendto, recvfrom;    void * tmpsend, *tmprecv;    ptrdiff_t lb, sext, rext;    size = ompi_comm_size(comm);    rank = ompi_comm_rank(comm);    OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:alltoall_intra_pairwise rank %d", rank));    err = ompi_ddt_get_extent (sdtype, &lb, &sext);    if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }    err = ompi_ddt_get_extent (rdtype, &lb, &rext);    if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }    /* Perform pairwise exchange - starting from 1 so the local copy is last */    for (step = 1; step < size+1; step++) {        /* who do we talk to in this step? */        sendto  = (rank+step)%size;        recvfrom = (rank+size-step)%size;        /* where from are we sending and where from are we receiving actual data ? */        tmpsend = (char*)sbuf+sendto*sext*scount;        tmprecv = (char*)rbuf+recvfrom*rext*rcount;        /* send and receive */        err = ompi_coll_tuned_sendrecv( tmpsend, scount, sdtype, sendto, MCA_COLL_BASE_TAG_ALLTOALL,                                        tmprecv, rcount, rdtype, recvfrom, MCA_COLL_BASE_TAG_ALLTOALL,                                        comm, MPI_STATUS_IGNORE, rank);        if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl;  }    }    return MPI_SUCCESS;  err_hndl:    OPAL_OUTPUT((ompi_coll_tuned_stream,"%s:%4d\tError occurred %d, rank %2d", __FILE__,line,err,rank));    return err;}int ompi_coll_tuned_alltoall_intra_bruck(void *sbuf, int scount,                                         struct ompi_datatype_t *sdtype,                                         void* rbuf, int rcount,                                         struct ompi_datatype_t *rdtype,                                         struct ompi_communicator_t *comm){    int i, k, line = -1;    int rank, size;    int sendto, recvfrom, distance, *displs=NULL, *blen=NULL;    int maxpacksize, packsize, position;    char * tmpbuf=NULL, *packbuf=NULL;    ptrdiff_t lb, sext, rext;    int err = 0;    int weallocated = 0;    MPI_Datatype iddt;    size = ompi_comm_size(comm);    rank = ompi_comm_rank(comm);    OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:alltoall_intra_bruck rank %d", rank));    err = ompi_ddt_get_extent (sdtype, &lb, &sext);    if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }    err = ompi_ddt_get_extent (rdtype, &lb, &rext);    if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }#ifdef blahblah    /* try and SAVE memory by using the data segment hung off the communicator if possible */    if (comm->c_coll_selected_data->mcct_num_reqs >= size) {         /* we have enought preallocated for displments and lengths */        displs = (int*) comm->c_coll_basic_data->mcct_reqs;        blen = (int *) (displs + size);        weallocated = 0;    }     else { /* allocate the buffers ourself */#endif        displs = (int *) malloc(size*sizeof(int));        if (displs == NULL) { line = __LINE__; err = -1; goto err_hndl; }        blen = (int *) malloc(size*sizeof(int));        if (blen == NULL) { line = __LINE__; err = -1; goto err_hndl; }        weallocated = 1;#ifdef blahblah    }#endif    /* Prepare for packing data */    err = MPI_Pack_size( scount*size, sdtype, comm, &maxpacksize );    if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl;  }    /* pack buffer allocation */    packbuf = (char*) malloc((unsigned) maxpacksize);    if (packbuf == NULL) { line = __LINE__; err = -1; goto err_hndl; }    /* tmp buffer allocation for message data */    tmpbuf = (char *) malloc(scount*size*sext);    if (tmpbuf == NULL) { line = __LINE__; err = -1; goto err_hndl; }    /* Step 1 - local rotation - shift up by rank */    err = ompi_ddt_copy_content_same_ddt (sdtype, (int32_t) ((size-rank)*scount),                                          tmpbuf, ((char*)sbuf)+rank*scount*sext);    if (err<0) {        line = __LINE__; err = -1; goto err_hndl;    }    if (rank != 0) {        err = ompi_ddt_copy_content_same_ddt (sdtype, (int32_t) (rank*scount),                                              tmpbuf+(size-rank)*scount*sext, (char*)sbuf);        if (err<0) {            line = __LINE__; err = -1; goto err_hndl;        }    }    /* perform communication step */    for (distance = 1; distance < size; distance<<=1) {        /* send data to "sendto" */        sendto = (rank+distance)%size;        recvfrom = (rank-distance+size)%size;        packsize = 0;        k = 0;        /* create indexed datatype */        for (i = 1; i < size; i++) {            if ((i&distance) == distance) {                displs[k] = i*scount; blen[k] = scount;                k++;            }        }        /* Set indexes and displacements */        err = MPI_Type_indexed(k, blen, displs, sdtype, &iddt);        if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl;  }        /* Commit the new datatype */        err = MPI_Type_commit(&iddt);        if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl;  }        /* have the new distribution ddt, pack and exchange data */        err = MPI_Pack(tmpbuf, 1, iddt, packbuf, maxpacksize, &packsize, comm);        if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl;  }        /* Sendreceive */        err = ompi_coll_tuned_sendrecv ( packbuf, packsize, MPI_PACKED, sendto,                                          MCA_COLL_BASE_TAG_ALLTOALL,                                         rbuf, packsize, MPI_PACKED, recvfrom,                                          MCA_COLL_BASE_TAG_ALLTOALL,                                         comm, MPI_STATUS_IGNORE, rank);        if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }        /* Unpack data from rbuf to tmpbuf */        position = 0;        err = MPI_Unpack(rbuf, packsize, &position,                         tmpbuf, 1, iddt, comm);        if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }        /* free ddt */        err = MPI_Type_free(&iddt);        if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl;  }    } /* end of for (distance = 1... */    /* Step 3 - local rotation - */    for (i = 0; i < size; i++) {        err = ompi_ddt_copy_content_same_ddt (rdtype, (int32_t) rcount,                                              ((char*)rbuf)+(((rank-i+size)%size)*rcount*rext),                                               tmpbuf+i*rcount*rext);        if (err<0) {            line = __LINE__; err = -1; goto err_hndl;        }    }    /* Step 4 - clean up */    if (tmpbuf != NULL) free(tmpbuf);    if (packbuf != NULL) free(packbuf);    if (weallocated) {        if (displs != NULL) free(displs);        if (blen != NULL) free(blen);    }    return OMPI_SUCCESS; err_hndl:    OPAL_OUTPUT((ompi_coll_tuned_stream,"%s:%4d\tError occurred %d, rank %2d", __FILE__,line,err,rank));    if (tmpbuf != NULL) free(tmpbuf);    if (packbuf != NULL) free(packbuf);    if (weallocated) {        if (displs != NULL) free(displs);        if (blen != NULL) free(blen);    }    return err;}int ompi_coll_tuned_alltoall_intra_two_procs(void *sbuf, int scount,                                             struct ompi_datatype_t *sdtype,                                             void* rbuf, int rcount,                                             struct ompi_datatype_t *rdtype,                                             struct ompi_communicator_t *comm){    int line = -1, err = 0;    int rank;    int sendto, recvfrom;    void * tmpsend, *tmprecv;    ptrdiff_t sext, rext, lb;    rank = ompi_comm_rank(comm);    OPAL_OUTPUT((ompi_coll_tuned_stream,"ompi_coll_tuned_alltoall_intra_two_procs rank %d", rank));    err = ompi_ddt_get_extent (sdtype, &lb, &sext);    if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }    err = ompi_ddt_get_extent (rdtype, &lb, &rext);    if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }    /* exchange data */    sendto  = (rank+1)%2;    recvfrom = sendto;    /* where from are we sending and where to are we receiving ? */    tmpsend = (char*)sbuf+sendto*sext*scount;    tmprecv = (char*)rbuf+recvfrom*rext*rcount;    /* send and receive */    err = ompi_coll_tuned_sendrecv ( tmpsend, scount, sdtype, sendto, MCA_COLL_BASE_TAG_ALLTOALL,

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?