📄 pipe.c
字号:
/*_____________________________________________________________________________ * pipe.C routines to circulate data around in a ring. * This is all reasonably standard MPI * In fact, it is standard MPE, but repeated here * for consistency and independence. * * Note we use the ANSI C MPI bindings here. * main reason is that the C++ bindings are ill documented. * However, full conversion to C++ should not be too hard. *_____________________________________________________________________________ * * Original version taken from: * http://wotug.ukc.ac.uk/parallel/standards/mpi * * Adapted on Okt 2003 * by Simon Portegies Zwart (Amsterdam) *_____________________________________________________________________________ */#include "pipe.h"#ifndef TOOLBOXvoid MPE_Pipe_create(MPI_Comm comm, MPI_Datatype type, int maxsize, void **pipe){ MPE_Pipe *new_pipe; int size, dsize, dextent, maxbuf; new_pipe = (MPE_Pipe *)malloc(sizeof(MPE_Pipe)); // Create a ring of processes, allowing MPI to pick a good ordering MPI_Comm_size(comm, &size); int trueue=1; MPI_Cart_create(comm, 1, &size, &trueue, 1, &new_pipe->mycomm); MPI_Cart_shift(new_pipe->mycomm, 0, 1, &new_pipe->left, &new_pipe->right); // Create the double buffers. Currently, require that the // datatype be contiguous. MPI_Type_size(type, &dsize); MPI_Type_extent(type, &dextent); if (2 * dsize <= dextent) { fprintf(stderr, "Datatype needs to be (nearly) contiguous; size = %d and extent = %d\n", dsize, dextent); free(new_pipe); *pipe = 0; return; } new_pipe->typesize = dextent; MPI_Allreduce(&maxsize, &maxbuf, 1, MPI_INT, MPI_MAX, comm); new_pipe->maxlen = maxbuf; maxbuf *= dextent; new_pipe->buf = (void *)malloc(2 * maxbuf); new_pipe->buf1 = new_pipe->buf; new_pipe->buf2 = (void *)((char *)new_pipe->buf1 + maxbuf); new_pipe->type = type; new_pipe->stage = 0; new_pipe->last_stage = size - 1; *pipe = (void *)new_pipe;}// This is one algorithm for sending data in the pipe. Others could // use MPI_Sendrecv, persistent send/recv operations, or alltoall. void MPE_Pipe_start(void *pipe, void *mybuf, int len, int qcopy){ MPE_Pipe *p = (MPE_Pipe *)pipe; if (p->stage != 0) { fprintf(stderr, "Can only start pipe when pipe is empty\n"); return; } if (p->last_stage == 0) { return; } // Start the pipe MPI_Irecv(p->buf1, p->maxlen, p->type, p->left, 0, p->mycomm, &p->requests[0]); // If qcopy is 1, we might choose to use MPI_Send instead if (qcopy) { memcpy(p->buf2, mybuf, len * p->typesize); mybuf = p->buf2; } MPI_Isend(mybuf, len, p->type, p->right, 0, p->mycomm, &p->requests[1]);}void MPE_Pipe_push(void *pipe, void **recvbuf, int *recvlen){ MPE_Pipe *p = (MPE_Pipe *)pipe; MPI_Status statuses[2]; void *tmp; if (p->last_stage == 0) return; MPI_Waitall(2, p->requests, statuses); MPI_Get_count(&statuses[0], p->type, recvlen); *recvbuf = p->buf1; if (++p->stage == p->last_stage) { p->stage = 0; return; } // Start next cycle tmp = p->buf1; p->buf1 = p->buf2; p->buf2 = tmp; MPI_Irecv(p->buf1, p->maxlen, p->type, p->left, 0, p->mycomm, &p->requests[0]); MPI_Isend(p->buf2, *recvlen, p->type, p->right, 0, p->mycomm, &p->requests[1]);}void MPE_Pipe_free(void **pipe){ MPE_Pipe *p = (MPE_Pipe *)*pipe; if (p->stage != 0) { fprintf(stderr, "Can not free PIPE until %d more stages complete\n", p->last_stage - p->stage); return; } MPI_Comm_free(&p->mycomm); free(p->buf); *pipe = 0;}#elseint main(int argc, char **argv){ void *pipe; int rank, size, *rbuf, rlen, i; MPI_Init(&argc, &argv); MPI_Comm_size(MPI_COMM_WORLD, &size); MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPE_Pipe_create(MPI_COMM_WORLD, MPI_INT, 1, &pipe); MPE_Pipe_start(pipe, &rank, 1, 1); // Get size - 1 pieces of additional data for (i=1; i<size; i++) { MPE_Pipe_push((void*)pipe, (void**)&rbuf, &rlen); if (rlen != 1) { printf("[%d] received wrong count (%d) at step %d\n", rank, rlen, i); } printf("[%d] received %d\n", rank, *rbuf); } MPE_Pipe_free(&pipe); MPI_Finalize();}#endif
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -