⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 fmshm.c

📁 基于linux环境的ns2多机并行仿真补丁
💻 C
📖 第 1 页 / 共 2 页
字号:
	if( SHM_nodeid == 0 )	{	    allocate_buffers( totbufs );	}if(shmfmdbg>=0){printf("Waiting for all SHM processors to join...\n");fflush(stdout);}	for(;;)	{	    struct shmid_ds shmstat;	    int x = shmctl( shmid, IPC_STAT, &shmstat );	    MYASSERT( x == 0, ("shmctl failure"); perror("") );	    njoined = shmstat.shm_nattch;if(shmfmdbg>=1){printf("#joined = %lu\n", njoined);fflush(stdout);}if(0)MYASSERT( 1 <= njoined && njoined <= SHM_numnodes, ("%lu",njoined) );	    if( njoined >= SHM_numnodes ) break;if(shmfmdbg>=1){printf( "%lu joined, %lu more to join\n",njoined,SHM_numnodes-njoined);fflush(stdout);}	    sleep(2);	}if(shmfmdbg>=0){printf("All %lu processors joined.\n",njoined);fflush(stdout);}	if( SHM_nodeid == 0 )	{	    int nbar = 0;	    fmshm->bar[0] = SHM_numnodes;	    while(1)	    {		int i = 0;		for( i = 1, nbar = 1; i < SHM_numnodes; i++ )		{		    int x = fmshm->bar[i];		    if( x == 1 ) nbar++;		}		if( nbar == SHM_numnodes )		{		    break;		}		else		{if(shmfmdbg>=1){printf("SHM node 0: Waiting for %d shm nodes to reach barrier...\n",SHM_numnodes-nbar);fflush(stdout);}		    sleep(1);		}	    }	    fmshm->bar[0] = 0; /*Prevent advancing of next session's feds*/	}	else	{	    volatile int x = 0;	    while( (x = fmshm->bar[0]) == 0 )	    {if(shmfmdbg>=1){printf("SHM node %d: Waiting for shm node 0 to reach barrier...\n",SHM_nodeid);fflush(stdout);}		sleep(1);	    }	    fmshm->bar[SHM_nodeid] = 1;	}if(shmfmdbg>=1){printf("Reached barrier.\n");fflush(stdout);}	for( i = 0; i < SHM_numnodes; i++ )	{	    int j = 0;	    for( j = 0; j < SHM_numnodes; j++ )	    {	       SHMMsgID mid = -1;	       SHMMsg *m = 0;               if( i != j )	       {if(shmfmdbg>=2){printf("from[%d]to[%d]",i,j);mid=fmshm->from[i].to[j].msgq; m=&fmshm->msgs[mid];MYASSERT(mid==m->msg_id,("%d,%d",mid,m->msg_id)); printf("\tmsgq=%d",mid);mid=fmshm->from[i].to[j].sendp; m=&fmshm->msgs[mid];MYASSERT(mid==m->msg_id,("%d,%d",mid,m->msg_id)); printf("\tsendp=%d",mid);mid=fmshm->from[i].to[j].recvp; m=&fmshm->msgs[mid];MYASSERT(mid==m->msg_id,("%d,%d",mid,m->msg_id)); printf("\trecvp=%d",mid);printf("\n"); fflush(stdout);}	       }	    }	}    }if(shmfmdbg>=0){printf("SHM_initialize() done.\n");fflush(stdout);}}/*---------------------------------------------------------------------------*/void SHM_finalize( void ){    if( !removed_shm && fmshmid >= 0 )    {        shmctl( fmshmid, IPC_RMID, 0 ); removed_shm = 1;    }    fmshmid = -1;    if( fmshm ) shmdt( (void *)fmshm );    fmshm = 0;}/*---------------------------------------------------------------------------*/SHM_stream *SHM_begin_message( int recipient, int length, int handler,    int src_id, int dest_id ){    SHM_stream *stream = 0;    SHMMsgID mid = -1;    SHMMsg *msg = 0;    unsigned long ntries = 0, maxtries = 4000000000LU;    int from = SHM_nodeid, to = recipient;    MYASSERT( 0 <= to && to < SHM_numnodes && to != from, ("%d", to) );    MYASSERT( length <= SHMMAXDATALEN,            ("SHM msg size %d can't exceed compiled %d",length,SHMMAXDATALEN));    mid = fmshm->from[from].to[to].sendp;    msg = &fmshm->msgs[mid];    MYASSERT( msg && msg->msg_id==mid, ("from %d to %d", from, to) );    for( ntries = 0; !WRITER_READY(msg) && ntries < maxtries; ntries++ ) {}    if( !WRITER_READY(msg) )    {if(shmfmdbg>=1){printf("SHM node %d sending to %d with buffer ID %d - out of buffers or deadlocked\n",from,to,msg->msg_id);fflush(stdout);}    }    else    {        WRITER_LOCK(msg);        {        msg->src_id = src_id;        msg->dest_id = dest_id;        msg->src_pe = from;        msg->dest_pe = to;        msg->handler = handler;        msg->pieces.npieces = 0;        stream = (SHM_stream *)msg;        }if(shmfmdbg>=3){printf("SHM_begin_message() msg ID %d\n",msg->msg_id);fflush(stdout);}    }    return stream;}/*---------------------------------------------------------------------------*/void SHM_send_piece( SHM_stream *sendstream, void *buffer, int length ){    SHMMsg *msg = (SHMMsg *)sendstream;    SHMMsgPieces *pcs = 0;    int offset = 0;    MYASSERT( msg && msg->src_pe == SHM_nodeid, ("!" ) );    MYASSERT( msg->msg_id==fmshm->from[msg->src_pe].to[msg->dest_pe].sendp,("!"));    pcs = &msg->pieces;    if( pcs->npieces > 0 )    {	SHMMsgPieceSpec *lastpc = &pcs->pspec[pcs->npieces-1];	offset = lastpc->offset + lastpc->length;	offset = ((offset-1)/16 + 1) * 16;    }    MYASSERT( offset+length <= SHMMAXDATALEN,            ("SHM msg size %d can't exceed compiled %d",	    offset+length, SHMMAXDATALEN) );    MYASSERT( pcs->npieces < SHMMAXPIECES,            ("SHM msg pieces can't exceed compiled %d pieces", SHMMAXPIECES) );    {    SHMMsgPieceSpec *newspec = &pcs->pspec[pcs->npieces++];    newspec->offset = offset;    newspec->length = length;    memcpy( &pcs->pdata[offset], (char *)buffer, length );    }}/*---------------------------------------------------------------------------*/void SHM_end_message( SHM_stream *sendstream ){    SHMMsg *msg = (SHMMsg *)sendstream;    MYASSERT( msg && msg->src_pe == SHM_nodeid, ("!" ) );    MYASSERT( msg->msg_id==fmshm->from[msg->src_pe].to[msg->dest_pe].sendp,("!"));if(shmfmdbg>=3){printf("SHM_end_message() msg ID %d\n",msg->msg_id);fflush(stdout);}    {	fmshm->from[msg->src_pe].to[msg->dest_pe].sendp = msg->next;    }    WRITER_UNLOCK( msg );}/*---------------------------------------------------------------------------*/static int npieces_recd; /*#pieces received by user from current message*//*---------------------------------------------------------------------------*/void SHM_receive( void *buffer, SHM_stream *receivestream, unsigned int length ){    SHMMsg *msg = (SHMMsg *)receivestream;    int next_piece = npieces_recd++;    SHMMsgPieces *pcs = 0;    MYASSERT( msg && msg->dest_pe == SHM_nodeid, ("!" ) );    MYASSERT( msg->msg_id==fmshm->from[msg->src_pe].to[msg->dest_pe].recvp,("!"));    pcs = &msg->pieces;    MYASSERT( next_piece < pcs->npieces, ("Only %d pieces exist", pcs->npieces) );    {    SHMMsgPieceSpec *next_spec = &pcs->pspec[next_piece];    int offset = next_spec->offset;    MYASSERT( length <= next_spec->length,            ("Only %d < %u bytes in piece", next_spec->length, length) );    memcpy( (char *)buffer, &pcs->pdata[offset], length );    }}/*---------------------------------------------------------------------------*/int SHM_extract( unsigned int maxbytes ){    int nbytes = 0;    static int pe = 0;    int i = 0;    if( SHM_nodeid == 0 && !removed_shm && fmshmid >= 0 )    {        shmctl( fmshmid, IPC_RMID, 0 ); removed_shm = 1;    }    if( SHM_numnodes <= 1 ) return 0;    while( nbytes < maxbytes )    {	int nready = 0;        for( i = 0; i < SHM_numnodes-1; i++ )        {            if( pe == SHM_nodeid ) pe++;	    pe %= SHM_numnodes;	    if( pe != SHM_nodeid )	    {                SHMMsg *msg = 0;		SHMMsgID mid = fmshm->from[pe].to[SHM_nodeid].recvp;		MYASSERT( 0 <= mid, ("%d",mid) );                msg = &fmshm->msgs[mid];	        if( READER_READY( msg ) )	        {		    SHM_stream *shm_stream = (SHM_stream *)msg;		    READER_LOCK( msg );if(shmfmdbg>=3){printf("Detected incoming SHM msg src_pe=%d,src_id=%d,dest_id=%d!\n",msg->src_pe,msg->src_id,msg->dest_id);fflush(stdout);}		    nready++;		    npieces_recd = 0;	            fmcb( msg->handler, shm_stream, msg->src_pe,		          msg->src_id, msg->dest_id );                    fmshm->from[pe].to[SHM_nodeid].recvp = msg->next;		    nbytes += SHMMAXDATALEN;		    READER_UNLOCK( msg );	        }	    }	    pe++; pe %= SHM_numnodes;        }	if( nready <= 0 ) break;    }    return nbytes;}/*---------------------------------------------------------------------------*/int SHM_numpieces( SHM_stream *shm_stream ){    SHMMsg *msg = (SHMMsg *)shm_stream;    SHMMsgPieces *pcs = &msg->pieces;    MYASSERT( msg->msg_id==fmshm->from[msg->src_pe].to[msg->dest_pe].recvp,("!"));    return pcs->npieces;}/*---------------------------------------------------------------------------*/int SHM_piecelen( SHM_stream *shm_stream, int i ){    SHMMsg *msg = (SHMMsg *)shm_stream;    SHMMsgPieces *pcs = &msg->pieces;    SHMMsgPieceSpec *spec = 0;    MYASSERT(msg->msg_id==fmshm->from[msg->src_pe].to[msg->dest_pe].recvp,("!"));    MYASSERT( 0 <= i && i < pcs->npieces, ("Only %d pieces", pcs->npieces) );    spec = &pcs->pspec[i];    return spec->length;}/*---------------------------------------------------------------------------*/int SHM_debug_level( int level ){    int old = shmfmdbg;    shmfmdbg = level;    return old;}/*---------------------------------------------------------------------------*/

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -