📄 protocolcheckpoint.c
字号:
/** @file protocolCheckpoint.c implements the checkpoint high level API for daemon * giving all necessary structures to define a protocol specific checkpoint * implementation. * Not thread safe functions, no mixed calls between multiple checkpoints as * some states are stored in static variables. */#include "config.h"#include "debug.h"#include "protocolCheckpoint.h"#include "checkpoint.h"#include "chl.h"#include <unistd.h>#include <fcntl.h>struct sockaddr *addr;int use_localfile;int group;int rank;int np;static int seqnumber = -1;/** if this variable is set, it means that a checkpoint is currently taking place * no other checkpoint can begin until current checkpoint is finished. */CkptInfo *_pckpt_cin = NULL;#define cin _pckpt_cinstatic int _put_ckptnp(CkptSock *s, int np);static int _get_ckptnp(CkptSock *s);static inline int _pckpt_rfdset_pipe(void);static inline int _pckpt_wfdset(void);#ifndef CKPT_BUFFSIZE#define CKPT_BUFFSIZE 64000#endifstatic char ckptbuff[CKPT_BUFFSIZE];int pckpt_init(struct sockaddr *_addr, int uselocal, int _group, int _rank, int _np){ use_localfile = uselocal; addr = _addr; group = _group; rank = _rank; np = _np; return initCheckpointLib(group, rank);}int pckpt_finalize(void){ return closeCheckpointLib(group, rank);}void pckpt_setCkptfunc(int ifunc, Ftp_CkptFunc func){ ASSERT(cin); ASSERT(ifunc < cin->nfunc); *(cin->ckpt_func + ifunc) = func;}void pckpt_setCkptargs(int iargs, void *args){ ASSERT(cin); ASSERT(iargs < cin->nfunc); *(cin->ckpt_func + cin->nfunc + iargs) = args;}void pckpt_setH(long h){ ASSERT(cin); cin->h = h;}/* remplac閑 par un defineinline CkptInfo *pckpt_getCkptInfo(void){ return cin;}*/void pckpt_setSeqnumber(int seq){ seqnumber = seq;}int pckpt_getSeqnumber(void){ return seqnumber;}Ftp_CkptFunc pckpt_getCkptfunc(void){ ASSERT(cin); ASSERT(cin->ifunc < cin->nfunc); return *(cin->ckpt_func + cin->ifunc);}void *pckpt_getCkptargs(void){ ASSERT(cin); ASSERT(cin->ifunc < cin->nfunc); return (*(((void **) (cin->ckpt_func + cin->nfunc)) + cin->ifunc));}int pckpt_wfdset(void){ if(addr) { if((!cin) || (cin->sock.data == -1) || (cin->sock.proto == -1)) return 0; } else if ((!cin) || (cin->sock.file == -1)) return 0; if(pckpt_getCkptfunc() == pckpt_writeimgfunc) return _pckpt_wfdset(); return ftp_ckpt_wfdset(cin);}int pckpt_rfdset(void){ if(!cin || (cin->pipe == -1)) return 0; return _pckpt_rfdset_pipe();} int pckpt_atomic_try_and_begin(void){ if((!cin) && ftp_ckpt_try()) { printi("ckpt","etape2"); if(pckpt_begin()) return CP_START; // if(pckpt_begin()) return ftp_ckpt_start(); else return 0; } else return 0;}CkptInfo *pckpt_begin(void){ CkptSock s; int pipe; int nbfunc; ASSERT(!cin); initCheckpointSock(&s); ftp_ckpt_seqnumber(seqnumber); if((pipe = openCheckpointPipeRD(group, rank)) < 0) return NULL; if(use_localfile) { if(openWCheckpointLocalFile(&s, group, rank, seqnumber) < 0) { close(pipe); return NULL; } } if(connectCheckpointServer(&s, addr) < 0) { if(s.file != -1) close(s.file); close(pipe); return NULL; } if(putCheckpointProto(&s, group, rank, seqnumber, ftp_ckpt_protocolsize(np) + sizeof(int)) < 0) { closeCheckpointServer(&s); close(pipe); return NULL; } if(_put_ckptnp(&s, np) < 0) { closeCheckpointServer(&s); close(pipe); return NULL; } nbfunc = ftp_ckpt_nbfunc(); printi("ckpt_begin", "Number of checkpoint functions to call for this Fault Tolerant protocol : %d", nbfunc); cin = (CkptInfo *) malloc(sizeof(CkptInfo) + nbfunc * sizeof(Ftp_CkptFunc) + nbfunc * sizeof(void *)); ASSERT(cin); cin->group = group; cin->rank = rank; cin->seq = seqnumber; cin->pipe = pipe; memcpy(&cin->sock, &s, sizeof(CkptSock)); cin->np = np; cin->datasize = 0; cin->ifunc = 0; cin->nfunc = nbfunc; printi("ckpt_begin", "Setting data channel connexion to Checkpoint server in ASYNC mode"); if(cin->sock.data != -1 ) if(fcntl(cin->sock.data, F_SETFL, O_NONBLOCK) < 0) qerror("fcntl"); printi("ckpt_begin", "Calling fault tolerant protocol specific checkpoint begin function"); if(ftp_ckpt_begin(cin) < 0) { closeCheckpointServer(&s); close(pipe); free(cin); cin = NULL; return NULL; } printi("ckpt_begin", "Calling fault tolerant protocol specific checkpoint function to set %d functions", cin->nfunc); ftp_ckpt_setwritefunc(cin); printi("ckpt_begin", "Checkpoint begin performed. Ready to send checkpoint data"); return cin;}int pckpt_genericwrite(void){ switch(pckpt_getCkptfunc()(cin, pckpt_getCkptargs())) { case -1 : return -1; case 0 : return 0; case 1 : if(++(cin->ifunc) < cin->nfunc) return 0; else return 1; default : printq("Undefined return code from FTP specific function %d of %d", cin->ifunc, cin->nfunc); }}int pckpt_end(void){ ASSERT(cin->ifunc == cin->nfunc); printi("ckpt", "Checkpoint %d finished", cin->seq); if(sendCheckpointConfirm(&(cin->sock), cin->datasize) < 0) { printe("Cannot send checkpoint confirmation to server (seq=%d, size=%d)\n", cin->seq, cin->datasize); free(cin); cin = NULL; return -1; } if(use_localfile) closeCheckpointLocalFile(&(cin->sock)); if(ftp_ckpt_end(cin) < 0) { printe("Cannot execute fault tolerant protocol specific action for checkpoint end"); free(cin); cin = NULL; return -1; } closeCheckpointServer(&(cin->sock)); free(cin); cin = NULL; return 0;}CkptInfo *prestart_begin(void){ int psize; int nbfunc; int ret; printi("Restart", "Restarting from Checkpoint %d:%d(%d)", group, rank, seqnumber); nbfunc = ftp_restart_nbfunc(); cin = (CkptInfo *) malloc(sizeof(CkptInfo) + nbfunc * sizeof(Ftp_CkptFunc) + nbfunc * sizeof(void *)); ASSERT(cin); cin->pipe = -1; initCheckpointSock(&(cin->sock)); cin->group = group; cin->rank = rank; cin->seq = seqnumber; if(use_localfile) { if(openRCheckpointLocalFile(&(cin->sock), group, rank, seqnumber) < 0) { if(addr == NULL) { printi ("Restart", "No valid checkpoint file found %d:%d(%d)", group, rank, cin->seq); return NULL; } else printi("Restart", "No valid local checkpoint file found %d:%d(%d)", group, rank, cin-> seq); } } if(connectCheckpointServer(&(cin->sock), addr) < 0) qerror ("could not connect to checkpoint server"); if((ret = getCheckpointProto(&(cin->sock), &(cin->group), &(cin->rank), &(cin->seq), &psize, &(cin->datasize))) < 0) { switch (ret) { case -1: qerror ("Could not retrieve checkpoint proto"); case -2: printi("Restart", "No suitable image found on server for %d:%d(%d)", group, rank, seqnumber); closeCheckpointServer(&(cin->sock)); free(cin); return cin = NULL; } } if((cin->np = _get_ckptnp(&(cin->sock))) < 0) qerror("Reading total number of processes"); cin->ifunc = 0; cin->nfunc = nbfunc; printi ("Restart", "Using checkpoint %d:%d(%d) psize=%d dsize=%d", cin->group, cin->rank, cin->seq, psize, cin->datasize); ASSERT(cin->np == np); ASSERT(psize == ftp_ckpt_protocolsize(cin->np) + sizeof(int)); if(ftp_restart_begin(cin) < 0) { closeCheckpointServer(&(cin->sock)); free(cin); return cin = NULL; } ftp_restart_setreadfunc(cin); return cin;}int prestart_genericread(void){ switch(pckpt_getCkptfunc()(cin, pckpt_getCkptargs())) { case -1 : return -1; case 0 : return 0; case 1 : if(++(cin->ifunc) < cin->nfunc) return 0; else return 1; default : printq("Undefined return code from FTP specific function %d of %d", cin->ifunc, cin->nfunc); } return 0;}int prestart_end(void){ if(ftp_restart_end(cin) < 0) { free(cin); cin = NULL; return -1; } free(cin); cin = NULL; return 0;}static int block_size = 0;static inline int _pckpt_wfdset(void){ if(block_size == 0) return 0; else return cin->sock.data;}static inline int _pckpt_rfdset_pipe(void){ if((pckpt_getCkptfunc() == pckpt_writeimgfunc) && (block_size == 0)) return cin->pipe; else return 0;}int pckpt_writeimgfunc(CkptInfo *cin, void *dummy){ static int datasize = 0; static int offset = 0; int writeret; ASSERT(dummy == NULL); if(block_size == 0) { /* if the last write is finished */ /* read the next block in the pipe */ if(cin->pipe != -1) { block_size = read(cin->pipe, ckptbuff, CKPT_BUFFSIZE); printi("ckpt_sendimg", "Reading on ckpt pipe returned %d bytes to send to ckpt server\n", block_size); } if(block_size == -1) { // if((errno != EAGAIN) && (errno != EINTR)) qerror("reading checkpoint image from pipe"); block_size = 0; return 0; } if(block_size == 0) { printi("ckpt_sendimg", "Nothing to send to checkpoint server, sending total image size of %d bytes\n", datasize); if(syncSendCheckpointProtoData(&(cin->sock), &datasize, sizeof(int)) != sizeof(int)) qerror("Cannot send confirmation of img %d size %d to stable Checkpoint server", cin->seq, datasize); printi("ckpt_sendimg", "Closing checkpoint pipe"); close(cin->pipe); cin->pipe = -1; cin->datasize += datasize; block_size = 0; datasize = 0; offset = 0; return 1; } offset = 0; } printi("ckpt_sendimg", "Sending %d image data\n", block_size); if((writeret = sendCheckpointImageData(&(cin->sock), ckptbuff + offset, block_size)) < 0) { if (errno == EAGAIN) { printi("ckpt", "Slow socket writing to checkpoint %s (%s)", writeret == -1 ? "server" : "file", strerror(errno)); return 0; } else qerror("Writting to checkpoint %s", writeret == -1 ? "server" : "file"); } offset += writeret; block_size -= writeret; datasize += writeret; return 0;}/** Synchronous implementation *//* Assumes that cin->pipe is already opened for writing */int prestart_readimgfunc(CkptInfo *cin, void *dummy){ int isize, nr, res; ASSERT(dummy == NULL);#define upto(a) ( (a)<CKPT_BUFFSIZE ? (a) : CKPT_BUFFSIZE ) printi("ckpt_restart", "Reading Ckpt Image size"); if(syncRecvCheckpointProtoData(&(cin->sock), &isize, sizeof(int)) != sizeof(int)) qerror("Reading Ckpt Image size"); for(; isize > 0; ) { nr = recvCheckpointImageData(&(cin->sock), ckptbuff, upto(isize)); if((nr < 0) && (errno != EAGAIN) && (errno != EINTR)) qerror("Reading Ckpt Image data"); if(nr > 0) { printi("ckpt_restart_flood", "Writing %d bytes to local pipe", nr); for(res = 0; res < nr; isize -= res, nr -= res) { if((res = write(cin->pipe, ckptbuff, nr)) < 0) qerror("Forwarding Ckpt Image data to local pipe %d of %d writen", res, nr); if(res < nr) printw("cas bizare... %d of %d sent", res, nr); } } } close(cin->pipe); cin->pipe = -1;#undef upto return 1;}static int _put_ckptnp(CkptSock *s, int np){ printi("ckpt_sendnp", "Sending np (%d)\n", np); if(syncSendCheckpointProtoData(s, &np, sizeof(int)) < 0) { printe("Ckpt transaction failed sending np (%d)", np); return -1; } return sizeof(int);}static int _get_ckptnp(CkptSock *s){ int np; printi("ckpt_recvnp", "Reading np"); if(syncRecvCheckpointProtoData(s, &np, sizeof(int)) != sizeof(int)) { printe("Ckpt transaction failed reading np"); return -1; } return np;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -