📄 genericcheckpoint.c
字号:
/** @file genericCheckpoint.c implements the checkpoint API to access checkpoint server */#include "genericCheckpoint.h"#include "utils_socket.h"#include "config.h"#include <stdlib.h>#include <string.h>#include <errno.h>#include <unistd.h>#include <signal.h>#include <limits.h>#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <netdb.h>#include <sys/stat.h>#include <fcntl.h>#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>#ifndef BUFFSIZE#define BUFFSIZE PIPE_BUF#endif/****************************************************************************** Init and misc functions of the ckptlib Allmost all that have to deal with pipes is here******************************************************************************/#if 0#ifdef CKPT_HELPER_IS_CONDOR# define CKPT_PIPE_EXT ".tmp"#else # ifdef CKPT_HELPER_IS_CKPT # define CKPT_PIPE_EXT ""# else /* CKPT_HELPER_IS_BLCR */# define CKPT_PIPE_EXT ""# include "libcr.h"# endif /* CKPT_HELPER_IS_CKPT */ #endif /* CKPT_HELPER_IS_CONDOR */#endif#include "protocolCheckpoint.h"int downloadCheckpoint(void){ int ret; while((ret = prestart_genericread()) == 0) /* nothing */ ; if(ret != 1) qerror("reading checkpoint image failed"); if(prestart_end() < 0) qerror("finishing initialisation after checkpoint image read failed"); return 0;}/** * Create pipe used for transmission of the checkpoint file between native app * and the worker. * @param wid: the group id * @param rankid: the rank * @return 0 if no error. *//*int initCheckpointLib(int wid,int rankid){ char *restartpipe; char *ckptpipe; char buff[strlen(TMPDIR)+128];#ifndef NO_ADOC# ifdef ADOC_MIN adoc_set_min_level(ADOC_MIN);# endif# ifdef ADOC_MAX adoc_set_max_level(ADOC_MAX);# endif#endif sprintf(buff, TMPDIR"/%d:%d.restart.pipe", wid, rankid); restartpipe = strdup(buff); sprintf(buff, TMPDIR"/%d:%d.ckpt.pipe"CKPT_PIPE_EXT, wid, rankid); ckptpipe = strdup(buff); if(mknod(restartpipe,S_IFIFO | S_IREAD | S_IWRITE,0) == -1) { if(errno != EEXIST) { printe("creating restart pipe between native app and worker (mknod)"); free(restartpipe);free(ckptpipe); return -1; } } if(mknod(ckptpipe,S_IFIFO | S_IREAD | S_IWRITE,0) == -1) { if(errno != EEXIST) { printe("creating checkpoint pipe between native app and worker (mknod)"); free(restartpipe);free(ckptpipe); return -1; } } free(restartpipe);free(ckptpipe); return 0;}*//* Removes created pipes *//*int closeCheckpointLib(int wid,int rankid){ char *restartpipe; char *ckptpipe; char buff[512]; sprintf(buff, TMPDIR"/%d:%d.restart.pipe", wid, rankid); restartpipe = strdup(buff); sprintf(buff, TMPDIR"/%d:%d.ckpt.pipe"CKPT_PIPE_EXT, wid, rankid); ckptpipe = strdup(buff); unlink(restartpipe); unlink(ckptpipe); free(restartpipe);free(ckptpipe); return 0;}*//** * open the pipe for reading checkpoint image * @return fd of the pipe, -1 on failure. *//*int openCheckpointPipeRD(int wid, int rankid){ char pipefile[256]; int pipefd; sprintf(pipefile, TMPDIR"/%d:%d.ckpt.pipe"CKPT_PIPE_EXT, wid, rankid); if(mknod(pipefile, S_IFIFO | S_IREAD | S_IWRITE,0) == -1) { if(errno != EEXIST) { printe("creating checkpoint pipe between native app and worker (mknod) %s", pipefile); return -1; } } printi("ckpt_generic", "Waiting for checkpoint file on %s\n",pipefile); if((pipefd = open(pipefile,O_RDONLY | O_NONBLOCK)) == -1) { printe("Opening pipefile: %s",pipefile); return -1; } printi("ckpt_generic", "pipe opened\n"); return pipefd;}*//** * open the pipe for writting checkpoint image * @return fd of the pipe, -1 on failure. *//*int openCheckpointPipeWR(int wid, int rankid){ char pipefile[256]; int pipefd; sprintf(pipefile, TMPDIR"/%d:%d.restart.pipe", wid, rankid); if(mknod(pipefile, S_IFIFO | S_IREAD | S_IWRITE,0) == -1) { if(errno != EEXIST) { printe("creating checkpoint pipe between native app and worker (mknod) %s", pipefile); return -1; } } printi("ckpt_generic", "Waiting for checkpoint file on %s\n",pipefile); if((pipefd = open(pipefile,O_WRONLY | O_APPEND)) == -1) { printe("Opening pipefile: %s",pipefile); return -1; } printi("ckpt_generic", "pipe opened\n"); return pipefd;}*//******************************************************************************* All that deals with checkpoint protocol*******************************************************************************/static inline int _sendckpt(int s, const void *buffer, int size);static inline int _recvckpt(int s, void *buffer, int size);void initCheckpointSock(CkptSock *s){ s->proto = -1; s->data = -1; s->file = -1; s->protocur = 0; s->datacur = 0;}int connectCheckpointServer(CkptSock *s, const struct sockaddr *addr){ struct sockaddr_in cpaddr;# define addr ((struct sockaddr_in *) addr) ASSERT(s); s->proto = -1; s->data = -1; if(addr) { memcpy(&cpaddr, addr, sizeof(struct sockaddr_in)); printi("ckpt_generic", "try to connect to proto channel %s:%d\n", inet_ntoa(addr->sin_addr), ntohs(addr->sin_port)); if((s->proto = socket(PF_INET,SOCK_STREAM,0)) == -1) { printe("Connecting checkpoint server (proto channel socket)"); s->proto = s->data = -1; return -1; } if(connect(s->proto, (struct sockaddr*) &cpaddr, sizeof(struct sockaddr_in)) == -1) { printe("Connecting checkpoint server %s:%d (proto channel connect)", inet_ntoa(addr->sin_addr), ntohs(addr->sin_port)); close(s->proto); s->proto = s->data = -1; return -1; } if(_uread(s->proto, &(cpaddr.sin_port), sizeof(in_port_t)) != sizeof(in_port_t)) { printe("Recieving data channel port from server %s (_uread)", inet_ntoa(addr->sin_addr)); close(s->proto); s->proto = s->data = -1; return -1; } printi("ckpt_generic", "try to connect to data channel %s:%d\n", inet_ntoa(cpaddr.sin_addr), ntohs(cpaddr.sin_port)); if((s->data = socket(PF_INET,SOCK_STREAM,0)) == -1) { printe("Connecting checkpoint server (data channel socket)"); close(s->proto); s->proto = s->data = -1; return -1; } if(connect(s->data, (struct sockaddr *) &cpaddr, sizeof(struct sockaddr_in)) == -1) { printe("Connecting checkpoint server %s:%d (data channel connect)", inet_ntoa(cpaddr.sin_addr), ntohs(cpaddr.sin_port)); close(s->proto); close(s->data); s->proto = s->data = -1; return -1; } return 1; } else { printi("ckpt_generic", "Checkpoint server adress not filled, dismissing connect"); return 0; }# undef addr}#ifndef DEF_PERM#define DEF_PERM (S_IREAD | S_IWRITE | S_IRGRP | S_IWGRP)#endifint openWCheckpointLocalFile(CkptSock *s, int group, int rank, int seq){ ASSERT(s); if(seq >= 0) snprintf(s->name, 64, TMPDIR"/ckptimg-g%d-r%d-s%d", group, rank, seq); else snprintf(s->name, 64, TMPDIR"/ckptimg-g%d-r%d.tmp", group, rank); if((s->file = creat(s->name, DEF_PERM)) == -1) printe("Cannot open checkpoint local file %s", s->name); return s->file;}int openRCheckpointLocalFile(CkptSock *s, int group, int rank, int seq){ ASSERT(s); if(seq >= 0) snprintf(s->name, 64, TMPDIR"/ckptimg-g%d-r%d-s%d", group, rank, seq); else snprintf(s->name, 64, TMPDIR"/ckptimg-g%d-r%d", group, rank); if((s->file = open(s->name, O_RDONLY)) == -1) printe("Cannot open checkpoint local file %s", s->name); return s->file;}int sendCheckpointConfirm(CkptSock *s, int totalsize){ char buff[sizeof (int) + 1]; char *dstname; char *extchar; printi ("ckpt_confirm", "Sending final checkpoint confirmation (totaldatasize=%d)\n", totalsize); /* if some checkpoint server is connected, ACK the local copy has no meaning when distant ack failed */ if(s->proto != -1) { printi("ckpt_generic", "Linked to checkpoint server, sending confirmation"); *buff = 'C'; *((int *) (buff + 1)) = htonl (totalsize); if (_uwrite (s->proto, buff, sizeof (int) + 1) != (sizeof (int) + 1)) { printe ("Confirmation could not be sent ton checkpoint server\n"); return -1; } if (shutdown (s->proto, SHUT_WR) < 0) { printe ("Checkpoint protocol connection shutdown (WR) failed"); return -1; } if (_uread (s->proto, buff, 1) != 1) { printe ("Checkpoint confirmation could not be acknoledged\n"); return -1; } if (buff[0] != 'A') { printw ("checkpoint confirmation refused by server\n"); return -1; } } if(s->file != -1) { printi("ckpt_confirm", "Linked to file, confirming local file"); if(lseek(s->file, 4 * sizeof(int), SEEK_SET) != 4 * sizeof(int)) { printe("Ckpt: lseek to confirmation protocol area on local file failed"); return -2; } *((int *) buff) = htonl (totalsize); if(write(s->file, buff, sizeof(int)) != sizeof(int)) { printe("Ckpt: write checkpoint confirmation on local file failed"); return -2; } if(fsync(s->file) == -1) { printe("Flushing local checkpoint to disk"); return -2; } if(close(s->file) == -1) { printe("Closing local checkpoint file"); return -2; } s->file = -1; /* Do whe use seqnumbers ? If we don't we have to replace previous checkpoint file with this one */ dstname = strdup(s->name); if((extchar = strstr(dstname, ".tmp"))) { *extchar = '\0'; printi("ckpt_confirm", "Rename %s to %s", s->name, dstname); if(rename(s->name, dstname)) { printe("Replacing previous checkpoint file (%s) with current terminated checkpoint (%s)", s->name, dstname); free(dstname); return -2; } } free(dstname); } return 0;}int closeCheckpointServer(CkptSock *s){ int ret1, ret2; if(s->proto == -1) return 0; shutdown(s->proto, 2); shutdown(s->data, 2); ret1 = close(s->proto); ret2 = close(s->data); s->proto = -1; s->data = -1; if((ret1 == -1) || (ret2 == -1)) return -1; else return 0;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -