📄 red_scat.c
字号:
old_i = (i < rem) ? i*2 + 1 : i + rem; if (old_i < 2*rem) { /* This process has to also do its left neighbor's work */ newcnts[i] = recvcnts[old_i] + recvcnts[old_i-1]; } else newcnts[i] = recvcnts[old_i]; } newdisps[0] = 0; for (i=1; i<pof2; i++) newdisps[i] = newdisps[i-1] + newcnts[i-1]; mask = pof2 >> 1; send_idx = recv_idx = 0; last_idx = pof2; while (mask > 0) { newdst = newrank ^ mask; /* find real rank of dest */ dst = (newdst < rem) ? newdst*2 + 1 : newdst + rem; send_cnt = recv_cnt = 0; if (newrank < newdst) { send_idx = recv_idx + mask; for (i=send_idx; i<last_idx; i++) send_cnt += newcnts[i]; for (i=recv_idx; i<send_idx; i++) recv_cnt += newcnts[i]; } else { recv_idx = send_idx + mask; for (i=send_idx; i<recv_idx; i++) send_cnt += newcnts[i]; for (i=recv_idx; i<last_idx; i++) recv_cnt += newcnts[i]; } /* printf("Rank %d, send_idx %d, recv_idx %d, send_cnt %d, recv_cnt %d, last_idx %d\n", newrank, send_idx, recv_idx, send_cnt, recv_cnt, last_idx);*/ /* Send data from tmp_results. Recv into tmp_recvbuf */ if ((send_cnt != 0) && (recv_cnt != 0)) mpi_errno = MPIC_Sendrecv((char *) tmp_results + newdisps[send_idx]*extent, send_cnt, datatype, dst, MPIR_REDUCE_SCATTER_TAG, (char *) tmp_recvbuf + newdisps[recv_idx]*extent, recv_cnt, datatype, dst, MPIR_REDUCE_SCATTER_TAG, comm, MPI_STATUS_IGNORE); else if ((send_cnt == 0) && (recv_cnt != 0)) mpi_errno = MPIC_Recv((char *) tmp_recvbuf + newdisps[recv_idx]*extent, recv_cnt, datatype, dst, MPIR_REDUCE_SCATTER_TAG, comm, MPI_STATUS_IGNORE); else if ((recv_cnt == 0) && (send_cnt != 0)) mpi_errno = MPIC_Send((char *) tmp_results + newdisps[send_idx]*extent, send_cnt, datatype, dst, MPIR_REDUCE_SCATTER_TAG, comm); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); return mpi_errno; } /* --END ERROR HANDLING-- */ /* tmp_recvbuf contains data received in this step. tmp_results contains data accumulated so far */ if (recv_cnt) {#ifdef HAVE_CXX_BINDING if (is_cxx_uop) { (*MPIR_Process.cxx_call_op_fn)((char *) tmp_recvbuf + newdisps[recv_idx]*extent, (char *) tmp_results + newdisps[recv_idx]*extent, recv_cnt, datatype, uop); } else #endif (*uop)((char *) tmp_recvbuf + newdisps[recv_idx]*extent, (char *) tmp_results + newdisps[recv_idx]*extent, &recv_cnt, &datatype); } /* update send_idx for next iteration */ send_idx = recv_idx; last_idx = recv_idx + mask; mask >>= 1; } /* copy this process's result from tmp_results to recvbuf */ if (recvcnts[rank]) { mpi_errno = MPIR_Localcopy((char *)tmp_results + disps[rank]*extent, recvcnts[rank], datatype, recvbuf, recvcnts[rank], datatype); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); return mpi_errno; } /* --END ERROR HANDLING-- */ } MPIU_Free(newcnts); MPIU_Free(newdisps); } /* In the non-power-of-two case, all odd-numbered processes of rank < 2*rem send to (rank-1) the result they calculated for that process */ if (rank < 2*rem) { if (rank % 2) { /* odd */ if (recvcnts[rank-1]) mpi_errno = MPIC_Send((char *) tmp_results + disps[rank-1]*extent, recvcnts[rank-1], datatype, rank-1, MPIR_REDUCE_SCATTER_TAG, comm); } else { /* even */ if (recvcnts[rank]) mpi_errno = MPIC_Recv(recvbuf, recvcnts[rank], datatype, rank+1, MPIR_REDUCE_SCATTER_TAG, comm, MPI_STATUS_IGNORE); } /* --BEGIN ERROR HANDLING-- */ if (mpi_errno) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); return mpi_errno; } /* --END ERROR HANDLING-- */ } MPIU_Free((char*)tmp_results + true_lb); MPIU_Free((char*)tmp_recvbuf + true_lb); } if ((is_commutative && (nbytes >= MPIR_REDSCAT_COMMUTATIVE_LONG_MSG)) || (!is_commutative && (nbytes >= MPIR_REDSCAT_NONCOMMUTATIVE_SHORT_MSG))) { /* commutative and long message, or noncommutative and long message. use (p-1) pairwise exchanges */ if (sendbuf != MPI_IN_PLACE) { /* copy local data into recvbuf */ mpi_errno = MPIR_Localcopy(((char *)sendbuf+disps[rank]*extent), recvcnts[rank], datatype, recvbuf, recvcnts[rank], datatype); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); return mpi_errno; } /* --END ERROR HANDLING-- */ } /* allocate temporary buffer to store incoming data */ tmp_recvbuf = MPIU_Malloc(recvcnts[rank]*(MPIR_MAX(true_extent,extent))+1); /* --BEGIN ERROR HANDLING-- */ if (!tmp_recvbuf) { mpi_errno = MPIR_Err_create_code( MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0); return mpi_errno; } /* --END ERROR HANDLING-- */ /* adjust for potential negative lower bound in datatype */ tmp_recvbuf = (void *)((char*)tmp_recvbuf - true_lb); for (i=1; i<comm_size; i++) { src = (rank - i + comm_size) % comm_size; dst = (rank + i) % comm_size; /* send the data that dst needs. recv data that this process needs from src into tmp_recvbuf */ if (sendbuf != MPI_IN_PLACE) mpi_errno = MPIC_Sendrecv(((char *)sendbuf+disps[dst]*extent), recvcnts[dst], datatype, dst, MPIR_REDUCE_SCATTER_TAG, tmp_recvbuf, recvcnts[rank], datatype, src, MPIR_REDUCE_SCATTER_TAG, comm, MPI_STATUS_IGNORE); else mpi_errno = MPIC_Sendrecv(((char *)recvbuf+disps[dst]*extent), recvcnts[dst], datatype, dst, MPIR_REDUCE_SCATTER_TAG, tmp_recvbuf, recvcnts[rank], datatype, src, MPIR_REDUCE_SCATTER_TAG, comm, MPI_STATUS_IGNORE); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); return mpi_errno; } /* --END ERROR HANDLING-- */ if (is_commutative || (src < rank)) { if (sendbuf != MPI_IN_PLACE) {#ifdef HAVE_CXX_BINDING if (is_cxx_uop) { (*MPIR_Process.cxx_call_op_fn)(tmp_recvbuf, recvbuf, recvcnts[rank], datatype, uop ); } else #endif (*uop)(tmp_recvbuf, recvbuf, &recvcnts[rank], &datatype); } else {#ifdef HAVE_CXX_BINDING if (is_cxx_uop) { (*MPIR_Process.cxx_call_op_fn)( tmp_recvbuf, ((char *)recvbuf+disps[rank]*extent), recvcnts[rank], datatype, uop ); } else #endif (*uop)(tmp_recvbuf, ((char *)recvbuf+disps[rank]*extent), &recvcnts[rank], &datatype); /* we can't store the result at the beginning of recvbuf right here because there is useful data there that other process/processes need. at the end, we will copy back the result to the beginning of recvbuf. */ } } else { if (sendbuf != MPI_IN_PLACE) {#ifdef HAVE_CXX_BINDING if (is_cxx_uop) { (*MPIR_Process.cxx_call_op_fn)( recvbuf, tmp_recvbuf, recvcnts[rank], datatype, uop ); } else #endif (*uop)(recvbuf, tmp_recvbuf, &recvcnts[rank], &datatype); /* copy result back into recvbuf */ mpi_errno = MPIR_Localcopy(tmp_recvbuf, recvcnts[rank], datatype, recvbuf, recvcnts[rank], datatype); } else {#ifdef HAVE_CXX_BINDING if (is_cxx_uop) { (*MPIR_Process.cxx_call_op_fn)( ((char *)recvbuf+disps[rank]*extent), tmp_recvbuf, recvcnts[rank], datatype, uop ); } else #endif (*uop)(((char *)recvbuf+disps[rank]*extent), tmp_recvbuf, &recvcnts[rank], &datatype); /* copy result back into recvbuf */ mpi_errno = MPIR_Localcopy(tmp_recvbuf, recvcnts[rank], datatype, ((char *)recvbuf + disps[rank]*extent), recvcnts[rank], datatype); } /* --BEGIN ERROR HANDLING-- */ if (mpi_errno) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); return mpi_errno; } /* --END ERROR HANDLING-- */ } } MPIU_Free((char *)tmp_recvbuf+true_lb); /* if MPI_IN_PLACE, move output data to the beginning of recvbuf. already done for rank 0. */ if ((sendbuf == MPI_IN_PLACE) && (rank != 0)) { mpi_errno = MPIR_Localcopy(((char *)recvbuf + disps[rank]*extent), recvcnts[rank], datatype, recvbuf, recvcnts[rank], datatype); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -