⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 send.c

📁 Parallel programming/Lou Baker, Bradley J.Smith .—New York:McGraw-Hill Book Co.
💻 C
字号:
/* PSPH - Parallel SPH program
 * Bradley Smith, and Lou Baker, Dagonet Software
 */
static char sccs_id[] = "@(#) /home2/bsmith/src/psph/SCCS/s.send.c 1.27 94/03/24";

/*
   Buffered send and receive functions
 */

#include "all.h"

/* Global variables */
int Rank, NProc;
MPI_Comm Comm;
int *AllDest;
int *AllDestButMe;
int Quiet=FALSE;

char LogFile[256];
static int OutFile;

#ifdef LAST_COMM
/* Record status of last communication and report it on break */
int LastSendDest, LastSendTag;
int LastRecvSource, LastRecvTag;
#endif

/* Global message buffers */
PMsgBuf GSendBuf, GRecvBuf;

void
ExitMPISystem()
{
#ifdef NB_SEND
 FlushAllPending();
#endif
 MPI_Finalize();
}

void
InitMPISystem(int argc, char **argv, char *logfile, int quiet)
{
 int i,j=0;
 char curdir[256];
 Quiet = quiet;

  fprintf(stderr, "Hello World\n");
  {
    int i;
    for(i=0; i<argc; i++) {
      fprintf(stderr, "i=%d argv=%s\n", i, argv[i]);
    }
  }
  fprintf(stderr, "Hello World\n");
 if(MPI_Init(&argc,&argv) != MPI_SUCCESS)
	ReportError("Can't initialize MPI!\n", TRUE);
 Comm = MPI_COMM_WORLD;
 MPI_Comm_size(Comm, &NProc);
 MPI_Comm_rank(Comm, &Rank);

 /* Set up an array that is all destinations */
 AllDest = (int *) New(sizeof(int)*NProc);
 AllDestButMe = (int *) New(sizeof(int)*NProc);
 j=0;
 for(i=0; i<NProc; i++)
	{
 	AllDest[i] = i;
	if(i != Rank)
		AllDestButMe[j++] = i;
	}

 /* Allocate our own send and receive buffers - global */
 GSendBuf = NewMsgBuf(SEND_BUF, BUF_ALLOC);
 GRecvBuf = NewMsgBuf(RECV_BUF, BUF_ALLOC);

 if(logfile)
	{
	GetCurDirectory(curdir);
	sprintf(LogFile, "%s/%s", curdir, logfile);
	OutFile = -1;
#ifndef SPLIT_LOG
	if(Rank == 0 )
		{
		if(access(logfile, 0) == 0)
			unlink(logfile);
		OutFile = open(logfile, O_APPEND | O_CREAT | O_WRONLY, 0440);
		}
#endif
#ifdef SPLIT_LOG
	sprintf(LogFile, "%s/%s.%d", curdir, logfile, Rank);
	OutFile = open(LogFile, O_CREAT | O_WRONLY, 0440);
#endif
	}
 else
	LogFile[0] = NULLC;
}

PMsgBuf 
NewMsgBuf(BOOL is_send, int nalloc)
{
 PMsgBuf mbuf;
 mbuf = (PMsgBuf) New(sizeof(MsgBuf));
 mbuf->count = mbuf->nused = 0;
 mbuf->nalloc = 0;
 mbuf->is_send = is_send;
 mbuf->datatype = MPI_INT;
 mbuf->datasize = sizeof(int);
 mbuf->buf = NULL;
#ifndef PVM_BUF
 SizeMsgBuf(mbuf, nalloc);
#else
 mbuf->bufid = -1;
#endif
 return(mbuf);
}

void
SizeMsgBuf(PMsgBuf mbuf, int nalloc)
{
 if(mbuf->nalloc <= 0)
	{
	mbuf->nalloc = nalloc;
	mbuf->buf = (char *) New(nalloc);
	return;
	}
 mbuf->buf = (char *) ReAlloc(mbuf->buf, nalloc);
 mbuf->nalloc = nalloc;
}

void
DeleteMsgBuf(PMsgBuf mbuf)
{
#ifndef PVM_BUF
 if(mbuf->nalloc > 0)
	Delete(mbuf->buf);
#else
 if(mbuf->bufid >= 0)
	pvm_freebuf(mbuf->bufid);
#endif
 Delete(mbuf);
}
	

int
PackBuf(PMsgBuf mbuf, void *buf, int count, MPI_Datatype datatype)
{

 if(!mbuf)
	mbuf = GSendBuf;
 if(!mbuf->is_send)
	{
	ReportError("Packbuf called on Receive type buffer!- ignored\n", FALSE);
	return(0);
	}
 if(mbuf->count > 0 && mbuf->datatype != datatype)
    	{
		ReportError("PackBuf called with bad datatype - ignoring Pack call!\n", FALSE);
		Debug("mbuf->datatype = %d, datatype = %d\n",
                            mbuf->datatype, datatype );
		return(0);
		}

 if(mbuf->count <= 0)
	{
 	mbuf->datatype = datatype;
 	mbuf->datasize = _MPI_SizeDataType(datatype);
#ifdef PVM_BUF
	/* Free and create a new buffer */
	if(mbuf->bufid >= 0)
		pvm_freebuf(mbuf->bufid);
	mbuf->bufid = pvm_mkbuf(PvmDataDefault);
	pvm_setsbuf(mbuf->bufid);
#endif
	}

#ifdef PVM_BUF /* Set our pvm-buffer! */
 if(pvm_getsbuf() != mbuf->bufid)
	{
	pvm_setsbuf(mbuf->bufid);
	}
#endif

#ifdef PVM_BUF
 PVM_Pack(buf, count, datatype); 
#else
 /* Check for enough room in buffer */
 if(mbuf->nalloc < (mbuf->count + count) * mbuf->datasize)
	{
	/* Enlarge send buffer */
	SizeMsgBuf(mbuf, MAX(mbuf->nalloc+BUF_ALLOC, (mbuf->count + count) * mbuf->datasize) );
	}
 /* Copy the data over */
 memcpy(&mbuf->buf[mbuf->count * mbuf->datasize], buf, count*mbuf->datasize);
#endif
 mbuf->count += count;
 return(count);
}

#ifdef NB_SEND
PNBSend NBFree=NULL;
PNBSend NBPending = NULL;
#endif

/* Actually send the buffer */
int
SendBuf(PMsgBuf mbuf, int *destarray, int destcount, int tag, MPI_Comm comm)
{
 int i,ret;
 char errmsg[120];
#ifdef NB_SEND
 PNBSend nbs;
#endif
 if(!mbuf)
	mbuf = GSendBuf;
 if(!mbuf->is_send)
	{
	ReportError("SendBuf called on Receive type buffer!- ignored\n", FALSE);
	return(0);
	}
 if(mbuf->count <= 0)
		{
		sprintf(errmsg,"Send with count=0?: count=%d type=%d dest=%d tag=%d\n",
			mbuf->count, mbuf->datatype, destarray[0], tag); 
		ReportError(errmsg, TRUE);
		}

/* Code to manage non-blocking sends */
#ifdef NB_SEND
 for(i=0; i<destcount; i++)
	{
 	nbs=GetNBBuf(mbuf->buf, mbuf->count, mbuf->datatype);
 	ret=MPI_Isend(nbs->buf, mbuf->count, mbuf->datatype, destarray[i], tag,
		comm,&nbs->request);
 	if(ret != MPI_SUCCESS)
		{
		sprintf(errmsg,"Send failed: count=%d type=%d dest=%d tag=%d\n",
			mbuf->count, mbuf->datatype, destarray[i], tag); 
		ReportError(errmsg, TRUE);
		}
	}
#else /* NOT NB_SEND */
#ifdef PGON_MSG
 /* Optimization - take advantage of the PGON multicast operation 
  * by using the mult-send features of the PGON interface */
 ret= PGON_Send(mbuf->buf, mbuf->count, mbuf->datatype, destarray, destcount, tag);
#else
#ifndef PVM
 /* Under normal MPI - do multiple sends */
 for(i=0; i<destcount; i++)
	{
	ret=MPI_Send(mbuf->buf,mbuf->count,mbuf->datatype, destarray[i], tag, comm);
 	if(ret != MPI_SUCCESS)
		{
		sprintf(errmsg,"Send failed: count=%d type=%d dest=%d tag=%d\n",
			mbuf->count, mbuf->datatype, destarray[i], tag); 
		ReportError(errmsg, TRUE);
		}
	}
#else
 /* Optimization - take advantage of the PVM multicast operation 
  * by using the mult-send features of the PVM interface */
#ifdef TRACE
 StartSendTrace(tag, mbuf->count, destcount);
#endif

#ifdef PVM_BUF
 ret = PVM_Send(NULL, mbuf->bufid, mbuf->datatype, destarray, destcount, tag);
#else
 ret= PVM_Send(mbuf->buf, mbuf->count, mbuf->datatype, destarray, destcount, tag);
#endif

#ifdef TRACE
 StopSendTrace(tag);
#endif
#endif
#endif /* PGON_MSG */
#endif /* NOT NB_SEND */
#ifdef LAST_COMM
 LastSendDest = destarray[0];
 LastSendTag = tag;
#endif

 mbuf->count = 0;
 return(ret);
}

#ifdef NB_SEND
void
FlushAllPending()
{
 PNBSend p;
 MPI_Status status;
 int count=0;

 for(p = NBPending; p!= NULL; p=p->next)
	{
#ifdef PGON
	MPI_Wait(&p->request, &status);
#endif
	++count;
	}
 Debug("Count of leftover sends is %d\n", count);
}

PNBSend 
GetNBBuf(void *buf, int count, MPI_Datatype datatype)
{
 int n;
 PNBSend p,last;
 MPI_Status status;

 n = count * _MPI_SizeDataType(datatype);

 if(NBPending)
 	{
	/* Free the pending requests */
	last = NULL;
	for(p=NBPending; p!= NULL; p = p->next)
		{
		if(MPI_Test(&p->request, &status))
			{
			/* Remove it from our list */
			if(last)
				last->next = p->next;
			else
				NBPending = p->next;
			/* Add it to the free list */
			p->next = NBFree;
			NBFree = p;
			/* Free the buffer if required */
			if(p->buf != p->smallbuf)
				Delete(p->buf);
			}
		else
			last = p;
		}
	}

 /* Now find a free request */
 if(NBFree)
	{
	p = NBFree;
	NBFree = p->next;
	}
 else
	p = (PNBSend) New(sizeof(NBSend));

 /* Create our buffer */
 if(n > NB_BUF_SIZE)
	{
	p->buf = New(n);
	}
 else
	p->buf = p->smallbuf;

 /* Copy it */
 memcpy(p->buf, buf, n);

 /* Put in the pending list */
 p->next = NBPending;
 NBPending = p;

 /* Return it */
 return(p);
}
#endif


/* Convinience function */
int
PackString(PMsgBuf mbuf, char *s)
{
 return(PackBuf(mbuf, s, strlen(s)+1, MPI_CHAR));
}

int
UnPackBuf(PMsgBuf mbuf, void *buf, int count)
{
 MPI_Status status;
 if(!mbuf)
	mbuf = GRecvBuf;
 if(mbuf->is_send)
	{
	ReportError("FATAL: UnpackBuf called on Send type buffer!\n", TRUE);
	return(0);
	}

#ifdef PVM_BUF
 /* Set the buffer under PVM */
 if(pvm_getrbuf() != mbuf->bufid)
	{
	pvm_setrbuf(mbuf->bufid);
	}
#endif

 if(mbuf->count-mbuf->nused <= 0)
	{
	ReportError("FATAL: UnPackBuf called on empty buffer!\n", TRUE);
	return(0);
	}

 count = MIN(mbuf->count-mbuf->nused, count);

#ifdef PVM_BUF
 PVM_Unpack(mbuf->bufid, buf, count, mbuf->datatype, Rank, &status);
#else
 memcpy(buf, &mbuf->buf[mbuf->nused * mbuf->datasize], count*mbuf->datasize);
#endif

 mbuf->nused += count;
 return(count);
}

int
RecvBuf(PMsgBuf mbuf, int count, MPI_Datatype datatype,
	int source, int tag, MPI_Comm comm, MPI_Status *status)
{
 char errmsg[120];

 if(!mbuf)
	mbuf = GRecvBuf;
 if(mbuf->is_send)
	{
	ReportError("RecvBuf called on Send type buffer!- ignored\n", FALSE);
	return(0);
	}
 if(count <= 0)
		{
		sprintf(errmsg,"Recv with count=0?: count=%d type=%d dest=%d tag=%d\n",
			count, datatype, source, tag); 
		ReportError(errmsg, TRUE);
		}
 mbuf->datasize = _MPI_SizeDataType(datatype);
 mbuf->datatype = datatype;
#ifdef LAST_COMM
 LastRecvSource = source;
 LastRecvTag = tag;
#endif

#ifdef PVM_BUF
 /* Do the receive from here */
 PVM_Receive(NULL, count, datatype, source, tag, status, &mbuf->bufid);
#else
 if(mbuf->datasize * count > mbuf->nalloc)
	{
	/* Need bigger buffer */
	SizeMsgBuf(mbuf, MAX(mbuf->nalloc + BUF_ALLOC, count * mbuf->datasize));
	}

 /* Actually do a receive */
 if(MPI_Recv(mbuf->buf, count, datatype, source, tag, comm, status) != MPI_SUCCESS)
		{
		sprintf(errmsg,"Recv Failed: count=%d type=%d dest=%d tag=%d\n",
			count, datatype, source, tag); 
		ReportError(errmsg, TRUE);
		}
#endif
 MPI_Get_count(status, datatype,&mbuf->count);
 mbuf->nused = 0;

 return(mbuf->count);
}

int
UnPackString(PMsgBuf mbuf, char *string, int max_len)
{
 int i;
 if(!mbuf)
	mbuf = GRecvBuf;
 if(mbuf->is_send)
	{
	ReportError("UnpackBuf called on Send type buffer!- ignored\n", FALSE);
	return(0);
	}

 for(i=mbuf->nused; i<mbuf->nused+max_len && i<mbuf->count && mbuf->buf[i] != NULLC; i++)
	*string++ = mbuf->buf[i];

 /* Increment by one if we have used NULLC character */
 if(i<mbuf->count && mbuf->buf[i] == NULLC)
	i++;
 *string = NULLC;
 mbuf->nused+= i;

 return(i); /* Return the string length */
}


#if defined(MYMPI)
int
_MPI_SizeDataType(MPI_Datatype datatype)
{
switch(datatype)
	{
	default:
	case MPI_INT:
    	return(sizeof(int));
	case MPI_UNSIGNED:
    	return(sizeof(unsigned));
	case MPI_SHORT:
    	return(sizeof(short));
	case MPI_UNSIGNED_SHORT:
		return(sizeof(unsigned short));
	case MPI_LONG:
    	return(sizeof(long));
	case MPI_UNSIGNED_LONG:
		return(sizeof(unsigned long));
	case MPI_BYTE:
	case MPI_CHAR:
		return(sizeof(char));
	case MPI_DOUBLE:
		return(sizeof(double));
	case MPI_FLOAT:
		return(sizeof(float));
	}
}
#endif

int
ParseSpawnArg(int argc, char **argv)
{
 int i,n=1;

 for(i=1; i<argc; i++)
	{
	if(*argv[i] == '-' && isdigit(argv[i][1]))
		n = MAX(n, atoi(&argv[i][1]));
	if(strncmp(argv[i],"-s",2) == 0)
		{
		/* Found -s spawn argument */
		if(strlen(argv[i]) == 2)	/* Number is in next space */
			{
			if(++i >= argc)
				{
				ReportError("Error: -s argument should be followed by number to spawn.\n",FALSE);
				ReportError("Defaulting to one task.\n",FALSE);
				}
			else
				n=MAX(1,atoi(argv[i]));
			}
		else
			n=MAX(1,atoi(&argv[i][2]));
		}
	}
 return(n);
}


/* Extension - to print a single string at a location */
void
OnePrintString(char *str)
{
 int rank,flag,nproc;
 int count;
 MPI_Status status;
 char s[1024];
 static int out;

 flag = TRUE;
 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
 MPI_Comm_size(MPI_COMM_WORLD, &nproc);

#ifdef PRINT_FROM_ROOT
 if(rank != 0)
	{
 	/* Write to the message */
 	sprintf(s,"[%2d] %s", rank,str); 
 	MPI_Send(s, strlen(s)+1, MPI_CHAR, 0, STRING_TAG, MPI_COMM_WORLD);
#ifdef SPLIT_LOG
	PrintToLog(s,rank);
#endif
	return;
	}
 else
	{
	/* Print stuff out */
	while(flag && nproc > 1)
		{
		MPI_Iprobe(MPI_ANY_SOURCE, STRING_TAG, MPI_COMM_WORLD, &flag,
			&status);
		if(!flag)
			break;
		MPI_Recv(s, 1023,MPI_CHAR, MPI_ANY_SOURCE,STRING_TAG,MPI_COMM_WORLD, &status);
Debug("HI BOB");
		MPI_Get_count(&status, MPI_CHAR, &count);
		s[count] = NULLC;
#ifndef SPLIT_LOG
		PrintToLog(s,rank);
#endif
		if(!Quiet)
			printf("%s",s);
		}
	}
#endif
 sprintf(s,"[%2d] %s", rank,str); 
 PrintToLog(s,rank);
 if(!Quiet)
	printf("%s", s);
}

void
PrintToLog(char *s, int rank)
{
#if defined(PRINT_FROM_ROOT) && !defined(SPLIT_LOG)
 if(rank != 0)
	return;
#endif

 if(LogFile[0] == NULLC)
	return;
#ifndef PRINT_FROM_ROOT
 /* May need to open file each time to avoid concurrent writes */
 if(OutFile <= 0)
	OutFile = open(LogFile, O_APPEND | O_WRONLY, 0440);
#endif
 if(OutFile > 0)
	{
	write(OutFile, s, strlen(s));
 	}
}

void
GetCurDirectory(char *dir)
{
 MPI_Status status;

 /* Use the proc 0 directory as the master */
 if(Rank == 0)
        {
        getcwd(dir, 120);
        if(NProc > 1)
                {
                PackString(NULL,dir);
        SendBuf(NULL,&AllDest[1], NProc-1, CUR_DIR_TAG, Comm);
                }
        }
 else
        {

{
 char errmsg[120];
sprintf(errmsg,"CALL RECVBUF: count=%d type=%d dest=%d tag=%d\n",
        120, MPI_CHAR, 0, CUR_DIR_TAG);
              ReportError(errmsg, FALSE);
}


        RecvBuf(NULL,120, MPI_CHAR, 0, CUR_DIR_TAG, Comm, &status);
        UnPackString(NULL,dir, 120);
        }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -