📄 send.c
字号:
/* PSPH - Parallel SPH program
* Bradley Smith, and Lou Baker, Dagonet Software
*/
static char sccs_id[] = "@(#) /home2/bsmith/src/psph/SCCS/s.send.c 1.27 94/03/24";
/*
Buffered send and receive functions
*/
#include "all.h"
/* Global variables */
int Rank, NProc;
MPI_Comm Comm;
int *AllDest;
int *AllDestButMe;
int Quiet=FALSE;
char LogFile[256];
static int OutFile;
#ifdef LAST_COMM
/* Record status of last communication and report it on break */
int LastSendDest, LastSendTag;
int LastRecvSource, LastRecvTag;
#endif
/* Global message buffers */
PMsgBuf GSendBuf, GRecvBuf;
void
ExitMPISystem()
{
#ifdef NB_SEND
FlushAllPending();
#endif
MPI_Finalize();
}
void
InitMPISystem(int argc, char **argv, char *logfile, int quiet)
{
int i,j=0;
char curdir[256];
Quiet = quiet;
fprintf(stderr, "Hello World\n");
{
int i;
for(i=0; i<argc; i++) {
fprintf(stderr, "i=%d argv=%s\n", i, argv[i]);
}
}
fprintf(stderr, "Hello World\n");
if(MPI_Init(&argc,&argv) != MPI_SUCCESS)
ReportError("Can't initialize MPI!\n", TRUE);
Comm = MPI_COMM_WORLD;
MPI_Comm_size(Comm, &NProc);
MPI_Comm_rank(Comm, &Rank);
/* Set up an array that is all destinations */
AllDest = (int *) New(sizeof(int)*NProc);
AllDestButMe = (int *) New(sizeof(int)*NProc);
j=0;
for(i=0; i<NProc; i++)
{
AllDest[i] = i;
if(i != Rank)
AllDestButMe[j++] = i;
}
/* Allocate our own send and receive buffers - global */
GSendBuf = NewMsgBuf(SEND_BUF, BUF_ALLOC);
GRecvBuf = NewMsgBuf(RECV_BUF, BUF_ALLOC);
if(logfile)
{
GetCurDirectory(curdir);
sprintf(LogFile, "%s/%s", curdir, logfile);
OutFile = -1;
#ifndef SPLIT_LOG
if(Rank == 0 )
{
if(access(logfile, 0) == 0)
unlink(logfile);
OutFile = open(logfile, O_APPEND | O_CREAT | O_WRONLY, 0440);
}
#endif
#ifdef SPLIT_LOG
sprintf(LogFile, "%s/%s.%d", curdir, logfile, Rank);
OutFile = open(LogFile, O_CREAT | O_WRONLY, 0440);
#endif
}
else
LogFile[0] = NULLC;
}
PMsgBuf
NewMsgBuf(BOOL is_send, int nalloc)
{
PMsgBuf mbuf;
mbuf = (PMsgBuf) New(sizeof(MsgBuf));
mbuf->count = mbuf->nused = 0;
mbuf->nalloc = 0;
mbuf->is_send = is_send;
mbuf->datatype = MPI_INT;
mbuf->datasize = sizeof(int);
mbuf->buf = NULL;
#ifndef PVM_BUF
SizeMsgBuf(mbuf, nalloc);
#else
mbuf->bufid = -1;
#endif
return(mbuf);
}
void
SizeMsgBuf(PMsgBuf mbuf, int nalloc)
{
if(mbuf->nalloc <= 0)
{
mbuf->nalloc = nalloc;
mbuf->buf = (char *) New(nalloc);
return;
}
mbuf->buf = (char *) ReAlloc(mbuf->buf, nalloc);
mbuf->nalloc = nalloc;
}
void
DeleteMsgBuf(PMsgBuf mbuf)
{
#ifndef PVM_BUF
if(mbuf->nalloc > 0)
Delete(mbuf->buf);
#else
if(mbuf->bufid >= 0)
pvm_freebuf(mbuf->bufid);
#endif
Delete(mbuf);
}
int
PackBuf(PMsgBuf mbuf, void *buf, int count, MPI_Datatype datatype)
{
if(!mbuf)
mbuf = GSendBuf;
if(!mbuf->is_send)
{
ReportError("Packbuf called on Receive type buffer!- ignored\n", FALSE);
return(0);
}
if(mbuf->count > 0 && mbuf->datatype != datatype)
{
ReportError("PackBuf called with bad datatype - ignoring Pack call!\n", FALSE);
Debug("mbuf->datatype = %d, datatype = %d\n",
mbuf->datatype, datatype );
return(0);
}
if(mbuf->count <= 0)
{
mbuf->datatype = datatype;
mbuf->datasize = _MPI_SizeDataType(datatype);
#ifdef PVM_BUF
/* Free and create a new buffer */
if(mbuf->bufid >= 0)
pvm_freebuf(mbuf->bufid);
mbuf->bufid = pvm_mkbuf(PvmDataDefault);
pvm_setsbuf(mbuf->bufid);
#endif
}
#ifdef PVM_BUF /* Set our pvm-buffer! */
if(pvm_getsbuf() != mbuf->bufid)
{
pvm_setsbuf(mbuf->bufid);
}
#endif
#ifdef PVM_BUF
PVM_Pack(buf, count, datatype);
#else
/* Check for enough room in buffer */
if(mbuf->nalloc < (mbuf->count + count) * mbuf->datasize)
{
/* Enlarge send buffer */
SizeMsgBuf(mbuf, MAX(mbuf->nalloc+BUF_ALLOC, (mbuf->count + count) * mbuf->datasize) );
}
/* Copy the data over */
memcpy(&mbuf->buf[mbuf->count * mbuf->datasize], buf, count*mbuf->datasize);
#endif
mbuf->count += count;
return(count);
}
#ifdef NB_SEND
PNBSend NBFree=NULL;
PNBSend NBPending = NULL;
#endif
/* Actually send the buffer */
int
SendBuf(PMsgBuf mbuf, int *destarray, int destcount, int tag, MPI_Comm comm)
{
int i,ret;
char errmsg[120];
#ifdef NB_SEND
PNBSend nbs;
#endif
if(!mbuf)
mbuf = GSendBuf;
if(!mbuf->is_send)
{
ReportError("SendBuf called on Receive type buffer!- ignored\n", FALSE);
return(0);
}
if(mbuf->count <= 0)
{
sprintf(errmsg,"Send with count=0?: count=%d type=%d dest=%d tag=%d\n",
mbuf->count, mbuf->datatype, destarray[0], tag);
ReportError(errmsg, TRUE);
}
/* Code to manage non-blocking sends */
#ifdef NB_SEND
for(i=0; i<destcount; i++)
{
nbs=GetNBBuf(mbuf->buf, mbuf->count, mbuf->datatype);
ret=MPI_Isend(nbs->buf, mbuf->count, mbuf->datatype, destarray[i], tag,
comm,&nbs->request);
if(ret != MPI_SUCCESS)
{
sprintf(errmsg,"Send failed: count=%d type=%d dest=%d tag=%d\n",
mbuf->count, mbuf->datatype, destarray[i], tag);
ReportError(errmsg, TRUE);
}
}
#else /* NOT NB_SEND */
#ifdef PGON_MSG
/* Optimization - take advantage of the PGON multicast operation
* by using the mult-send features of the PGON interface */
ret= PGON_Send(mbuf->buf, mbuf->count, mbuf->datatype, destarray, destcount, tag);
#else
#ifndef PVM
/* Under normal MPI - do multiple sends */
for(i=0; i<destcount; i++)
{
ret=MPI_Send(mbuf->buf,mbuf->count,mbuf->datatype, destarray[i], tag, comm);
if(ret != MPI_SUCCESS)
{
sprintf(errmsg,"Send failed: count=%d type=%d dest=%d tag=%d\n",
mbuf->count, mbuf->datatype, destarray[i], tag);
ReportError(errmsg, TRUE);
}
}
#else
/* Optimization - take advantage of the PVM multicast operation
* by using the mult-send features of the PVM interface */
#ifdef TRACE
StartSendTrace(tag, mbuf->count, destcount);
#endif
#ifdef PVM_BUF
ret = PVM_Send(NULL, mbuf->bufid, mbuf->datatype, destarray, destcount, tag);
#else
ret= PVM_Send(mbuf->buf, mbuf->count, mbuf->datatype, destarray, destcount, tag);
#endif
#ifdef TRACE
StopSendTrace(tag);
#endif
#endif
#endif /* PGON_MSG */
#endif /* NOT NB_SEND */
#ifdef LAST_COMM
LastSendDest = destarray[0];
LastSendTag = tag;
#endif
mbuf->count = 0;
return(ret);
}
#ifdef NB_SEND
void
FlushAllPending()
{
PNBSend p;
MPI_Status status;
int count=0;
for(p = NBPending; p!= NULL; p=p->next)
{
#ifdef PGON
MPI_Wait(&p->request, &status);
#endif
++count;
}
Debug("Count of leftover sends is %d\n", count);
}
PNBSend
GetNBBuf(void *buf, int count, MPI_Datatype datatype)
{
int n;
PNBSend p,last;
MPI_Status status;
n = count * _MPI_SizeDataType(datatype);
if(NBPending)
{
/* Free the pending requests */
last = NULL;
for(p=NBPending; p!= NULL; p = p->next)
{
if(MPI_Test(&p->request, &status))
{
/* Remove it from our list */
if(last)
last->next = p->next;
else
NBPending = p->next;
/* Add it to the free list */
p->next = NBFree;
NBFree = p;
/* Free the buffer if required */
if(p->buf != p->smallbuf)
Delete(p->buf);
}
else
last = p;
}
}
/* Now find a free request */
if(NBFree)
{
p = NBFree;
NBFree = p->next;
}
else
p = (PNBSend) New(sizeof(NBSend));
/* Create our buffer */
if(n > NB_BUF_SIZE)
{
p->buf = New(n);
}
else
p->buf = p->smallbuf;
/* Copy it */
memcpy(p->buf, buf, n);
/* Put in the pending list */
p->next = NBPending;
NBPending = p;
/* Return it */
return(p);
}
#endif
/* Convinience function */
int
PackString(PMsgBuf mbuf, char *s)
{
return(PackBuf(mbuf, s, strlen(s)+1, MPI_CHAR));
}
int
UnPackBuf(PMsgBuf mbuf, void *buf, int count)
{
MPI_Status status;
if(!mbuf)
mbuf = GRecvBuf;
if(mbuf->is_send)
{
ReportError("FATAL: UnpackBuf called on Send type buffer!\n", TRUE);
return(0);
}
#ifdef PVM_BUF
/* Set the buffer under PVM */
if(pvm_getrbuf() != mbuf->bufid)
{
pvm_setrbuf(mbuf->bufid);
}
#endif
if(mbuf->count-mbuf->nused <= 0)
{
ReportError("FATAL: UnPackBuf called on empty buffer!\n", TRUE);
return(0);
}
count = MIN(mbuf->count-mbuf->nused, count);
#ifdef PVM_BUF
PVM_Unpack(mbuf->bufid, buf, count, mbuf->datatype, Rank, &status);
#else
memcpy(buf, &mbuf->buf[mbuf->nused * mbuf->datasize], count*mbuf->datasize);
#endif
mbuf->nused += count;
return(count);
}
int
RecvBuf(PMsgBuf mbuf, int count, MPI_Datatype datatype,
int source, int tag, MPI_Comm comm, MPI_Status *status)
{
char errmsg[120];
if(!mbuf)
mbuf = GRecvBuf;
if(mbuf->is_send)
{
ReportError("RecvBuf called on Send type buffer!- ignored\n", FALSE);
return(0);
}
if(count <= 0)
{
sprintf(errmsg,"Recv with count=0?: count=%d type=%d dest=%d tag=%d\n",
count, datatype, source, tag);
ReportError(errmsg, TRUE);
}
mbuf->datasize = _MPI_SizeDataType(datatype);
mbuf->datatype = datatype;
#ifdef LAST_COMM
LastRecvSource = source;
LastRecvTag = tag;
#endif
#ifdef PVM_BUF
/* Do the receive from here */
PVM_Receive(NULL, count, datatype, source, tag, status, &mbuf->bufid);
#else
if(mbuf->datasize * count > mbuf->nalloc)
{
/* Need bigger buffer */
SizeMsgBuf(mbuf, MAX(mbuf->nalloc + BUF_ALLOC, count * mbuf->datasize));
}
/* Actually do a receive */
if(MPI_Recv(mbuf->buf, count, datatype, source, tag, comm, status) != MPI_SUCCESS)
{
sprintf(errmsg,"Recv Failed: count=%d type=%d dest=%d tag=%d\n",
count, datatype, source, tag);
ReportError(errmsg, TRUE);
}
#endif
MPI_Get_count(status, datatype,&mbuf->count);
mbuf->nused = 0;
return(mbuf->count);
}
int
UnPackString(PMsgBuf mbuf, char *string, int max_len)
{
int i;
if(!mbuf)
mbuf = GRecvBuf;
if(mbuf->is_send)
{
ReportError("UnpackBuf called on Send type buffer!- ignored\n", FALSE);
return(0);
}
for(i=mbuf->nused; i<mbuf->nused+max_len && i<mbuf->count && mbuf->buf[i] != NULLC; i++)
*string++ = mbuf->buf[i];
/* Increment by one if we have used NULLC character */
if(i<mbuf->count && mbuf->buf[i] == NULLC)
i++;
*string = NULLC;
mbuf->nused+= i;
return(i); /* Return the string length */
}
#if defined(MYMPI)
int
_MPI_SizeDataType(MPI_Datatype datatype)
{
switch(datatype)
{
default:
case MPI_INT:
return(sizeof(int));
case MPI_UNSIGNED:
return(sizeof(unsigned));
case MPI_SHORT:
return(sizeof(short));
case MPI_UNSIGNED_SHORT:
return(sizeof(unsigned short));
case MPI_LONG:
return(sizeof(long));
case MPI_UNSIGNED_LONG:
return(sizeof(unsigned long));
case MPI_BYTE:
case MPI_CHAR:
return(sizeof(char));
case MPI_DOUBLE:
return(sizeof(double));
case MPI_FLOAT:
return(sizeof(float));
}
}
#endif
int
ParseSpawnArg(int argc, char **argv)
{
int i,n=1;
for(i=1; i<argc; i++)
{
if(*argv[i] == '-' && isdigit(argv[i][1]))
n = MAX(n, atoi(&argv[i][1]));
if(strncmp(argv[i],"-s",2) == 0)
{
/* Found -s spawn argument */
if(strlen(argv[i]) == 2) /* Number is in next space */
{
if(++i >= argc)
{
ReportError("Error: -s argument should be followed by number to spawn.\n",FALSE);
ReportError("Defaulting to one task.\n",FALSE);
}
else
n=MAX(1,atoi(argv[i]));
}
else
n=MAX(1,atoi(&argv[i][2]));
}
}
return(n);
}
/* Extension - to print a single string at a location */
void
OnePrintString(char *str)
{
int rank,flag,nproc;
int count;
MPI_Status status;
char s[1024];
static int out;
flag = TRUE;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &nproc);
#ifdef PRINT_FROM_ROOT
if(rank != 0)
{
/* Write to the message */
sprintf(s,"[%2d] %s", rank,str);
MPI_Send(s, strlen(s)+1, MPI_CHAR, 0, STRING_TAG, MPI_COMM_WORLD);
#ifdef SPLIT_LOG
PrintToLog(s,rank);
#endif
return;
}
else
{
/* Print stuff out */
while(flag && nproc > 1)
{
MPI_Iprobe(MPI_ANY_SOURCE, STRING_TAG, MPI_COMM_WORLD, &flag,
&status);
if(!flag)
break;
MPI_Recv(s, 1023,MPI_CHAR, MPI_ANY_SOURCE,STRING_TAG,MPI_COMM_WORLD, &status);
Debug("HI BOB");
MPI_Get_count(&status, MPI_CHAR, &count);
s[count] = NULLC;
#ifndef SPLIT_LOG
PrintToLog(s,rank);
#endif
if(!Quiet)
printf("%s",s);
}
}
#endif
sprintf(s,"[%2d] %s", rank,str);
PrintToLog(s,rank);
if(!Quiet)
printf("%s", s);
}
void
PrintToLog(char *s, int rank)
{
#if defined(PRINT_FROM_ROOT) && !defined(SPLIT_LOG)
if(rank != 0)
return;
#endif
if(LogFile[0] == NULLC)
return;
#ifndef PRINT_FROM_ROOT
/* May need to open file each time to avoid concurrent writes */
if(OutFile <= 0)
OutFile = open(LogFile, O_APPEND | O_WRONLY, 0440);
#endif
if(OutFile > 0)
{
write(OutFile, s, strlen(s));
}
}
void
GetCurDirectory(char *dir)
{
MPI_Status status;
/* Use the proc 0 directory as the master */
if(Rank == 0)
{
getcwd(dir, 120);
if(NProc > 1)
{
PackString(NULL,dir);
SendBuf(NULL,&AllDest[1], NProc-1, CUR_DIR_TAG, Comm);
}
}
else
{
{
char errmsg[120];
sprintf(errmsg,"CALL RECVBUF: count=%d type=%d dest=%d tag=%d\n",
120, MPI_CHAR, 0, CUR_DIR_TAG);
ReportError(errmsg, FALSE);
}
RecvBuf(NULL,120, MPI_CHAR, 0, CUR_DIR_TAG, Comm, &status);
UnPackString(NULL,dir, 120);
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -