📄 fmshm.c
字号:
/*---------------------------------------------------------------------------*//* Portable shared-memory FM-like implementation using SysV shm calls. *//* Author(s): Kalyan Perumalla <http://www.cc.gatech.edu/~kalyan> 20July2001 *//* $Revision: 1.2 $ $Name: v26apr05 $ $Date: 2003/10/03 21:50:26 $ *//*---------------------------------------------------------------------------*/#include <stdio.h>#include <stdlib.h>#include <string.h>#include "mycompat.h"#include "fmshm.h"#if PLATFORM_WIN #define IPC_RMID 0 #define IPC_CREAT 0 #define IPC_EXCL 0 struct shmid_ds { unsigned long shm_nattch; }; #define shmctl(x,y,z) 0 #define shmget(x,y,z) -1 #define shmat(x,y,z) ((void *)-1) #define shmdt(x) (void)0 #define getuid() -1#else #include <sys/types.h> #include <sys/ipc.h> #include <sys/shm.h> #include <sys/sem.h> #include <unistd.h> #include <sched.h>#endif/*---------------------------------------------------------------------------*/static int shmfmdbg = 0;/*---------------------------------------------------------------------------*/typedef int SHMMsgID;/*---------------------------------------------------------------------------*/typedef struct{ int length; /*Actual #bytes in this piece*/ int offset; /*Into data byte array; pointer alignment accounted for*/} SHMMsgPieceSpec;/*---------------------------------------------------------------------------*/typedef struct{ int npieces; /*Actual #pieces in this message*/ SHMMsgPieceSpec pspec[SHMMAXPIECES];/*Pieces pointing into data array*/ char pdata[SHMMAXDATALEN]; /*Compacted/linearized data of pieces*/} SHMMsgPieces;/*---------------------------------------------------------------------------*/typedef struct _SHMMsg{ int synch; int src_id; int dest_id; int handler; int src_pe; int dest_pe; SHMMsgPieces pieces; SHMMsgID msg_id; SHMMsgID next;} SHMMsg;/*---------------------------------------------------------------------------*/typedef struct{ struct { struct { SHMMsgID msgq; /*Circular*/ SHMMsgID sendp; /*Sender's current buffer in msgq*/ SHMMsgID recvp; /*Receiver's current buffer in msgq*/ } to[SHMMAXPE]; } from[SHMMAXPE]; int bar[SHMMAXPE]; SHMMsg msgs[1]; /*Actual array size determined at runtime*/} SHMSharedMemory;/*---------------------------------------------------------------------------*/int SHM_nodeid;int SHM_numnodes;/*---------------------------------------------------------------------------*/static int shmkey = -1;static int fmshmid = -1;static SHMSharedMemory *fmshm = 0;static int removed_shm = 0;static SHMCallback *fmcb = 0;static int nbufs[SHMMAXPE][SHMMAXPE];/*---------------------------------------------------------------------------*//* *//*---------------------------------------------------------------------------*/static void config(void){ char *estr = 0; estr = getenv("FMSHM_DEBUG"); shmfmdbg = estr ? atoi(estr) : 0;if(shmfmdbg>=1){printf("FMSHM_DEBUG=%d\n",shmfmdbg);fflush(stdout);} if( (estr = getenv("FMSHM_KEY")) ) shmkey = atoi( estr ); else shmkey = getuid();if(shmfmdbg>=0){printf( "SHM_KEY=%d\n",shmkey);fflush(stdout);}}/*---------------------------------------------------------------------------*/static void allocate_buffers( int totbufs ){ int i, j, k; int bufs_sofar = 0; int n2 = SHM_numnodes*(SHM_numnodes-1); for( i = 0; i < SHM_numnodes; i++ ) { for( j = 0; j < SHM_numnodes; j++ ) { nbufs[i][j] = ((i==j) ? 0 : (totbufs / n2)); bufs_sofar += nbufs[i][j]; } } if(shmfmdbg>=1) { printf("Buffer allocation:\n"); printf("-----------------\n"); for( i = 0; i < SHM_numnodes; i++ ) { printf("From %3d: ", i); for( j = 0; j < SHM_numnodes; j++ ) { printf(" %3d", nbufs[i][j]); } printf("\n"); } printf("Total buffers=%d, leftover=%d\n",bufs_sofar,totbufs-bufs_sofar); printf("-----------------\n"); } for( i = 0, bufs_sofar = 0; i < SHM_numnodes; i++ ) { for( j = 0; j < SHM_numnodes; j++ ) { fmshm->from[i].to[j].msgq = -1; fmshm->from[i].to[j].sendp = -1; fmshm->from[i].to[j].recvp = -1; if( i != j ) { SHMMsgID first = -1, last = -1; for( k = nbufs[i][j]-1; k >= 0; --k ) { SHMMsg *msg = 0; MYASSERT( bufs_sofar < totbufs, ("Out of buffers") ); msg = &fmshm->msgs[bufs_sofar]; msg->msg_id = bufs_sofar++; msg->next = first; first = msg->msg_id; if( last < 0 ) last = msg->msg_id; } MYASSERT( first >= 0 && last >= 0, ("!") ); fmshm->msgs[last].next = first; fmshm->from[i].to[j].msgq = first; fmshm->from[i].to[j].sendp = first; fmshm->from[i].to[j].recvp = first; } } }if(shmfmdbg>=1){printf("Buffers leftover %d\n", totbufs-bufs_sofar);fflush(stdout);}}/*---------------------------------------------------------------------------*/#define SYNCHVAL 1234#define READER_READY(msg) (((msg) && ((msg)->synch != 0)) ? 1 : 0)#define READER_LOCK(msg) do{ \ MYASSERT((msg),("!")); \if(shmfmdbg>=3){printf("READER_LOCK(%d:%d)\n",msg->msg_id,msg->synch);} \ while(!READER_READY(msg)) {;} \ MYASSERT( (msg)->synch==SYNCHVAL, ("READER_LOCK %d",SHM_nodeid) ); \ }while(0)#define READER_UNLOCK(msg) do{ \ MYASSERT((msg),("!")); \ MYASSERT( (msg)->synch==SYNCHVAL, ("READER_UNLOCK %d",SHM_nodeid) ); \if(shmfmdbg>=3){printf("READER_UNLOCK(%d:%d)\n",msg->msg_id,msg->synch);} \ (msg)->synch = 0; \ }while(0)#define WRITER_READY(msg) (((msg) && ((msg)->synch == 0)) ? 1 : 0)#define WRITER_LOCK(msg) do{ \ MYASSERT((msg),("!")); \if(shmfmdbg>=3){printf("WRITER_LOCK(%d:%d)\n",msg->msg_id,msg->synch);} \ while(!WRITER_READY(msg)) {;} \ MYASSERT( (msg)->synch==0, ("WRITER_LOCK %d",SHM_nodeid) ); \ }while(0)#define WRITER_UNLOCK(msg) do{ \ MYASSERT((msg),("!")); \ MYASSERT( (msg)->synch==0, ("WRITER_UNLOCK %d",SHM_nodeid) ); \if(shmfmdbg>=3){printf("WRITER_UNLOCK(%d:%d)\n",msg->msg_id,msg->synch);} \ (msg)->synch = SYNCHVAL; \ }while(0)/*---------------------------------------------------------------------------*//* *//*---------------------------------------------------------------------------*/void SHM_initialize( int nodeid, int numnodes, SHMCallback *cb ){ MYASSERT( 1 <= numnodes && numnodes <= SHMMAXPE, ("#SHM nodes %d must lie in [1..%d]", numnodes, SHMMAXPE) ); MYASSERT( 0 <= nodeid && nodeid < numnodes, ("SHM node ID must lie in [0..%d]", numnodes-1) ); SHM_nodeid = nodeid; SHM_numnodes = numnodes;if(shmfmdbg>=0){printf("SHM_nodeid=%d, SHM_numnodes=%d\n",SHM_nodeid,SHM_numnodes);fflush(stdout);} MYASSERT( cb, ("Message callback required") ); fmcb = cb; config(); if( SHM_numnodes > 1 ) { double onemeg = 1024.0*1024, maxmem = 0; unsigned long njoined = 0; char *qbufstr = getenv("FMSHM_BUFSPERPEQ"); char *maxmemstr = getenv("FMSHM_TOTMEMORY"); int i = 0, nbufs_per_q = 0, totbufs = 0, totqueues = 0; int shmid, shmsize, shmflags; void *shm;if(!qbufstr){qbufstr=getenv("FMSHM_BUFSPERPE");} /*For backward compatibility*/ nbufs_per_q = qbufstr ? atoi(qbufstr) : -1; maxmem = maxmemstr ? atof(maxmemstr) : -1; totqueues = (SHM_numnodes * (SHM_numnodes-1)); if( nbufs_per_q <= 0 && maxmem <= 0 ) { maxmem = onemeg; /*1MB shared segment*/if(shmfmdbg>=0){printf("Using default shared segment limit of %.4lf MB.\n",maxmem/(onemeg));fflush(stdout);} } else if( maxmem > 0 ) {if(shmfmdbg>=0){printf("Using specified shared segment limit of %.4lf MB.\n",maxmem/(onemeg));fflush(stdout);} if( nbufs_per_q > 0 ) {if(shmfmdbg>=0){printf("Overriding specified no. of bufs per PE Q of %d.\n",nbufs_per_q);fflush(stdout);} nbufs_per_q = -1; } } else { maxmem = nbufs_per_q * totqueues * sizeof(SHMMsg);if(shmfmdbg>=0){printf("Using specified no. of bufs per PE Q of %d, and computed shared segment limit of %.4lf MB.\n",nbufs_per_q,maxmem/(onemeg));fflush(stdout);} } MYASSERT( maxmem > 0, ("%lf %d", maxmem, nbufs_per_q) ); nbufs_per_q = maxmem / (totqueues * sizeof(SHMMsg)); totbufs = nbufs_per_q*totqueues;if(shmfmdbg>=0){printf("Using %d bufs per PE Q, total Qs %d, total bufs %d.\n",nbufs_per_q,totqueues,totbufs);fflush(stdout);} if( SHM_nodeid == 0 ) { int oldshmid = shmget( shmkey, 1, 0 ); if( oldshmid >= 0 ) {if(shmfmdbg>=1){printf("Deleting old zombie shm segment %d.\n",oldshmid);fflush(stdout);} shmctl( oldshmid, IPC_RMID, 0 ); } } shmsize = sizeof(SHMSharedMemory) + totbufs*sizeof(SHMMsg); shmflags = 0666 | (SHM_nodeid==0 ? IPC_CREAT|IPC_EXCL : 0); do {if(shmfmdbg>=1){printf("shmget(0x%x,%.4fMB,0x%x)\n",shmkey,shmsize/(onemeg),shmflags);fflush(stdout);} shmid = shmget( shmkey, shmsize, shmflags ); MYASSERT( !(SHM_nodeid==0 && shmid < 0), ("Node 0: shmkey %d bad or in use", shmkey); perror("") ); if( shmid < 0 ) {if(shmfmdbg>=1){printf("SHM node %d: Retrying shmget...\n",SHM_nodeid);fflush(stdout);} sleep(1); } }while(shmid < 0);if(shmfmdbg>=1){printf("SHM node %d: shmid=%d.\n",SHM_nodeid,shmid);fflush(stdout);} fmshmid = shmid;if(shmfmdbg>=1){printf("shmat(%x,%x,%x)\n",shmid,0,0);fflush(stdout);} shm = shmat( shmid, (char *)0, 0 ); MYASSERT( shm != (void *)-1, ("shmat(shmid=%d)", shmid) ); fmshm = (SHMSharedMemory *)shm;if(shmfmdbg>=1){printf("fmshm=%p\n",fmshm);fflush(stdout);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -