📄 fm.c
字号:
xf->xf_class->xf_begin_message( xf, &mixfm->curr.out.stream, m, length, handler, FM_nodeid, to_fm_id ); } } return mixfm->curr.out.stream;}/*---------------------------------------------------------------------------*/void FM_send_piece( FM_stream *sendstream, void *buffer, ULONG length ){ XFace *xf = mixfm->curr.out.xf; MYASSERT( sendstream == mixfm->curr.out.stream, ("%p %p",sendstream,mixfm->curr.out.stream) ); MYASSERT( xf, ("An interface must be currently active") ); xf->xf_class->xf_send_piece( xf, sendstream, buffer, length );}/*---------------------------------------------------------------------------*/void FM_end_message( FM_stream *sendstream ){ XFace *xf = mixfm->curr.out.xf; MYASSERT( sendstream == mixfm->curr.out.stream, ("%p %p",sendstream,mixfm->curr.out.stream) ); MYASSERT( xf, ("An interface must be currently active") ); xf->xf_class->xf_end_message( xf, sendstream ); mixfm->curr.out.xf = 0; mixfm->curr.out.stream = 0;}/*---------------------------------------------------------------------------*/void FM_receive( void *buffer, FM_stream *receivestream, unsigned int length ){ XFace *xf = mixfm->curr.in.xf; MYASSERT( receivestream == mixfm->curr.in.stream, ("%p %p",receivestream,mixfm->curr.in.stream) ); MYASSERT( xf, ("An interface must be currently active") ); xf->xf_class->xf_recv_piece( xf, receivestream, buffer, length );}/*---------------------------------------------------------------------------*/int FM_extract( unsigned int maxbytes ){ int x = 0, nextracted = 0; for( x = 0; nextracted < maxbytes && x < mixfm->nxfaces; x++ ) { XFace *xf = &mixfm->xfaces[x]; mixfm->curr.in.xf = xf; mixfm->curr.in.stream = 0; mixfm->curr.out.xf = xf; mixfm->curr.out.stream = 0; nextracted += xf->xf_class->xf_extract( xf, maxbytes-nextracted ); mixfm->curr.in.xf = 0; mixfm->curr.in.stream = 0; mixfm->curr.out.xf = xf; mixfm->curr.out.stream = 0; } return nextracted;}/*---------------------------------------------------------------------------*//* *//*---------------------------------------------------------------------------*/ULONG FM_register_handler( ULONG id, FM_handler *handler ){ if( id == FM_ANY_ID ) id = mixfm->next_handler++; MYASSERT( id < FM_MAX_HANDLERS && !FM_handler_table[id], ("%lu",id)); FM_handler_table[id] = handler;if(mixfm->dbg>=1){printf("Registered handler %lu = %p\n",id, handler);fflush(stdout);} return id;}/*---------------------------------------------------------------------------*/int FM_debug_level( int level ){ int old = mixfm->dbg; mixfm->dbg = level; return old;}/*---------------------------------------------------------------------------*/void FM_set_parameter( int parameter, int arg ){ switch( parameter ) { case 0: FM_debug_level( arg ); break; case 1: SHM_debug_level( arg ); break; case 2: GM_debug_level( arg ); break; case 3: TCP_debug_level( arg ); break; default: break; }}/*---------------------------------------------------------------------------*/void FM_RemovePeer(unsigned senderID){ MYASSERT( 0, ("Not implemented") );}/*---------------------------------------------------------------------------*/int FM_IsTransportSupported( FM_Transport t ) {return t==FM_TRANSPORT_RELIABLE;}FM_Transport FM_GetTransport( void ) {return FM_TRANSPORT_RELIABLE;}int FM_SetTransport( FM_Transport t ) {return FM_IsTransportSupported(t);}/*---------------------------------------------------------------------------*//* Based on FML Code from Dr. Richard Fujimoto. *//*---------------------------------------------------------------------------*/int FML_NextHandle = 0; /* number of next available handler ID */int FML_InitFM = 0; /* flag indicating FM has been initialized */int FML_MaxHandles = 1024; /* max number of handles; FM default */unsigned int fmh_Barrier; /* handler ID for FML barrier */long FML_NReceived=0; /* how many processors replied to barrier req*/int PFML_Barrier(FM_stream *, unsigned);#include "rmbar.h"static int use_rmb = 1;/*---------------------------------------------------------------------------*/void FML_FMInit(void){ FM_initialize(); FML_InitFM = 1; if( use_rmb ) { rmb_init(); } else { /* Set up handler for barrier */ int error = FML_RegisterHandler(&fmh_Barrier, PFML_Barrier); MYASSERT( !error, ("%d: FML_FMInit Failed\n", (int) FM_nodeid) ); }}/*---------------------------------------------------------------------------*/long FML_RegisterHandler(unsigned int *HandleID, FM_handler hfunction){ if (FML_InitFM == 0) /* check that FM has already been initialized */ { fprintf(stderr, "FML_RegisterHandler: FM not initialized\n"); fflush(stderr); return (1); } if (FML_NextHandle >= FML_MaxHandles) /* check if too many handlers */ { fprintf(stderr, "FML_RegisterHandler: too many handles\n"); fflush(stderr); return (2); } /* set up handler linkage and return ID */ *HandleID = FML_NextHandle; FM_register_handler( FML_NextHandle, hfunction ); FML_NextHandle++; return (0);}/*---------------------------------------------------------------------------*/void FML_PrintHandlerTable (void){ int i; printf ("*** FM Handler Table for Node %d ***\n", (int) FM_nodeid); for (i=0; i<FML_NextHandle; i++) printf ("%d: %x\n", i, (unsigned int) FM_handler_table[i]); fflush(stdout);}/*---------------------------------------------------------------------------*//* Simple barrier; does not guarantee all messages flushed out of system *//* Broadcast a msg, wait until a broadcast is recd from each other processor.*//* Use v=1 for final barrier, 0 otherwise *//*---------------------------------------------------------------------------*/static void FML_BarrierInternal(int v) { int i; if( use_rmb ) { rmb_barrier(); return; } FML_NReceived++; /* broadcast message saying this processor has reached barrier */ for(i=0; i<FM_numnodes; i++) { FM_stream *strm; if(i == FM_nodeid) continue; do { strm = FM_begin_message(i, sizeof(v), fmh_Barrier); if( !strm ) FM_extract(~0); /*break deadlock due to full queues*/ } while( !strm ); MYASSERT( strm, ("FML_Barrier failed") ); FM_send_piece(strm, &v, sizeof(v)); FM_end_message (strm); } /* wait until message received from all other processors */ while(FML_NReceived < FM_numnodes) { FM_extract(~0); } /* reset counter; do subtract because new barrier may have already started */ FML_NReceived -= FM_numnodes;}/*---------------------------------------------------------------------------*/void FML_Barrier( void ) { FML_BarrierInternal(0); }/*---------------------------------------------------------------------------*/void FML_FinalBarrier(void) { FML_BarrierInternal(1); }/*---------------------------------------------------------------------------*//* Handler for simple barrier *//*---------------------------------------------------------------------------*/int PFML_Barrier(FM_stream *strm, unsigned senderID){ int v=0; FM_receive(&v, strm, sizeof(v)); if(v) { FM_RemovePeer(senderID); } FML_NReceived++; return(FM_CONTINUE);}/*---------------------------------------------------------------------------*//* NODEINFO utilities. *//*---------------------------------------------------------------------------*/#define MAX_HOSTNAMELEN 1000void FM_getenv_nodeinfo( NodeInfo *s ){ int i = 0; s->nproc = 1; s->my_index = 0; for( i = 0; i < MAX_PE; i++ ) { s->node_names[i] = malloc( sizeof(char) * (MAX_HOSTNAMELEN+100) ); sprintf( s->node_names[i], "%s", "" ); } /* Get node information */ { char *env_var_name = "NODEINFO"; char *env_var_format = "npe:nodename,nodename,...,nodename:myindex"; char *nstr = getenv( env_var_name ); char *p = 0; if( !nstr ) { nstr = "1:localhost:0"; fprintf(stderr, "\n\n*** Warning: NODEINFO environment variable not defined.\n" "*** Assuming single process (i.e., NODEINFO=\"%s\").\n\n\n", nstr); fflush(stdout); } MYASSERT( nstr, ("Environment variable %s must be specified.\n" "It should of the form \"%s\".\n", env_var_name, env_var_format) ); nstr = strdup( nstr ); /* Don't destroy the environment */ MYASSERT( nstr, ("Insufficient memory duping NODEINFO") );if(1){fprintf(stdout,"%s=\"%s\"\n",env_var_name,nstr);fflush(stdout);} p = strchr( nstr, ':' ); MYASSERT( p, ("Ill-formed npe value for environment variable %s.\n" "It should of the form \"%s\".\n", env_var_name, env_var_format) ); *p = 0; s->nproc = atoi( nstr ); nstr = p+1; MYASSERT( 0 < s->nproc, ("npe value %d must be positive in env variable %s.\n" "It should of the form \"%s\".\n", s->nproc, env_var_name, env_var_format ) ); MYASSERT( s->nproc <= MAX_PE, ("npe value %d too large (max=%d) in " "env variable %s.\nIt should of the form \"%s\".\n", s->nproc, MAX_PE, env_var_name, env_var_format) ); for( i = 0; i < s->nproc; ) { char delimit_char = ',', *comma = 0; int ninstances = 1; if( !( comma = strchr( nstr, delimit_char ) ) ) delimit_char = ':'; comma = strchr( nstr, delimit_char ); MYASSERT( comma, ("Ill-formed node %d value for env variable %s.\n" "It should of the form \"%s\".\n", i, env_var_name, env_var_format) ); *comma = 0; if( isdigit(nstr[0]) ) /*Encoding as n@host, for n instances of host*/ { char *at = strchr( nstr, '@' ); MYASSERT( at && strlen(at+1) > 0, ("Ill-formed node name %d value for env variable %s.\n" "It should of the form \"%s\".\n", i, env_var_name, env_var_format) ); *at = 0; ninstances = atoi(nstr); nstr = at+1; MYASSERT( ninstances+i <= s->nproc, ("Ill-formed node name %d value for env variable %s.\n" "It should of the form \"%s\".\n", i, env_var_name, env_var_format ) ); } if( nstr[0] == '#' ) nstr++;if(0){printf("Detected %d instance(s) of \"%s\" in %s\n",ninstances,nstr,env_var_name);fflush(stdout);} if( nstr[0] ) { #define ADDNODENAME(_nn) do{ \ int inst = 0; \ for( inst = 0; inst < ninstances; inst++ ) { \if(0){printf("Adding nodename[%d] = \"%s\"\n",i,_nn);fflush(stdout);}\ MYASSERT(i<MAX_PE,("%s",_nn)); \ strcpy(s->node_names[i++],_nn); \ } \ }while(0) char oparen_char = '(', cparen_char = ')'; if( !strchr( nstr, oparen_char ) ) { /*Normal hostname*/ ADDNODENAME( nstr ); } else /*Encoding as hn(i-j,...) representing hni..hnj*/ { char *prefix = 0, *oparen=0, *cparen=0, *ids=0; prefix = nstr; oparen = strchr( prefix, oparen_char ); cparen = strchr( oparen, cparen_char ); MYASSERT( cparen, ("Mismatched parens") ); MYASSERT( cparen+1==comma, ("Trailing chars after parens") ); *oparen = 0; *cparen = 0; ids = oparen+1; while(*ids && ids<cparen) { int rangei=0, rangej=0, nbytes=0, k=0; sscanf(ids,"%d%n",&rangei,&nbytes); MYASSERT( nbytes > 0, ("\"%s\"",ids) ); MYASSERT( 0 <= rangei, ("%d",rangei) ); rangej = rangei; ids += nbytes; if(*ids && *ids == '-') { ids++; sscanf(ids,"%d%n",&rangej,&nbytes); MYASSERT( 0 <= rangej, ("%d",rangej) ); ids += nbytes; } if(*ids == ';') ids++; MYASSERT( i+(rangej-rangei+1) <= s->nproc, ("%d %d %d %d",i,rangei,rangej,s->nproc) ); for( k = rangei; k <= rangej; k++ ) { char nodename[MAX_HOSTNAMELEN]; sprintf( nodename, "%s%d", prefix, k ); ADDNODENAME( nodename ); } } MYASSERT( !(*ids) && ids==cparen, ("%s",ids) ); } } nstr = comma+1; } if( (p = strchr( nstr, ':' )) ) *p = 0; s->my_index = atoi( nstr ); nstr = p ? p+1 : nstr+strlen(nstr);if(1){fprintf( stdout, "\tMyIndex=%d\n", s->my_index ); fflush(stdout);} MYASSERT( 0 <= s->my_index && s->my_index < s->nproc, ("Ill-formed my_index value %d for environment variable %s.\n" "It should of the form \"%s\".\n", s->my_index, env_var_name, env_var_format) ); } /* Node information */ { char *nprocs_str = malloc( sizeof(char) * 100 ); char *id_str = malloc( sizeof(char) * 100 ); sprintf( nprocs_str, "FM_NUMNODES=%d", s->nproc ); putenv( nprocs_str ); sprintf( id_str, "FM_NODEID=%d", s->my_index ); putenv( id_str ); for( i = 0; i < s->nproc; i++ ) { const char *nm = s->node_names[i]; char *node_str = malloc( sizeof(char) * (strlen(nm)+100) ); sprintf( node_str, "FM_NODENAME_%d=%s", i, nm ); putenv( node_str ); } }}/*---------------------------------------------------------------------------*/void FM_setenv_nodenames( const NodeInfo *s ){ int i = 0; char *nprocs_str = malloc( sizeof(char) * 20 ); char *id_str = malloc( sizeof(char) * 20 ); sprintf( nprocs_str, "FM_NUMNODES=%d", s->nproc ); putenv( nprocs_str ); sprintf( id_str, "FM_NODEID=%d", s->my_index ); putenv( id_str ); for( i = 0; i < s->nproc; i++ ) { const char *nm = s->node_names[i]; char *node_str = malloc( sizeof(char) * (strlen(nm)+20) ); sprintf( node_str, "FM_NODENAME_%d=%s", i, nm ); putenv( node_str ); }}/*---------------------------------------------------------------------------*/
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -