📄 msgtr.c
字号:
/* PSPH - Parallel SPH program
* Bradley Smith, and Lou Baker, Dagonet Software
*/
static char sccs_id[] = "@(#) /home/bsmith/src/psph/SCCS/s.msg.c 1.8 94/01/19";
/* Message passing routines -
* Operating System Dependent!
*/
#include "all.h"
#ifdef MPI /* Generic message passing interface */
#include "mpi.h"
#endif
#if defined(MYMPI)
int
MPI_Init(int *argc, char ***argv)
{
#if defined(PVM)
int nproc;
/* Determine number of processes to spawn, and start PVM */
nproc = ParseSpawnArg(*argc, *argv);
#ifdef CM5 /* CM5 has a host program that spawns others */
return(PVM_Init("psph_slave", nproc));
#else
return(PVM_Init(*argv[0],nproc));
#endif
#else
return(MPI_SUCCESS);
#endif
}
int
MPI_Comm_size(MPI_Comm comm, int *size)
{
#ifdef PVM
*size = PVM_Nproc();
#else
*size = 1;
#endif
return(MPI_SUCCESS);
}
int
MPI_Comm_rank(MPI_Comm comm, int *rank)
{
#ifdef PVM
*rank = PVM_Rank();
#else
*rank = 0;
#endif
return(MPI_SUCCESS);
}
int
MPI_Abort(MPI_Comm comm, int errcode)
{
#ifdef PVM
PVM_Abort(errcode);
#endif
return(MPI_SUCCESS);
}
int
MPI_Finalize()
{
#ifdef PVM
pvm_exit();
#endif
return(MPI_SUCCESS);
}
int
MPI_Send(void *buf, int count, MPI_Datatype datatype,
int dest, int tag, MPI_Comm comm)
{
int ret;
#ifdef TRACE
tracemark(TraceSTART_SEND);
#endif
#ifdef PVM
ret = PVM_Send(buf, count, datatype, dest, tag);
#else
ret = MPI_SUCCESS;
#endif
#ifdef TRACE
tracemark(TraceFINISH_SEND);
#endif
return(ret);
}
int
MPI_Recv(void *buf, int count, MPI_Datatype datatype,
int source, int tag, MPI_Comm comm, MPI_Status *status)
{
int ret;
#ifdef TRACE
tracemark(TraceWAIT_RECV);
#endif
#ifdef PVM
ret = PVM_Receive(buf, count, datatype, source, tag, status);
#else
ret = MPI_SUCCESS;
#endif
#ifdef TRACE
tracemark(TraceFINISH_RECV);
#endif
return(ret);
}
int
MPI_Isend(void *buf, int count, MPI_Datatype datatype, int dest,
int tag, MPI_Comm comm, MPI_Comm_request *request)
{
#ifdef PVM
return(PVM_Isend(buf, count, datatype, dest, tag, request));
#else
return(MPI_SUCCESS);
#endif
}
int
MPI_Irecv(void *buf, int count, MPI_Datatype datatype, int source,
int tag, MPI_Comm comm, MPI_Comm_request *request)
{
#ifdef PVM
return(PVM_Irecv(buf, count, datatype, source, tag, request));
#else
return(MPI_SUCCESS);
#endif
}
int
MPI_Wait(MPI_Comm_request *request, MPI_Status *status)
{
int ret;
#ifdef TRACE
tracemark(TraceWAIT_START);
#endif
#ifdef PVM
ret = PVM_Wait(request,status);
#else
ret = MPI_SUCCESS;
#endif
#ifdef TRACE
tracemark(TraceWAIT_DONE);
#endif
return(ret);
}
int
MPI_Test(MPI_Comm_request *request, MPI_Status *status)
{
#ifdef PVM
return(PVM_Test(request,status));
#else
return(TRUE);
#endif
}
int
MPI_Iprobe(int source, int tag, MPI_Comm comm, int *flag,
MPI_Status *status)
{
#ifdef PVM
return(PVM_Probe(source, tag, flag, status));
#else
*flag = FALSE;
return(MPI_SUCCESS);
#endif
}
int
MPI_Get_source(MPI_Status status, int *source)
{
return(*source = status.source);
}
int
MPI_Get_tag(MPI_Status status, int *tag)
{
return(*tag = status.tag);
}
int
MPI_Get_count(MPI_Status status, MPI_Datatype datatype,
int *count)
{
return(*count = status.count);
}
int
MPI_Barrier(MPI_Comm comm)
{
int rank, nproc, dummy,i;
MPI_Status status;
#ifdef TRACE
tracemark(TraceWAIT_BARRIER);
#endif
/* For right now - do this with generic sends and receives */
MPI_Comm_size(comm, &nproc);
MPI_Comm_rank(comm, &rank);
if(nproc <= 1)
return(MPI_SUCCESS);
/* Everyone send to rank zero */
if(rank > 0)
{
MPI_Send(&rank, 1, MPI_INT, 0, BARRIER_MSG_TAG, comm);
MPI_Recv(&dummy,1,MPI_INT, 0, BARRIER_MSG_TAG, comm, &status);
}
else
{
/* Zero'th processor collects messages */
for(i=1; i<nproc; i++)
MPI_Recv(&dummy,1, MPI_INT, MPI_ANY_SOURCE, BARRIER_MSG_TAG, comm, &status);
/* Now let everyone proceed */
for(i=1; i<nproc; i++)
MPI_Send(&rank, 1, MPI_INT, i, BARRIER_MSG_TAG, comm);
}
#ifdef TRACE
tracemark(TraceWAIT_OVER);
#endif
return(MPI_SUCCESS);
}
/* A very limited implementation ! - only a few ops */
int
MPI_Reduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype,
MPI_Op op, int root, MPI_Comm comm)
{
int rank, nproc, i;
void *buf;
MPI_Status status;
/* For right now - do this with generic sends and receives */
MPI_Comm_size(comm, &nproc);
MPI_Comm_rank(comm, &rank);
if(nproc <= 1)
return(MPI_SUCCESS);
if(rank != root)
{
MPI_Send(sendbuf, count, datatype, root, REDUCE_MSG_TAG, comm);
}
else
{
/* Master */
buf = New(count * _MPI_SizeDataType(datatype));
memcpy(recvbuf, sendbuf, count * _MPI_SizeDataType(datatype));
for(i=1; i<nproc; i++)
{
MPI_Recv(buf,count, datatype, MPI_ANY_SOURCE, REDUCE_MSG_TAG, comm, &status);
_MPI_Op(recvbuf, buf, count, datatype,op);
}
free(buf);
}
/* Need to wait for master before continuing */
MPI_Barrier(comm);
return(MPI_SUCCESS);
}
void
_MPI_Op(void *b1, void *b2, int count,MPI_Datatype datatype, MPI_Op op)
{
int i;
float *f1,*f2;
double *d1, *d2;
int *i1,*i2;
f1 = (float *) b1; f2 = (float *) b2;
d1 = (double *) b1; d2 = (double *) b2;
i1 = (int *) b1; i2 = (int *) b2;
for(i=0; i<count; i++)
{
switch(datatype)
{
case MPI_FLOAT:
switch(op)
{
case MPI_MAX:
f1[i] = MAX(f1[i], f2[i]);
break;
case MPI_MIN:
f1[i] = MIN(f1[i], f2[i]);
break;
case MPI_SUM:
f1[i] += f2[i];
break;
case MPI_PROD:
f1[i] *= f2[i];
break;
default:
ReportError("MPI:Unsupported MPI_FLOAT reduction operation\n",FALSE);
break;
}
break;
case MPI_DOUBLE:
switch(op)
{
case MPI_MAX:
d1[i] = MAX(d1[i], d2[i]);
break;
case MPI_MIN:
d1[i] = MIN(d1[i], d2[i]);
break;
case MPI_SUM:
d1[i] += d2[i];
break;
case MPI_PROD:
d1[i] *= d2[i];
break;
default:
ReportError("MPI:Unsupported MPI_DOUBLE reduction operation\n",FALSE);
break;
}
break;
case MPI_INT:
case MPI_UNSIGNED:
switch(op)
{
case MPI_MAX:
i1[i] = MAX(i1[i], i2[i]);
break;
case MPI_MIN:
i1[i] = MIN(i1[i], i2[i]);
break;
case MPI_SUM:
i1[i] += i2[i];
break;
case MPI_PROD:
i1[i] *= i2[i];
break;
default:
ReportError("MPI:Unsupported MPI_INT reduction operation\n",FALSE);
break;
}
break;
default:
ReportError("MPI: Unsupported type in MPI_Reduce()\n", FALSE);
break;
} /* End switch */
}
}
#endif /* MYMPI */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -