📄 mastersched.c
字号:
nbrJob_on_ckpt--; printf("Master: SC sent ckpt impossible %d, n_onckpt=%d\n", iJob, nbrJob_on_ckpt); break; case 'E': // ckpt phase end successfully printf("Master: SC sent ckpt ended %d \n", iJob); if (jobs[iJob].remaintime != 0) // should not be return; jobs[iJob].on_exec = 0; jobs[iJob].remaintime = jobs[iJob].timeSlot; nbrJob_on_exec--; nbrJob_on_ckpt--; printf("Master: on_exec = %d,on_ckpt= %d, temp ecoule= %d\n", nbrJob_on_exec,nbrJob_on_ckpt, minTimeSlot - timeout.tv_sec ); diff = nbrJob_on_exec - nbrJob_on_ckpt; if((!end_ckpt_launch) && diff < init_nbrJlaunched) {// case of end ckpt not in order and next jobs not launched int relaunch = init_nbrJlaunched - diff; for(j=0; j < relaunch ; j++) launchNextJob (); } // in sequential checkpoint/restart policy launch next jobs //diff not used, it will be equal 0 for (j = 0; j < nbrJobs && nbrJob_on_exec < init_nbrJlaunched; j++ ) launchNextJob (); default: } }}void setTimeout(){ int i, t; t = minTimeSlot - timeout.tv_sec; //elapsed time in select loop printf("TEMPS ECOULEEEEEEEEEE = %d\n", t); minTimeSlot = timeSlot; for ( i = 0; i < nbrJobs; i++) { if ( !jobs[i].on_exec) continue; if(jobs[i].remaintime > 0 ) { if ( jobs[i].remaintime > t ) jobs[i].remaintime = jobs[i].remaintime - t; if ( jobs[i].remaintime < minTimeSlot ) minTimeSlot = jobs[i].remaintime; } } printf("temps d'attente = %d\n", minTimeSlot ); timeout.tv_sec = minTimeSlot; timeout.tv_usec = 0; }void parse_args(int argc, char *argv[]){ int i; for(i = 1; i < argc; i++) { if ( !strcmp( argv[i], "-nJobs" ) ) { nbrJobs = atoi(argv[++i]); nbrJ_rest = nbrJobs; continue; } if( !strcmp( argv[i], "-nSimultaneous") ) { init_nbrJlaunched = atoi(argv[++i]); continue; } if( !strcmp( argv[i], "-timeSlot") ) { timeSlot = atoi(argv[++i]); continue; } if( !strcmp( argv[i], "-ckpt_time") ) { ckpt_time = atoi(argv[++i]); continue; } if ( !strcmp( argv[i], "-port") ) { listenPort = atoi(argv[++i]); continue; } if ( !strcmp(argv[i], "-begin_ckpt_launch")) { begin_ckpt_launch = 1; end_ckpt_launch = 0; continue; } if ( !strcmp(argv[i], "-launch_begin_ckpt")) { launch_begin_ckpt = 1; end_ckpt_launch = 0; continue; } if ( !strcmp(argv[i], "-script")) { char * name = NULL; name = strdup(argv[++i]); sprintf (scriptname, "%s", name); continue; } } if(( timeSlot <= 0 ) || (ckpt_time > 0 && ckpt_time >= timeSlot) || (nbrJobs < 1) || ( init_nbrJlaunched < 1) || ( init_nbrJlaunched > nbrJobs ) || (launch_begin_ckpt && begin_ckpt_launch) || (launch_begin_ckpt && ckpt_time == 0 )) //@todo tester l'existance du script { fprintf(stderr,"Usage: %s <-nJobs nbrJobs> <-nSimultaneous nbsimultaneous > <-timeSlot timeSlot> <-port listenPort> [[-begin_ckpt_launch] or [-launch_begin_ckpt] + [-ckpt_time ckpt_time]] [-script filename] \n\n\where\n\ <nbrJobs> - Total number of jobs to be launched\n\ <nbsimultaneous> - Number of jobs launched simultaneously\n\ <timeSlot> - time slot for each job. Must be superior than 0\n\ <port> - Listening Port for master\n\ <ckpt_time> - Time estimated for checkpoint phase. Used with launch_begin_ckpt option\n\ and be less than timeSlot\n\ <begin_ckpt_launch> - Specify the prallel checkpoint/restart policy. Not used with other policy\n\ <launch_begin_ckpt> - Specify the prefetch checkpoint/restart policy. Not used with other policy\n\ <filename> - File name of the script that launch the jobs. if in local directory, it must start with './' \n\\n", argv[0]); exit (0); } }int main (int argc, char *argv[]){ struct sockaddr_in addr; struct hostent *mscAddr; char hostName [256]; char mscIP [25]; char * scriptCmd [6]; char argv0[164]; char argv1[164]; char argv2[164]; char argv3[164]; pid_t forked; fd_set readSet; int maxfd, i,j, s; int time_offset; parse_args(argc, argv); jobs = calloc( nbrJobs , sizeof(struct SchedJob)); addr.sin_family = AF_INET; addr.sin_port = htons(listenPort); addr.sin_addr.s_addr = INADDR_ANY; if( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { perror("socket"); return -1; } if( bind(listenfd, (struct sockaddr *)&addr, sizeof(struct sockaddr_in)) < 0) { perror("bind"); return -1; } if( listen(listenfd, MAX_LISTEN_WAIT) < 0) { perror("listen"); return -1; } if (gethostname(hostName, (size_t)256) < 0) { perror("Error: could not get hostname"); exit(1); } if((mscAddr = gethostbyname(hostName)) == 0) perror("gethostbyname"); strcpy(mscIP, (char *)inet_ntoa(*(struct in_addr *)mscAddr->h_addr)); scriptCmd[0] = scriptname; sprintf (argv0, "%s", mscIP); scriptCmd[1] = argv0; sprintf (argv1, "%d", listenPort); scriptCmd[2] = argv1; sprintf (argv2, "%d", nbrJobs); scriptCmd[3] = argv2; sprintf (argv3, "%d", init_nbrJlaunched); scriptCmd[4] = argv3; scriptCmd[5]= NULL; if ((forked = fork()) != -1) { if (forked == 0) execv(scriptname, scriptCmd); } else { perror("Error: could not fork(): aborting"); exit(1); } /* /!\ne jamais mettre time_offset = timeSlot/init_nbrJlaunched, pour le job 0 apres la boucle, le temps restant sera 0, que j'utilise pour dire que le job est en phase de ckpt. et dans ce cas il ne fera pas de ckpt */ time_offset = 20; printf("time_offset = %d\n", time_offset); for (i = 1; i <= (2 * nbrJobs ); i++) newConnection(); /* send launch order for initial specified number of jobs*/ for (i = 0; i < init_nbrJlaunched; i++) { send_launchJ_order (i); sleep(time_offset); for(j=0; j <= i; j++) jobs[j].remaintime-= time_offset; } if ( i == nbrJobs ) nextJob = 0; else nextJob = i; maxfd = listenfd; for (i = 0; i < nbrJobs; i++) { if (jobs[i].sockSC > maxfd) maxfd = jobs[i].sockSC; if (jobs[i].sockDisp > maxfd ) maxfd = jobs[i].sockDisp; } minTimeSlot = jobs[0].remaintime; if ( launch_begin_ckpt ) // prefetch checkpoint/restart policy minTimeSlot = minTimeSlot - ckpt_time; if ( minTimeSlot < 0 ) minTimeSlot = 0; timeout.tv_sec = minTimeSlot; timeout.tv_usec = 0; for (;;) { FD_ZERO(&readSet); for (i = 0; i < nbrJobs; i++) { if ( jobs[i].sockSC != -1 ) FD_SET(jobs[i].sockSC, &readSet); if ( jobs[i].sockDisp != -1 ) FD_SET(jobs[i].sockDisp, &readSet); } s = select( maxfd + 1, &readSet, NULL, NULL, &timeout); if( s < 0 ) { if( (errno == EAGAIN) || (errno == EINTR) ) continue; perror("Master select"); } if ( s == 0 ) on_begin_checkpoint (); for ( i = 0; i < nbrJobs; i++ ) { if (jobs[i].sockDisp != -1 && FD_ISSET ( jobs[i].sockDisp, &readSet)) { /* check the end of job i and if all jobs have finished */ if ( finalize_oneJ_checkAll(i)) return 0; launchNextJob (); } if ( jobs[i].sockSC != -1 && FD_ISSET ( jobs[i].sockSC, &readSet)) on_end_Checkpoint(i); } setTimeout(); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -