⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 fmmpi.c

📁 基于linux环境的ns2多机并行仿真补丁
💻 C
字号:
/*---------------------------------------------------------------------------*//* FM-like interface for MPI communication.                                  *//* Author(s): Kalyan Perumalla <http://www.cc.gatech.edu/~kalyan> 04Nov2003  *//* $Revision: 1.5 $ $Name: v26apr05 $ $Date: 2005/04/26 14:46:39 $ *//*---------------------------------------------------------------------------*/#include <stdio.h>#include <stdlib.h>#include <string.h>#include "mycompat.h"#include "fmmpi.h"#if MPI_AVAILABLE    #include <mpi.h>#else    #define MPI_SUCCESS 0    #define MPI_ERROR 1    #define MPI_Init(a,b)      (void)0    #define MPI_Finalize()     (void)0    #define MPI_Comm_rank(a,b) (void)0    #define MPI_Comm_size(a,b) (void)0    #define MPI_Buffer_attach(a,b) (MPI_ERROR)    #define MPI_Pack(a,b,c,d,e,f,g) (MPI_ERROR)    #define MPI_Send(a,b,c,d,e,f) (MPI_ERROR)    #define MPI_Bsend(a,b,c,d,e,f) (MPI_ERROR)    #define MPI_Unpack(a,b,c,d,e,f,g) (MPI_ERROR)    #define MPI_Recv(a,b,c,d,e,f,g) (MPI_ERROR)    #define MPI_Iprobe(a,b,c,d,e) (MPI_ERROR)    #define MPI_Probe(a,b,c,d,e) (MPI_ERROR)    #define MPI_Barrier(a) (MPI_ERROR)    #define MPI_BYTE 0    #define MPI_PACKED 0    #define MPI_COMM_WORLD 0    #define MPI_ANY_SOURCE 0    #define MPI_BSEND_OVERHEAD 0    #ifndef SSIZE_MAX	#define SSIZE_MAX 32767    #endif    typedef struct { int MPI_SOURCE; int MPI_TAG; } MPI_Status;#endif/*---------------------------------------------------------------------------*/typedef struct { void *iov_base; int iov_len; } IOVEC;/*---------------------------------------------------------------------------*/static int mpifmdbg = 0;/*---------------------------------------------------------------------------*/typedef struct{    int src_id;  /*Original sender's caller-specific ID*/    int dest_id; /*Original receiver's caller-specific ID*/    int src_pe;    int dest_pe;    int handler;    int npieces; /*Including this header piece; hence always >=1*/    int piecelen[FMMPIMAXPIECES]; /*Byte length of each piece*/    int totbytes;/*#bytes in all pieces combined; hence always >= sizeof(hdr)*/} FMMPIMsgHeaderPiece;/*---------------------------------------------------------------------------*/typedef struct{    IOVEC pieces[FMMPIMAXPIECES];    FMMPIMsgHeaderPiece hdr;} FMMPISendMsg;/*---------------------------------------------------------------------------*/typedef struct{    FMMPIMsgHeaderPiece hdr;    int npieces_recd;    int nbytes_recd;    int position;} FMMPIRecvMsg;/*---------------------------------------------------------------------------*/typedef struct{    int inited;    int nodeid;    int numnodes;    int msg_tag;    FMMPISendMsg send_msg;    FMMPIRecvMsg recv_msg;    #define BLOAT_FACTOR 2    #define MAXBUFLEN BLOAT_FACTOR*FMMPIMAXDATALEN    #define MAXPENDINGMSGS 100000L /*CUSTOMIZE*/    char attachbuf[MAXPENDINGMSGS * (MAXBUFLEN + MPI_BSEND_OVERHEAD)];    char packbuf[MAXBUFLEN];    char unpackbuf[MAXBUFLEN];} FMMPIState;/*---------------------------------------------------------------------------*/int FMMPI_nodeid;int FMMPI_numnodes;static FMMPICallback *fmcb = 0;static FMMPIState *mpi = 0;/*---------------------------------------------------------------------------*//*                                                                           *//*---------------------------------------------------------------------------*/void FMMPI_pre_initialize(int *pac, char ***pav){    if( !mpi || !mpi->inited )    {        int rank = -1, size = 0;        char *estr = getenv("FMMPI_DEBUG");        mpifmdbg = estr ? atoi(estr) : 1;if(mpifmdbg>=1){printf("FMMPI_DEBUG=%d\n",mpifmdbg);fflush(stdout);}if(mpifmdbg>=0){printf("FMMPI_pre_init() started.\n");fflush(stdout);}        MPI_Init( pac, pav );        MPI_Comm_rank( MPI_COMM_WORLD, &rank );        MPI_Comm_size( MPI_COMM_WORLD, &size );        mpi = (FMMPIState*)calloc(1,sizeof(FMMPIState));		MYASSERT( mpi, ("Need %d bytes",sizeof(FMMPIState)) );		mpi->inited = 1;        mpi->nodeid = FMMPI_nodeid = rank;        mpi->numnodes = FMMPI_numnodes = size;	mpi->msg_tag = 123;        MPI_Buffer_attach( mpi->attachbuf, sizeof(mpi->attachbuf) );if(mpifmdbg>=0){printf( "FMMPI_nodeid=%d, FMMPI_numnodes=%d\n", FMMPI_nodeid, FMMPI_numnodes);fflush(stdout);}    }}/*---------------------------------------------------------------------------*/void FMMPI_initialize( int nodeid, int numnodes, FMMPICallback *cb ){if(mpifmdbg>=0){printf("FMMPI_initialize() started.\n");fflush(stdout);}    MYASSERT( cb, ("Message callback required") );    fmcb = cb;    {        int argc = 0; char *targv[2] = {"argv0",0}, **argv = targv;	    FMMPI_pre_initialize( &argc, &argv );        MYASSERT( nodeid == FMMPI_nodeid,		          ("nodeid=%d rank=%d",nodeid,FMMPI_nodeid) );        MYASSERT( numnodes == FMMPI_numnodes,		          ("numnodes=%d size=%d",numnodes,FMMPI_numnodes) );    }    MYASSERT( 0 < mpi->numnodes && mpi->numnodes <= FMMPIMAXPE,            ("#nodes %d must be in [1..%d]", mpi->numnodes, FMMPIMAXPE) );    MYASSERT( 0 <= mpi->nodeid && mpi->nodeid < mpi->numnodes,            ("Node %d must be in [0..%d]", mpi->nodeid, mpi->numnodes-1) );if(mpifmdbg>=0){printf("%d:FMMPI joining barrier\n",FMMPI_nodeid);fflush(stdout);}		MPI_Barrier( MPI_COMM_WORLD );if(mpifmdbg>=0){printf("%d:FMMPI done barrier\n",FMMPI_nodeid);fflush(stdout);}if(mpifmdbg>=0){printf("FMMPI_initialize() done.\n");fflush(stdout);}}/*---------------------------------------------------------------------------*/void FMMPI_finalize( void ){if(mpifmdbg>=1){printf("%d:FMMPI_finalize()\n",FMMPI_nodeid);fflush(stdout);}    MPI_Finalize(); /*Does an implicit MPI_Buffer_detach()*/}/*---------------------------------------------------------------------------*/FMMPI_stream *FMMPI_begin_message( int recipient, int length, int handler,    int src_id, int dest_id ){    FMMPISendMsg *msg = &mpi->send_msg;    FMMPIMsgHeaderPiece *hdr = &msg->hdr;    IOVEC *iov = &msg->pieces[0];if(mpifmdbg>=2){printf("%d:FMMPI_begin_message(to=%d, len=%d)\n",FMMPI_nodeid,recipient,length);fflush(stdout);}    MYASSERT( 0 <= recipient && recipient < mpi->numnodes,            ("Bad FMMPI recipient ID %d must be in [0..%d]",             recipient, mpi->numnodes-1) );    MYASSERT( 0 <= length && length <= FMMPIMAXPIECELEN,            ("Msg size %d must be <= %d", length, FMMPIMAXPIECELEN));    hdr->src_id = src_id;    hdr->dest_id = dest_id;    hdr->src_pe = mpi->nodeid;    hdr->dest_pe = recipient;    hdr->handler = handler;    hdr->npieces = 1;    hdr->piecelen[0] = sizeof(*hdr);    hdr->totbytes = sizeof(*hdr);    iov->iov_base = (char *)hdr;    iov->iov_len = hdr->totbytes;    return (FMMPI_stream *)msg;}/*---------------------------------------------------------------------------*/void FMMPI_send_piece( FMMPI_stream *sendstream, void *buffer, int length ){    FMMPISendMsg *msg = &mpi->send_msg;    FMMPIMsgHeaderPiece *hdr = &msg->hdr;if(mpifmdbg>=2){printf("%d:FMMPI_send_piece(buf=%x,len=%d)\n",FMMPI_nodeid,buffer,length);fflush(stdout);}    MYASSERT( sendstream == (FMMPI_stream *)msg, ("!") );    MYASSERT( buffer && 0 <= length && length <= (SSIZE_MAX-hdr->totbytes),            ("buffer=%p, length = %d SSIZE_MAX=%d", buffer, length, SSIZE_MAX));    MYASSERT( hdr->npieces < FMMPIMAXPIECES,              ("No. of pieces can't exceed compiled %d pieces",FMMPIMAXPIECES));    MYASSERT( length <= FMMPIMAXPIECELEN,              ("Piecelen %d can't exceed compiled %d",length,FMMPIMAXPIECELEN));    MYASSERT( hdr->totbytes+length <= sizeof(mpi->packbuf),              ("%d + %d <= %d", hdr->totbytes, length, sizeof(mpi->packbuf)) );    {    int pn = hdr->npieces++;    IOVEC *iov = &msg->pieces[pn];    iov->iov_base = buffer;    iov->iov_len = length;    hdr->piecelen[pn] = length;    hdr->totbytes += length;    }}/*---------------------------------------------------------------------------*/void FMMPI_end_message( FMMPI_stream *sendstream ){    int i = 0, position = 0, retcode = 0;    FMMPISendMsg *msg = &mpi->send_msg;    FMMPIMsgHeaderPiece *hdr = &msg->hdr;    int to = hdr->dest_pe, nwritten = 0;if(mpifmdbg>=2){printf("%d:FMMPI_end_message()\n",FMMPI_nodeid);fflush(stdout);}    MYASSERT( sendstream == (FMMPI_stream *)msg, ("!") );    MYASSERT( hdr->totbytes <= sizeof(mpi->packbuf),              ("%d %d", hdr->totbytes, sizeof(mpi->packbuf)) );    for( i = 0; i < hdr->npieces; i++ )    {        IOVEC *piece = &msg->pieces[i];        retcode = MPI_Pack( piece->iov_base, piece->iov_len, MPI_BYTE,                           (void*)mpi->packbuf, sizeof(mpi->packbuf), &position,                           MPI_COMM_WORLD );        MYASSERT( retcode == MPI_SUCCESS, ("MPI_Pack must succeed!") );    }    retcode = MPI_Bsend( mpi->packbuf, position, MPI_PACKED,                         hdr->dest_pe, mpi->msg_tag, MPI_COMM_WORLD );    MYASSERT( retcode == MPI_SUCCESS, ("MPI_Send must succeed!") );}/*---------------------------------------------------------------------------*/int FMMPI_receive( void *buffer, FMMPI_stream *receivestream,                   unsigned int length ){    FMMPIRecvMsg *msg = &mpi->recv_msg;    FMMPIMsgHeaderPiece *hdr = &msg->hdr;    int pn = 0, retcode = 0;    MYASSERT( receivestream == msg, ("cyclic check") );    pn = msg->npieces_recd++;    if( pn > 0 )    {        MYASSERT( pn < hdr->npieces,                ("Only #%d pieces", hdr->npieces) );        MYASSERT( length == hdr->piecelen[pn],                ("request len %d != sent len %d", length, hdr->piecelen[pn]));        MYASSERT( length <= hdr->totbytes - msg->nbytes_recd,                ("%d, %d, %d", length, hdr->totbytes, msg->nbytes_recd) );    }    retcode = MPI_Unpack( mpi->unpackbuf, sizeof(mpi->unpackbuf),                     &msg->position, buffer, length, MPI_BYTE, MPI_COMM_WORLD );    MYASSERT( retcode == MPI_SUCCESS, ("MPI_Unpack must succeed!") );    msg->nbytes_recd += length;    return length;}/*---------------------------------------------------------------------------*/static int recv_one_msg( void ){    MPI_Status status;    int i = 0, nbytes = 0, nrecd = 0, retcode = 0, from_pe = -1;    FMMPIRecvMsg *msg = &mpi->recv_msg;    FMMPIMsgHeaderPiece temphdr, *hdr = &msg->hdr;    msg->nbytes_recd = 0; msg->npieces_recd = 0; msg->position = 0;if(mpifmdbg>=2){printf("%d:MPI_Recv() started\n",FMMPI_nodeid);fflush(stdout);}    retcode = MPI_Recv( mpi->unpackbuf, sizeof(mpi->unpackbuf), MPI_PACKED,	                    MPI_ANY_SOURCE, mpi->msg_tag, MPI_COMM_WORLD, &status );    MYASSERT( retcode == MPI_SUCCESS, ("MPI_Recv must succeed!") );	from_pe = status.MPI_SOURCE;if(mpifmdbg>=2){printf("%d:MPI_Recv() got msg from %d tag %d\n",FMMPI_nodeid,from_pe,mpi->msg_tag);fflush(stdout);}    hdr->src_id = -1; hdr->dest_id = -1;    hdr->src_pe = from_pe; hdr->dest_pe = mpi->nodeid;    hdr->handler = 0; hdr->npieces = 0; hdr->totbytes = 0;    nrecd = FMMPI_receive( &temphdr, msg, sizeof(temphdr) );    if( nrecd <= 0 )    {        /*Do nothing*/if(mpifmdbg>=2){printf("%d:MPI_Recv() nrecd=0!\n",FMMPI_nodeid);fflush(stdout);}        MYASSERT( 0, ("NRECD=0") );    }    else    {        MYASSERT( temphdr.src_pe == hdr->src_pe,            ("src pes must agree: %d != %d", temphdr.src_pe, hdr->src_pe) );        MYASSERT( temphdr.dest_pe == hdr->dest_pe,            ("dest pes must agree: %d != %d", temphdr.dest_pe, hdr->dest_pe) );        MYASSERT( temphdr.totbytes >= sizeof(*hdr),            ("msg size: %d >= %d", temphdr.totbytes, sizeof(*hdr)) );        MYASSERT( temphdr.piecelen[0] == sizeof(*hdr),            ("piecelen[0] %d != hdr size %d",temphdr.piecelen[0],sizeof(*hdr)));        *hdr = temphdr;        fmcb( hdr->handler, msg, hdr->src_pe, hdr->src_id, hdr->dest_id );        nrecd = 0;        for( i = msg->npieces_recd; i < hdr->npieces; i++ )        {            char temp_piece[FMMPIMAXPIECELEN];            int len = hdr->piecelen[i];            MYASSERT( len <= sizeof(temp_piece),                      ("%d %d",len,sizeof(temp_piece)) );            nrecd = FMMPI_receive( temp_piece, msg, len );            if( nrecd <= 0 ) break;        }        MYASSERT( nrecd <= 0 || msg->nbytes_recd == hdr->totbytes,            ("%d, %d", msg->nbytes_recd, hdr->totbytes) );        nbytes = msg->nbytes_recd;    }    return nbytes;}/*---------------------------------------------------------------------------*/static int poll_all_once( unsigned int maxbytes ){    int nbytes = 0;    while( nbytes < maxbytes )    {        int ready = 0, retcode = 0;        MPI_Status status;        retcode = MPI_Iprobe( MPI_ANY_SOURCE, mpi->msg_tag, MPI_COMM_WORLD,                              &ready, &status );        MYASSERT( retcode == MPI_SUCCESS, ("MPI_Iprobe must succeed!") );if(mpifmdbg>=2){if(ready){printf("%d:MPI_Iprobe() ready=%d\n",FMMPI_nodeid,ready);fflush(stdout);}}        if( !ready ) break;        nbytes += recv_one_msg();    }    return nbytes;}/*---------------------------------------------------------------------------*/int FMMPI_extract( unsigned int maxbytes ){    int nbytes = 0;if(mpifmdbg>=10){printf("%d:FMMPI_extract()\n",FMMPI_nodeid);fflush(stdout);}if( mpi->numnodes <= 1 ) return 0;    while( nbytes < maxbytes )    {        int m = poll_all_once( maxbytes-nbytes );        if( m <= 0 ) break;        nbytes += m;    }    return nbytes;}/*---------------------------------------------------------------------------*/int FMMPI_numpieces( FMMPI_stream *mpi_stream ){    FMMPIRecvMsg *msg = &mpi->recv_msg;    FMMPIMsgHeaderPiece *hdr = &msg->hdr;    MYASSERT( mpi_stream == msg, ("!") );    return hdr->npieces-1;}/*---------------------------------------------------------------------------*/int FMMPI_piecelen( FMMPI_stream *mpi_stream, int i ){    FMMPIRecvMsg *msg = &mpi->recv_msg;    FMMPIMsgHeaderPiece *hdr = &msg->hdr;    MYASSERT( mpi_stream == msg, ("!") );    MYASSERT( 0<=i && i < hdr->npieces-1, ("Only #%d pieces", hdr->npieces-1) );    return hdr->piecelen[i+1];}/*---------------------------------------------------------------------------*/int FMMPI_debug_level( int level ){    int old = mpifmdbg;    mpifmdbg = level;    return old;}/*---------------------------------------------------------------------------*/

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -