📄 daemoncsched.c
字号:
/* MPICH-V/CL Copyright (C) 2002, 2003 Groupe Cluster et Grid, LRI, Universite de Paris Sud This file is part of MPICH-V/CL. MPICH-V/CL is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. MPICH-V/CL is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with MPICH-V/CL; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA $Id: daemonCsched.c,v 1.17 2004/06/17 17:35:25 bouziane Exp $*/#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>#include <sys/ioctl.h>#include <unistd.h>#include <stdlib.h>#include <assert.h>#include "protocolCheckpoint.h"#include "config.h"#include "debug.h"#include "daemoncom.h"#include "utils_socket.h"#include "daemonCsched.h"#include "csched_proto.h"static int status = STATUS_NOT_CONNECTED;static int np;static int tosend = 0;void cp_request(){ if(status == STATUS_CONNECTED) { status = STATUS_CP_REQUESTED; printi("ckpt-request","checkpoint requested"); }}intcsched_can_cp_now (){ return (status == STATUS_CP_REQUESTED);}void csched_on_begin_cp (){ /* struct csched_req r; */ if (status != STATUS_CP_REQUESTED) printw ("begin cp on its own decision!"); status = STATUS_CP_RUNNING; launch_ckpt(); printi ("CSCHED", "warned other daemon of my new cp");}void csched_on_end_cp (){ tosend = 1;}void set_ckpt_status(int statu){ status = statu;}int ckpt_status(){ return status;}int csched_ckpt_type(){ return CP_START;}/******************************* * API * *******************************/int ftp_getcschedfdset(){ return tosend;}intftp_init_csched(struct sockaddr_in *saddr, int myrank, int worldsize){ int fd; int seq; if (status != STATUS_NOT_CONNECTED) { printw("connect requested but already connected (connection call ignored)"); return -1; } fd = _usocket (); if (fd < 0) qerror ("unable to allocate a TCP socket"); if (connect (fd, (struct sockaddr *) saddr, sizeof (struct sockaddr_in)) < 0) qerror ("unable to connect to CSCHED server %s:%d", inet_ntoa (saddr->sin_addr), ntohs (saddr->sin_port)); if (fd < 0) return fd; if (_usend (fd, &myrank, sizeof (int), 0) != sizeof (int)) { close (fd); fd = -1; return fd; } np = worldsize; if (_urecv (fd, &seq, sizeof (int), 0) != sizeof (int)) { close (fd); fd = -1; return fd; } seq = ntohl ( seq ); pckpt_setSeqnumber(seq); printi("CSCHED", "Sequence number initialization is %d", seq); status = STATUS_CONNECTED; return fd;}int ftp_csched_write (int fd){ int ret; struct csched_req r; if (status == STATUS_CONNECTED) printw ("end cp but no cp running! (status is %d)", status); if (status == STATUS_CP_RUNNING) { printi ("CSCHED", "CP ended. Informing csched"); /* have to inform csched */ r.type = CSCHED_REP_END_CHECKPOINT; ret = _usend(fd, &r, sizeof(struct csched_req), 0); if (ret != sizeof(struct csched_req)) { qerror("writing to csched"); } /* re-initialize checkpoint var */ tosend = 0; ckpt_info_reinit(); status = STATUS_CONNECTED; return 0; } if (status == STATUS_RESTARTING) { printi ("CSCHED", "restart ended. Informing csched"); /* have to inform csched */ r.type = CSCHED_END_RESTART; ret = _usend(fd, &r, sizeof(struct csched_req), 0); if (ret != sizeof(struct csched_req)) { qerror("writing to csched"); } /* re-initialize checkpoint var */ tosend = 0; ckpt_info_reinit(); status = STATUS_CONNECTED; return 0; } printw("status where no write is possible",status); return -1;}int ftp_csched_read (int fd){ static char * buf = NULL; int toread; struct csched_req *r; buf = (char *) malloc (sizeof(struct csched_req)); toread = _urecv(fd, buf, sizeof(struct csched_req), 0); if (toread != sizeof(struct csched_req) ) { printe ("read on sockCSCHED (%d) broken", fd); close (fd); return -1; } r = (struct csched_req *) buf; printi ("CSCHED", "handling some request : %d", r->type); switch (r->type) { case CSCHED_REQ_CHECKPOINT: if (status == STATUS_CONNECTED) { ckpt_csched_warn(); cp_request(); } else if (status == STATUS_CP_RUNNING || status == STATUS_CP_REQUESTED) { ckpt_csched_warn(); } else printw ("checkpoint scheduler asked for checkpoint while in state (%d). Ignoring.", status); break; case CSCHED_REQ_CKPTEND: assert( status == STATUS_CONNECTED); /* remove past checkpoint image on local disk */ ckpt_info_reinit(); break; default: printe ("unknown scheduling request : %u", r->type); break; } free(buf); buf=NULL; return 0;}void csched_begin_restart(){ status = STATUS_RESTARTING;}void csched_end_restart( ){ tosend = 1;}int ftp_wait_ckpt (){ return 0;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -