📄 mastersched.c
字号:
#include <stdio.h>#include <stdlib.h>#include <sys/types.h>#include <unistd.h>#include <sys/socket.h>#include <netdb.h>#include <netinet/tcp.h>#include <string.h>#include <errno.h>#include "masterSched.h"#define MAX_LISTEN_WAIT 10#define RE_ORDER_CKPT_WAIT 20 // wait 20sec for reorder ckpt if impossiblestruct SchedJob * jobs = NULL; /* Structure of jobs.*/int nbrJobs = 0; /* Total number of jobs.*/int init_nbrJlaunched = 0; /* Number of jobs to be executed simultaneously.*/int nbrJ_rest = 0; /* Number of remains jobs.*/int listenPort; /* Listen port of Master.*/int listenfd = -1; /* Socket of Master.*/char scriptname[164]; /* Command line for launching jobs.*/int timeSlot = 0; /* Time slice for jobs.*/int minTimeSlot = 0; /* Less remain time of jobs.*/int ckpt_time = 0; /* For prefetching restart jobs.*/ int nextJob = 0; /* Futur job to execute.*/ int saveNext = 0; /* To avoid ckpt order for restarted jobs.*/ int end_ckpt_launch = 1; /* Ckpt/restart sequential policy.*/int begin_ckpt_launch = 0; /* Ckpt/restart prallel policy.*/int launch_begin_ckpt = 0; /* Ckpt/restart prefetch policy.*/struct timeval timeout; /* Counter of time.*/ int nbrJob_on_exec = 0; /* Number of jobs on execution*/int nbrJob_on_ckpt = 0; /* Number of jobs on ckpt phase*/void newConnection(){ int acceptSocket = -1; struct sockaddr_in pin; int addrlen; int n; static int currentJob = -1; int msg; if ((acceptSocket = accept(listenfd, (struct sockaddr *)&pin, &addrlen)) < 0) { perror("Could not accept socket connection exit"); exit (-1); } else { n = read(acceptSocket, &msg, sizeof(int)); if ( n != sizeof (int)) perror("Master: Error Receive some value"); else { if (msg == -1) { printf("Master: It is a dispatcher\n"); currentJob++; n = write(acceptSocket, ¤tJob, sizeof(int)); if ( n != sizeof (int)) perror("Error: Send currentJob to dispatcher"); else jobs[currentJob].sockDisp = acceptSocket; } else { printf("Master: It is a SC %d\n", msg); jobs[msg].sockSC = acceptSocket; jobs[msg].on_exec = 0; jobs[msg].timeSlot = timeSlot; jobs[msg].justlaunch = 0; jobs[msg].launch_next = 0; } } }}int send_launchJ_order (int iJob){ int n; // sending the identifier (iJob) of job to be launched n = write(jobs[iJob].sockDisp, &iJob, sizeof(int)); if ( n != sizeof (int)) { close(jobs[iJob].sockDisp); jobs[iJob].sockDisp = -1; perror("Master: Send currentJob to dispatcher for continue order"); return 0; } else { printf("Master: Sent currentJob = %d to dispatcher for continue\n", iJob); // initialize the structure of a job jobs[iJob].on_exec = 1; jobs[iJob].remaintime = jobs[iJob].timeSlot; if ( launch_begin_ckpt ) // on prefetch policy, indicate when launching a job before // checkpointing jobs[iJob].remaintime = jobs[iJob].remaintime - ckpt_time; nbrJob_on_exec++; return 1; }}/** The next job to launch is the next on the Roud-Robin algorithm * which can be : - a) not on exececution, or * - b) on checkpoint, or * - c) on execution if not a) and b) found*/void setNextJob (){ int i; nextJob++; if ( nextJob == nbrJobs ) nextJob = 0; for ( i = nextJob; i < nbrJobs; i++) if ( jobs[i].sockSC != -1 && jobs[i].sockDisp != -1 && ((!jobs[i].on_exec) || jobs[i].remaintime == 0)) // remains time = 0 if on ckpt { nextJob = i; return; } if ( nextJob != 0 ) for (i = 0; i < nextJob; i++) if ( jobs[i].sockSC != -1 && jobs[i].sockDisp != -1 && ((!jobs[i].on_exec) || jobs[i].remaintime == 0)) { nextJob = i; return; }}int finalize_oneJ_checkAll (int iJob){ int i, n; char req; n = read(jobs[iJob].sockDisp, &req, sizeof(char)); if (n != sizeof(char)) printf("Master: dispatcher %d disconnected\n", iJob); else printf("Master: dispatcher %d end\n", iJob); close (jobs[iJob].sockDisp); jobs[iJob].sockDisp = -1; jobs[iJob].on_exec = 0; nbrJob_on_exec--; nbrJ_rest--; if(jobs[iJob].remaintime == 0) // finalize job during checkpointing { nbrJob_on_ckpt--; printf("Master: finalized job, on_ckpt = %d\n",nbrJob_on_ckpt ); } for (i = nextJob ; i < nbrJobs; i++) if (jobs[i].sockDisp != -1) return 0; for (i = 0 ; i < nextJob; i++) if (jobs[i].sockDisp != -1) return 0; // last disconnection of SC printf("TEMPS ECOULEEEEEEEEEE = %d\n",minTimeSlot - timeout.tv_sec ); n = read(jobs[iJob].sockSC, &req, sizeof(char)); printf("Master: SC disconnect %d \n", iJob); close(jobs[iJob].sockSC); return 1;}int launchNextJob (){ if (jobs[nextJob].on_exec) return 0; if (jobs[nextJob].sockDisp == -1) { // case when next job was on execution and finalized setNextJob(); if (jobs[nextJob].sockDisp == -1 || jobs[nextJob].on_exec) return 0; } if (send_launchJ_order (nextJob)) { saveNext = nextJob; jobs[nextJob].remaintime = jobs[nextJob].remaintime + minTimeSlot - timeout.tv_sec; setNextJob(); return 1; } /* set next job to be launched*/ setNextJob (); return 0;}void on_begin_checkpoint (){ struct master_info req; int n, i; if(nbrJ_rest <= init_nbrJlaunched) return; // no checkpoint/restart if only init_nbrJlaunched jobs req.type = 'C'; // ckpt order i = 0; while( i < nbrJobs ) { if ( jobs[i].remaintime > 0 && jobs[i].remaintime == minTimeSlot && jobs[i].on_exec ) { if (launch_begin_ckpt && !(jobs[i].launch_next)) { // prefetch policy and launch a job phase launchNextJob(); jobs[i].launch_next = 1; // next phase for job i is checkpoint jobs[i].remaintime = ckpt_time + minTimeSlot - timeout.tv_sec; i++; continue; } jobs[i].launch_next = 0; /* on parallel checkpoint/restart policy, avoiding send ckpt order for just launched job */ if (jobs[i].justlaunch) { jobs[i].justlaunch = 0; i++; continue; } // sending ckpt order to SC of job i req.numJob = i; n = write ( jobs[i].sockSC , &req, sizeof(struct master_info)); if ( n != sizeof(struct master_info)) { perror("Master, error: send checkpoint order to SC"); close(jobs[i].sockSC); jobs[i].sockSC = -1; jobs[i].on_exec = 0; if (nextJob == i) setNextJob(); } else { printf("Master: checkpoint order Sent to SC %d \n", i); jobs[i].remaintime = 0; nbrJob_on_ckpt++; printf("MAster: incrementer on_ckpt = %d\n",nbrJob_on_ckpt ); if ( begin_ckpt_launch ) { if (launchNextJob() && saveNext > i) jobs[saveNext].justlaunch = 1; } } } jobs[i].justlaunch = 0; // reinit after treating job i i++; } }void on_end_Checkpoint (int iJob){ struct master_info rep; int n, j; int diff; n = read ( jobs[iJob].sockSC, &rep, sizeof(struct master_info)); if ( n != sizeof(struct master_info)) { printf("Master: SC disconnect %d \n", iJob); close(jobs[iJob].sockSC); jobs[iJob].sockSC = -1; jobs[iJob].on_exec = 0; } else { switch (rep.type) { case 'I': // ckpt impossible (job on restart phase) jobs[iJob].remaintime = RE_ORDER_CKPT_WAIT + minTimeSlot - timeout.tv_sec; jobs[iJob].launch_next = 1;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -