📄 reduce.c
字号:
dst, MPIR_REDUCE_TAG, (char *) tmp_buf + disps[recv_idx]*extent, recv_cnt, datatype, dst, MPIR_REDUCE_TAG, comm, MPI_STATUS_IGNORE); MPIU_ERR_CHKANDJUMP((mpi_errno), mpi_errno, MPI_ERR_OTHER, "**fail"); /* tmp_buf contains data received in this step. recvbuf contains data accumulated so far */ /* This algorithm is used only for predefined ops and predefined ops are always commutative. */#ifdef HAVE_CXX_BINDING if (is_cxx_uop) { (*MPIR_Process.cxx_call_op_fn)((char *) tmp_buf + disps[recv_idx]*extent, (char *) recvbuf + disps[recv_idx]*extent, recv_cnt, datatype, uop); } else #endif (*uop)((char *) tmp_buf + disps[recv_idx]*extent, (char *) recvbuf + disps[recv_idx]*extent, &recv_cnt, &datatype); /* update send_idx for next iteration */ send_idx = recv_idx; mask <<= 1; /* update last_idx, but not in last iteration because the value is needed in the gather step below. */ if (mask < pof2) last_idx = recv_idx + pof2/mask; } } /* now do the gather to root */ /* Is root one of the processes that was excluded from the computation above? If so, send data from newrank=0 to the root and have root take on the role of newrank = 0 */ if (root < 2*rem) { if (root % 2 != 0) { if (rank == root) { /* recv */ /* initialize the arrays that weren't initialized */ for (i=0; i<(pof2-1); i++) cnts[i] = count/pof2; cnts[pof2-1] = count - (count/pof2)*(pof2-1); disps[0] = 0; for (i=1; i<pof2; i++) disps[i] = disps[i-1] + cnts[i-1]; mpi_errno = MPIC_Recv(recvbuf, cnts[0], datatype, 0, MPIR_REDUCE_TAG, comm, MPI_STATUS_IGNORE); newrank = 0; send_idx = 0; last_idx = 2; } else if (newrank == 0) { /* send */ mpi_errno = MPIC_Send(recvbuf, cnts[0], datatype, root, MPIR_REDUCE_TAG, comm); newrank = -1; } newroot = 0; } else newroot = root / 2; } else newroot = root - rem; if (newrank != -1) { j = 0; mask = 0x1; while (mask < pof2) { mask <<= 1; j++; } mask >>= 1; j--; while (mask > 0) { newdst = newrank ^ mask; /* find real rank of dest */ dst = (newdst < rem) ? newdst*2 : newdst + rem; /* if root is playing the role of newdst=0, adjust for it */ if ((newdst == 0) && (root < 2*rem) && (root % 2 != 0)) dst = root; /* if the root of newdst's half of the tree is the same as the root of newroot's half of the tree, send to newdst and exit, else receive from newdst. */ newdst_tree_root = newdst >> j; newdst_tree_root <<= j; newroot_tree_root = newroot >> j; newroot_tree_root <<= j; send_cnt = recv_cnt = 0; if (newrank < newdst) { /* update last_idx except on first iteration */ if (mask != pof2/2) last_idx = last_idx + pof2/(mask*2); recv_idx = send_idx + pof2/(mask*2); for (i=send_idx; i<recv_idx; i++) send_cnt += cnts[i]; for (i=recv_idx; i<last_idx; i++) recv_cnt += cnts[i]; } else { recv_idx = send_idx - pof2/(mask*2); for (i=send_idx; i<last_idx; i++) send_cnt += cnts[i]; for (i=recv_idx; i<send_idx; i++) recv_cnt += cnts[i]; } if (newdst_tree_root == newroot_tree_root) { /* send and exit */ /* printf("Rank %d, send_idx %d, send_cnt %d, last_idx %d\n", newrank, send_idx, send_cnt, last_idx); fflush(stdout); */ /* Send data from recvbuf. Recv into tmp_buf */ mpi_errno = MPIC_Send((char *) recvbuf + disps[send_idx]*extent, send_cnt, datatype, dst, MPIR_REDUCE_TAG, comm); MPIU_ERR_CHKANDJUMP((mpi_errno), mpi_errno, MPI_ERR_OTHER, "**fail"); break; } else { /* recv and continue */ /* printf("Rank %d, recv_idx %d, recv_cnt %d, last_idx %d\n", newrank, recv_idx, recv_cnt, last_idx); fflush(stdout); */ mpi_errno = MPIC_Recv((char *) recvbuf + disps[recv_idx]*extent, recv_cnt, datatype, dst, MPIR_REDUCE_TAG, comm, MPI_STATUS_IGNORE); MPIU_ERR_CHKANDJUMP((mpi_errno), mpi_errno, MPI_ERR_OTHER, "**fail"); } if (newrank > newdst) send_idx = recv_idx; mask >>= 1; j--; } } } else { /* use a binomial tree algorithm */ /* This code is from MPICH-1. */ /* Here's the algorithm. Relative to the root, look at the bit pattern in my rank. Starting from the right (lsb), if the bit is 1, send to the node with that bit zero and exit; if the bit is 0, receive from the node with that bit set and combine (as long as that node is within the group) Note that by receiving with source selection, we guarentee that we get the same bits with the same input. If we allowed the parent to receive the children in any order, then timing differences could cause different results (roundoff error, over/underflows in some cases, etc). Because of the way these are ordered, if root is 0, then this is correct for both commutative and non-commutitive operations. If root is not 0, then for non-commutitive, we use a root of zero and then send the result to the root. To see this, note that the ordering is mask = 1: (ab)(cd)(ef)(gh) (odds send to evens) mask = 2: ((ab)(cd))((ef)(gh)) (3,6 send to 0,4) mask = 4: (((ab)(cd))((ef)(gh))) (4 sends to 0) Comments on buffering. If the datatype is not contiguous, we still need to pass contiguous data to the user routine. In this case, we should make a copy of the data in some format, and send/operate on that. In general, we can't use MPI_PACK, because the alignment of that is rather vague, and the data may not be re-usable. What we actually need is a "squeeze" operation that removes the skips. */ mask = 0x1; if (is_commutative) lroot = root; else lroot = 0; relrank = (rank - lroot + comm_size) % comm_size; while (/*(mask & relrank) == 0 && */mask < comm_size) { /* Receive */ if ((mask & relrank) == 0) { source = (relrank | mask); if (source < comm_size) { source = (source + lroot) % comm_size; mpi_errno = MPIC_Recv (tmp_buf, count, datatype, source, MPIR_REDUCE_TAG, comm, &status); MPIU_ERR_CHKANDJUMP((mpi_errno), mpi_errno, MPI_ERR_OTHER, "**fail"); /* The sender is above us, so the received buffer must be the second argument (in the noncommutative case). */ if (is_commutative) {#ifdef HAVE_CXX_BINDING if (is_cxx_uop) { (*MPIR_Process.cxx_call_op_fn)( tmp_buf, recvbuf, count, datatype, uop ); } else #endif (*uop)(tmp_buf, recvbuf, &count, &datatype); } else {#ifdef HAVE_CXX_BINDING if (is_cxx_uop) { (*MPIR_Process.cxx_call_op_fn)( recvbuf, tmp_buf, count, datatype, uop ); } else #endif (*uop)(recvbuf, tmp_buf, &count, &datatype); mpi_errno = MPIR_Localcopy(tmp_buf, count, datatype, recvbuf, count, datatype); MPIU_ERR_CHKANDJUMP((mpi_errno), mpi_errno, MPI_ERR_OTHER, "**fail"); } } } else { /* I've received all that I'm going to. Send my result to my parent */ source = ((relrank & (~ mask)) + lroot) % comm_size; mpi_errno = MPIC_Send( recvbuf, count, datatype, source, MPIR_REDUCE_TAG, comm ); MPIU_ERR_CHKANDJUMP((mpi_errno), mpi_errno, MPI_ERR_OTHER, "**fail"); break; } mask <<= 1; } if (!is_commutative && (root != 0)) { if (rank == 0) { mpi_errno = MPIC_Send( recvbuf, count, datatype, root, MPIR_REDUCE_TAG, comm ); } else if (rank == root) { mpi_errno = MPIC_Recv ( recvbuf, count, datatype, 0, MPIR_REDUCE_TAG, comm, &status); } MPIU_ERR_CHKANDJUMP((mpi_errno), mpi_errno, MPI_ERR_OTHER, "**fail"); } } /* check if multiple threads are calling this collective function */ MPIDU_ERR_CHECK_MULTIPLE_THREADS_EXIT( comm_ptr ); /* --BEGIN ERROR HANDLING-- */ if (p->op_errno) { mpi_errno = p->op_errno; goto fn_fail; } /* --END ERROR HANDLING-- */ fn_exit: MPIU_CHKLMEM_FREEALL(); MPIR_Nest_decr(); return (mpi_errno); fn_fail: goto fn_exit;}/* end:nested *//* begin:nested */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -