📄 comm_cid.c
字号:
freeing the new communicator anyway, so we might as well leave the coll selection as NULL (the coll base comm unselect code handles that case properly). */ if (MPI_UNDEFINED != newcomm->c_local_group->grp_my_rank) { /* Step 2: call all functions, which might use the new communicator already. */ /* Initialize the coll components */ /* Let the collectives components fight over who will do collective on this new comm. */ if (OMPI_SUCCESS != (ok = mca_coll_base_comm_select(newcomm, collcomponent))) { return ok; } } return OMPI_SUCCESS;} /**************************************************************************//**************************************************************************//**************************************************************************//* Arguments not used in this implementation: * - bridgecomm * - local_leader * - remote_leader * - send_first */static int ompi_comm_allreduce_intra ( int *inbuf, int *outbuf, int count, struct ompi_op_t *op, ompi_communicator_t *comm, ompi_communicator_t *bridgecomm, void* local_leader, void* remote_leader, int send_first ){ return comm->c_coll.coll_allreduce ( inbuf, outbuf, count, MPI_INT, op,comm );}/* Arguments not used in this implementation: * - bridgecomm * - local_leader * - remote_leader * - send_first */static int ompi_comm_allreduce_inter ( int *inbuf, int *outbuf, int count, struct ompi_op_t *op, ompi_communicator_t *intercomm, ompi_communicator_t *bridgecomm, void* local_leader, void* remote_leader, int send_first ){ int local_rank, rsize; int i, rc; int *sbuf; int *tmpbuf=NULL; int *rcounts=NULL, scount=0; int *rdisps=NULL; if ( &ompi_mpi_op_sum != op && &ompi_mpi_op_prod != op && &ompi_mpi_op_max != op && &ompi_mpi_op_min != op ) { return MPI_ERR_OP; } if ( !OMPI_COMM_IS_INTER (intercomm)) { return MPI_ERR_COMM; } /* Allocate temporary arrays */ rsize = ompi_comm_remote_size (intercomm); local_rank = ompi_comm_rank ( intercomm ); tmpbuf = (int *) malloc ( count * sizeof(int)); rdisps = (int *) calloc ( rsize, sizeof(int)); rcounts = (int *) calloc ( rsize, sizeof(int) ); if ( NULL == tmpbuf || NULL == rdisps || NULL == rcounts ) { return OMPI_ERR_OUT_OF_RESOURCE; } /* Execute the inter-allreduce: the result of our group will be in the buffer of the remote group */ rc = intercomm->c_coll.coll_allreduce ( inbuf, tmpbuf, count, MPI_INT, op, intercomm ); if ( OMPI_SUCCESS != rc ) { goto exit; } if ( 0 == local_rank ) { MPI_Request req; /* for the allgatherv later */ scount = count; /* local leader exchange their data and determine the overall result for both groups */ rc = MCA_PML_CALL(irecv (outbuf, count, MPI_INT, 0, OMPI_COMM_ALLREDUCE_TAG , intercomm, &req)); if ( OMPI_SUCCESS != rc ) { goto exit; } rc = MCA_PML_CALL(send (tmpbuf, count, MPI_INT, 0, OMPI_COMM_ALLREDUCE_TAG, MCA_PML_BASE_SEND_STANDARD, intercomm)); if ( OMPI_SUCCESS != rc ) { goto exit; } rc = ompi_request_wait_all ( 1, &req, MPI_STATUS_IGNORE ); if ( OMPI_SUCCESS != rc ) { goto exit; } if ( &ompi_mpi_op_max == op ) { for ( i = 0 ; i < count; i++ ) { if (tmpbuf[i] > outbuf[i]) outbuf[i] = tmpbuf[i]; } } else if ( &ompi_mpi_op_min == op ) { for ( i = 0 ; i < count; i++ ) { if (tmpbuf[i] < outbuf[i]) outbuf[i] = tmpbuf[i]; } } else if ( &ompi_mpi_op_sum == op ) { for ( i = 0 ; i < count; i++ ) { outbuf[i] += tmpbuf[i]; } } else if ( &ompi_mpi_op_prod == op ) { for ( i = 0 ; i < count; i++ ) { outbuf[i] *= tmpbuf[i]; } } } /* distribute the overall result to all processes in the other group. Instead of using bcast, we are using here allgatherv, to avoid the possible deadlock. Else, we need an algorithm to determine, which group sends first in the inter-bcast and which receives the result first. */ rcounts[0] = count; sbuf = outbuf; rc = intercomm->c_coll.coll_allgatherv (sbuf, scount, MPI_INT, outbuf, rcounts, rdisps, MPI_INT, intercomm); exit: if ( NULL != tmpbuf ) { free ( tmpbuf ); } if ( NULL != rcounts ) { free ( rcounts ); } if ( NULL != rdisps ) { free ( rdisps ); } return (rc);}/* Arguments not used in this implementation: * - send_first */static int ompi_comm_allreduce_intra_bridge (int *inbuf, int *outbuf, int count, struct ompi_op_t *op, ompi_communicator_t *comm, ompi_communicator_t *bcomm, void* lleader, void* rleader, int send_first ){ int *tmpbuf=NULL; int local_rank; int i; int rc; int local_leader, remote_leader; local_leader = (*((int*)lleader)); remote_leader = (*((int*)rleader)); if ( &ompi_mpi_op_sum != op && &ompi_mpi_op_prod != op && &ompi_mpi_op_max != op && &ompi_mpi_op_min != op ) { return MPI_ERR_OP; } local_rank = ompi_comm_rank ( comm ); tmpbuf = (int *) malloc ( count * sizeof(int)); if ( NULL == tmpbuf ) { return MPI_ERR_INTERN; } /* Intercomm_create */ rc = comm->c_coll.coll_allreduce ( inbuf, tmpbuf, count, MPI_INT, op, comm ); if ( OMPI_SUCCESS != rc ) { goto exit; } if (local_rank == local_leader ) { MPI_Request req; rc = MCA_PML_CALL(irecv ( outbuf, count, MPI_INT, remote_leader, OMPI_COMM_ALLREDUCE_TAG, bcomm, &req)); if ( OMPI_SUCCESS != rc ) { goto exit; } rc = MCA_PML_CALL(send (tmpbuf, count, MPI_INT, remote_leader, OMPI_COMM_ALLREDUCE_TAG, MCA_PML_BASE_SEND_STANDARD, bcomm)); if ( OMPI_SUCCESS != rc ) { goto exit; } rc = ompi_request_wait_all ( 1, &req, MPI_STATUS_IGNORE); if ( OMPI_SUCCESS != rc ) { goto exit; } if ( &ompi_mpi_op_max == op ) { for ( i = 0 ; i < count; i++ ) { if (tmpbuf[i] > outbuf[i]) outbuf[i] = tmpbuf[i]; } } else if ( &ompi_mpi_op_min == op ) { for ( i = 0 ; i < count; i++ ) { if (tmpbuf[i] < outbuf[i]) outbuf[i] = tmpbuf[i]; } } else if ( &ompi_mpi_op_sum == op ) { for ( i = 0 ; i < count; i++ ) { outbuf[i] += tmpbuf[i]; } } else if ( &ompi_mpi_op_prod == op ) { for ( i = 0 ; i < count; i++ ) { outbuf[i] *= tmpbuf[i]; } } } rc = comm->c_coll.coll_bcast ( outbuf, count, MPI_INT, local_leader, comm); exit: if (NULL != tmpbuf ) { free (tmpbuf); } return (rc);}/* Arguments not used in this implementation: * - bridgecomm * * lleader is the local rank of root in comm * rleader is the OOB contact information of the * root processes in the other world. */static int ompi_comm_allreduce_intra_oob (int *inbuf, int *outbuf, int count, struct ompi_op_t *op, ompi_communicator_t *comm, ompi_communicator_t *bridgecomm, void* lleader, void* rleader, int send_first ){ int *tmpbuf=NULL; int i; int rc; int local_leader, local_rank; orte_process_name_t *remote_leader=NULL; orte_std_cntr_t size_count; local_leader = (*((int*)lleader)); remote_leader = (orte_process_name_t*)rleader; size_count = count; if ( &ompi_mpi_op_sum != op && &ompi_mpi_op_prod != op && &ompi_mpi_op_max != op && &ompi_mpi_op_min != op ) { return MPI_ERR_OP; } local_rank = ompi_comm_rank ( comm ); tmpbuf = (int *) malloc ( count * sizeof(int)); if ( NULL == tmpbuf ) { return MPI_ERR_INTERN; } /* comm is an intra-communicator */ rc = comm->c_coll.coll_allreduce(inbuf,tmpbuf,count,MPI_INT,op, comm ); if ( OMPI_SUCCESS != rc ) { goto exit; } if (local_rank == local_leader ) { orte_buffer_t *sbuf; orte_buffer_t *rbuf; sbuf = OBJ_NEW(orte_buffer_t); rbuf = OBJ_NEW(orte_buffer_t); if (ORTE_SUCCESS != (rc = orte_dss.pack(sbuf, tmpbuf, (orte_std_cntr_t)count, ORTE_INT))) { goto exit; } if ( send_first ) { rc = orte_rml.send_buffer(remote_leader, sbuf, 0, 0); rc = orte_rml.recv_buffer(remote_leader, rbuf, 0); } else { rc = orte_rml.recv_buffer(remote_leader, rbuf, 0); rc = orte_rml.send_buffer(remote_leader, sbuf, 0, 0); } if (ORTE_SUCCESS != (rc = orte_dss.unpack(rbuf, outbuf, &size_count, ORTE_INT))) { goto exit; } OBJ_RELEASE(sbuf); OBJ_RELEASE(rbuf); count = (int)size_count; if ( &ompi_mpi_op_max == op ) { for ( i = 0 ; i < count; i++ ) { if (tmpbuf[i] > outbuf[i]) outbuf[i] = tmpbuf[i]; } } else if ( &ompi_mpi_op_min == op ) { for ( i = 0 ; i < count; i++ ) { if (tmpbuf[i] < outbuf[i]) outbuf[i] = tmpbuf[i]; } } else if ( &ompi_mpi_op_sum == op ) { for ( i = 0 ; i < count; i++ ) { outbuf[i] += tmpbuf[i]; } } else if ( &ompi_mpi_op_prod == op ) { for ( i = 0 ; i < count; i++ ) { outbuf[i] *= tmpbuf[i]; } } } rc = comm->c_coll.coll_bcast (outbuf, count, MPI_INT, local_leader, comm); exit: if (NULL != tmpbuf ) { free (tmpbuf); } return (rc);}#if defined(c_plusplus) || defined(__cplusplus)}#endif
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -