⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 comm_pvm.c

📁 fastDNAml is an attempt to solve the same problem as DNAML, but to do so faster and using less memo
💻 C
字号:
#include  <stdlib.h>#include  <stdio.h>#include  <string.h>#include  <sys/types.h>#include  <sys/times.h>#include  <time.h>#include  <pvm3.h>#include  "fastDNAml_types.h"#include  "fastDNAml_funcs.h"#include  "fastDNAml_globals.h"extern int  monitor_id;extern int  foreman_id;extern int  master_id;extern int  first_worker_id;#define  MAXPROCS 1024int         pvm_err;int         source;int         tag;char        buf[1];int         tid[MAXPROCS];extern proc_data  proc[];/******************************************************************************* *//* Initialize process and join the PVM program. */void process_init(int argc, char **argv, proc_data *pp){ /* process_init */  int  i,n;  int  nrequest;  char *xargv[12];  pp->stats.tstart = pp->stats.t0 = dwalltime00();  gethostname(pp->hostname,HOST_NAME_LEN);  pp->progtype = myprogtype;  pp->tid = pvm_mytid();  if(!get_args(argc,argv,(boolean)1))   bail(argv[0],99);/* Set command line arguments that all processes need. * These arguments will be used in calls to pvm_spawn() * to start up the master, foreman and workers.  */  n=0;  xargv[n]=(char*)malloc(16*sizeof(char));  sprintf(xargv[n++],"-d%d",infol);  xargv[n]=(char*)malloc(128*sizeof(char));  sprintf(xargv[n++],"-n%s",run_id);  xargv[n]=(char*)malloc(256*sizeof(char));  sprintf(xargv[n++],"-w%s",workdir);  switch(myprogtype) {  case DNAML_MONITOR:    if(ship_seq_data) {      xargv[n]=(char*)malloc(16*sizeof(char));      strcpy(xargv[n++],"-s");    }    xargv[n]=(char*)malloc(16*sizeof(char));    sprintf(xargv[n++],"-p%d",nworkers);    if(seq_file[0]!=0) {      xargv[n]=(char*)malloc(128*sizeof(char));      strcpy(xargv[n++],seq_file);    }    xargv[n]=NULL;    if( pvm_spawn("pvm_fastDNAml",xargv,0,NULL,1,&tid[1]) == 0 ) {      bail(argv[0],ERR_NO_MASTER);    }    master_id = tid[1];    break;  case DNAML_MASTER:    if(ship_seq_data) {      xargv[n]=(char*)malloc(16*sizeof(char));      strcpy(xargv[n++],"-s");    }    xargv[n]=(char*)malloc(16*sizeof(char));    sprintf(xargv[n++],"-p%d",nworkers);    if( (monitor_id = pvm_parent()) != PvmNoParent) {      xargv[n]=(char*)malloc(16*sizeof(char));      sprintf(xargv[n++],"-m%x",monitor_id);    }    else {      monitor_id = INVALID_ID;    }    if(seq_file[0]!=0) {      xargv[n]=(char*)malloc(128*sizeof(char));      strcpy(xargv[n++],seq_file);    }    xargv[n]=NULL;    if( pvm_spawn("pvm_foreman",xargv,0,NULL,1,&tid[2]) == 0 ) {      bail(argv[0],ERR_NO_FOREMAN);    }    foreman_id = tid[2];    break;  case DNAML_FOREMAN:    if( (master_id = pvm_parent()) == PvmNoParent) bail(argv[0],ERR_NO_MASTER);    if( monitor_id != INVALID_ID ) {      xargv[n]=(char*)malloc(16*sizeof(char));      sprintf(xargv[n++],"-m%x",monitor_id);    }    xargv[n]=NULL;    nrequest = nworkers;    if( (nworkers=pvm_spawn("pvm_worker",xargv,0,NULL,nrequest,&tid[0])) == 0 ) {      send_msg(NULL,0,master_id,ERR_NO_WORKERS);    }    first_worker_id = tid[0];    for(i=0;i<nworkers;i++) {      proc[i].tid = tid[i];      proc[i].progtype = DNAML_WORKER;      proc[i].state = DNAML_SPAWNED;    }    break;  case DNAML_WORKER:    if(foreman_id == INVALID_ID) {      if( (foreman_id = pvm_parent()) == PvmNoParent)        bail(argv[0],ERR_NO_FOREMAN);    }    break;  default:    break;  }  for(i=0;i<n;i++) free(xargv[i]);  return;} /* process_init *//* Spawn ntask instances of program task on host where. * Currently, meant only for starting worker tasks.  */int spawn(char *task, int ntask, char *where){ /* spawn */  char *xargv[12];  int  xtid;  int  i,n=0;  int  numt;  xargv[n]=(char*)malloc(16*sizeof(char));  sprintf(xargv[n++],"-d%d",infol);  xargv[n]=(char*)malloc(128*sizeof(char));  sprintf(xargv[n++],"-n%s",run_id);  xargv[n]=(char*)malloc(256*sizeof(char));  sprintf(xargv[n++],"-w%s",workdir);  xargv[n]=NULL;  if( monitor_id != INVALID_ID ) {    xargv[n]=(char*)malloc(16*sizeof(char));    sprintf(xargv[n++],"-m%x",monitor_id);    xargv[n]=NULL;  }  numt = pvm_spawn("pvm_worker",xargv,ntask,where,1,&tid[0]);  for(i=nworkers;i<nworkers+numt;i++) {    proc[i].tid = tid[i-nworkers];    proc[i].progtype = DNAML_WORKER;    proc[i].state = DNAML_SPAWNED;  }  nworkers += numt;  for(i=0;i<n;i++) free(xargv[i]);  return numt;} /* spawn *//* Finialize PVM and exit. */void bail(char *source, int err_code){ /* bail */  switch(err_code) {    case 0:      break;    case ERR_TIMEOUT:      fprintf(stderr,"%s: timed out\n",source);      break;    case ERR_NO_MASTER:      fprintf(stderr,"no master program running\n");      break;    case ERR_NO_FOREMAN:      fprintf(stderr,"no foreman program running\n");      break;    case ERR_NO_WORKERS:      fprintf(stderr,"no worker program running\n");      break;    case ERR_SEQDATA:      fprintf(stderr,"%s: cannot get sequence data\n",source);      break;    case ERR_OUTFILE:      fprintf(stderr, "%s: cannot open output file\n",source);      break;    case ERR_LOGFILE:      fprintf(stderr,"%s: cannot open log file\n",source);      break;    case ERR_DEBUGFILE:      fprintf(stderr,"%s: cannot open debug file\n",source);      break;    case ERR_BAD_MSG_TAG:      fprintf(stderr,"%s: unexpected message\n",source);      break;    default:      fprintf(stderr,"%s: error code %d\n",source,err_code);  }  pvm_exit();  exit(err_code);} /* bail */void probe_msg(int *from, int *type) { /* probe_msg */  int    bufid,tag,xtid,bytes,i;  if(*from == ANY_SOURCE) *from = -1;  if(*type == ANY_TAG   ) *type = -1;  do {    bufid = pvm_probe(*from,*type);  } while(bufid==0);  pvm_bufinfo(bufid,&bytes,&tag,&xtid);  *from = xtid;  *type = tag;  return;} /* probe_msg */void iprobe_msg(int *from, int *type) { /* iprobe_msg */  int    bufid,tag,xtid,bytes,i;  if(*from == ANY_SOURCE) *from = -1;  if(*type == ANY_TAG   ) *type = -1;  if( (bufid = pvm_probe(*from,*type)) != 0) {    pvm_bufinfo(bufid,&bytes,&tag,&xtid);    *from = xtid;    *type = tag;  }  else {    *from = -1;    *type = DNAML_NOMSG;  }} /* iprobe_msg */void pvm_pkstatdata(stat_data *s, int nitem, int stride){/*DKB-TODO- currently ignore nitem and stride */  pvm_pkdouble(&s->tstart,1,1);  pvm_pkdouble(&s->tinput,1,1);  pvm_pkdouble(&s->t0,1,1);  pvm_pkdouble(&s->t1,1,1);  pvm_pkdouble(&s->utime,1,1);  pvm_pkdouble(&s->stime,1,1);  pvm_pkint(&s->ntrees,1,1);}void pvm_pkproc(proc_data *p, int nitem, int stride){/*DKB-TODO- currently ignore nitem and stride */  pvm_pkbyte(p->hostname,HOST_NAME_LEN,1);  pvm_pkint(&p->progtype,1,1);  pvm_pkint(&p->tid,1,1);  pvm_pkint(&p->state,1,1);  pvm_pkdouble(&p->t0,1,1);  pvm_pkstatdata(&p->stats,1,1);}void send_msg(void *buf, int size, int dest, int type){ /* send_msg */  char       c;  stat_data  *sd;  switch(type) {    case DNAML_WORK:    case DNAML_RESULT:    case DNAML_SEQ_FILE:    case DNAML_SEQ_DATA:      pvm_initsend(PvmDataDefault);      pvm_pkstr(buf);      pvm_send(dest,type);      break;    case DNAML_SEND_TREE:    case DNAML_RECV_TREE:    case DNAML_ADD_SPECS:    case DNAML_SEQ_DATA_REQUEST:    case ERR_SEQDATA:    case ERR_MALLOC:    case ERR_GENERIC:    case ERR_BADTREE:    case ERR_BADEVAL:    case DNAML_QUIT:    case DNAML_STATS_REQUEST:      pvm_initsend(PvmDataDefault);      pvm_send(dest,type);      break;    case DNAML_DONE:    case DNAML_STATS:      pvm_initsend(PvmDataDefault);      pvm_pkstatdata((stat_data*)buf,1,1);      pvm_send(dest,type);      break;    case DNAML_ADD_TASK:      pvm_initsend(PvmDataDefault);      pvm_pkstr(buf);      pvm_send(dest,type);      break;    case DNAML_TASK_ADDED:      pvm_initsend(PvmDataDefault);      pvm_pkproc((proc_data*)buf,1,1);      pvm_send(dest,type);      break;    case DNAML_INPUT_TIME:      sd = buf;      sd->t1 = sd->tinput = dwalltime00();      pvm_initsend(PvmDataDefault);      pvm_pkstatdata(sd,1,1);      pvm_send(dest,type);      sd->t0 = sd->t1;      break;    case DNAML_STEP_TIME:      sd = buf;      sd->t1 = dwalltime00();      pvm_initsend(PvmDataDefault);      pvm_pkstatdata(sd,1,1);      pvm_send(dest,type);      sd->t0 = sd->t1;      break;    case DNAML_NUM_TREE:    case DNAML_WORKER_READY:    case DNAML_SEQ_DATA_SIZE:    case DNAML_AWOL:    case DNAML_KILL_TASK:      pvm_initsend(PvmDataDefault);      pvm_pkint(buf,1,1);      pvm_send(dest,type);      break;    case DNAML_TID_LIST:    default:      break;  }} /* send_msg */void pvm_upkstatdata(stat_data *s, int nitem, int stride){/*DKB-TODO- currently ignore nitem and stride */  pvm_upkdouble(&s->tstart,1,1);  pvm_upkdouble(&s->tinput,1,1);  pvm_upkdouble(&s->t0,1,1);  pvm_upkdouble(&s->t1,1,1);  pvm_upkdouble(&s->utime,1,1);  pvm_upkdouble(&s->stime,1,1);  pvm_upkint(&s->ntrees,1,1);}void pvm_upkproc(proc_data *p, int nitem, int stride){/*DKB-TODO- currently ignore nitem and stride */  pvm_upkbyte(p->hostname,HOST_NAME_LEN,1);  pvm_upkint(&p->progtype,1,1);  pvm_upkint(&p->tid,1,1);  pvm_upkint(&p->state,1,1);  pvm_upkdouble(&p->t0,1,1);  pvm_upkstatdata(&p->stats,1,1);}void recv_msg(void *buf, int size, int from, int type){ /* recv_msg */  int         source,tag;  source = from==ANY_SOURCE ? -1 : from;  tag = type==ANY_TAG ? -1 : type;  switch(tag) {    case DNAML_WORK:    case DNAML_RESULT:    case DNAML_SEQ_FILE:    case DNAML_SEQ_DATA:      pvm_recv(source,tag);      pvm_upkstr(buf);      break;    case DNAML_SEND_TREE:    case DNAML_RECV_TREE:    case DNAML_ADD_SPECS:    case DNAML_SEQ_DATA_REQUEST:    case ERR_SEQDATA:    case ERR_MALLOC:    case ERR_GENERIC:    case ERR_BADTREE:    case ERR_BADEVAL:    case DNAML_QUIT:    case DNAML_STATS_REQUEST:      pvm_recv(source,tag);      break;    case DNAML_DONE:    case DNAML_STATS:      pvm_recv(source,tag);      pvm_upkstatdata((stat_data*)buf,1,1);      break;    case DNAML_ADD_TASK:      pvm_recv(source,tag);      pvm_upkstr(buf);      break;    case DNAML_TASK_ADDED:      pvm_recv(source,tag);      pvm_upkproc((proc_data*)buf,1,1);      break;    case DNAML_INPUT_TIME:    case DNAML_STEP_TIME:      pvm_recv(source,tag);      pvm_upkstatdata((stat_data*)buf,1,1);      break;    case DNAML_NUM_TREE:    case DNAML_WORKER_READY:    case DNAML_SEQ_DATA_SIZE:    case DNAML_AWOL:    case DNAML_KILL_TASK:      pvm_recv(source,tag);      pvm_upkint(buf,size,1);      break;      default:      break;  }} /* recv_msg */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -