📄 fmshm.c
字号:
if( SHM_nodeid == 0 ) { allocate_buffers( totbufs ); }if(shmfmdbg>=0){printf("Waiting for all SHM processors to join...\n");fflush(stdout);} for(;;) { struct shmid_ds shmstat; int x = shmctl( shmid, IPC_STAT, &shmstat ); MYASSERT( x == 0, ("shmctl failure"); perror("") ); njoined = shmstat.shm_nattch;if(shmfmdbg>=1){printf("#joined = %lu\n", njoined);fflush(stdout);}if(0)MYASSERT( 1 <= njoined && njoined <= SHM_numnodes, ("%lu",njoined) ); if( njoined >= SHM_numnodes ) break;if(shmfmdbg>=1){printf( "%lu joined, %lu more to join\n",njoined,SHM_numnodes-njoined);fflush(stdout);} sleep(2); }if(shmfmdbg>=0){printf("All %lu processors joined.\n",njoined);fflush(stdout);} if( SHM_nodeid == 0 ) { int nbar = 0; fmshm->bar[0] = SHM_numnodes; while(1) { int i = 0; for( i = 1, nbar = 1; i < SHM_numnodes; i++ ) { int x = fmshm->bar[i]; if( x == 1 ) nbar++; } if( nbar == SHM_numnodes ) { break; } else {if(shmfmdbg>=1){printf("SHM node 0: Waiting for %d shm nodes to reach barrier...\n",SHM_numnodes-nbar);fflush(stdout);} sleep(1); } } fmshm->bar[0] = 0; /*Prevent advancing of next session's feds*/ } else { volatile int x = 0; while( (x = fmshm->bar[0]) == 0 ) {if(shmfmdbg>=1){printf("SHM node %d: Waiting for shm node 0 to reach barrier...\n",SHM_nodeid);fflush(stdout);} sleep(1); } fmshm->bar[SHM_nodeid] = 1; }if(shmfmdbg>=1){printf("Reached barrier.\n");fflush(stdout);} for( i = 0; i < SHM_numnodes; i++ ) { int j = 0; for( j = 0; j < SHM_numnodes; j++ ) { SHMMsgID mid = -1; SHMMsg *m = 0; if( i != j ) {if(shmfmdbg>=2){printf("from[%d]to[%d]",i,j);mid=fmshm->from[i].to[j].msgq; m=&fmshm->msgs[mid];MYASSERT(mid==m->msg_id,("%d,%d",mid,m->msg_id)); printf("\tmsgq=%d",mid);mid=fmshm->from[i].to[j].sendp; m=&fmshm->msgs[mid];MYASSERT(mid==m->msg_id,("%d,%d",mid,m->msg_id)); printf("\tsendp=%d",mid);mid=fmshm->from[i].to[j].recvp; m=&fmshm->msgs[mid];MYASSERT(mid==m->msg_id,("%d,%d",mid,m->msg_id)); printf("\trecvp=%d",mid);printf("\n"); fflush(stdout);} } } } }if(shmfmdbg>=0){printf("SHM_initialize() done.\n");fflush(stdout);}}/*---------------------------------------------------------------------------*/void SHM_finalize( void ){ if( !removed_shm && fmshmid >= 0 ) { shmctl( fmshmid, IPC_RMID, 0 ); removed_shm = 1; } fmshmid = -1; if( fmshm ) shmdt( (void *)fmshm ); fmshm = 0;}/*---------------------------------------------------------------------------*/SHM_stream *SHM_begin_message( int recipient, int length, int handler, int src_id, int dest_id ){ SHM_stream *stream = 0; SHMMsgID mid = -1; SHMMsg *msg = 0; unsigned long ntries = 0, maxtries = 4000000000LU; int from = SHM_nodeid, to = recipient; MYASSERT( 0 <= to && to < SHM_numnodes && to != from, ("%d", to) ); MYASSERT( length <= SHMMAXDATALEN, ("SHM msg size %d can't exceed compiled %d",length,SHMMAXDATALEN)); mid = fmshm->from[from].to[to].sendp; msg = &fmshm->msgs[mid]; MYASSERT( msg && msg->msg_id==mid, ("from %d to %d", from, to) ); for( ntries = 0; !WRITER_READY(msg) && ntries < maxtries; ntries++ ) {} if( !WRITER_READY(msg) ) {if(shmfmdbg>=1){printf("SHM node %d sending to %d with buffer ID %d - out of buffers or deadlocked\n",from,to,msg->msg_id);fflush(stdout);} } else { WRITER_LOCK(msg); { msg->src_id = src_id; msg->dest_id = dest_id; msg->src_pe = from; msg->dest_pe = to; msg->handler = handler; msg->pieces.npieces = 0; stream = (SHM_stream *)msg; }if(shmfmdbg>=3){printf("SHM_begin_message() msg ID %d\n",msg->msg_id);fflush(stdout);} } return stream;}/*---------------------------------------------------------------------------*/void SHM_send_piece( SHM_stream *sendstream, void *buffer, int length ){ SHMMsg *msg = (SHMMsg *)sendstream; SHMMsgPieces *pcs = 0; int offset = 0; MYASSERT( msg && msg->src_pe == SHM_nodeid, ("!" ) ); MYASSERT( msg->msg_id==fmshm->from[msg->src_pe].to[msg->dest_pe].sendp,("!")); pcs = &msg->pieces; if( pcs->npieces > 0 ) { SHMMsgPieceSpec *lastpc = &pcs->pspec[pcs->npieces-1]; offset = lastpc->offset + lastpc->length; offset = ((offset-1)/16 + 1) * 16; } MYASSERT( offset+length <= SHMMAXDATALEN, ("SHM msg size %d can't exceed compiled %d", offset+length, SHMMAXDATALEN) ); MYASSERT( pcs->npieces < SHMMAXPIECES, ("SHM msg pieces can't exceed compiled %d pieces", SHMMAXPIECES) ); { SHMMsgPieceSpec *newspec = &pcs->pspec[pcs->npieces++]; newspec->offset = offset; newspec->length = length; memcpy( &pcs->pdata[offset], (char *)buffer, length ); }}/*---------------------------------------------------------------------------*/void SHM_end_message( SHM_stream *sendstream ){ SHMMsg *msg = (SHMMsg *)sendstream; MYASSERT( msg && msg->src_pe == SHM_nodeid, ("!" ) ); MYASSERT( msg->msg_id==fmshm->from[msg->src_pe].to[msg->dest_pe].sendp,("!"));if(shmfmdbg>=3){printf("SHM_end_message() msg ID %d\n",msg->msg_id);fflush(stdout);} { fmshm->from[msg->src_pe].to[msg->dest_pe].sendp = msg->next; } WRITER_UNLOCK( msg );}/*---------------------------------------------------------------------------*/static int npieces_recd; /*#pieces received by user from current message*//*---------------------------------------------------------------------------*/void SHM_receive( void *buffer, SHM_stream *receivestream, unsigned int length ){ SHMMsg *msg = (SHMMsg *)receivestream; int next_piece = npieces_recd++; SHMMsgPieces *pcs = 0; MYASSERT( msg && msg->dest_pe == SHM_nodeid, ("!" ) ); MYASSERT( msg->msg_id==fmshm->from[msg->src_pe].to[msg->dest_pe].recvp,("!")); pcs = &msg->pieces; MYASSERT( next_piece < pcs->npieces, ("Only %d pieces exist", pcs->npieces) ); { SHMMsgPieceSpec *next_spec = &pcs->pspec[next_piece]; int offset = next_spec->offset; MYASSERT( length <= next_spec->length, ("Only %d < %u bytes in piece", next_spec->length, length) ); memcpy( (char *)buffer, &pcs->pdata[offset], length ); }}/*---------------------------------------------------------------------------*/int SHM_extract( unsigned int maxbytes ){ int nbytes = 0; static int pe = 0; int i = 0; if( SHM_nodeid == 0 && !removed_shm && fmshmid >= 0 ) { shmctl( fmshmid, IPC_RMID, 0 ); removed_shm = 1; } if( SHM_numnodes <= 1 ) return 0; while( nbytes < maxbytes ) { int nready = 0; for( i = 0; i < SHM_numnodes-1; i++ ) { if( pe == SHM_nodeid ) pe++; pe %= SHM_numnodes; if( pe != SHM_nodeid ) { SHMMsg *msg = 0; SHMMsgID mid = fmshm->from[pe].to[SHM_nodeid].recvp; MYASSERT( 0 <= mid, ("%d",mid) ); msg = &fmshm->msgs[mid]; if( READER_READY( msg ) ) { SHM_stream *shm_stream = (SHM_stream *)msg; READER_LOCK( msg );if(shmfmdbg>=3){printf("Detected incoming SHM msg src_pe=%d,src_id=%d,dest_id=%d!\n",msg->src_pe,msg->src_id,msg->dest_id);fflush(stdout);} nready++; npieces_recd = 0; fmcb( msg->handler, shm_stream, msg->src_pe, msg->src_id, msg->dest_id ); fmshm->from[pe].to[SHM_nodeid].recvp = msg->next; nbytes += SHMMAXDATALEN; READER_UNLOCK( msg ); } } pe++; pe %= SHM_numnodes; } if( nready <= 0 ) break; } return nbytes;}/*---------------------------------------------------------------------------*/int SHM_numpieces( SHM_stream *shm_stream ){ SHMMsg *msg = (SHMMsg *)shm_stream; SHMMsgPieces *pcs = &msg->pieces; MYASSERT( msg->msg_id==fmshm->from[msg->src_pe].to[msg->dest_pe].recvp,("!")); return pcs->npieces;}/*---------------------------------------------------------------------------*/int SHM_piecelen( SHM_stream *shm_stream, int i ){ SHMMsg *msg = (SHMMsg *)shm_stream; SHMMsgPieces *pcs = &msg->pieces; SHMMsgPieceSpec *spec = 0; MYASSERT(msg->msg_id==fmshm->from[msg->src_pe].to[msg->dest_pe].recvp,("!")); MYASSERT( 0 <= i && i < pcs->npieces, ("Only %d pieces", pcs->npieces) ); spec = &pcs->pspec[i]; return spec->length;}/*---------------------------------------------------------------------------*/int SHM_debug_level( int level ){ int old = shmfmdbg; shmfmdbg = level; return old;}/*---------------------------------------------------------------------------*/
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -