⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 csched.c

📁 Path MPICH-V for MPICH the MPI Implementation
💻 C
字号:
/*  MPICH-V Vcl  Copyright (C) 2002, 2003, 2004 Groupe Cluster et Grid, LRI, Universite de Paris Sud  This file is part of MPICH-V/CL.  MPICH-V/CL is free software; you can redistribute it and/or modify  it under the terms of the GNU General Public License as published by  the Free Software Foundation; either version 2 of the License, or  (at your option) any later version.  MPICH-V/CL is distributed in the hope that it will be useful,  but WITHOUT ANY WARRANTY; without even the implied warranty of  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the  GNU General Public License for more details.  You should have received a copy of the GNU General Public License  along with MPICH-V/CL; if not, write to the Free Software  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA  $Id: csched.c,v 1.21 2005/12/16 14:09:02 herault Exp $*/#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>#include <stdio.h>#include <signal.h>#include <sys/time.h>#include <unistd.h>#include <stdlib.h>#include <errno.h>#include <sys/ioctl.h>#include <string.h>#include <netdb.h>#include "config.h"#include "debug.h"#include "utils_socket.h"#include "csched_proto.h"#include "csched.h"#include "server.h"static int fd = -1;                 /**< socket fd to listen to*/static int npMPI = 0;               /**< number of MPI process */static int thetimeout;              /**< time interval between 2 checkpoint requests */static int numPhase = -1;           /**< num of the last successful global checkpoint */static int netNumphase = -1;        /**< num of the last successful global checkpoint in network format */static short state = 0;             /**< if in a checkpoint phase then 1, if in                                         restart phase 2, else 0 */static char * ckptend;              /**< table of process's checkpointed notification */static int * mpifds;                /**< sockets of mpi process by rank */static int socketToDispatcher;      /**< Socket to the dispatcher *//** * use when a checkpoint phase is stopped (interrupt or complete)  */static void checkpointended() {  if (state == 1) /**< true only if all mpi process have been asked for checkpoint */    {      state = 0;      numPhase--;      netNumphase = htonl(numPhase);    }  bzero(ckptend, (npMPI * sizeof(char)));}static void disconnect(int rank){  printi("SC", "rank %d has broken connection (%s)", rank, strerror(errno));  close(mpifds[rank]);  mpifds[rank] = -1;  checkpointended();}static void on_quit(int s){  int i;  if(socketToDispatcher != -1)    close(socketToDispatcher);  if(mpifds && npMPI)    {      for(i = 0; i < npMPI; i++) /**< close mpi sockets */	if(mpifds[i] != -1)	  close(mpifds[i]);    }  if(fd != -1)    close(fd); /**< close listening socket */  exit(0);}/** * return 1 if everybody is connected, 0 else  */static int isAnybodyOutThere() {  int i;  for ( i = 0 ; i < npMPI ; i++)    if (mpifds[i] == -1 )      {        printi("SC", "MPI process of rank %d is not here\n", i);        return 0;      }  return 1;}static int atomic_send(int fd, void *buf, int size){  int s;  s  = _usend(fd, buf, size, 0);    if(s <= 0)      {        printi("SC", "connection broken by peer (fd = %d )", fd);        return -1;      }  return size;}static int new_connection(int fd){  int rlen, tfd,  rank;  struct sockaddr_in addr;  rlen = sizeof(struct sockaddr_in);  tfd = accept(fd, (struct sockaddr*)&addr, &rlen);  printi("SC", "new connection from %s:%d\n", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));  rlen = _urecv(tfd, ((char*)&rank), sizeof(int), 0);  if(rlen <= 0)    {      printi("SC", "connection closed from %s:%u before rank sent\n",             inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));      close(tfd);      return -1;    }  printi("SC", "it is rank %d\n", rank);  mpifds[rank] = tfd;  printi("SC", "seqnumber sent %d \n",ntohl(netNumphase));  if (atomic_send(mpifds[rank], &netNumphase, sizeof(int)) < 0)    disconnect(rank);  /** if restarting, state = 2 */  if ( state != 2 && numPhase > -1)    state = 2;  return 0;}/** * use to warn daemons about the end of the checkpoint phase  */static void send_daemon_notification(){  int i;  struct csched_req rep;  rep.type = CSCHED_REQ_CKPTEND;  for( i = 0 ; i < npMPI ; i++)    {      if(atomic_send(mpifds[i], &rep, sizeof(struct csched_req) ) < 0)        disconnect(i);    }}/** * use to ask one MPI process to checkpoint  */int send_checkpoint_order(int rank) {  int ret;  struct csched_req req;  req.type = CSCHED_REQ_CHECKPOINT;  printi("SC","send checkpoint request to %d", rank);  ret = atomic_send(mpifds[rank], &req, sizeof(struct csched_req));  if (ret == -1)    {      numPhase--;      netNumphase = htonl(numPhase);      disconnect(rank);      return -1;    }  return 0;}/** * ask everybody to checkpoint  */void checkpoint(){  int i, ret;  printw("checkpoint request started");  numPhase++;  netNumphase = htonl(numPhase);  for(i = 0 ; i < npMPI ; i++)    {      if (mpifds[i] == -1)  /**< should not be happening */	return;      ret = send_checkpoint_order(i);      if (ret == -1)        return;    }  state = 1;}static void ckpt_signal(int s){  if (isAnybodyOutThere()  && (!state))            checkpoint();  signal (SIGUSR1,ckpt_signal);   signal (SIGTSTP,ckpt_signal); }int main(int argc, char *argv[]) {  fd_set rfs;  int i, j, maxfd, sv;  struct timeval timeout;  struct gengetopt_args_info *args;    int policy = 1; /* 0 : no checkpoint / 1 : checkpoints */  args = initialize_mpichv_service("SC", argc, argv, &socketToDispatcher,				   &fd);  signal(SIGINT, on_quit);  signal(SIGUSR1, ckpt_signal);  signal(SIGTSTP, ckpt_signal);  if( args->ckpt_timeout_arg )    thetimeout = args->ckpt_timeout_arg;  else    policy = 0;  npMPI = args->n_procs_arg;  ckptend = calloc(npMPI , sizeof(char));  mpifds  = malloc(npMPI * sizeof(int));  for(i = 0 ; i < npMPI ; i++)    mpifds[i] = -1;  timeout.tv_sec = thetimeout;  timeout.tv_usec = 0;  for(;;)    {      FD_ZERO(&rfs);      maxfd = fd;      /* listen socket in read set */      FD_SET(fd, &rfs);      /* mpi socket in read set */      for(i = 0; i < npMPI; i++)	if(mpifds[i] != -1)	  {            FD_SET(mpifds[i], &rfs);	    if(mpifds[i] > maxfd)	      maxfd = mpifds[i];	  }            if (policy)	{	  if(isAnybodyOutThere()) /* execution with checkpoint  */	    sv = select( maxfd + 1, &rfs, NULL, NULL, &timeout);	  else	    {	      timeout.tv_sec = thetimeout;	      timeout.tv_usec = 0;	      sv = select( maxfd + 1, &rfs, NULL, NULL, NULL);	    }	}      else        /* execution without checkpoint */        sv = select( maxfd + 1, &rfs, NULL, NULL, NULL);      if( sv < 0 ) /* something wrong with the select */	{	  if( (errno == EAGAIN) || (errno == EINTR) )	    continue;	  printe("select");	  on_quit(-1);	}      if(sv == 0)  /* timeout for checkpoint */	{	  printi("SC","timeout");          timeout.tv_sec  = thetimeout;          timeout.tv_usec = 0;          if (isAnybodyOutThere() && policy && (!state))            checkpoint();        }      if(FD_ISSET(fd, &rfs))            /* new connection from MPI process */	new_connection(fd);      for(i = 0; i < npMPI; i++)        /* received something from a MPI process */	if( (mpifds[i] != -1) && FD_ISSET(mpifds[i], &rfs) )	  {            struct csched_req buf;            if (_urecv(mpifds[i], &buf, sizeof(struct csched_req), 0) != sizeof(struct csched_req) )              {                disconnect(i);                continue ;              }            switch (buf.type)              {              case CSCHED_REP_END_CHECKPOINT:                printi("csched", "MPI process %d says it has finished phase %d", i, state);                ckptend[i] = 1;                /* verify if global checkpoint complete */		                break;              case CSCHED_END_RESTART:                ckptend[i] = 1;                break;              default:                printe(" unknown request from peer %d", i);                continue;              }          }      for(j = 0 ; (j < npMPI) && ckptend[j] ; j++) /* test if the checkpoint/restart phase is achieved */        printi("csched", "%d of %d MPI processes have finished checkpoint", j+1, npMPI);      if (j == npMPI)	{          if (state == 1)            {              printw("ckpt phase ended");  	      timeout.tv_sec = thetimeout;	      timeout.tv_usec = 0;              send_daemon_notification();              state = 0;              checkpointended();            }          else            {              printi("csched-complete","restart phase ended");              state = 0;              checkpointended();            }	}    }  close(socketToDispatcher);  return 0;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -