📄 foreman.c
字号:
/* foreman.c */#define MAIN#include <stdlib.h>#include <stdio.h>#include <string.h>#include <sys/types.h>#include <sys/times.h>#include <time.h>#include "fastDNAml_types.h"#include "fastDNAml_funcs.h"#include "fastDNAml_globals.h"#include "foreman.h"#ifdef DEBUG# define LOG_SEQ_DATA_REQUEST(from)\ fprintf(dbgfp,"DNAML_SEQ_DATA_REQUEST from %8.8x\n",from);fflush(dbgfp)# define LOG_SEQ_FILE(seq_file) \ fprintf(dbgfp,"DNAML_SEQ_FILE **%s**\n",seq_file);fflush(dbgfp)# define LOG_SENT_SEQFILE(seq_file)\ fprintf(dbgfp,"sent filename **%s**\n",seq_file);fflush(dbgfp)# define LOG_SENT_SEQDATA(num,to)\ fprintf(dbgfp,"sent %d bytes to %8.8x\n",num,to);fflush(dbgfp)# define LOG_NUM_TREE(num) \ fprintf(dbgfp, "foreman: received n_tree_sent = %d from master\n", num);\ fflush(dbgfp)# define LOG_RECV_TREE(from) \ fprintf(dbgfp, "foreman: result from %8.8x\n", from);fflush(dbgfp)# define LOG_SEND_TREE \ fprintf(dbgfp, "foreman: sent the best tree to master\n");fflush(dbgfp)# define LOG_REQUEST(from) \ fprintf(dbgfp, "foreman: READY from %8.8x\n",from);fflush(dbgfp)# define LOG_SENT_WORK(to) \ fprintf(dbgfp, "foreman: work to %8.8x\n",to);fflush(dbgfp)# define LOG_OVERDUE(xtid) \ fprintf(dbgfp, "foreman: task %8.8x overdue\n",xtid);fflush(dbgfp)# define LOG_RECV_WORK(from) \ fprintf(dbgfp, "foreman: WORK FROM %8.8x\n",from);fflush(dbgfp)# define LOG_RECV_QUIT(from) \ fprintf(dbgfp, "foreman: QUIT FROM %8.8x\n",from);fflush(dbgfp)# define LOG_ERRMSG(from) \ fprintf(dbgfp,"got error msg %d from %8.8x\n",type,from);fflush(dbgfp)#else# define LOG_SEQ_DATA_REQUEST(seq_file)# define LOG_SEQ_FILE(seq_file)# define LOG_SENT_SEQFILE(seq_file)# define LOG_SENT_SEQDATA(num,to)# define LOG_NUM_TREE(num)# define LOG_RECV_TREE(from)# define LOG_SEND_TREE# define LOG_REQUEST(from)# define LOG_SENT_WORK(to)# define LOG_OVERDUE(xtid)# define LOG_RECV_WORK(from)# define LOG_RECV_QUIT(from)# define LOG_ERRMSG(from)#endifint myprogtype = DNAML_FOREMAN;ready_queue rdq;work_queue wq;/*DKB-TODO- Resize treestr and msg dynamically. */#define MAXSP 15000char treestr[MAXSP*(nmlngth+32)+256];char msg[MAXSP*(nmlngth+32)+256];#define MAXPROCS 1024proc_data proc[MAXPROCS];/******************************************************************************* */main (int argc, char *argv[]){ /* main for foreman program */ int i,j,n; int size, type, from, xtid; double t0, t1; struct tms stoptms; int ready_slave; int state; char *bp,c; char *work; char xhost[128]; int n_tree_sent, n_tree_received; int bufid; double best_likelihood, likelihood; char buf[2048]; char filename[2048] = ""; boolean done; void *p; int got_msg; process_init(argc,argv,&myproc); if(monitor_id!=INVALID_ID) { send_msg(&myproc,1,monitor_id,DNAML_TASK_ADDED); }# ifdef DEBUG if( realpath(workdir,buf) == NULL ) { fprintf(stderr,"Bad path name: %s\n",workdir); } make_filename(filename,buf,run_id,".foreman.dbg",myproc.tid); if ((dbgfp = fopen(filename,"w")) == NULL) { bail(argv[0],ERR_DEBUGFILE); }# endif done = FALSE; init_ready_queue(&rdq); init_work_queue(&wq); n_tree_sent = n_tree_received = 0; best_likelihood = unlikely; t0 = dwalltime00();/* If we do not have a sequence data filename, or if ship_seq_data==TRUE, then * send a sequence data request to the master. We will need either the filename * or the data to send the workers when they send DNAML_SEQ_DATA_REQUESTs. */ if(seq_file[0] == '\0' || ship_seq_data) send_msg(NULL,0,master_id,DNAML_SEQ_DATA_REQUEST);/*----------------------------------------------------------------------------*/ while(!done) { if( wq.next < wq.end && rdq.length !=0 ) { type = DNAML_IDLING; get_ready_queue(&rdq, &from); } else { from = ANY_SOURCE; type = ANY_TAG; iprobe_msg(&from,&type); } switch(type) { case DNAML_SEQ_DATA_REQUEST: /* from a worker */ recv_msg(NULL,0,from,type); LOG_SEQ_DATA_REQUEST(from); for(n=0;n<nworkers;n++) { if(proc[n].tid == from) { proc[n].state = DNAML_SEQ_DATA_REQUEST; break; } } break; case DNAML_SEQ_FILE: /* sequence data filename from master */ recv_msg(seq_file,sizeof(seq_file),from,type); LOG_SEQ_FILE(seq_file); break; case DNAML_SEQ_DATA_SIZE: /* sequence data from master */ recv_msg(&seq_data_size,1,from,DNAML_SEQ_DATA_SIZE); seq_data_str = (char*)realloc(seq_data_str,seq_data_size*sizeof(char)); recv_msg(seq_data_str,seq_data_size,from,DNAML_SEQ_DATA); break; case DNAML_ADD_TASK: /* request to spawn a worker */ recv_msg(xhost,sizeof(xhost),from,type); spawn("pvm_worker",1,xhost); break; case DNAML_RESULT: /* an evaluated tree returned from worker */ recv_msg(msg,sizeof(msg),from,type); if(on_blacklist(from)) { remove_blacklist(from); for(i=0; i<nworkers && proc[i].tid!=from; i++); proc[i].state = DNAML_WORKER_READY; if(infol>3 && monitor_id!=INVALID_ID) { send_msg(&from,1,monitor_id,DNAML_WORKER_READY); } } else { LOG_RECV_TREE(from); if(infol>3 && monitor_id!=INVALID_ID) { send_msg(&c,0,monitor_id,DNAML_RECV_TREE); } n_tree_received++; remove_work_queue(&wq,from); if((likelihood=str_readTreeLikelihood(msg)) > best_likelihood) { /*DKB-TODO-handle badEval returned from str_readTreeLikelihood*/ best_likelihood = likelihood; strcpy(treestr,msg); } if (n_tree_sent == n_tree_received) { send_msg(treestr,strlen(treestr),master_id,DNAML_RESULT); LOG_SEND_TREE; n_tree_sent = n_tree_received = 0; best_likelihood = unlikely; } } /* fall through to case DNAML_IDLING: */ case DNAML_WORKER_READY: /* new workers send this message */ case DNAML_IDLING: /* if we have idle workers and work to be done */ for(i=0; i<nworkers && proc[i].tid!=from; i++); if(type==DNAML_WORKER_READY) { recv_msg(&xtid,1,from,type); send_msg(&from,1,master_id,DNAML_WORKER_READY); /*notify master of a new worker*/ LOG_REQUEST(from); proc[i].state = DNAML_WORKER_READY; } /* Check to see if worker is slated to be killed ... */ ready_slave = from; if( proc[i].state == DNAML_KILL_TASK) { send_msg(NULL,0,ready_slave,DNAML_QUIT); nworkers--; for(j=i;j<nworkers;j++) { proc[j] = proc[j+1]; } } /* ... if not, it is ready for more work */ else { if(get_work_queue(&wq, &work, &size, ready_slave)) { send_msg(work,strlen(work),ready_slave,DNAML_WORK); LOG_SENT_WORK(ready_slave); proc[i].state = DNAML_WORK; } else { put_ready_queue(&rdq,ready_slave); proc[i].state = DNAML_WORKER_READY; } } break; case DNAML_WORK: /* A tree to be evaluated. From the master. */ recv_msg(msg,sizeof(msg),from,type); LOG_RECV_WORK(from); size = strlen(msg)+1; work = malloc(size); /* save work */ strcpy(work,msg); put_work_queue(&wq,work,size); if(get_ready_queue(&rdq,&ready_slave)) { get_work_queue(&wq, &work, &size, ready_slave); send_msg(work,strlen(work),ready_slave,DNAML_WORK); LOG_SENT_WORK(ready_slave); for(i=0; i<nworkers && proc[i].tid!=from; i++); proc[i].state = DNAML_WORK; } else { } break; case DNAML_NUM_TREE: recv_msg(&n_tree_sent,1,from,type); LOG_NUM_TREE(n_tree_sent); if (n_tree_sent == n_tree_received) { send_msg(treestr,strlen(treestr),master_id,DNAML_RESULT); LOG_SEND_TREE; n_tree_sent = n_tree_received = 0; best_likelihood = unlikely; } break; /* The only way to get out of this while(!done) loop is to get a DNAML_QUIT * message. */ case DNAML_QUIT: recv_msg(msg,sizeof(msg),from,type); LOG_RECV_QUIT(from); done = TRUE; break; /* Mark a worker to be killed. A DNAML_QUIT message will be sent once * it returns any work it is doing. See case DNAML_RESULT. */ case DNAML_KILL_TASK: recv_msg(&xtid,1,from,type); for(i=0;i<nworkers;i++) { if(proc[i].tid == xtid) { proc[i].state = DNAML_KILL_TASK; break;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -