⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 protocolcheckpoint.c

📁 Path MPICH-V for MPICH the MPI Implementation
💻 C
字号:
/** @file protocolCheckpoint.c implements the checkpoint high level API for daemon  * giving all necessary structures to define a protocol specific checkpoint  * implementation.  * Not thread safe functions, no mixed calls between multiple checkpoints as   * some states are stored in static variables.  */#include "config.h"#include "debug.h"#include "protocolCheckpoint.h"#include "checkpoint.h"#include "chl.h"#include <unistd.h>#include <fcntl.h>struct sockaddr *addr;int use_localfile;int group;int rank;int np;static int seqnumber = -1;/** if this variable is set, it means that a checkpoint is currently taking place  * no other checkpoint can begin until current checkpoint is finished.   */CkptInfo *_pckpt_cin = NULL;#define cin _pckpt_cinstatic int _put_ckptnp(CkptSock *s, int np);static int _get_ckptnp(CkptSock *s);static inline int _pckpt_rfdset_pipe(void);static inline int _pckpt_wfdset(void);#ifndef CKPT_BUFFSIZE#define CKPT_BUFFSIZE  64000#endifstatic char ckptbuff[CKPT_BUFFSIZE];int pckpt_init(struct sockaddr *_addr, int uselocal, int _group, int _rank, int _np){  use_localfile = uselocal;  addr = _addr;  group = _group;  rank = _rank;  np = _np;  return initCheckpointLib(group, rank);}int pckpt_finalize(void){  return closeCheckpointLib(group, rank);}void pckpt_setCkptfunc(int ifunc, Ftp_CkptFunc func){  ASSERT(cin);  ASSERT(ifunc < cin->nfunc);  *(cin->ckpt_func + ifunc) = func;}void pckpt_setCkptargs(int iargs, void *args){  ASSERT(cin);  ASSERT(iargs < cin->nfunc);  *(cin->ckpt_func + cin->nfunc + iargs) = args;}void pckpt_setH(long h){  ASSERT(cin);  cin->h = h;}/* remplac閑 par un defineinline CkptInfo *pckpt_getCkptInfo(void){  return cin;}*/void pckpt_setSeqnumber(int seq){  seqnumber = seq;}int pckpt_getSeqnumber(void){  return seqnumber;}Ftp_CkptFunc pckpt_getCkptfunc(void){  ASSERT(cin);  ASSERT(cin->ifunc < cin->nfunc);  return *(cin->ckpt_func + cin->ifunc);}void *pckpt_getCkptargs(void){  ASSERT(cin);  ASSERT(cin->ifunc < cin->nfunc);  return (*(((void **) (cin->ckpt_func + cin->nfunc)) + cin->ifunc));}int pckpt_wfdset(void){  if(addr)    {      if((!cin) || (cin->sock.data == -1) || (cin->sock.proto == -1)) return 0;    }  else     if ((!cin) || (cin->sock.file == -1)) return 0;         if(pckpt_getCkptfunc() == pckpt_writeimgfunc) return _pckpt_wfdset();  return ftp_ckpt_wfdset(cin);}int pckpt_rfdset(void){  if(!cin || (cin->pipe == -1)) return 0;  return _pckpt_rfdset_pipe();}  int pckpt_atomic_try_and_begin(void){  if((!cin) && ftp_ckpt_try())   {    printi("ckpt","etape2");    if(pckpt_begin()) return CP_START;    // if(pckpt_begin()) return ftp_ckpt_start();    else return 0;  }  else return 0;}CkptInfo *pckpt_begin(void){  CkptSock s;  int pipe;  int nbfunc;  ASSERT(!cin);  initCheckpointSock(&s);    ftp_ckpt_seqnumber(seqnumber);  if((pipe = openCheckpointPipeRD(group, rank)) < 0) return NULL;  if(use_localfile)  {    if(openWCheckpointLocalFile(&s, group, rank, seqnumber) < 0)    {      close(pipe);      return NULL;    }  }    if(connectCheckpointServer(&s, addr) < 0)  {    if(s.file != -1) close(s.file);    close(pipe);    return NULL;  }    if(putCheckpointProto(&s, group, rank, seqnumber, ftp_ckpt_protocolsize(np) + sizeof(int)) < 0)  {    closeCheckpointServer(&s);    close(pipe);    return NULL;  }  if(_put_ckptnp(&s, np) < 0)  {    closeCheckpointServer(&s);    close(pipe);    return NULL;  }  nbfunc = ftp_ckpt_nbfunc();  printi("ckpt_begin", "Number of checkpoint functions to call for this Fault Tolerant protocol : %d", nbfunc);  cin = (CkptInfo *) malloc(sizeof(CkptInfo) + nbfunc * sizeof(Ftp_CkptFunc) + nbfunc * sizeof(void *));  ASSERT(cin);  cin->group = group;  cin->rank = rank;  cin->seq = seqnumber;  cin->pipe = pipe;  memcpy(&cin->sock, &s, sizeof(CkptSock));  cin->np = np;  cin->datasize = 0;  cin->ifunc = 0;  cin->nfunc = nbfunc;  printi("ckpt_begin", "Setting data channel connexion to Checkpoint server in ASYNC mode");  if(cin->sock.data != -1 )    if(fcntl(cin->sock.data, F_SETFL, O_NONBLOCK) < 0) qerror("fcntl");  printi("ckpt_begin", "Calling fault tolerant protocol specific checkpoint begin function");  if(ftp_ckpt_begin(cin) < 0)  {    closeCheckpointServer(&s);    close(pipe);    free(cin);    cin = NULL;    return NULL;  }  printi("ckpt_begin", "Calling fault tolerant protocol specific checkpoint function to set %d functions", cin->nfunc);  ftp_ckpt_setwritefunc(cin);  printi("ckpt_begin", "Checkpoint begin performed. Ready to send checkpoint data");  return cin;}int pckpt_genericwrite(void){  switch(pckpt_getCkptfunc()(cin, pckpt_getCkptargs()))  {    case -1 : return -1;    case 0 : return 0;    case 1 :      if(++(cin->ifunc) < cin->nfunc) return 0;      else return 1;    default :      printq("Undefined return code from FTP specific function %d of %d", cin->ifunc, cin->nfunc);  }}int pckpt_end(void){  ASSERT(cin->ifunc == cin->nfunc);  printi("ckpt", "Checkpoint %d finished", cin->seq);  if(sendCheckpointConfirm(&(cin->sock), cin->datasize) < 0)    {      printe("Cannot send checkpoint confirmation to server (seq=%d, size=%d)\n", cin->seq, cin->datasize);      free(cin);      cin = NULL;      return -1;    }  if(use_localfile)    closeCheckpointLocalFile(&(cin->sock));      if(ftp_ckpt_end(cin) < 0)    {      printe("Cannot execute fault tolerant protocol specific action for checkpoint end");      free(cin);      cin = NULL;      return -1;    }   closeCheckpointServer(&(cin->sock));    free(cin);  cin = NULL;  return 0;}CkptInfo *prestart_begin(void){  int psize;  int nbfunc;  int ret;  printi("Restart", "Restarting from Checkpoint %d:%d(%d)", group, rank, seqnumber);    nbfunc = ftp_restart_nbfunc();  cin = (CkptInfo *) malloc(sizeof(CkptInfo) + nbfunc * sizeof(Ftp_CkptFunc) + nbfunc * sizeof(void *));  ASSERT(cin);  cin->pipe = -1;  initCheckpointSock(&(cin->sock));  cin->group = group;  cin->rank = rank;  cin->seq = seqnumber;  if(use_localfile)  {    if(openRCheckpointLocalFile(&(cin->sock), group, rank, seqnumber) < 0)    {      if(addr == NULL)         {          printi ("Restart", "No valid checkpoint file found %d:%d(%d)", group, rank, cin->seq);          return NULL;        }      else printi("Restart", "No valid local checkpoint file found %d:%d(%d)", group, rank, cin-> seq);    }  }  if(connectCheckpointServer(&(cin->sock), addr) < 0) qerror ("could not connect to checkpoint server");  if((ret = getCheckpointProto(&(cin->sock), &(cin->group), &(cin->rank), &(cin->seq), &psize, &(cin->datasize))) < 0)   {    switch (ret)     {      case -1: qerror ("Could not retrieve checkpoint proto");      case -2:	printi("Restart", "No suitable image found on server for %d:%d(%d)", group, rank, seqnumber);        closeCheckpointServer(&(cin->sock));        free(cin);        return cin = NULL;    }  }  if((cin->np = _get_ckptnp(&(cin->sock))) < 0) qerror("Reading total number of processes");  cin->ifunc = 0;  cin->nfunc = nbfunc;  printi ("Restart", "Using checkpoint %d:%d(%d) psize=%d dsize=%d", cin->group, cin->rank, cin->seq, psize, cin->datasize);  ASSERT(cin->np == np);  ASSERT(psize == ftp_ckpt_protocolsize(cin->np) + sizeof(int));    if(ftp_restart_begin(cin) < 0)   {    closeCheckpointServer(&(cin->sock));    free(cin);    return cin = NULL;   }  ftp_restart_setreadfunc(cin);  return cin;}int prestart_genericread(void){ switch(pckpt_getCkptfunc()(cin, pckpt_getCkptargs()))  {    case -1 : return -1;    case 0 : return 0;    case 1 :      if(++(cin->ifunc) < cin->nfunc) return 0;      else return 1;    default :      printq("Undefined return code from FTP specific function %d of %d", cin->ifunc, cin->nfunc);  } return 0;}int prestart_end(void){  if(ftp_restart_end(cin) < 0)   {    free(cin);    cin = NULL;    return -1;  }  free(cin);  cin = NULL;  return 0;}static int block_size = 0;static inline int _pckpt_wfdset(void){  if(block_size == 0) return 0;  else return cin->sock.data;}static inline int _pckpt_rfdset_pipe(void){  if((pckpt_getCkptfunc() == pckpt_writeimgfunc) && (block_size == 0)) return cin->pipe;  else return 0;}int pckpt_writeimgfunc(CkptInfo *cin, void *dummy){  static int datasize = 0;  static int offset = 0;  int writeret;  ASSERT(dummy == NULL);  if(block_size == 0)  {    /* if the last write is finished */    /* read the next block in the pipe */    if(cin->pipe != -1)    {      block_size = read(cin->pipe, ckptbuff, CKPT_BUFFSIZE);      printi("ckpt_sendimg", "Reading on ckpt pipe returned %d bytes to send to ckpt server\n",	     block_size);    }    if(block_size == -1)    {      //      if((errno != EAGAIN) && (errno != EINTR)) qerror("reading checkpoint image from pipe");      block_size = 0;      return 0;    }    if(block_size == 0)    {      printi("ckpt_sendimg", "Nothing to send to checkpoint server, sending total image size of %d bytes\n", datasize);      if(syncSendCheckpointProtoData(&(cin->sock), &datasize, sizeof(int)) != sizeof(int)) qerror("Cannot send confirmation of img %d size %d to stable Checkpoint server", cin->seq, datasize);      printi("ckpt_sendimg", "Closing checkpoint pipe");      close(cin->pipe);      cin->pipe = -1;      cin->datasize += datasize;      block_size = 0;      datasize = 0;      offset = 0;      return 1;    }    offset = 0;  }  printi("ckpt_sendimg", "Sending %d image data\n", block_size);  if((writeret = sendCheckpointImageData(&(cin->sock), ckptbuff + offset, block_size)) < 0)  {    if (errno == EAGAIN)      {        printi("ckpt", "Slow socket writing to checkpoint %s (%s)", 		writeret == -1 ? "server" : "file",		strerror(errno));        return 0;      }    else qerror("Writting to checkpoint %s", writeret == -1 ? "server" : "file");  }  offset += writeret;  block_size -= writeret;  datasize += writeret;  return 0;}/** Synchronous implementation *//* Assumes that cin->pipe is already opened for writing */int prestart_readimgfunc(CkptInfo *cin, void *dummy){  int isize, nr, res;  ASSERT(dummy == NULL);#define upto(a) ( (a)<CKPT_BUFFSIZE ? (a) : CKPT_BUFFSIZE )  printi("ckpt_restart", "Reading Ckpt Image size");  if(syncRecvCheckpointProtoData(&(cin->sock), &isize, sizeof(int)) != sizeof(int)) qerror("Reading Ckpt Image size");  for(; isize > 0; )  {    nr = recvCheckpointImageData(&(cin->sock), ckptbuff, upto(isize));    if((nr < 0) && (errno != EAGAIN) && (errno != EINTR)) qerror("Reading Ckpt Image data");    if(nr > 0)    {      printi("ckpt_restart_flood", "Writing %d bytes to local pipe", nr);           for(res = 0; res < nr; isize -= res, nr -= res)       {        if((res = write(cin->pipe, ckptbuff, nr)) < 0) qerror("Forwarding Ckpt Image data to local pipe %d of %d writen", res, nr);        if(res < nr) printw("cas bizare... %d of %d sent", res, nr);      }    }  }  close(cin->pipe);  cin->pipe = -1;#undef upto  return 1;}static int _put_ckptnp(CkptSock *s, int np){  printi("ckpt_sendnp", "Sending np (%d)\n", np);  if(syncSendCheckpointProtoData(s, &np, sizeof(int)) < 0)  {    printe("Ckpt transaction failed sending np (%d)", np);    return -1;  }  return sizeof(int);}static int _get_ckptnp(CkptSock *s){  int np;    printi("ckpt_recvnp", "Reading np");  if(syncRecvCheckpointProtoData(s, &np, sizeof(int)) != sizeof(int))  {    printe("Ckpt transaction failed reading np");    return -1;  }  return np;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -