⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 comm_cid.c

📁 MPI stands for the Message Passing Interface. Written by the MPI Forum (a large committee comprising
💻 C
📖 第 1 页 / 共 2 页
字号:
       freeing the new communicator anyway, so we might as well leave       the coll selection as NULL (the coll base comm unselect code       handles that case properly). */    if (MPI_UNDEFINED != newcomm->c_local_group->grp_my_rank) {        /* Step 2: call all functions, which might use the new           communicator already. */        /* Initialize the coll components */        /* Let the collectives components fight over who will do           collective on this new comm.  */        if (OMPI_SUCCESS !=             (ok = mca_coll_base_comm_select(newcomm, collcomponent))) {            return ok;        }    }    return OMPI_SUCCESS;}                         /**************************************************************************//**************************************************************************//**************************************************************************//* Arguments not used in this implementation: *  - bridgecomm *  - local_leader *  - remote_leader *  - send_first */static int ompi_comm_allreduce_intra ( int *inbuf, int *outbuf,                                        int count, struct ompi_op_t *op,                                        ompi_communicator_t *comm,                                       ompi_communicator_t *bridgecomm,                                        void* local_leader,                                        void* remote_leader,                                        int send_first ){    return comm->c_coll.coll_allreduce ( inbuf, outbuf, count, MPI_INT,                                          op,comm );}/* Arguments not used in this implementation: *  - bridgecomm *  - local_leader *  - remote_leader *  - send_first */static int ompi_comm_allreduce_inter ( int *inbuf, int *outbuf,                                        int count, struct ompi_op_t *op,                                        ompi_communicator_t *intercomm,                                       ompi_communicator_t *bridgecomm,                                        void* local_leader,                                        void* remote_leader,                                        int send_first ){    int local_rank, rsize;    int i, rc;    int *sbuf;    int *tmpbuf=NULL;    int *rcounts=NULL, scount=0;    int *rdisps=NULL;    if ( &ompi_mpi_op_sum != op && &ompi_mpi_op_prod != op &&         &ompi_mpi_op_max != op && &ompi_mpi_op_min  != op ) {        return MPI_ERR_OP;    }    if ( !OMPI_COMM_IS_INTER (intercomm)) {        return MPI_ERR_COMM;    }    /* Allocate temporary arrays */    rsize      = ompi_comm_remote_size (intercomm);    local_rank = ompi_comm_rank ( intercomm );    tmpbuf  = (int *) malloc ( count * sizeof(int));    rdisps  = (int *) calloc ( rsize, sizeof(int));    rcounts = (int *) calloc ( rsize, sizeof(int) );    if ( NULL == tmpbuf || NULL == rdisps || NULL == rcounts ) {        return OMPI_ERR_OUT_OF_RESOURCE;    }        /* Execute the inter-allreduce: the result of our group will       be in the buffer of the remote group */    rc = intercomm->c_coll.coll_allreduce ( inbuf, tmpbuf, count, MPI_INT,                                            op, intercomm );    if ( OMPI_SUCCESS != rc ) {        goto exit;    }    if ( 0 == local_rank ) {        MPI_Request req;        /* for the allgatherv later */        scount = count;        /* local leader exchange their data and determine the overall result           for both groups */        rc = MCA_PML_CALL(irecv (outbuf, count, MPI_INT, 0,                                 OMPI_COMM_ALLREDUCE_TAG                                , intercomm, &req));        if ( OMPI_SUCCESS != rc ) {            goto exit;        }        rc = MCA_PML_CALL(send (tmpbuf, count, MPI_INT, 0,                               OMPI_COMM_ALLREDUCE_TAG,                                MCA_PML_BASE_SEND_STANDARD, intercomm));        if ( OMPI_SUCCESS != rc ) {            goto exit;        }        rc = ompi_request_wait_all ( 1, &req, MPI_STATUS_IGNORE );        if ( OMPI_SUCCESS != rc ) {            goto exit;        }        if ( &ompi_mpi_op_max == op ) {            for ( i = 0 ; i < count; i++ ) {                if (tmpbuf[i] > outbuf[i]) outbuf[i] = tmpbuf[i];            }        }        else if ( &ompi_mpi_op_min == op ) {            for ( i = 0 ; i < count; i++ ) {                if (tmpbuf[i] < outbuf[i]) outbuf[i] = tmpbuf[i];            }        }        else if ( &ompi_mpi_op_sum == op ) {            for ( i = 0 ; i < count; i++ ) {                outbuf[i] += tmpbuf[i];            }        }        else if ( &ompi_mpi_op_prod == op ) {            for ( i = 0 ; i < count; i++ ) {                outbuf[i] *= tmpbuf[i];            }        }    }     /* distribute the overall result to all processes in the other group.       Instead of using bcast, we are using here allgatherv, to avoid the       possible deadlock. Else, we need an algorithm to determine,        which group sends first in the inter-bcast and which receives        the result first.    */    rcounts[0] = count;    sbuf       = outbuf;    rc = intercomm->c_coll.coll_allgatherv (sbuf, scount, MPI_INT, outbuf,                                            rcounts, rdisps, MPI_INT,                                             intercomm);     exit:    if ( NULL != tmpbuf ) {        free ( tmpbuf );    }    if ( NULL != rcounts ) {        free ( rcounts );    }    if ( NULL != rdisps ) {        free ( rdisps );    }        return (rc);}/* Arguments not used in this implementation: * - send_first */static int ompi_comm_allreduce_intra_bridge (int *inbuf, int *outbuf,                                              int count, struct ompi_op_t *op,                                              ompi_communicator_t *comm,                                             ompi_communicator_t *bcomm,                                              void* lleader, void* rleader,                                             int send_first ){    int *tmpbuf=NULL;    int local_rank;    int i;    int rc;    int local_leader, remote_leader;        local_leader  = (*((int*)lleader));    remote_leader = (*((int*)rleader));    if ( &ompi_mpi_op_sum != op && &ompi_mpi_op_prod != op &&         &ompi_mpi_op_max != op && &ompi_mpi_op_min  != op ) {        return MPI_ERR_OP;    }        local_rank = ompi_comm_rank ( comm );    tmpbuf     = (int *) malloc ( count * sizeof(int));    if ( NULL == tmpbuf ) {        return MPI_ERR_INTERN;    }    /* Intercomm_create */    rc = comm->c_coll.coll_allreduce ( inbuf, tmpbuf, count, MPI_INT,                                       op, comm );    if ( OMPI_SUCCESS != rc ) {        goto exit;    }    if (local_rank == local_leader ) {        MPI_Request req;                rc = MCA_PML_CALL(irecv ( outbuf, count, MPI_INT, remote_leader,                                 OMPI_COMM_ALLREDUCE_TAG,                                  bcomm, &req));        if ( OMPI_SUCCESS != rc ) {            goto exit;               }        rc = MCA_PML_CALL(send (tmpbuf, count, MPI_INT, remote_leader,                                OMPI_COMM_ALLREDUCE_TAG,                               MCA_PML_BASE_SEND_STANDARD,  bcomm));        if ( OMPI_SUCCESS != rc ) {            goto exit;        }        rc = ompi_request_wait_all ( 1, &req, MPI_STATUS_IGNORE);        if ( OMPI_SUCCESS != rc ) {            goto exit;        }        if ( &ompi_mpi_op_max == op ) {            for ( i = 0 ; i < count; i++ ) {                if (tmpbuf[i] > outbuf[i]) outbuf[i] = tmpbuf[i];            }        }        else if ( &ompi_mpi_op_min == op ) {            for ( i = 0 ; i < count; i++ ) {                if (tmpbuf[i] < outbuf[i]) outbuf[i] = tmpbuf[i];            }        }        else if ( &ompi_mpi_op_sum == op ) {            for ( i = 0 ; i < count; i++ ) {                outbuf[i] += tmpbuf[i];            }        }        else if ( &ompi_mpi_op_prod == op ) {            for ( i = 0 ; i < count; i++ ) {                outbuf[i] *= tmpbuf[i];            }        }            }        rc = comm->c_coll.coll_bcast ( outbuf, count, MPI_INT, local_leader,                                    comm); exit:    if (NULL != tmpbuf ) {        free (tmpbuf);    }    return (rc);}/* Arguments not used in this implementation: *    - bridgecomm * * lleader is the local rank of root in comm * rleader is the OOB contact information of the * root processes in the other world. */static int ompi_comm_allreduce_intra_oob (int *inbuf, int *outbuf,                                           int count, struct ompi_op_t *op,                                           ompi_communicator_t *comm,                                          ompi_communicator_t *bridgecomm,                                           void* lleader, void* rleader,                                          int send_first ){    int *tmpbuf=NULL;    int i;    int rc;    int local_leader, local_rank;    orte_process_name_t *remote_leader=NULL;    orte_std_cntr_t size_count;        local_leader  = (*((int*)lleader));    remote_leader = (orte_process_name_t*)rleader;    size_count = count;    if ( &ompi_mpi_op_sum != op && &ompi_mpi_op_prod != op &&         &ompi_mpi_op_max != op && &ompi_mpi_op_min  != op ) {        return MPI_ERR_OP;    }            local_rank = ompi_comm_rank ( comm );    tmpbuf     = (int *) malloc ( count * sizeof(int));    if ( NULL == tmpbuf ) {        return MPI_ERR_INTERN;    }    /* comm is an intra-communicator */    rc = comm->c_coll.coll_allreduce(inbuf,tmpbuf,count,MPI_INT,op, comm );    if ( OMPI_SUCCESS != rc ) {        goto exit;    }        if (local_rank == local_leader ) {        orte_buffer_t *sbuf;        orte_buffer_t *rbuf;        sbuf = OBJ_NEW(orte_buffer_t);        rbuf = OBJ_NEW(orte_buffer_t);                if (ORTE_SUCCESS != (rc = orte_dss.pack(sbuf, tmpbuf, (orte_std_cntr_t)count, ORTE_INT))) {            goto exit;        }        if ( send_first ) {            rc = orte_rml.send_buffer(remote_leader, sbuf, 0, 0);            rc = orte_rml.recv_buffer(remote_leader, rbuf, 0);        }        else {            rc = orte_rml.recv_buffer(remote_leader, rbuf, 0);            rc = orte_rml.send_buffer(remote_leader, sbuf, 0, 0);        }        if (ORTE_SUCCESS != (rc = orte_dss.unpack(rbuf, outbuf, &size_count, ORTE_INT))) {            goto exit;        }        OBJ_RELEASE(sbuf);        OBJ_RELEASE(rbuf);        count = (int)size_count;        if ( &ompi_mpi_op_max == op ) {            for ( i = 0 ; i < count; i++ ) {                if (tmpbuf[i] > outbuf[i]) outbuf[i] = tmpbuf[i];            }        }        else if ( &ompi_mpi_op_min == op ) {            for ( i = 0 ; i < count; i++ ) {                if (tmpbuf[i] < outbuf[i]) outbuf[i] = tmpbuf[i];            }        }        else if ( &ompi_mpi_op_sum == op ) {            for ( i = 0 ; i < count; i++ ) {                outbuf[i] += tmpbuf[i];            }        }        else if ( &ompi_mpi_op_prod == op ) {            for ( i = 0 ; i < count; i++ ) {                outbuf[i] *= tmpbuf[i];            }        }            }        rc = comm->c_coll.coll_bcast (outbuf, count, MPI_INT,                                   local_leader, comm); exit:    if (NULL != tmpbuf ) {        free (tmpbuf);    }    return (rc);}#if defined(c_plusplus) || defined(__cplusplus)}#endif

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -