📄 foreman.c
字号:
} } break; case ERR_SEQDATA: case ERR_MALLOC: case ERR_GENERIC: recv_msg(NULL,0,from,type); LOG_ERRMSG(from); send_msg(NULL,0,from,DNAML_QUIT); for(i=0; i<nworkers && proc[i].tid!=from; i++); nworkers--; for(j=i;j<nworkers;j++) { proc[j] = proc[j+1]; } if(nworkers==0) send_msg(NULL,0,master_id,ERR_NO_WORKERS); break; case ERR_BADTREE: case ERR_BADEVAL: /*DKB-TODO-Should put work from this worker back on the work queue?*/ recv_msg(NULL,0,from,type); LOG_ERRMSG(from); send_msg(NULL,0,from,DNAML_QUIT); for(i=0; i<nworkers && proc[i].tid!=from; i++); nworkers--; for(j=i;j<nworkers;j++) { proc[j] = proc[j+1]; } if(nworkers==0) send_msg(NULL,0,master_id,ERR_NO_WORKERS); break; case DNAML_STATS_REQUEST: recv_msg(NULL,0,from,type); record_times(&myproc.stats); send_msg(&myproc.stats,1,from,DNAML_STATS); break; /* Stuff to do when no event has come in. */ case DNAML_NOMSG: /* Handle accrued SEQ_DATA_REQUESTs from workers. */ for(i=0;i<nworkers;i++) { if(proc[i].state == DNAML_SEQ_DATA_REQUEST) { if( seq_file[0] != '\0' && !ship_seq_data) { send_msg(seq_file,sizeof(seq_file),proc[i].tid,DNAML_SEQ_FILE); LOG_SENT_SEQFILE(seq_file); proc[i].state = DNAML_SEQ_FILE; } else if(seq_data_str) { send_msg(&seq_data_size,1,proc[i].tid,DNAML_SEQ_DATA_SIZE); send_msg(seq_data_str,seq_data_size,proc[i].tid,DNAML_SEQ_DATA); LOG_SENT_SEQDATA(seq_data_size,proc[i].tid); proc[i].state = DNAML_SEQ_DATA; } } } /* If any currently busy workers are overdue, put the trees they are * working on back on the work queue and blacklist them. */ t1 = dwalltime00(); if(t1-t0 > 5.0) { /* check only every 5 sec */ t0 = t1; for(i=0;i<wq.next;i++) { if(t1-(wq.wk[i].time)>120.0) { /* allow a 120 sec timeout */ xtid = wq.wk[i].tid; LOG_OVERDUE(xtid); work = wq.wk[i].work; size = wq.wk[i].size; (wq.next)--; (wq.end)--; for(j=i;j<wq.end;j++) { wq.wk[j] = wq.wk[j+1]; } put_work_queue(&wq,work,size); put_blacklist(xtid); for(j=0; j<nworkers && proc[j].tid!=ready_slave; j++); proc[j].state = DNAML_AWOL; if(infol>3 && monitor_id!=INVALID_ID) { send_msg(&xtid,1,monitor_id,DNAML_AWOL); } } } } break; default: recv_msg(NULL,0,from,type); fprintf(stderr, "foreman: got unexpected message. type=%d\n",type);# ifdef DEBUG fclose(dbgfp); bail(argv[0],ERR_BAD_MSG_TAG);# endif } }/*------------------------- end while(!done) -----------------------------*//* Send a DNAML_QUIT to all workers who are awaiting a message (workers in * states DNAML_SEQ_DATA_REQUEST and DNAML_WORKER_READY.) */ i=0; while(i<nworkers) { state = proc[i].state; if( state == DNAML_SEQ_DATA_REQUEST || state == DNAML_WORKER_READY) { send_msg(NULL,0,proc[i].tid,DNAML_QUIT); nworkers--; for(j=i;j<nworkers;j++) { proc[j] = proc[j+1]; } } else { i++; } } /* All other workers are in states such that we will eventually receive a * message from them. When the message arrives, respond with a DNAML_QUIT. * Timeout after 120 sec, in case any of these workers are AWOL. */ t0 = dwalltime00(); while(nworkers && (t1=dwalltime00())-t0<120.0) { from = ANY_SOURCE; type = ANY_TAG; iprobe_msg(&from,&type); if(type != DNAML_NOMSG) { recv_msg(msg,sizeof(msg),from,type); send_msg(NULL,0,from,DNAML_QUIT); for(i=0; i<nworkers && proc[i].tid!=from; i++); nworkers--; for(j=i;j<nworkers;j++) { proc[j] = proc[j+1]; } } }/* Now we can quit. */# ifdef DEBUG fclose(dbgfp);# endif if(monitor_id!=INVALID_ID) { record_times(&myproc.stats); send_msg(&myproc.stats,1,monitor_id,DNAML_DONE); } bail((char*)NULL,0);} /* main for foreman program *//******************************************************************************/void init_ready_queue(ready_queue *Queue) { /* init_ready_queue */ Queue->next = 0; Queue->length = 0; } /* init_ready_queue *//******************************************************************************/void put_ready_queue(ready_queue *Queue, int id) { /* put_ready_queue */ int i; i = (Queue->next + Queue->length) % MAX_READY_QUEUE_SIZE; if (++(Queue->length) > MAX_READY_QUEUE_SIZE) fprintf(stderr, "put_ready_queue: ready queue overflow !!\n"); else Queue->id[i] = id; } /* put_ready_queue *//******************************************************************************/boolean get_ready_queue(ready_queue *Queue, int *id) { /* get_ready_queue */ if (Queue->length <= 0) { return(FALSE); } else { *id = Queue->id[Queue->next]; Queue->next = (++Queue->next) % MAX_READY_QUEUE_SIZE; (Queue->length)--; return(TRUE); } } /* get_ready_queue *//******************************************************************************/void init_work_queue(work_queue *q) { /* init_work_queue */ q->next = 0; q->end = 0; } /* init_work_queue *//******************************************************************************/void put_work_queue(work_queue *q, char *work, int size) { /* put_work_queue */ if (q->end > MAX_WORK_QUEUE_SIZE) { fprintf(stderr, "put_work_queue: work queue overflow !!\n"); } else { q->wk[q->end].work = work; q->wk[q->end].size = size; q->wk[q->end].tid = 0; q->wk[q->end].time = 0; } (q->end)++; } /* put_work_queue *//******************************************************************************/boolean get_work_queue(work_queue *q, char **work, int *size, int tid) { /* get_work_queue */ if (q->next == q->end) return(FALSE); else { *work = q->wk[q->next].work; *size = q->wk[q->next].size; q->wk[q->next].tid = tid; q->wk[q->next].time = dwalltime00(); (q->next)++; return(TRUE); } } /* get_work_queue *//******************************************************************************/void remove_work_queue(work_queue *q, int tid) { /* remove_work_queue */ int i,j; for(i=0; i<q->next && q->wk[i].tid!=tid ;i++); free(q->wk[i].work); (q->next)--; (q->end)--; for(j=i;j<q->end;j++) { q->wk[j] = q->wk[j+1]; } } /* remove_work_queue *//******************************************************************************* * These functions maintain the Blacklist - the list of workers who fail to * return their work within the timeout period. Put_blacklist puts a task ID * on the blacklist, on_blacklist determines if a TID is on the blacklist and * returns its position if it is, and remove_blacklist removes a TID from the * blacklist. */int blacklist[128];int n=0;void put_blacklist(tid){ blacklist[++n]=tid; return;}int on_blacklist(tid){ int i; i=1; if(n>0) { for(i=1; tid!=blacklist[i] && i<=n; i++); } return (i<=n ? i : 0);}void remove_blacklist(tid){ int i,j; if(j=on_blacklist(tid)) { for(i=j;i<n;i++) { blacklist[i]=blacklist[i+1]; } n--; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -