📄 allgather.c
字号:
recvtype, tmp_buf, tmp_buf_size, &position, comm); } curr_cnt = nbytes; mask = 0x1; i = 0; while (mask < comm_size) { dst = rank ^ mask; /* find offset into send and recv buffers. zero out the least significant "i" bits of rank and dst to find root of src and dst subtrees. Use ranks of roots as index to send from and recv into buffer. */ dst_tree_root = dst >> i; dst_tree_root <<= i; my_tree_root = rank >> i; my_tree_root <<= i; send_offset = my_tree_root * nbytes; recv_offset = dst_tree_root * nbytes; if (dst < comm_size) { mpi_errno = MPIC_Sendrecv(((char *)tmp_buf + send_offset), curr_cnt, MPI_BYTE, dst, MPIR_ALLGATHER_TAG, ((char *)tmp_buf + recv_offset), nbytes*mask, MPI_BYTE, dst, MPIR_ALLGATHER_TAG, comm, &status); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); return mpi_errno; } /* --END ERROR HANDLING-- */ NMPI_Get_count(&status, MPI_BYTE, &last_recv_cnt); curr_cnt += last_recv_cnt; } /* if some processes in this process's subtree in this step did not have any destination process to communicate with because of non-power-of-two, we need to send them the data that they would normally have received from those processes. That is, the haves in this subtree must send to the havenots. We use a logarithmic recursive-halfing algorithm for this. */ if (dst_tree_root + mask > comm_size) { nprocs_completed = comm_size - my_tree_root - mask; /* nprocs_completed is the number of processes in this subtree that have all the data. Send data to others in a tree fashion. First find root of current tree that is being divided into two. k is the number of least-significant bits in this process's rank that must be zeroed out to find the rank of the root */ j = mask; k = 0; while (j) { j >>= 1; k++; } k--; offset = nbytes * (my_tree_root + mask); tmp_mask = mask >> 1; while (tmp_mask) { dst = rank ^ tmp_mask; tree_root = rank >> k; tree_root <<= k; /* send only if this proc has data and destination doesn't have data. at any step, multiple processes can send if they have the data */ if ((dst > rank) && (rank < tree_root + nprocs_completed) && (dst >= tree_root + nprocs_completed)) { mpi_errno = MPIC_Send(((char *)tmp_buf + offset), last_recv_cnt, MPI_BYTE, dst, MPIR_ALLGATHER_TAG, comm); /* last_recv_cnt was set in the previous receive. that's the amount of data to be sent now. */ /* --BEGIN ERROR HANDLING-- */ if (mpi_errno) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); return mpi_errno; } /* --END ERROR HANDLING-- */ } /* recv only if this proc. doesn't have data and sender has data */ else if ((dst < rank) && (dst < tree_root + nprocs_completed) && (rank >= tree_root + nprocs_completed)) { mpi_errno = MPIC_Recv(((char *)tmp_buf + offset), nbytes*nprocs_completed, MPI_BYTE, dst, MPIR_ALLGATHER_TAG, comm, &status); /* nprocs_completed is also equal to the no. of processes whose data we don't have */ /* --BEGIN ERROR HANDLING-- */ if (mpi_errno) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); return mpi_errno; } /* --END ERROR HANDLING-- */ NMPI_Get_count(&status, MPI_BYTE, &last_recv_cnt); curr_cnt += last_recv_cnt; } tmp_mask >>= 1; k--; } } mask <<= 1; i++; } position = 0; NMPI_Unpack(tmp_buf, tmp_buf_size, &position, recvbuf, recvcount*comm_size, recvtype, comm); MPIU_Free(tmp_buf); }#endif /* MPID_HAS_HETERO */ } else if (recvcount*comm_size*type_size < MPIR_ALLGATHER_SHORT_MSG) { /* Short message and non-power-of-two no. of processes. Use * Bruck algorithm (see description above). */ /* allocate a temporary buffer of the same size as recvbuf. */ /* get true extent of recvtype */ mpi_errno = NMPI_Type_get_true_extent(recvtype, &recvtype_true_lb, &recvtype_true_extent); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); return mpi_errno; } /* --END ERROR HANDLING-- */ recvbuf_extent = recvcount * comm_size * (MPIR_MAX(recvtype_true_extent, recvtype_extent)); tmp_buf = MPIU_Malloc(recvbuf_extent); /* --BEGIN ERROR HANDLING-- */ if (!tmp_buf) { mpi_errno = MPIR_Err_create_code( MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0 ); return mpi_errno; } /* --END ERROR HANDLING-- */ /* adjust for potential negative lower bound in datatype */ tmp_buf = (void *)((char*)tmp_buf - recvtype_true_lb); /* copy local data to the top of tmp_buf */ if (sendbuf != MPI_IN_PLACE) { mpi_errno = MPIR_Localcopy (sendbuf, sendcount, sendtype, tmp_buf, recvcount, recvtype); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); return mpi_errno; } /* --END ERROR HANDLING-- */ } else { mpi_errno = MPIR_Localcopy (((char *)recvbuf + rank * recvcount * recvtype_extent), recvcount, recvtype, tmp_buf, recvcount, recvtype); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); return mpi_errno; } /* --END ERROR HANDLING-- */ } /* do the first \floor(\lg p) steps */ curr_cnt = recvcount; pof2 = 1; while (pof2 <= comm_size/2) { src = (rank + pof2) % comm_size; dst = (rank - pof2 + comm_size) % comm_size; mpi_errno = MPIC_Sendrecv(tmp_buf, curr_cnt, recvtype, dst, MPIR_ALLGATHER_TAG, ((char *)tmp_buf + curr_cnt*recvtype_extent), curr_cnt, recvtype, src, MPIR_ALLGATHER_TAG, comm, MPI_STATUS_IGNORE); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); return mpi_errno; } /* --END ERROR HANDLING-- */ curr_cnt *= 2; pof2 *= 2; } /* if comm_size is not a power of two, one more step is needed */ rem = comm_size - pof2; if (rem) { src = (rank + pof2) % comm_size; dst = (rank - pof2 + comm_size) % comm_size; mpi_errno = MPIC_Sendrecv(tmp_buf, rem * recvcount, recvtype, dst, MPIR_ALLGATHER_TAG, ((char *)tmp_buf + curr_cnt*recvtype_extent), rem * recvcount, recvtype, src, MPIR_ALLGATHER_TAG, comm, MPI_STATUS_IGNORE); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); return mpi_errno; } /* --END ERROR HANDLING-- */ } /* Rotate blocks in tmp_buf down by (rank) blocks and store * result in recvbuf. */ mpi_errno = MPIR_Localcopy(tmp_buf, (comm_size-rank)*recvcount, recvtype, (char *) recvbuf + rank*recvcount*recvtype_extent, (comm_size-rank)*recvcount, recvtype); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); return mpi_errno; } /* --END ERROR HANDLING-- */ if (rank) { mpi_errno = MPIR_Localcopy((char *) tmp_buf + (comm_size-rank)*recvcount*recvtype_extent, rank*recvcount, recvtype, recvbuf, rank*recvcount, recvtype); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); return mpi_errno; } /* --END ERROR HANDLING-- */ } MPIU_Free((char*)tmp_buf + recvtype_true_lb); } else { /* long message or medium-size message and non-power-of-two * no. of processes. use ring algorithm. */ /* First, load the "local" version in the recvbuf. */ if (sendbuf != MPI_IN_PLACE) { mpi_errno = MPIR_Localcopy(sendbuf, sendcount, sendtype, ((char *)recvbuf + rank*recvcount*recvtype_extent), recvcount, recvtype); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); return mpi_errno; } /* --END ERROR HANDLING-- */ } /* Now, send left to right. This fills in the receive area in reverse order. */ left = (comm_size + rank - 1) % comm_size; right = (rank + 1) % comm_size; j = rank; jnext = left; for (i=1; i<comm_size; i++) { mpi_errno = MPIC_Sendrecv(((char *)recvbuf + j*recvcount*recvtype_extent), recvcount, recvtype, right, MPIR_ALLGATHER_TAG, ((char *)recvbuf + jnext*recvcount*recvtype_extent), recvcount, recvtype, left, MPIR_ALLGATHER_TAG, comm, MPI_STATUS_IGNORE); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); break; } /* --END ERROR HANDLING-- */ j = jnext;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -