📄 fm.c
字号:
/*---------------------------------------------------------------------------*/typedef struct{ int ngroups; /*Number of groups*/ NodeGroup group[FMMAXPE];/*Information for each group*/} NodeGroupMap;/*---------------------------------------------------------------------------*/typedef struct{ /*If xf_cltag!=NUL, then i->j has direct link, using xf_cltag interface*/ /*If xf_cltag==NUL, then i->j is reached via i->next_hop*/ XFaceClassTag xf_cltag;/*Type of subnet to which this link xface belongs*/ int subnet_id; /*ID of subnet to which this link xface belongs*/ int next_hop; /*FM node ID of next processor to route via*/ double cost; /*Link cost (say, latency); smaller is better*/} AdjEntry;/*---------------------------------------------------------------------------*/typedef struct{ AdjEntry matrix[FMMAXPE][FMMAXPE]; /*Info on reaching from FM nodes i to j*/} AdjMatrix;/*---------------------------------------------------------------------------*/typedef struct{ AdjMatrix adj; /*How to reach from one FM node to another*/ int tot_subnets; /*Total number of ALL subnets in the network*/ NodeGroupMap grps[FM_SUBNET_NUM]; /*Grouping info for ALL subnets*/ NodeGroup *subnets[FMMAXPE]; /*Pointers to group info indexed by subnet_id*/ XFaceNextHopTable nhop_table; /*Next hop info from this PE to others*/} Network;/*---------------------------------------------------------------------------*/typedef struct{ int dbg; #define MAX_XFACES 10 int nxfaces; /*Actual number of interfaces instantiated at this processor*/ XFace xfaces[MAX_XFACES]; /*Info for each instantiated xface at this proc.*/ Network net; /*Info about entire network*/ FMNodeName FM_nodenames[FMMAXPE]; /*Hostnames of all nodes*/ int next_handler; /*Which is the next free slot in FM handler table*/ struct { struct { FM_stream stream; /*Currently active stream*/ XFace *xf; /*Currently active interface*/ } in, out; /*Incoming, Outgoing*/ } curr;} MixFMData;/*---------------------------------------------------------------------------*/ULONG FM_nodeid;ULONG FM_numnodes;FM_handler **FM_handler_table;/*---------------------------------------------------------------------------*/static MixFMData *mixfm = 0;/*---------------------------------------------------------------------------*/#define TCP_AVAILABLE 1static int use_shm = #if SHM_AVAILABLE 1 #else 0 #endif ;static int use_gm = #if GM_AVAILABLE 1 #else 0 #endif ;static int use_tcp = #if TCP_AVAILABLE 1 #else 0 #endif ;static int use_mpi = #if MPI_AVAILABLE 1 #else 0 #endif ;/*---------------------------------------------------------------------------*//* *//*---------------------------------------------------------------------------*/int absorb_or_route( int handler, void *in_stream, int src_pe, int src_id, int dest_id){ MYASSERT( src_id != FM_nodeid, ("Loop detected") ); MYASSERT( 0 <= src_id && src_id < FM_numnodes, ("%d",src_id) ); MYASSERT( 0 <= dest_id && dest_id < FM_numnodes, ("%d",dest_id) ); mixfm->curr.in.stream = in_stream; if( dest_id == FM_nodeid ) { /*Destined to self; call the local handler*/ FM_handler *hf = FM_handler_table[handler]; MYASSERT( hf, ("Handler function %d must exist",handler) ); hf( in_stream, src_id ); } else { /*Destined to some other node; route towards it*/ XFaceNextHopEntry *nhe =&mixfm->net.nhop_table.nhop_entry[dest_id]; int nh = nhe->next_hop; mixfm->curr.out.xf = nhe->xf;if(mixfm->dbg>=3){printf("Forwarding msg from %d via %d to %d\n",src_id,nh,dest_id);} /*Copy pieces from incoming interface to outgoing interface*/ { XFaceStream out_stream = 0; int p = 0, np = 0, dest_pe = 0, maxlen = 0; XFace *in_xf = mixfm->curr.in.xf, *out_xf = mixfm->curr.out.xf; MYASSERT( in_xf && out_xf, ("%p %p",in_xf,out_xf) ); maxlen = out_xf->xf_class->xf_maxpiecelen; dest_pe = out_xf->xf_map[nh]; np = in_xf->xf_class->xf_num_pieces( in_xf, in_stream ); MYASSERT( 0 <= np && np <= FMMAXPIECES, ("%d %d",np,FMMAXPIECES) ); out_xf->xf_class->xf_begin_message( out_xf, &out_stream, dest_pe, maxlen, handler, src_id, dest_id ); mixfm->curr.out.stream = out_stream; for( p = 0; p < np; p++ ) { typedef char ReceiveBuf[FMMAXPIECELEN]; ReceiveBuf buf[FMMAXPIECES]; int piecelen = in_xf->xf_class->xf_piece_len(in_xf, in_stream, p); MYASSERT( 0 < piecelen && piecelen <= FMMAXPIECELEN, ("%d %d",piecelen,FMMAXPIECELEN) ); in_xf->xf_class->xf_recv_piece(in_xf, in_stream, buf[p], piecelen); out_xf->xf_class->xf_send_piece(out_xf, out_stream, buf[p], piecelen); } out_xf->xf_class->xf_end_message( out_xf, out_stream ); } mixfm->curr.out.xf = 0; mixfm->curr.out.stream = 0; } return FM_CONTINUE;}/*---------------------------------------------------------------------------*/static void print_xface( XFace *xf ){ int i = 0; printf("Xface name=\"%s\" subnet_id=%d sub_id=%d sub_n=%d\n", xf->xf_name, xf->xf_subnet_id, xf->xf_sub_id, xf->xf_sub_n); printf("\tinv_map={"); for( i = 0; i < xf->xf_sub_n; i++ ) { printf("%s%d",(i>0?", ":""),xf->xf_inv_map[i]); } printf("}\n"); printf("\tcanname={"); for( i = 0; i < xf->xf_sub_n; i++ ) { printf("%s\"%s\"",(i>0?", ":""),xf->xf_canname[i]); } printf("}\n");}/*---------------------------------------------------------------------------*/static void config( void ){ int i = 0, val = 0; char *estr = 0; estr = getenv("FM_NUMNODES"); MYASSERT( estr, ("FM: Expected env var FM_NUMNODES") ); FM_numnodes = atoi( estr ); MYASSERT( 0 < FM_numnodes && FM_numnodes <= FMMAXPE, ("FM: Bad FM_NPROCS=%lu; must be in [1..%d]", FM_numnodes,FMMAXPE)); estr = getenv("FM_NODEID"); MYASSERT( estr, ("FM: Expected env var FM_NODEID") ); val = atoi( estr ); MYASSERT( 0 <= val && val < FM_numnodes, ("FM: Bad FM_NODEID=%d; must be in [0..%lu]", val, FM_numnodes-1) ); FM_nodeid = val; for( i = 0; i < FM_numnodes; i++ ) { char temp[1000], *fmn = mixfm->FM_nodenames[i]; sprintf( temp, "FM_NODENAME_%d", i ); estr = getenv(temp); MYASSERT( estr, ("FM: Expected env var %s",temp) ); strncpy( fmn, estr, FMMAXHOSTNAMELEN ); fmn[FMMAXHOSTNAMELEN-1] = 0;if(mixfm->dbg>=2){printf("MIXFM: Node[%d]=\"%s\"\n", i, fmn);fflush(stdout);} }if(mixfm->dbg>=1){printf("FM_nodeid=%lu, FM_numnodes=%lu\n",FM_nodeid,FM_numnodes);fflush(stdout);}}/*---------------------------------------------------------------------------*/static void group_mpi_nodes( void ){ int i = 0; NodeGroupMap *mpigrps = &mixfm->net.grps[FM_SUBNET_MPI]; NodeGroup *newgrp = &mpigrps->group[0]; mpigrps->ngroups = 1; newgrp->numnodes = 0; for( i = 0; i < FM_numnodes; i++ ) { int memberid = newgrp->numnodes++; MYASSERT( 0 <= memberid && memberid <= FMMPIMAXPE-1, ("%d",memberid) ); newgrp->fmnodeid[memberid] = i; strcpy( newgrp->canname[memberid], mixfm->FM_nodenames[i] ); }}/*---------------------------------------------------------------------------*/static void group_shm_nodes( void ){ int i = 0, g = 0; NodeGroupMap *shmgrps = &mixfm->net.grps[FM_SUBNET_SHM]; shmgrps->ngroups = 0; for( i = 0; i < FM_numnodes; i++ ) { int groupid = -1, memberid = -1; const char *fmnm = mixfm->FM_nodenames[i]; NodeGroup *addtogrp = 0; for( g = 0; g < shmgrps->ngroups; g++ ) { NodeGroup *grp = &shmgrps->group[g]; if( !strcmp( grp->canname[0], fmnm ) ) { if( use_shm ) { addtogrp = grp; groupid = g; break; } } } if( !addtogrp ) { NodeGroup *newgrp = &shmgrps->group[shmgrps->ngroups]; char *nm = newgrp->canname[0]; strcpy( nm, fmnm ); newgrp->numnodes = 0; addtogrp = newgrp; groupid = shmgrps->ngroups++;if(mixfm->dbg>=2){printf("Detected new group[%d] \"%s\"\n",groupid,nm);} } MYASSERT( addtogrp, ("!") ); MYASSERT( 0 <= addtogrp->numnodes && addtogrp->numnodes <= SHMMAXPE-1, ("%d",addtogrp->numnodes) ); memberid = addtogrp->numnodes++; addtogrp->fmnodeid[memberid] = i; if(memberid!=0) strcpy( addtogrp->canname[memberid], addtogrp->canname[0] );if(mixfm->dbg>=2){printf("Node %d added to group %d\n",i,groupid);} } for( g = 0; g < shmgrps->ngroups; ) { NodeGroup *grp = &shmgrps->group[g];if(mixfm->dbg>=3){printf("Group %d has %d nodes\n",g,grp->numnodes);} if( grp->numnodes > 1 ) { g++; } else { int g2 = 0;if(mixfm->dbg>=3){printf("Removing group %d\n",g);} for( g2 = g+1; g2 < shmgrps->ngroups; g2++ ) { NodeGroup *grp_other = &shmgrps->group[g2]; *grp = *grp_other; } --shmgrps->ngroups; } }if(mixfm->dbg>=2){printf("Number of non-singleton shm groups %d\n",shmgrps->ngroups);}}/*---------------------------------------------------------------------------*/static void read_network_file( void ){ FILE *fp = 0; char *net_filename = getenv("FM_NETFILE"); if( !net_filename ) net_filename = "net.txt"; fp = fopen( net_filename, "r" ); if( !fp ) { int i = 0; SubnetType nettype = SUBNET_STR_TO_TYPE("LAN"); NodeGroupMap *grps = &mixfm->net.grps[nettype]; NodeGroup *grp = &grps->group[grps->ngroups++];if(mixfm->dbg>=0){printf("Note: No network description file \"%s\".\nAssuming shared memory & LAN (TCP) connectivity among all nodes...\n",net_filename);} /*Add LAN (TCP) connectivity among leader nodes of shm groups*/ grp->numnodes = 0; for( i = 0; i < FM_numnodes; i++ ) { int j = 0; /*See if we already added the group leader of node i's shm group*/ for( j = 0; j < i; j++ ) { if( use_shm ) { if( !strcmp( mixfm->FM_nodenames[j], mixfm->FM_nodenames[i] ) ) break; } } if( j < i ) { /*Already added; skip adding this node to the LAN group*/ } else { /*Add this first/leader node of an shm group to the LAN group*/ int k = grp->numnodes++; MYASSERT( k < FMMAXPE, ("Too many nodes %d",k) ); strcpy( grp->canname[k], mixfm->FM_nodenames[i] ); grp->fmnodeid[k] = i; } } } else { char line[10000]; int linenum = 0; int offset = 0;if(mixfm->dbg>=0){printf("Using network description file \"%s\"...\n",net_filename);} while( fgets( line+offset, sizeof(line)-offset, fp ) ) { char nettype_str[1000]; int nbytes = 0, nscanned = 0; SubnetType nettype; ++linenum; if( line[0] == 0 ) { continue; } /*Ignore empty line*/ if( line[strlen(line)-1] == '\n' ) /*Chop off trailing newline*/ { line[strlen(line)-1] = 0; } if( line[0] == 0 ) { continue; } /*Ignore empty line*/ if( line[strlen(line)-1] == '\\' ) /*Line break & continuation*/ { offset = strlen(line)-1; continue; } offset = 0; if( line[0] == 0 ) { continue; } /*Ignore empty line*/ { char *p; if( (p=strchr(line,'#')) ) *p = 0; } /*Strip comments*/ { char *p=line+strlen(line); while(*p && isspace(*p))*p--=0; } /*Trim any trailing whitespace*/ if( line[0] == 0 ) { continue; } /*Ignore empty line*/ nscanned = sscanf(line, "%s%n", nettype_str, &nbytes); MYASSERT( nscanned >= 1, ("In file \"%s\" line %d: " "valid subnet type required", net_filename, linenum) ); nettype = SUBNET_STR_TO_TYPE(nettype_str);if(mixfm->dbg>=1){printf("Read subnet type \"%s\"\n",nettype_str);} MYASSERT( nettype != FM_SUBNET_BAD, ("Unknown subnet type \"%s\" in file \"%s\" line %d", nettype_str, net_filename, linenum) ); MYASSERT( nettype != FM_SUBNET_SHM, ("In file \"%s\" line %d: SHM subnet is implicit and " "hence must not be specified", net_filename, linenum) ); /*Now parse off hostnames: "hostname[=othername] ..."*/ /*The "othername" is useful for specifying alternative, */ /*subnet-specific name for each node*/ { int i = 0; NodeGroupMap *grps = &mixfm->net.grps[nettype]; NodeGroup *grp = &grps->group[grps->ngroups++]; char *hnamestr = line + nbytes; grp->numnodes = 0; while( hnamestr[0] ) { char *p = 0; FMNodeName hname, othername; hname[0] = 0; sscanf(hnamestr, "%s%n", hname, &nbytes); hnamestr += nbytes; if( !hname ) break; p = strchr( hname, '=' ); if( p ) *p = 0; strcpy( othername, p ? p+1 : hname );if(mixfm->dbg>=1){printf("Read subnet member \"%s\"=\"%s\"\n",hname,othername);} /*Add this node to the subnet group, if it is included*/ { for( i = 0; i < FM_numnodes; i++ ) { if( !strcmp( mixfm->FM_nodenames[i], hname ) ) break; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -