📄 fmmpi.c
字号:
/*---------------------------------------------------------------------------*//* FM-like interface for MPI communication. *//* Author(s): Kalyan Perumalla <http://www.cc.gatech.edu/~kalyan> 04Nov2003 *//* $Revision: 1.5 $ $Name: v26apr05 $ $Date: 2005/04/26 14:46:39 $ *//*---------------------------------------------------------------------------*/#include <stdio.h>#include <stdlib.h>#include <string.h>#include "mycompat.h"#include "fmmpi.h"#if MPI_AVAILABLE #include <mpi.h>#else #define MPI_SUCCESS 0 #define MPI_ERROR 1 #define MPI_Init(a,b) (void)0 #define MPI_Finalize() (void)0 #define MPI_Comm_rank(a,b) (void)0 #define MPI_Comm_size(a,b) (void)0 #define MPI_Buffer_attach(a,b) (MPI_ERROR) #define MPI_Pack(a,b,c,d,e,f,g) (MPI_ERROR) #define MPI_Send(a,b,c,d,e,f) (MPI_ERROR) #define MPI_Bsend(a,b,c,d,e,f) (MPI_ERROR) #define MPI_Unpack(a,b,c,d,e,f,g) (MPI_ERROR) #define MPI_Recv(a,b,c,d,e,f,g) (MPI_ERROR) #define MPI_Iprobe(a,b,c,d,e) (MPI_ERROR) #define MPI_Probe(a,b,c,d,e) (MPI_ERROR) #define MPI_Barrier(a) (MPI_ERROR) #define MPI_BYTE 0 #define MPI_PACKED 0 #define MPI_COMM_WORLD 0 #define MPI_ANY_SOURCE 0 #define MPI_BSEND_OVERHEAD 0 #ifndef SSIZE_MAX #define SSIZE_MAX 32767 #endif typedef struct { int MPI_SOURCE; int MPI_TAG; } MPI_Status;#endif/*---------------------------------------------------------------------------*/typedef struct { void *iov_base; int iov_len; } IOVEC;/*---------------------------------------------------------------------------*/static int mpifmdbg = 0;/*---------------------------------------------------------------------------*/typedef struct{ int src_id; /*Original sender's caller-specific ID*/ int dest_id; /*Original receiver's caller-specific ID*/ int src_pe; int dest_pe; int handler; int npieces; /*Including this header piece; hence always >=1*/ int piecelen[FMMPIMAXPIECES]; /*Byte length of each piece*/ int totbytes;/*#bytes in all pieces combined; hence always >= sizeof(hdr)*/} FMMPIMsgHeaderPiece;/*---------------------------------------------------------------------------*/typedef struct{ IOVEC pieces[FMMPIMAXPIECES]; FMMPIMsgHeaderPiece hdr;} FMMPISendMsg;/*---------------------------------------------------------------------------*/typedef struct{ FMMPIMsgHeaderPiece hdr; int npieces_recd; int nbytes_recd; int position;} FMMPIRecvMsg;/*---------------------------------------------------------------------------*/typedef struct{ int inited; int nodeid; int numnodes; int msg_tag; FMMPISendMsg send_msg; FMMPIRecvMsg recv_msg; #define BLOAT_FACTOR 2 #define MAXBUFLEN BLOAT_FACTOR*FMMPIMAXDATALEN #define MAXPENDINGMSGS 100000L /*CUSTOMIZE*/ char attachbuf[MAXPENDINGMSGS * (MAXBUFLEN + MPI_BSEND_OVERHEAD)]; char packbuf[MAXBUFLEN]; char unpackbuf[MAXBUFLEN];} FMMPIState;/*---------------------------------------------------------------------------*/int FMMPI_nodeid;int FMMPI_numnodes;static FMMPICallback *fmcb = 0;static FMMPIState *mpi = 0;/*---------------------------------------------------------------------------*//* *//*---------------------------------------------------------------------------*/void FMMPI_pre_initialize(int *pac, char ***pav){ if( !mpi || !mpi->inited ) { int rank = -1, size = 0; char *estr = getenv("FMMPI_DEBUG"); mpifmdbg = estr ? atoi(estr) : 1;if(mpifmdbg>=1){printf("FMMPI_DEBUG=%d\n",mpifmdbg);fflush(stdout);}if(mpifmdbg>=0){printf("FMMPI_pre_init() started.\n");fflush(stdout);} MPI_Init( pac, pav ); MPI_Comm_rank( MPI_COMM_WORLD, &rank ); MPI_Comm_size( MPI_COMM_WORLD, &size ); mpi = (FMMPIState*)calloc(1,sizeof(FMMPIState)); MYASSERT( mpi, ("Need %d bytes",sizeof(FMMPIState)) ); mpi->inited = 1; mpi->nodeid = FMMPI_nodeid = rank; mpi->numnodes = FMMPI_numnodes = size; mpi->msg_tag = 123; MPI_Buffer_attach( mpi->attachbuf, sizeof(mpi->attachbuf) );if(mpifmdbg>=0){printf( "FMMPI_nodeid=%d, FMMPI_numnodes=%d\n", FMMPI_nodeid, FMMPI_numnodes);fflush(stdout);} }}/*---------------------------------------------------------------------------*/void FMMPI_initialize( int nodeid, int numnodes, FMMPICallback *cb ){if(mpifmdbg>=0){printf("FMMPI_initialize() started.\n");fflush(stdout);} MYASSERT( cb, ("Message callback required") ); fmcb = cb; { int argc = 0; char *targv[2] = {"argv0",0}, **argv = targv; FMMPI_pre_initialize( &argc, &argv ); MYASSERT( nodeid == FMMPI_nodeid, ("nodeid=%d rank=%d",nodeid,FMMPI_nodeid) ); MYASSERT( numnodes == FMMPI_numnodes, ("numnodes=%d size=%d",numnodes,FMMPI_numnodes) ); } MYASSERT( 0 < mpi->numnodes && mpi->numnodes <= FMMPIMAXPE, ("#nodes %d must be in [1..%d]", mpi->numnodes, FMMPIMAXPE) ); MYASSERT( 0 <= mpi->nodeid && mpi->nodeid < mpi->numnodes, ("Node %d must be in [0..%d]", mpi->nodeid, mpi->numnodes-1) );if(mpifmdbg>=0){printf("%d:FMMPI joining barrier\n",FMMPI_nodeid);fflush(stdout);} MPI_Barrier( MPI_COMM_WORLD );if(mpifmdbg>=0){printf("%d:FMMPI done barrier\n",FMMPI_nodeid);fflush(stdout);}if(mpifmdbg>=0){printf("FMMPI_initialize() done.\n");fflush(stdout);}}/*---------------------------------------------------------------------------*/void FMMPI_finalize( void ){if(mpifmdbg>=1){printf("%d:FMMPI_finalize()\n",FMMPI_nodeid);fflush(stdout);} MPI_Finalize(); /*Does an implicit MPI_Buffer_detach()*/}/*---------------------------------------------------------------------------*/FMMPI_stream *FMMPI_begin_message( int recipient, int length, int handler, int src_id, int dest_id ){ FMMPISendMsg *msg = &mpi->send_msg; FMMPIMsgHeaderPiece *hdr = &msg->hdr; IOVEC *iov = &msg->pieces[0];if(mpifmdbg>=2){printf("%d:FMMPI_begin_message(to=%d, len=%d)\n",FMMPI_nodeid,recipient,length);fflush(stdout);} MYASSERT( 0 <= recipient && recipient < mpi->numnodes, ("Bad FMMPI recipient ID %d must be in [0..%d]", recipient, mpi->numnodes-1) ); MYASSERT( 0 <= length && length <= FMMPIMAXPIECELEN, ("Msg size %d must be <= %d", length, FMMPIMAXPIECELEN)); hdr->src_id = src_id; hdr->dest_id = dest_id; hdr->src_pe = mpi->nodeid; hdr->dest_pe = recipient; hdr->handler = handler; hdr->npieces = 1; hdr->piecelen[0] = sizeof(*hdr); hdr->totbytes = sizeof(*hdr); iov->iov_base = (char *)hdr; iov->iov_len = hdr->totbytes; return (FMMPI_stream *)msg;}/*---------------------------------------------------------------------------*/void FMMPI_send_piece( FMMPI_stream *sendstream, void *buffer, int length ){ FMMPISendMsg *msg = &mpi->send_msg; FMMPIMsgHeaderPiece *hdr = &msg->hdr;if(mpifmdbg>=2){printf("%d:FMMPI_send_piece(buf=%x,len=%d)\n",FMMPI_nodeid,buffer,length);fflush(stdout);} MYASSERT( sendstream == (FMMPI_stream *)msg, ("!") ); MYASSERT( buffer && 0 <= length && length <= (SSIZE_MAX-hdr->totbytes), ("buffer=%p, length = %d SSIZE_MAX=%d", buffer, length, SSIZE_MAX)); MYASSERT( hdr->npieces < FMMPIMAXPIECES, ("No. of pieces can't exceed compiled %d pieces",FMMPIMAXPIECES)); MYASSERT( length <= FMMPIMAXPIECELEN, ("Piecelen %d can't exceed compiled %d",length,FMMPIMAXPIECELEN)); MYASSERT( hdr->totbytes+length <= sizeof(mpi->packbuf), ("%d + %d <= %d", hdr->totbytes, length, sizeof(mpi->packbuf)) ); { int pn = hdr->npieces++; IOVEC *iov = &msg->pieces[pn]; iov->iov_base = buffer; iov->iov_len = length; hdr->piecelen[pn] = length; hdr->totbytes += length; }}/*---------------------------------------------------------------------------*/void FMMPI_end_message( FMMPI_stream *sendstream ){ int i = 0, position = 0, retcode = 0; FMMPISendMsg *msg = &mpi->send_msg; FMMPIMsgHeaderPiece *hdr = &msg->hdr; int to = hdr->dest_pe, nwritten = 0;if(mpifmdbg>=2){printf("%d:FMMPI_end_message()\n",FMMPI_nodeid);fflush(stdout);} MYASSERT( sendstream == (FMMPI_stream *)msg, ("!") ); MYASSERT( hdr->totbytes <= sizeof(mpi->packbuf), ("%d %d", hdr->totbytes, sizeof(mpi->packbuf)) ); for( i = 0; i < hdr->npieces; i++ ) { IOVEC *piece = &msg->pieces[i]; retcode = MPI_Pack( piece->iov_base, piece->iov_len, MPI_BYTE, (void*)mpi->packbuf, sizeof(mpi->packbuf), &position, MPI_COMM_WORLD ); MYASSERT( retcode == MPI_SUCCESS, ("MPI_Pack must succeed!") ); } retcode = MPI_Bsend( mpi->packbuf, position, MPI_PACKED, hdr->dest_pe, mpi->msg_tag, MPI_COMM_WORLD ); MYASSERT( retcode == MPI_SUCCESS, ("MPI_Send must succeed!") );}/*---------------------------------------------------------------------------*/int FMMPI_receive( void *buffer, FMMPI_stream *receivestream, unsigned int length ){ FMMPIRecvMsg *msg = &mpi->recv_msg; FMMPIMsgHeaderPiece *hdr = &msg->hdr; int pn = 0, retcode = 0; MYASSERT( receivestream == msg, ("cyclic check") ); pn = msg->npieces_recd++; if( pn > 0 ) { MYASSERT( pn < hdr->npieces, ("Only #%d pieces", hdr->npieces) ); MYASSERT( length == hdr->piecelen[pn], ("request len %d != sent len %d", length, hdr->piecelen[pn])); MYASSERT( length <= hdr->totbytes - msg->nbytes_recd, ("%d, %d, %d", length, hdr->totbytes, msg->nbytes_recd) ); } retcode = MPI_Unpack( mpi->unpackbuf, sizeof(mpi->unpackbuf), &msg->position, buffer, length, MPI_BYTE, MPI_COMM_WORLD ); MYASSERT( retcode == MPI_SUCCESS, ("MPI_Unpack must succeed!") ); msg->nbytes_recd += length; return length;}/*---------------------------------------------------------------------------*/static int recv_one_msg( void ){ MPI_Status status; int i = 0, nbytes = 0, nrecd = 0, retcode = 0, from_pe = -1; FMMPIRecvMsg *msg = &mpi->recv_msg; FMMPIMsgHeaderPiece temphdr, *hdr = &msg->hdr; msg->nbytes_recd = 0; msg->npieces_recd = 0; msg->position = 0;if(mpifmdbg>=2){printf("%d:MPI_Recv() started\n",FMMPI_nodeid);fflush(stdout);} retcode = MPI_Recv( mpi->unpackbuf, sizeof(mpi->unpackbuf), MPI_PACKED, MPI_ANY_SOURCE, mpi->msg_tag, MPI_COMM_WORLD, &status ); MYASSERT( retcode == MPI_SUCCESS, ("MPI_Recv must succeed!") ); from_pe = status.MPI_SOURCE;if(mpifmdbg>=2){printf("%d:MPI_Recv() got msg from %d tag %d\n",FMMPI_nodeid,from_pe,mpi->msg_tag);fflush(stdout);} hdr->src_id = -1; hdr->dest_id = -1; hdr->src_pe = from_pe; hdr->dest_pe = mpi->nodeid; hdr->handler = 0; hdr->npieces = 0; hdr->totbytes = 0; nrecd = FMMPI_receive( &temphdr, msg, sizeof(temphdr) ); if( nrecd <= 0 ) { /*Do nothing*/if(mpifmdbg>=2){printf("%d:MPI_Recv() nrecd=0!\n",FMMPI_nodeid);fflush(stdout);} MYASSERT( 0, ("NRECD=0") ); } else { MYASSERT( temphdr.src_pe == hdr->src_pe, ("src pes must agree: %d != %d", temphdr.src_pe, hdr->src_pe) ); MYASSERT( temphdr.dest_pe == hdr->dest_pe, ("dest pes must agree: %d != %d", temphdr.dest_pe, hdr->dest_pe) ); MYASSERT( temphdr.totbytes >= sizeof(*hdr), ("msg size: %d >= %d", temphdr.totbytes, sizeof(*hdr)) ); MYASSERT( temphdr.piecelen[0] == sizeof(*hdr), ("piecelen[0] %d != hdr size %d",temphdr.piecelen[0],sizeof(*hdr))); *hdr = temphdr; fmcb( hdr->handler, msg, hdr->src_pe, hdr->src_id, hdr->dest_id ); nrecd = 0; for( i = msg->npieces_recd; i < hdr->npieces; i++ ) { char temp_piece[FMMPIMAXPIECELEN]; int len = hdr->piecelen[i]; MYASSERT( len <= sizeof(temp_piece), ("%d %d",len,sizeof(temp_piece)) ); nrecd = FMMPI_receive( temp_piece, msg, len ); if( nrecd <= 0 ) break; } MYASSERT( nrecd <= 0 || msg->nbytes_recd == hdr->totbytes, ("%d, %d", msg->nbytes_recd, hdr->totbytes) ); nbytes = msg->nbytes_recd; } return nbytes;}/*---------------------------------------------------------------------------*/static int poll_all_once( unsigned int maxbytes ){ int nbytes = 0; while( nbytes < maxbytes ) { int ready = 0, retcode = 0; MPI_Status status; retcode = MPI_Iprobe( MPI_ANY_SOURCE, mpi->msg_tag, MPI_COMM_WORLD, &ready, &status ); MYASSERT( retcode == MPI_SUCCESS, ("MPI_Iprobe must succeed!") );if(mpifmdbg>=2){if(ready){printf("%d:MPI_Iprobe() ready=%d\n",FMMPI_nodeid,ready);fflush(stdout);}} if( !ready ) break; nbytes += recv_one_msg(); } return nbytes;}/*---------------------------------------------------------------------------*/int FMMPI_extract( unsigned int maxbytes ){ int nbytes = 0;if(mpifmdbg>=10){printf("%d:FMMPI_extract()\n",FMMPI_nodeid);fflush(stdout);}if( mpi->numnodes <= 1 ) return 0; while( nbytes < maxbytes ) { int m = poll_all_once( maxbytes-nbytes ); if( m <= 0 ) break; nbytes += m; } return nbytes;}/*---------------------------------------------------------------------------*/int FMMPI_numpieces( FMMPI_stream *mpi_stream ){ FMMPIRecvMsg *msg = &mpi->recv_msg; FMMPIMsgHeaderPiece *hdr = &msg->hdr; MYASSERT( mpi_stream == msg, ("!") ); return hdr->npieces-1;}/*---------------------------------------------------------------------------*/int FMMPI_piecelen( FMMPI_stream *mpi_stream, int i ){ FMMPIRecvMsg *msg = &mpi->recv_msg; FMMPIMsgHeaderPiece *hdr = &msg->hdr; MYASSERT( mpi_stream == msg, ("!") ); MYASSERT( 0<=i && i < hdr->npieces-1, ("Only #%d pieces", hdr->npieces-1) ); return hdr->piecelen[i+1];}/*---------------------------------------------------------------------------*/int FMMPI_debug_level( int level ){ int old = mpifmdbg; mpifmdbg = level; return old;}/*---------------------------------------------------------------------------*/
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -