📄 fm.c
字号:
/*---------------------------------------------------------------------------*//* Portable mixed communication FM implementation. *//* Author(s): Kalyan Perumalla <www.cc.gatech.edu/~kalyan> 20July2001 *//* Radically Modified: Kalyan Perumalla<www.cc.gatech.edu/~kalyan> 15July2002*//* $Revision: 1.9 $ $Name: v26apr05 $ $Date: 2005/02/03 14:56:23 $ *//*---------------------------------------------------------------------------*/#include <stdio.h>#include <stdlib.h>#include <string.h>#include <ctype.h>#include "mycompat.h"#include "fmshm.h"#include "fmgm.h"#include "fmtcp.h"#include "fmmpi.h"#include "fm.h"/*---------------------------------------------------------------------------*/#define FMMAXPE MAX_PE/*---------------------------------------------------------------------------*/#define MAX(a,b) (((a)>(b)) ? (a) : (b))/*---------------------------------------------------------------------------*/#define FMMAXPIECES MAX( MAX(SHMMAXPIECES, GMMAXPIECES), TCPMAXPIECES )#define FMMAXPIECELEN MAX( MAX(SHMMAXPIECELEN, GMMAXPIECELEN), TCPMAXPIECELEN )/*---------------------------------------------------------------------------*/#define FMMAXHOSTNAMELEN 64typedef char FMNodeName[FMMAXHOSTNAMELEN];/*---------------------------------------------------------------------------*/struct XFaceStruct;typedef struct XFaceStruct XFace;typedef void *XFaceStream;/*---------------------------------------------------------------------------*/typedef enum{ FM_XFACE_CLASS_SHM, /*Shared memory*/ FM_XFACE_CLASS_MYR, /*Myrinet*/ FM_XFACE_CLASS_TCP, /*TCP LAN or WAN*/ FM_XFACE_CLASS_MPI, /*MPI*/ FM_XFACE_CLASS_NUM, /*Total number of interface classes*/ FM_XFACE_CLASS_NUL, /*Void; drop it on the floor*/ FM_XFACE_CLASS_BAD, /*Invalid class type*/} XFaceClassTag;/*---------------------------------------------------------------------------*/#define LINK_COST_SHM 10#define LINK_COST_MYR 40#define LINK_COST_LAN 100#define LINK_COST_WAN 1000#define LINK_COST_MPI 20#define LINK_COST_INF 1e10/*---------------------------------------------------------------------------*/typedef enum{ FM_SUBNET_SHM, /*Shared memory*/ FM_SUBNET_MYR, /*Myrinet*/ FM_SUBNET_LAN, /*TCP local area*/ FM_SUBNET_WAN, /*TCP wide area*/ FM_SUBNET_MPI, /*MPI virtual cluster*/ FM_SUBNET_NUM, /*Number of subnet types*/ FM_SUBNET_BAD, /*Invalid subnet type*/} SubnetType;/*---------------------------------------------------------------------------*/#define XFACE_CLASS_STR(ct) \ ((ct)==FM_XFACE_CLASS_SHM ? "SHM" : \ ((ct)==FM_XFACE_CLASS_MYR ? "MYR" : \ ((ct)==FM_XFACE_CLASS_TCP ? "TCP" : \ ((ct)==FM_XFACE_CLASS_MPI ? "MPI" : \ ((ct)==FM_XFACE_CLASS_NUL ? "NUL" : \ ((ct)==FM_XFACE_CLASS_BAD ? "BAD" : \ "???" ))))))/*---------------------------------------------------------------------------*/#define SUBNET_STR_TO_TYPE(sn) \ (!strcmp((sn),"SHM") ? FM_SUBNET_SHM :\ (!strcmp((sn),"MYR") ? FM_SUBNET_MYR :\ (!strcmp((sn),"LAN") ? FM_SUBNET_LAN :\ (!strcmp((sn),"WAN") ? FM_SUBNET_WAN :\ (!strcmp((sn),"MPI") ? FM_SUBNET_MPI :\ FM_SUBNET_BAD)))))/*---------------------------------------------------------------------------*/#define SUBNET_TYPE_TO_XFACE_CLASS(sn) \ ((sn)==FM_SUBNET_SHM ? FM_XFACE_CLASS_SHM :\ ((sn)==FM_SUBNET_MYR ? FM_XFACE_CLASS_MYR :\ ((sn)==FM_SUBNET_LAN ? FM_XFACE_CLASS_TCP :\ ((sn)==FM_SUBNET_WAN ? FM_XFACE_CLASS_TCP :\ ((sn)==FM_SUBNET_MPI ? FM_XFACE_CLASS_MPI :\ FM_XFACE_CLASS_NUL)))))/*---------------------------------------------------------------------------*/#define SUBNET_LINK_COST(sn) \ ((sn)==FM_SUBNET_SHM ? LINK_COST_SHM : \ ((sn)==FM_SUBNET_MYR ? LINK_COST_MYR : \ ((sn)==FM_SUBNET_LAN ? LINK_COST_LAN : \ ((sn)==FM_SUBNET_WAN ? LINK_COST_WAN : \ ((sn)==FM_SUBNET_MPI ? LINK_COST_MPI : \ LINK_COST_INF)))))/*---------------------------------------------------------------------------*/typedef struct{ XFaceClassTag xf_cltag; void (*xf_new)(XFace *xf); void (*xf_delete)(XFace *xf); void (*xf_constructor)(XFace *xf, int i, int N, FMNodeName nodenames[]); void (*xf_destructor)(XFace *xf); void (*xf_begin_message)(XFace *xf, XFaceStream *pstream, int, int, int, int, int); void (*xf_send_piece)(XFace *xf, XFaceStream stream, void *buf, int len); void (*xf_end_message)(XFace *xf, XFaceStream stream); int (*xf_extract)(XFace *xf, int max_bytes); int (*xf_num_pieces)(XFace *xf, XFaceStream stream); int (*xf_piece_len)(XFace *xf, XFaceStream stream, int piece_num); void (*xf_recv_piece)(XFace *xf, XFaceStream stream, void *buf, int maxlen); void *xf_callback; int xf_maxpiecelen;} XFaceClass;/*---------------------------------------------------------------------------*/typedef void *XFaceInstance;/*---------------------------------------------------------------------------*/struct XFaceStruct{ char *xf_name; int xf_subnet_id; /*Globally unique ID of the subnet this xface belongs to*/ int xf_sub_id; /*ID of this xface in its subnet; 0 <= x_sub_id < xf_sub_n*/ int xf_sub_n; /*No. of peer nodes in the subnet this xface belongs to*/ int xf_map[FMMAXPE];/*map[i]=xf_sub_id of i if FMID i is in subnet;else=-1*/ int xf_inv_map[FMMAXPE]; /*inv_map[i]=FMID of i'th member; 0<=i<xf_sub_n*/ FMNodeName xf_canname[FMMAXPE];/*canname[i]=canonical name of i'th member*/ XFaceClass *xf_class; /*Ref to this interface's class definition*/ XFaceInstance xf_instance;/*Opaque ref to instance of this interface class*/ int xf_constructed; /*Constructor already called on this interface?*/};/*---------------------------------------------------------------------------*/typedef struct{ int dest_id; /*FM ID of final destination processor*/ int next_hop; /*FM ID of next hop processor*/ XFace *xf; /*Interface via which the next hop processor is reached*/} XFaceNextHopEntry;/*---------------------------------------------------------------------------*/typedef struct{ XFaceNextHopEntry nhop_entry[FMMAXPE];} XFaceNextHopTable;/*---------------------------------------------------------------------------*/int absorb_or_route( int handler, void *in_stream, int src_pe, int src_id, int dest_id);/*---------------------------------------------------------------------------*/void xf_shm_new(XFace *xf){ xf->xf_instance = 0; /*Only one (static) instance supported by SHMFM*/}void xf_shm_delete(XFace *xf){ /*Do nothing*/}void xf_shm_constructor(XFace *xf, int i, int N, FMNodeName nodenames[]){ SHM_initialize( i, N, (SHMCallback*)xf->xf_class->xf_callback );}void xf_shm_destructor(XFace *xf){ SHM_finalize();}void xf_shm_begin_message(XFace *xf, XFaceStream *pstream, int recipient, int length, int handler, int src_id, int dest_id ){ *pstream = SHM_begin_message( recipient, length, handler, src_id, dest_id );}void xf_shm_send_piece(XFace *xf, XFaceStream stream, void *buf, int len){ SHM_send_piece( stream, buf, len );}void xf_shm_end_message(XFace *xf, XFaceStream stream){ SHM_end_message( stream );}int xf_shm_extract(XFace *xf, int max_bytes){ return SHM_extract( max_bytes );}int xf_shm_num_pieces(XFace *xf, XFaceStream stream){ return SHM_numpieces( stream );}int xf_shm_piece_len(XFace *xf, XFaceStream stream, int piece_num){ return SHM_piecelen( stream, piece_num );}void xf_shm_recv_piece(XFace *xf, XFaceStream stream, void *buf, int maxlen){ SHM_receive( buf, stream, maxlen );}int xf_shm_callback(int handler, SHM_stream *stream, int src_pe, int src_id, int dest_id){ return absorb_or_route( handler, stream, src_pe, src_id, dest_id );}/*---------------------------------------------------------------------------*/void xf_myr_new(XFace *xf){ xf->xf_instance = 0; /*Only one (static) instance supported by GMFM*/}void xf_myr_delete(XFace *xf){ /*Do nothing*/}void xf_myr_constructor(XFace *xf, int i, int N, FMNodeName nodenames[]){ int j = 0; char *nms[FMMAXPE]; for(j=0;j<N;j++){nms[j]=nodenames[j];} GM_initialize( i, N, nms, (GMCallback*)xf->xf_class->xf_callback );}void xf_myr_destructor(XFace *xf){ GM_finalize();}void xf_myr_begin_message(XFace *xf, XFaceStream *pstream, int recipient, int length, int handler, int src_id, int dest_id ){ *pstream = GM_begin_message( recipient, length, handler, src_id, dest_id );}void xf_myr_send_piece(XFace *xf, XFaceStream stream, void *buf, int len){ GM_send_piece( stream, buf, len );}void xf_myr_end_message(XFace *xf, XFaceStream stream){ GM_end_message( stream );}int xf_myr_extract(XFace *xf, int max_bytes){ return GM_extract( max_bytes );}int xf_myr_num_pieces(XFace *xf, XFaceStream stream){ return GM_numpieces( stream );}int xf_myr_piece_len(XFace *xf, XFaceStream stream, int piece_num){ return GM_piecelen( stream, piece_num );}void xf_myr_recv_piece(XFace *xf, XFaceStream stream, void *buf, int maxlen){ GM_receive( buf, stream, maxlen );}int xf_myr_callback(int handler, GM_stream *stream, int src_pe, int src_id, int dest_id){ return absorb_or_route( handler, stream, src_pe, src_id, dest_id );}/*---------------------------------------------------------------------------*/void xf_tcp_new(XFace *xf){ xf->xf_instance = 0;}void xf_tcp_delete(XFace *xf){ /*Do nothing*/}void xf_tcp_constructor(XFace *xf, int i, int N, FMNodeName nodenames[]){ int j = 0; char *nms[FMMAXPE]; for(j=0;j<N;j++){nms[j]=nodenames[j];} TCP_initialize( nms, i, N, (TCPCallback*)xf->xf_class->xf_callback );}void xf_tcp_destructor(XFace *xf){ TCP_finalize();}void xf_tcp_begin_message(XFace *xf, XFaceStream *pstream, int recipient, int length, int handler, int src_id, int dest_id ){ *pstream = TCP_begin_message( recipient, length, handler, src_id, dest_id );}void xf_tcp_send_piece(XFace *xf, XFaceStream stream, void *buf, int len){ TCP_send_piece( stream, buf, len );}void xf_tcp_end_message(XFace *xf, XFaceStream stream){ TCP_end_message( stream );}int xf_tcp_extract(XFace *xf, int max_bytes){ return TCP_extract( max_bytes );}int xf_tcp_num_pieces(XFace *xf, XFaceStream stream){ return TCP_numpieces( stream );}int xf_tcp_piece_len(XFace *xf, XFaceStream stream, int piece_num){ return TCP_piecelen( stream, piece_num );}void xf_tcp_recv_piece(XFace *xf, XFaceStream stream, void *buf, int maxlen){ TCP_receive( buf, stream, maxlen );}int xf_tcp_callback(int handler, TCP_stream *stream, int src_pe, int src_id, int dest_id){ return absorb_or_route( handler, stream, src_pe, src_id, dest_id );}/*---------------------------------------------------------------------------*/void xf_mpi_new(XFace *xf){ xf->xf_instance = 0; /*Only one (static) instance supported by FMMPIFM*/}void xf_mpi_delete(XFace *xf){ /*Do nothing*/}void xf_mpi_constructor(XFace *xf, int i, int N, FMNodeName nodenames[]){ FMMPI_initialize( i, N, (FMMPICallback*)xf->xf_class->xf_callback );}void xf_mpi_destructor(XFace *xf){ FMMPI_finalize();}void xf_mpi_begin_message(XFace *xf, XFaceStream *pstream, int recipient, int length, int handler, int src_id, int dest_id ){ *pstream = FMMPI_begin_message(recipient, length, handler, src_id, dest_id);}void xf_mpi_send_piece(XFace *xf, XFaceStream stream, void *buf, int len){ FMMPI_send_piece( stream, buf, len );}void xf_mpi_end_message(XFace *xf, XFaceStream stream){ FMMPI_end_message( stream );}int xf_mpi_extract(XFace *xf, int max_bytes){ return FMMPI_extract( max_bytes );}int xf_mpi_num_pieces(XFace *xf, XFaceStream stream){ return FMMPI_numpieces( stream );}int xf_mpi_piece_len(XFace *xf, XFaceStream stream, int piece_num){ return FMMPI_piecelen( stream, piece_num );}void xf_mpi_recv_piece(XFace *xf, XFaceStream stream, void *buf, int maxlen){ FMMPI_receive( buf, stream, maxlen );}int xf_mpi_callback(int handler, FMMPI_stream *stream, int src_pe, int src_id, int dest_id){ return absorb_or_route( handler, stream, src_pe, src_id, dest_id );}/*---------------------------------------------------------------------------*/static XFaceClass xf_classes[FM_XFACE_CLASS_NUM] ={ { FM_XFACE_CLASS_SHM, xf_shm_new, xf_shm_delete, xf_shm_constructor, xf_shm_destructor, xf_shm_begin_message, xf_shm_send_piece, xf_shm_end_message, xf_shm_extract, xf_shm_num_pieces, xf_shm_piece_len, xf_shm_recv_piece, xf_shm_callback, SHMMAXPIECELEN }, { FM_XFACE_CLASS_MYR, xf_myr_new, xf_myr_delete, xf_myr_constructor, xf_myr_destructor, xf_myr_begin_message, xf_myr_send_piece, xf_myr_end_message, xf_myr_extract, xf_myr_num_pieces, xf_myr_piece_len, xf_myr_recv_piece, xf_myr_callback, GMMAXPIECELEN }, { FM_XFACE_CLASS_TCP, xf_tcp_new, xf_tcp_delete, xf_tcp_constructor, xf_tcp_destructor, xf_tcp_begin_message, xf_tcp_send_piece, xf_tcp_end_message, xf_tcp_extract, xf_tcp_num_pieces, xf_tcp_piece_len, xf_tcp_recv_piece, xf_tcp_callback, TCPMAXPIECELEN }, { FM_XFACE_CLASS_MPI, xf_mpi_new, xf_mpi_delete, xf_mpi_constructor, xf_mpi_destructor, xf_mpi_begin_message, xf_mpi_send_piece, xf_mpi_end_message, xf_mpi_extract, xf_mpi_num_pieces, xf_mpi_piece_len, xf_mpi_recv_piece, xf_mpi_callback, FMMPIMAXPIECELEN }};/*---------------------------------------------------------------------------*/typedef struct{ int numnodes; /*Number of nodes in this group*/ FMNodeName canname[FMMAXPE]; /*Canonical hostnames of nodes in this group*/ int fmnodeid[FMMAXPE]; /*FM IDs of nodes in this group*/} NodeGroup;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -