📄 comm_mpi.c
字号:
#include <stdlib.h>#include <stdio.h>#include <string.h>#include <sys/types.h>#include <sys/times.h>#include <time.h>#include <mpi.h>#include "fastDNAml_types.h"#include "fastDNAml_funcs.h"#include "fastDNAml_globals.h"extern int monitor_id;extern int foreman_id;extern int master_id;extern int first_worker_id;#define MAXPROCS 1024int mpi_err;int source;int tag;extern proc_data proc[];MPI_Status status;/* A bug in MPICH: The datatype, blocklength, and displacement arrays * must be larger than needed. I just set them to 16 here. */MPI_Datatype Stattype = MPI_DATATYPE_NULL;MPI_Datatype Proctype = MPI_DATATYPE_NULL;MPI_Datatype sdt[16], pdt[16]; /* data types */int sdbl[16], pdbl[16]; /* block lengths */MPI_Aint sdd[16], pdd[16]; /* displacements */MPI_Aint base;/******************************************************************************* *//* Initialize MPI. */void process_init(int argc, char **argv, proc_data *pp){ /* process_init */ int i,n,nprocs; int *prog; int size,extent; stat_data zstat; proc_data zproc; pp->stats.tstart = pp->stats.t0 = dwalltime00(); gethostname(pp->hostname,HOST_NAME_LEN); pp->progtype = myprogtype; MPI_Init(&argc,&argv); MPI_Comm_rank(MPI_COMM_WORLD,&pp->tid); pp->state = DNAML_SPAWNED; MPI_Comm_size(MPI_COMM_WORLD,&nprocs); nworkers = 0; if(!get_args(argc,argv,(boolean)1)) bail(argv[0],99); prog = (int*)malloc(nprocs*sizeof(int)); MPI_Allgather(&myprogtype,1,MPI_INT,prog,1,MPI_INT,MPI_COMM_WORLD); for(n=0;n<nprocs;n++) { switch(prog[n]) { case DNAML_MONITOR: monitor_id = n; break; case DNAML_FOREMAN: foreman_id = n; break; case DNAML_MASTER: master_id = n; break; case DNAML_WORKER: if( myprogtype == DNAML_MONITOR || myprogtype == DNAML_FOREMAN ) { proc[nworkers].tid = n; proc[nworkers].progtype = DNAML_WORKER; proc[nworkers].state = DNAML_SPAWNED; } nworkers++; if(first_worker_id = INVALID_ID || n<first_worker_id) first_worker_id = n; break; default: break; } } free(prog); if( master_id ==INVALID_ID) bail(NULL,ERR_NO_MASTER); if( foreman_id ==INVALID_ID) bail(NULL,ERR_NO_FOREMAN); if(first_worker_id==INVALID_ID) bail(NULL,ERR_NO_WORKERS);/* Construct MPI data type for the stat_data struct. */ sdt[0]=MPI_DOUBLE; sdbl[0]=1; MPI_Address(&zstat.tstart, sdd ); sdt[1]=MPI_DOUBLE; sdbl[1]=1; MPI_Address(&zstat.tinput, sdd+1); sdt[2]=MPI_DOUBLE; sdbl[2]=1; MPI_Address(&zstat.t0, sdd+2); sdt[3]=MPI_DOUBLE; sdbl[3]=1; MPI_Address(&zstat.t1, sdd+3); sdt[4]=MPI_DOUBLE; sdbl[4]=1; MPI_Address(&zstat.utime, sdd+4); sdt[5]=MPI_DOUBLE; sdbl[5]=1; MPI_Address(&zstat.stime, sdd+5); sdt[6]=MPI_INT; sdbl[6]=1; MPI_Address(&zstat.ntrees, sdd+6); base=sdd[0]; for(i=0;i<7;i++) sdd[i] -= base; MPI_Type_struct(7,sdbl,sdd,sdt,&Stattype); MPI_Type_commit(&Stattype);/* MPI_Type_size(Stattype,&size); MPI_Type_extent(Stattype,&extent); fprintf(stderr,"%2.2d-%s: Stattype - size=%d extent=%d\n", pp->tid,argv[0],size,extent);*//* Construct MPI data type for the proc_data struct. */ pdt[0]=MPI_CHAR; pdbl[0]=HOST_NAME_LEN; MPI_Address(zproc.hostname,pdd); pdt[1]=MPI_INT; pdbl[1]=1; MPI_Address(&zproc.progtype, pdd+1); pdt[2]=MPI_INT; pdbl[2]=1; MPI_Address(&zproc.tid, pdd+2); pdt[3]=MPI_INT; pdbl[3]=1; MPI_Address(&zproc.state, pdd+3); pdt[4]=MPI_DOUBLE; pdbl[4]=1; MPI_Address(&zproc.t0, pdd+4); pdt[5]=Stattype; pdbl[5]=1; MPI_Address(&zproc.stats, pdd+5); base=pdd[0]; for(i=0;i<6;i++) pdd[i] -= base; MPI_Type_struct(6,pdbl,pdd,pdt,&Proctype); MPI_Type_commit(&Proctype);/* MPI_Type_size(Proctype,&size); MPI_Type_extent(Proctype,&extent); fprintf(stderr,"%2.2d-%s: Proctype - size=%d extent=%d\n", pp->tid,argv[0],size,extent);*/ return;} /* process_init *//* Spawn ntask instances of program task on host where. * Currently using MPI-1, which has no dynamic process management. */int spawn(char *task, int ntask, char *where){ /* spawn */ int numt; numt = 0; return numt;} /* spawn *//* Finialize MPI and exit. */void bail(char *source, int err_code){ /* bail */ if(Proctype != MPI_DATATYPE_NULL) MPI_Type_free(&Proctype); if(Stattype != MPI_DATATYPE_NULL) MPI_Type_free(&Stattype); switch(err_code) { case 0: break; case ERR_TIMEOUT: fprintf(stderr,"%s: timed out\n",source); break; case ERR_NO_MASTER: fprintf(stderr,"no master program running\n"); MPI_Abort(MPI_COMM_WORLD,1); break; case ERR_NO_FOREMAN: fprintf(stderr,"no foreman program running\n"); MPI_Abort(MPI_COMM_WORLD,1); break; case ERR_NO_WORKERS: fprintf(stderr,"no worker program running\n"); MPI_Abort(MPI_COMM_WORLD,1); break; case ERR_SEQDATA: fprintf(stderr,"%s: cannot get sequence data\n",source); break; case ERR_OUTFILE: fprintf(stderr, "%s: cannot open output file\n",source); break; case ERR_LOGFILE: fprintf(stderr,"%s: cannot open log file\n",source); break; case ERR_DEBUGFILE: fprintf(stderr,"%s: cannot open debug file\n",source); break; case ERR_BAD_MSG_TAG: fprintf(stderr,"%s: unexpected message\n",source); break; case 99: default: fprintf(stderr,"%s: error code %d\n",source,err_code); MPI_Abort(MPI_COMM_WORLD,1); break; } MPI_Finalize(); exit(err_code);} /* bail */void probe_msg(int *from, int *type) { /* probe_msg */ if(*from == ANY_SOURCE) *from = MPI_ANY_SOURCE; if(*type == ANY_TAG ) *type = MPI_ANY_TAG; MPI_Probe(*from,*type,MPI_COMM_WORLD,&status); *from = status.MPI_SOURCE; *type = status.MPI_TAG;} /* probe_msg */void iprobe_msg(int *from, int *type) { /* iprobe_msg */ int got_msg; if(*from == ANY_SOURCE) *from = MPI_ANY_SOURCE; if(*type == ANY_TAG ) *type = MPI_ANY_TAG; MPI_Iprobe(*from,*type,MPI_COMM_WORLD,&got_msg,&status); if(got_msg) { *from = status.MPI_SOURCE; *type = status.MPI_TAG; } else { *from = -1; *type = DNAML_NOMSG; }} /* iprobe_msg */void send_msg(void *buf, int size, int dest, int type){ /* send_msg */ char c; stat_data *sd; switch(type) { case DNAML_WORK: case DNAML_RESULT: case DNAML_SEQ_FILE: case DNAML_SEQ_DATA: case DNAML_SEND_TREE: case DNAML_RECV_TREE: case DNAML_ADD_SPECS: case DNAML_SEQ_DATA_REQUEST: case ERR_SEQDATA: case ERR_MALLOC: case ERR_GENERIC: case ERR_BADTREE: case ERR_BADEVAL: case DNAML_QUIT: case DNAML_STATS_REQUEST: MPI_Send(buf,size,MPI_CHAR,dest,type,MPI_COMM_WORLD); break; case DNAML_DONE: case DNAML_STATS: MPI_Send(buf,1,Stattype,dest,type,MPI_COMM_WORLD); break; case DNAML_ADD_TASK: break; case DNAML_TASK_ADDED: MPI_Send(buf,1,Proctype,dest,type,MPI_COMM_WORLD); break; case DNAML_INPUT_TIME: sd = buf; sd->t1 = sd->tinput = dwalltime00(); MPI_Send(sd,1,Stattype,dest,type,MPI_COMM_WORLD); sd->t0 = sd->t1; break; case DNAML_STEP_TIME: sd = buf; sd->t1 = dwalltime00(); MPI_Send(sd,1,Stattype,dest,type,MPI_COMM_WORLD); sd->t0 = sd->t1; break; case DNAML_NUM_TREE: case DNAML_WORKER_READY: case DNAML_SEQ_DATA_SIZE: case DNAML_AWOL: MPI_Send(buf,size,MPI_INT,dest,type,MPI_COMM_WORLD); break; case DNAML_KILL_TASK: case DNAML_TID_LIST: default: break; }} /* send_msg */void recv_msg(void *buf, int size, int from, int type){ /* recv_msg */ int source,tag; int i; stat_data *sd; char string[MPI_MAX_ERROR_STRING]; source = from==ANY_SOURCE ? MPI_ANY_SOURCE : from; tag = type==ANY_TAG ? MPI_ANY_TAG : type; switch(tag) { case DNAML_WORK: case DNAML_RESULT: case DNAML_SEQ_FILE: case DNAML_SEQ_DATA: case DNAML_SEND_TREE: case DNAML_RECV_TREE: case DNAML_ADD_SPECS: case DNAML_SEQ_DATA_REQUEST: case ERR_SEQDATA: case ERR_MALLOC: case ERR_GENERIC: case ERR_BADTREE: case ERR_BADEVAL: case DNAML_QUIT: case DNAML_STATS_REQUEST: MPI_Recv(buf,size,MPI_CHAR,source,tag,MPI_COMM_WORLD,&status); break; case DNAML_DONE: case DNAML_STATS: mpi_err = MPI_Recv(buf,1,Stattype,source,tag,MPI_COMM_WORLD,&status); break; case DNAML_ADD_TASK: break; case DNAML_TASK_ADDED: mpi_err = MPI_Recv(buf,1,Proctype,source,tag,MPI_COMM_WORLD,&status); break; case DNAML_INPUT_TIME: case DNAML_STEP_TIME: sd=buf; mpi_err = MPI_Recv(sd,1,Stattype,source,tag,MPI_COMM_WORLD,&status); break; case DNAML_NUM_TREE: case DNAML_WORKER_READY: case DNAML_SEQ_DATA_SIZE: case DNAML_AWOL: MPI_Recv(buf,size,MPI_INT,source,tag,MPI_COMM_WORLD,&status); break; case DNAML_KILL_TASK: case DNAML_TID_LIST: default: break; }} /* recv_msg */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -