📄 csched.c
字号:
/* MPICH-V Vcl Copyright (C) 2002, 2003, 2004 Groupe Cluster et Grid, LRI, Universite de Paris Sud This file is part of MPICH-V/CL. MPICH-V/CL is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. MPICH-V/CL is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with MPICH-V/CL; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA $Id: csched.c,v 1.21 2005/12/16 14:09:02 herault Exp $*/#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>#include <stdio.h>#include <signal.h>#include <sys/time.h>#include <unistd.h>#include <stdlib.h>#include <errno.h>#include <sys/ioctl.h>#include <string.h>#include <netdb.h>#include "config.h"#include "debug.h"#include "utils_socket.h"#include "csched_proto.h"#include "csched.h"#include "server.h"static int fd = -1; /**< socket fd to listen to*/static int npMPI = 0; /**< number of MPI process */static int thetimeout; /**< time interval between 2 checkpoint requests */static int numPhase = -1; /**< num of the last successful global checkpoint */static int netNumphase = -1; /**< num of the last successful global checkpoint in network format */static short state = 0; /**< if in a checkpoint phase then 1, if in restart phase 2, else 0 */static char * ckptend; /**< table of process's checkpointed notification */static int * mpifds; /**< sockets of mpi process by rank */static int socketToDispatcher; /**< Socket to the dispatcher *//** * use when a checkpoint phase is stopped (interrupt or complete) */static void checkpointended() { if (state == 1) /**< true only if all mpi process have been asked for checkpoint */ { state = 0; numPhase--; netNumphase = htonl(numPhase); } bzero(ckptend, (npMPI * sizeof(char)));}static void disconnect(int rank){ printi("SC", "rank %d has broken connection (%s)", rank, strerror(errno)); close(mpifds[rank]); mpifds[rank] = -1; checkpointended();}static void on_quit(int s){ int i; if(socketToDispatcher != -1) close(socketToDispatcher); if(mpifds && npMPI) { for(i = 0; i < npMPI; i++) /**< close mpi sockets */ if(mpifds[i] != -1) close(mpifds[i]); } if(fd != -1) close(fd); /**< close listening socket */ exit(0);}/** * return 1 if everybody is connected, 0 else */static int isAnybodyOutThere() { int i; for ( i = 0 ; i < npMPI ; i++) if (mpifds[i] == -1 ) { printi("SC", "MPI process of rank %d is not here\n", i); return 0; } return 1;}static int atomic_send(int fd, void *buf, int size){ int s; s = _usend(fd, buf, size, 0); if(s <= 0) { printi("SC", "connection broken by peer (fd = %d )", fd); return -1; } return size;}static int new_connection(int fd){ int rlen, tfd, rank; struct sockaddr_in addr; rlen = sizeof(struct sockaddr_in); tfd = accept(fd, (struct sockaddr*)&addr, &rlen); printi("SC", "new connection from %s:%d\n", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port)); rlen = _urecv(tfd, ((char*)&rank), sizeof(int), 0); if(rlen <= 0) { printi("SC", "connection closed from %s:%u before rank sent\n", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port)); close(tfd); return -1; } printi("SC", "it is rank %d\n", rank); mpifds[rank] = tfd; printi("SC", "seqnumber sent %d \n",ntohl(netNumphase)); if (atomic_send(mpifds[rank], &netNumphase, sizeof(int)) < 0) disconnect(rank); /** if restarting, state = 2 */ if ( state != 2 && numPhase > -1) state = 2; return 0;}/** * use to warn daemons about the end of the checkpoint phase */static void send_daemon_notification(){ int i; struct csched_req rep; rep.type = CSCHED_REQ_CKPTEND; for( i = 0 ; i < npMPI ; i++) { if(atomic_send(mpifds[i], &rep, sizeof(struct csched_req) ) < 0) disconnect(i); }}/** * use to ask one MPI process to checkpoint */int send_checkpoint_order(int rank) { int ret; struct csched_req req; req.type = CSCHED_REQ_CHECKPOINT; printi("SC","send checkpoint request to %d", rank); ret = atomic_send(mpifds[rank], &req, sizeof(struct csched_req)); if (ret == -1) { numPhase--; netNumphase = htonl(numPhase); disconnect(rank); return -1; } return 0;}/** * ask everybody to checkpoint */void checkpoint(){ int i, ret; printw("checkpoint request started"); numPhase++; netNumphase = htonl(numPhase); for(i = 0 ; i < npMPI ; i++) { if (mpifds[i] == -1) /**< should not be happening */ return; ret = send_checkpoint_order(i); if (ret == -1) return; } state = 1;}static void ckpt_signal(int s){ if (isAnybodyOutThere() && (!state)) checkpoint(); signal (SIGUSR1,ckpt_signal); signal (SIGTSTP,ckpt_signal); }int main(int argc, char *argv[]) { fd_set rfs; int i, j, maxfd, sv; struct timeval timeout; struct gengetopt_args_info *args; int policy = 1; /* 0 : no checkpoint / 1 : checkpoints */ args = initialize_mpichv_service("SC", argc, argv, &socketToDispatcher, &fd); signal(SIGINT, on_quit); signal(SIGUSR1, ckpt_signal); signal(SIGTSTP, ckpt_signal); if( args->ckpt_timeout_arg ) thetimeout = args->ckpt_timeout_arg; else policy = 0; npMPI = args->n_procs_arg; ckptend = calloc(npMPI , sizeof(char)); mpifds = malloc(npMPI * sizeof(int)); for(i = 0 ; i < npMPI ; i++) mpifds[i] = -1; timeout.tv_sec = thetimeout; timeout.tv_usec = 0; for(;;) { FD_ZERO(&rfs); maxfd = fd; /* listen socket in read set */ FD_SET(fd, &rfs); /* mpi socket in read set */ for(i = 0; i < npMPI; i++) if(mpifds[i] != -1) { FD_SET(mpifds[i], &rfs); if(mpifds[i] > maxfd) maxfd = mpifds[i]; } if (policy) { if(isAnybodyOutThere()) /* execution with checkpoint */ sv = select( maxfd + 1, &rfs, NULL, NULL, &timeout); else { timeout.tv_sec = thetimeout; timeout.tv_usec = 0; sv = select( maxfd + 1, &rfs, NULL, NULL, NULL); } } else /* execution without checkpoint */ sv = select( maxfd + 1, &rfs, NULL, NULL, NULL); if( sv < 0 ) /* something wrong with the select */ { if( (errno == EAGAIN) || (errno == EINTR) ) continue; printe("select"); on_quit(-1); } if(sv == 0) /* timeout for checkpoint */ { printi("SC","timeout"); timeout.tv_sec = thetimeout; timeout.tv_usec = 0; if (isAnybodyOutThere() && policy && (!state)) checkpoint(); } if(FD_ISSET(fd, &rfs)) /* new connection from MPI process */ new_connection(fd); for(i = 0; i < npMPI; i++) /* received something from a MPI process */ if( (mpifds[i] != -1) && FD_ISSET(mpifds[i], &rfs) ) { struct csched_req buf; if (_urecv(mpifds[i], &buf, sizeof(struct csched_req), 0) != sizeof(struct csched_req) ) { disconnect(i); continue ; } switch (buf.type) { case CSCHED_REP_END_CHECKPOINT: printi("csched", "MPI process %d says it has finished phase %d", i, state); ckptend[i] = 1; /* verify if global checkpoint complete */ break; case CSCHED_END_RESTART: ckptend[i] = 1; break; default: printe(" unknown request from peer %d", i); continue; } } for(j = 0 ; (j < npMPI) && ckptend[j] ; j++) /* test if the checkpoint/restart phase is achieved */ printi("csched", "%d of %d MPI processes have finished checkpoint", j+1, npMPI); if (j == npMPI) { if (state == 1) { printw("ckpt phase ended"); timeout.tv_sec = thetimeout; timeout.tv_usec = 0; send_daemon_notification(); state = 0; checkpointended(); } else { printi("csched-complete","restart phase ended"); state = 0; checkpointended(); } } } close(socketToDispatcher); return 0;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -