⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mastersched.c

📁 Path MPICH-V for MPICH the MPI Implementation
💻 C
📖 第 1 页 / 共 2 页
字号:
#include <stdio.h>#include <stdlib.h>#include <sys/types.h>#include <unistd.h>#include <sys/socket.h>#include <netdb.h>#include <netinet/tcp.h>#include <string.h>#include <errno.h>#include "masterSched.h"#define MAX_LISTEN_WAIT 10#define RE_ORDER_CKPT_WAIT 20  // wait 20sec for reorder ckpt if impossiblestruct SchedJob * jobs =  NULL;      /* Structure of jobs.*/int nbrJobs = 0;                     /* Total number of jobs.*/int init_nbrJlaunched = 0;    /* Number of jobs to be executed simultaneously.*/int nbrJ_rest = 0;             /* Number of remains jobs.*/int listenPort;                /* Listen port of Master.*/int listenfd = -1;             /* Socket of Master.*/char scriptname[164];          /* Command line for launching jobs.*/int timeSlot = 0;              /* Time slice for jobs.*/int minTimeSlot = 0;           /* Less remain time of jobs.*/int ckpt_time = 0;             /* For prefetching restart jobs.*/    int nextJob = 0;               /* Futur job to execute.*/ int saveNext = 0;              /* To avoid ckpt order for restarted jobs.*/ int end_ckpt_launch = 1;       /* Ckpt/restart sequential policy.*/int begin_ckpt_launch = 0;     /* Ckpt/restart prallel policy.*/int launch_begin_ckpt = 0;     /* Ckpt/restart prefetch policy.*/struct timeval timeout;        /* Counter of time.*/  int nbrJob_on_exec = 0;       /* Number of jobs on execution*/int nbrJob_on_ckpt = 0;       /* Number of jobs on ckpt phase*/void newConnection(){  int acceptSocket = -1;  struct sockaddr_in pin;  int addrlen;  int n;  static int currentJob = -1;  int msg;    if ((acceptSocket = accept(listenfd, (struct sockaddr *)&pin, &addrlen)) < 0)    {      perror("Could not accept socket connection exit");      exit (-1);    }   else     {      n = read(acceptSocket, &msg, sizeof(int));      if ( n != sizeof (int))        perror("Master: Error Receive some value");      else        {                    if (msg == -1)            {              printf("Master: It is a dispatcher\n");              currentJob++;              n = write(acceptSocket, &currentJob, sizeof(int));              if ( n != sizeof (int))                perror("Error: Send currentJob to dispatcher");                            else                                jobs[currentJob].sockDisp = acceptSocket;                   }          else            {              printf("Master: It is a SC %d\n", msg);              jobs[msg].sockSC = acceptSocket;              jobs[msg].on_exec = 0;               jobs[msg].timeSlot = timeSlot;              jobs[msg].justlaunch = 0;	      jobs[msg].launch_next = 0;            }        }    }}int send_launchJ_order (int iJob){  int n;  // sending the identifier (iJob) of job to be launched  n = write(jobs[iJob].sockDisp, &iJob, sizeof(int));  if ( n != sizeof (int))    {      close(jobs[iJob].sockDisp);      jobs[iJob].sockDisp = -1;      perror("Master: Send currentJob to dispatcher for continue order");      return 0;    }  else    {      printf("Master: Sent currentJob = %d to dispatcher for continue\n", iJob);      // initialize the structure of a job      jobs[iJob].on_exec = 1;      jobs[iJob].remaintime = jobs[iJob].timeSlot;           if ( launch_begin_ckpt )        // on prefetch policy, indicate when launching a job before         // checkpointing	jobs[iJob].remaintime = jobs[iJob].remaintime - ckpt_time;      nbrJob_on_exec++;      return 1;          }}/** The next job to launch is the next on the Roud-Robin algorithm  * which can be  : - a) not on exececution, or *                 - b) on checkpoint, or *                 - c) on execution if not a) and b) found*/void setNextJob (){  int i;  nextJob++;  if ( nextJob == nbrJobs )    nextJob = 0;    for ( i = nextJob; i < nbrJobs; i++)    if ( jobs[i].sockSC != -1 && jobs[i].sockDisp != -1 &&          ((!jobs[i].on_exec) || jobs[i].remaintime == 0)) // remains time = 0 if on ckpt      {        nextJob = i;        return;      }    if ( nextJob != 0 )    for (i = 0; i < nextJob; i++)      if ( jobs[i].sockSC != -1 && jobs[i].sockDisp != -1 &&           ((!jobs[i].on_exec) || jobs[i].remaintime == 0))        {          nextJob = i;          return;        }}int finalize_oneJ_checkAll (int iJob){  int i, n;  char req;    n = read(jobs[iJob].sockDisp, &req, sizeof(char));  if (n != sizeof(char))    printf("Master: dispatcher %d disconnected\n", iJob);    else    printf("Master: dispatcher %d end\n", iJob);      close (jobs[iJob].sockDisp);  jobs[iJob].sockDisp = -1;  jobs[iJob].on_exec = 0;  nbrJob_on_exec--;  nbrJ_rest--;    if(jobs[iJob].remaintime == 0) // finalize job during checkpointing    {      nbrJob_on_ckpt--;      printf("Master: finalized job, on_ckpt = %d\n",nbrJob_on_ckpt );    }  for (i = nextJob ; i < nbrJobs; i++)    if (jobs[i].sockDisp != -1)        return 0;            for (i = 0 ; i < nextJob; i++)    if (jobs[i].sockDisp != -1)      return 0;    // last disconnection of SC  printf("TEMPS ECOULEEEEEEEEEE = %d\n",minTimeSlot - timeout.tv_sec );  n = read(jobs[iJob].sockSC, &req, sizeof(char));  printf("Master: SC disconnect %d \n", iJob);  close(jobs[iJob].sockSC);  return 1;}int launchNextJob (){  if (jobs[nextJob].on_exec)    return 0;    if (jobs[nextJob].sockDisp == -1)    {      // case when next job was on execution and finalized      setNextJob();      if (jobs[nextJob].sockDisp == -1 || jobs[nextJob].on_exec)        return 0;    }  if (send_launchJ_order (nextJob))    {      saveNext = nextJob;      jobs[nextJob].remaintime = jobs[nextJob].remaintime + minTimeSlot - timeout.tv_sec;      setNextJob();        return 1;    }    /* set next job to be launched*/  setNextJob ();  return 0;}void on_begin_checkpoint (){  struct master_info req;  int n, i;    if(nbrJ_rest <= init_nbrJlaunched)    return;  // no checkpoint/restart if only init_nbrJlaunched jobs    req.type = 'C';  // ckpt order  i = 0;  while( i < nbrJobs )    {      if ( jobs[i].remaintime > 0 && jobs[i].remaintime == minTimeSlot && jobs[i].on_exec )        { 	  if (launch_begin_ckpt && !(jobs[i].launch_next))	    { // prefetch policy and launch a job phase	      launchNextJob();	      jobs[i].launch_next = 1; // next phase for job i is checkpoint	      jobs[i].remaintime = ckpt_time +  minTimeSlot - timeout.tv_sec;	      i++;	      continue;	    }	  jobs[i].launch_next = 0;                    /* on parallel checkpoint/restart policy, avoiding              send ckpt order for just launched job */          if (jobs[i].justlaunch)             {              jobs[i].justlaunch = 0;              i++;              continue;            }          // sending ckpt order to SC of job i          req.numJob = i;          n = write ( jobs[i].sockSC , &req, sizeof(struct master_info));          if ( n != sizeof(struct master_info))            {              perror("Master, error: send checkpoint order to SC");              close(jobs[i].sockSC);              jobs[i].sockSC = -1;              jobs[i].on_exec = 0;              if (nextJob == i)                setNextJob();            }          else            {              printf("Master: checkpoint order Sent to SC %d \n", i);               jobs[i].remaintime = 0;              nbrJob_on_ckpt++;              printf("MAster: incrementer on_ckpt = %d\n",nbrJob_on_ckpt );              if ( begin_ckpt_launch )                {                  if (launchNextJob() && saveNext > i)                    jobs[saveNext].justlaunch = 1;                }            }        }            jobs[i].justlaunch = 0; // reinit after treating job i      i++;    }  }void on_end_Checkpoint (int iJob){  struct master_info rep;  int n, j;  int diff;   n = read ( jobs[iJob].sockSC, &rep, sizeof(struct master_info));  if ( n != sizeof(struct master_info))    {      printf("Master: SC disconnect %d \n", iJob);      close(jobs[iJob].sockSC);            jobs[iJob].sockSC = -1;      jobs[iJob].on_exec = 0;     }  else    {      switch (rep.type)        {        case 'I': // ckpt impossible (job on restart phase)           jobs[iJob].remaintime = RE_ORDER_CKPT_WAIT + minTimeSlot - timeout.tv_sec;	  jobs[iJob].launch_next = 1;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -