📄 comm_pvm.c
字号:
#include <stdlib.h>#include <stdio.h>#include <string.h>#include <sys/types.h>#include <sys/times.h>#include <time.h>#include <pvm3.h>#include "fastDNAml_types.h"#include "fastDNAml_funcs.h"#include "fastDNAml_globals.h"extern int monitor_id;extern int foreman_id;extern int master_id;extern int first_worker_id;#define MAXPROCS 1024int pvm_err;int source;int tag;char buf[1];int tid[MAXPROCS];extern proc_data proc[];/******************************************************************************* *//* Initialize process and join the PVM program. */void process_init(int argc, char **argv, proc_data *pp){ /* process_init */ int i,n; int nrequest; char *xargv[12]; pp->stats.tstart = pp->stats.t0 = dwalltime00(); gethostname(pp->hostname,HOST_NAME_LEN); pp->progtype = myprogtype; pp->tid = pvm_mytid(); if(!get_args(argc,argv,(boolean)1)) bail(argv[0],99);/* Set command line arguments that all processes need. * These arguments will be used in calls to pvm_spawn() * to start up the master, foreman and workers. */ n=0; xargv[n]=(char*)malloc(16*sizeof(char)); sprintf(xargv[n++],"-d%d",infol); xargv[n]=(char*)malloc(128*sizeof(char)); sprintf(xargv[n++],"-n%s",run_id); xargv[n]=(char*)malloc(256*sizeof(char)); sprintf(xargv[n++],"-w%s",workdir); switch(myprogtype) { case DNAML_MONITOR: if(ship_seq_data) { xargv[n]=(char*)malloc(16*sizeof(char)); strcpy(xargv[n++],"-s"); } xargv[n]=(char*)malloc(16*sizeof(char)); sprintf(xargv[n++],"-p%d",nworkers); if(seq_file[0]!=0) { xargv[n]=(char*)malloc(128*sizeof(char)); strcpy(xargv[n++],seq_file); } xargv[n]=NULL; if( pvm_spawn("pvm_fastDNAml",xargv,0,NULL,1,&tid[1]) == 0 ) { bail(argv[0],ERR_NO_MASTER); } master_id = tid[1]; break; case DNAML_MASTER: if(ship_seq_data) { xargv[n]=(char*)malloc(16*sizeof(char)); strcpy(xargv[n++],"-s"); } xargv[n]=(char*)malloc(16*sizeof(char)); sprintf(xargv[n++],"-p%d",nworkers); if( (monitor_id = pvm_parent()) != PvmNoParent) { xargv[n]=(char*)malloc(16*sizeof(char)); sprintf(xargv[n++],"-m%x",monitor_id); } else { monitor_id = INVALID_ID; } if(seq_file[0]!=0) { xargv[n]=(char*)malloc(128*sizeof(char)); strcpy(xargv[n++],seq_file); } xargv[n]=NULL; if( pvm_spawn("pvm_foreman",xargv,0,NULL,1,&tid[2]) == 0 ) { bail(argv[0],ERR_NO_FOREMAN); } foreman_id = tid[2]; break; case DNAML_FOREMAN: if( (master_id = pvm_parent()) == PvmNoParent) bail(argv[0],ERR_NO_MASTER); if( monitor_id != INVALID_ID ) { xargv[n]=(char*)malloc(16*sizeof(char)); sprintf(xargv[n++],"-m%x",monitor_id); } xargv[n]=NULL; nrequest = nworkers; if( (nworkers=pvm_spawn("pvm_worker",xargv,0,NULL,nrequest,&tid[0])) == 0 ) { send_msg(NULL,0,master_id,ERR_NO_WORKERS); } first_worker_id = tid[0]; for(i=0;i<nworkers;i++) { proc[i].tid = tid[i]; proc[i].progtype = DNAML_WORKER; proc[i].state = DNAML_SPAWNED; } break; case DNAML_WORKER: if(foreman_id == INVALID_ID) { if( (foreman_id = pvm_parent()) == PvmNoParent) bail(argv[0],ERR_NO_FOREMAN); } break; default: break; } for(i=0;i<n;i++) free(xargv[i]); return;} /* process_init *//* Spawn ntask instances of program task on host where. * Currently, meant only for starting worker tasks. */int spawn(char *task, int ntask, char *where){ /* spawn */ char *xargv[12]; int xtid; int i,n=0; int numt; xargv[n]=(char*)malloc(16*sizeof(char)); sprintf(xargv[n++],"-d%d",infol); xargv[n]=(char*)malloc(128*sizeof(char)); sprintf(xargv[n++],"-n%s",run_id); xargv[n]=(char*)malloc(256*sizeof(char)); sprintf(xargv[n++],"-w%s",workdir); xargv[n]=NULL; if( monitor_id != INVALID_ID ) { xargv[n]=(char*)malloc(16*sizeof(char)); sprintf(xargv[n++],"-m%x",monitor_id); xargv[n]=NULL; } numt = pvm_spawn("pvm_worker",xargv,ntask,where,1,&tid[0]); for(i=nworkers;i<nworkers+numt;i++) { proc[i].tid = tid[i-nworkers]; proc[i].progtype = DNAML_WORKER; proc[i].state = DNAML_SPAWNED; } nworkers += numt; for(i=0;i<n;i++) free(xargv[i]); return numt;} /* spawn *//* Finialize PVM and exit. */void bail(char *source, int err_code){ /* bail */ switch(err_code) { case 0: break; case ERR_TIMEOUT: fprintf(stderr,"%s: timed out\n",source); break; case ERR_NO_MASTER: fprintf(stderr,"no master program running\n"); break; case ERR_NO_FOREMAN: fprintf(stderr,"no foreman program running\n"); break; case ERR_NO_WORKERS: fprintf(stderr,"no worker program running\n"); break; case ERR_SEQDATA: fprintf(stderr,"%s: cannot get sequence data\n",source); break; case ERR_OUTFILE: fprintf(stderr, "%s: cannot open output file\n",source); break; case ERR_LOGFILE: fprintf(stderr,"%s: cannot open log file\n",source); break; case ERR_DEBUGFILE: fprintf(stderr,"%s: cannot open debug file\n",source); break; case ERR_BAD_MSG_TAG: fprintf(stderr,"%s: unexpected message\n",source); break; default: fprintf(stderr,"%s: error code %d\n",source,err_code); } pvm_exit(); exit(err_code);} /* bail */void probe_msg(int *from, int *type) { /* probe_msg */ int bufid,tag,xtid,bytes,i; if(*from == ANY_SOURCE) *from = -1; if(*type == ANY_TAG ) *type = -1; do { bufid = pvm_probe(*from,*type); } while(bufid==0); pvm_bufinfo(bufid,&bytes,&tag,&xtid); *from = xtid; *type = tag; return;} /* probe_msg */void iprobe_msg(int *from, int *type) { /* iprobe_msg */ int bufid,tag,xtid,bytes,i; if(*from == ANY_SOURCE) *from = -1; if(*type == ANY_TAG ) *type = -1; if( (bufid = pvm_probe(*from,*type)) != 0) { pvm_bufinfo(bufid,&bytes,&tag,&xtid); *from = xtid; *type = tag; } else { *from = -1; *type = DNAML_NOMSG; }} /* iprobe_msg */void pvm_pkstatdata(stat_data *s, int nitem, int stride){/*DKB-TODO- currently ignore nitem and stride */ pvm_pkdouble(&s->tstart,1,1); pvm_pkdouble(&s->tinput,1,1); pvm_pkdouble(&s->t0,1,1); pvm_pkdouble(&s->t1,1,1); pvm_pkdouble(&s->utime,1,1); pvm_pkdouble(&s->stime,1,1); pvm_pkint(&s->ntrees,1,1);}void pvm_pkproc(proc_data *p, int nitem, int stride){/*DKB-TODO- currently ignore nitem and stride */ pvm_pkbyte(p->hostname,HOST_NAME_LEN,1); pvm_pkint(&p->progtype,1,1); pvm_pkint(&p->tid,1,1); pvm_pkint(&p->state,1,1); pvm_pkdouble(&p->t0,1,1); pvm_pkstatdata(&p->stats,1,1);}void send_msg(void *buf, int size, int dest, int type){ /* send_msg */ char c; stat_data *sd; switch(type) { case DNAML_WORK: case DNAML_RESULT: case DNAML_SEQ_FILE: case DNAML_SEQ_DATA: pvm_initsend(PvmDataDefault); pvm_pkstr(buf); pvm_send(dest,type); break; case DNAML_SEND_TREE: case DNAML_RECV_TREE: case DNAML_ADD_SPECS: case DNAML_SEQ_DATA_REQUEST: case ERR_SEQDATA: case ERR_MALLOC: case ERR_GENERIC: case ERR_BADTREE: case ERR_BADEVAL: case DNAML_QUIT: case DNAML_STATS_REQUEST: pvm_initsend(PvmDataDefault); pvm_send(dest,type); break; case DNAML_DONE: case DNAML_STATS: pvm_initsend(PvmDataDefault); pvm_pkstatdata((stat_data*)buf,1,1); pvm_send(dest,type); break; case DNAML_ADD_TASK: pvm_initsend(PvmDataDefault); pvm_pkstr(buf); pvm_send(dest,type); break; case DNAML_TASK_ADDED: pvm_initsend(PvmDataDefault); pvm_pkproc((proc_data*)buf,1,1); pvm_send(dest,type); break; case DNAML_INPUT_TIME: sd = buf; sd->t1 = sd->tinput = dwalltime00(); pvm_initsend(PvmDataDefault); pvm_pkstatdata(sd,1,1); pvm_send(dest,type); sd->t0 = sd->t1; break; case DNAML_STEP_TIME: sd = buf; sd->t1 = dwalltime00(); pvm_initsend(PvmDataDefault); pvm_pkstatdata(sd,1,1); pvm_send(dest,type); sd->t0 = sd->t1; break; case DNAML_NUM_TREE: case DNAML_WORKER_READY: case DNAML_SEQ_DATA_SIZE: case DNAML_AWOL: case DNAML_KILL_TASK: pvm_initsend(PvmDataDefault); pvm_pkint(buf,1,1); pvm_send(dest,type); break; case DNAML_TID_LIST: default: break; }} /* send_msg */void pvm_upkstatdata(stat_data *s, int nitem, int stride){/*DKB-TODO- currently ignore nitem and stride */ pvm_upkdouble(&s->tstart,1,1); pvm_upkdouble(&s->tinput,1,1); pvm_upkdouble(&s->t0,1,1); pvm_upkdouble(&s->t1,1,1); pvm_upkdouble(&s->utime,1,1); pvm_upkdouble(&s->stime,1,1); pvm_upkint(&s->ntrees,1,1);}void pvm_upkproc(proc_data *p, int nitem, int stride){/*DKB-TODO- currently ignore nitem and stride */ pvm_upkbyte(p->hostname,HOST_NAME_LEN,1); pvm_upkint(&p->progtype,1,1); pvm_upkint(&p->tid,1,1); pvm_upkint(&p->state,1,1); pvm_upkdouble(&p->t0,1,1); pvm_upkstatdata(&p->stats,1,1);}void recv_msg(void *buf, int size, int from, int type){ /* recv_msg */ int source,tag; source = from==ANY_SOURCE ? -1 : from; tag = type==ANY_TAG ? -1 : type; switch(tag) { case DNAML_WORK: case DNAML_RESULT: case DNAML_SEQ_FILE: case DNAML_SEQ_DATA: pvm_recv(source,tag); pvm_upkstr(buf); break; case DNAML_SEND_TREE: case DNAML_RECV_TREE: case DNAML_ADD_SPECS: case DNAML_SEQ_DATA_REQUEST: case ERR_SEQDATA: case ERR_MALLOC: case ERR_GENERIC: case ERR_BADTREE: case ERR_BADEVAL: case DNAML_QUIT: case DNAML_STATS_REQUEST: pvm_recv(source,tag); break; case DNAML_DONE: case DNAML_STATS: pvm_recv(source,tag); pvm_upkstatdata((stat_data*)buf,1,1); break; case DNAML_ADD_TASK: pvm_recv(source,tag); pvm_upkstr(buf); break; case DNAML_TASK_ADDED: pvm_recv(source,tag); pvm_upkproc((proc_data*)buf,1,1); break; case DNAML_INPUT_TIME: case DNAML_STEP_TIME: pvm_recv(source,tag); pvm_upkstatdata((stat_data*)buf,1,1); break; case DNAML_NUM_TREE: case DNAML_WORKER_READY: case DNAML_SEQ_DATA_SIZE: case DNAML_AWOL: case DNAML_KILL_TASK: pvm_recv(source,tag); pvm_upkint(buf,size,1); break; default: break; }} /* recv_msg */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -