⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 genericcheckpoint.c

📁 Path MPICH-V for MPICH the MPI Implementation
💻 C
📖 第 1 页 / 共 2 页
字号:
/** @file genericCheckpoint.c implements the checkpoint API to access checkpoint server */#include "genericCheckpoint.h"#include "utils_socket.h"#include "config.h"#include <stdlib.h>#include <string.h>#include <errno.h>#include <unistd.h>#include <signal.h>#include <limits.h>#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <netdb.h>#include <sys/stat.h>#include <fcntl.h>#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>#ifndef BUFFSIZE#define BUFFSIZE PIPE_BUF#endif/******************************************************************************  Init and misc functions of the ckptlib  Allmost all that have to deal with pipes is here******************************************************************************/#if 0#ifdef CKPT_HELPER_IS_CONDOR# define CKPT_PIPE_EXT ".tmp"#else # ifdef CKPT_HELPER_IS_CKPT #  define CKPT_PIPE_EXT ""# else /* CKPT_HELPER_IS_BLCR */#  define CKPT_PIPE_EXT ""#  include "libcr.h"# endif /* CKPT_HELPER_IS_CKPT */ #endif /* CKPT_HELPER_IS_CONDOR */#endif#include "protocolCheckpoint.h"int downloadCheckpoint(void){  int ret;    while((ret = prestart_genericread()) == 0) /* nothing */ ;  if(ret != 1)     qerror("reading checkpoint image failed");  if(prestart_end() < 0)     qerror("finishing initialisation after checkpoint image read failed");  return 0;}/** *  Create pipe used for transmission of the checkpoint file between native app *  and the worker. *  @param wid: the group id *  @param rankid: the rank *  @return 0 if no error. *//*int initCheckpointLib(int wid,int rankid){  char *restartpipe;  char *ckptpipe;  char buff[strlen(TMPDIR)+128];#ifndef NO_ADOC# ifdef ADOC_MIN  adoc_set_min_level(ADOC_MIN);# endif# ifdef ADOC_MAX  adoc_set_max_level(ADOC_MAX);# endif#endif  sprintf(buff, TMPDIR"/%d:%d.restart.pipe", wid, rankid);  restartpipe = strdup(buff);  sprintf(buff, TMPDIR"/%d:%d.ckpt.pipe"CKPT_PIPE_EXT, wid, rankid);  ckptpipe = strdup(buff);  if(mknod(restartpipe,S_IFIFO | S_IREAD | S_IWRITE,0) == -1)  {    if(errno != EEXIST)    {      printe("creating restart pipe between native app and worker (mknod)");      free(restartpipe);free(ckptpipe);      return -1;    }  }  if(mknod(ckptpipe,S_IFIFO | S_IREAD | S_IWRITE,0) == -1)  {    if(errno != EEXIST)    {      printe("creating checkpoint pipe between native app and worker (mknod)");      free(restartpipe);free(ckptpipe);      return -1;    }  }  free(restartpipe);free(ckptpipe);  return 0;}*//*  Removes created pipes  *//*int closeCheckpointLib(int wid,int rankid){  char *restartpipe;  char *ckptpipe;  char buff[512];  sprintf(buff, TMPDIR"/%d:%d.restart.pipe", wid, rankid);  restartpipe = strdup(buff);  sprintf(buff, TMPDIR"/%d:%d.ckpt.pipe"CKPT_PIPE_EXT, wid, rankid);  ckptpipe = strdup(buff);  unlink(restartpipe);  unlink(ckptpipe);  free(restartpipe);free(ckptpipe);  return 0;}*//** * open the pipe for reading checkpoint image * @return fd of the pipe, -1 on failure. *//*int openCheckpointPipeRD(int wid, int rankid){  char pipefile[256];  int pipefd;  sprintf(pipefile, TMPDIR"/%d:%d.ckpt.pipe"CKPT_PIPE_EXT, wid, rankid);  if(mknod(pipefile, S_IFIFO | S_IREAD | S_IWRITE,0) == -1)  {    if(errno != EEXIST)    {      printe("creating checkpoint pipe between native app and worker (mknod) %s", pipefile);      return -1;    }  }  printi("ckpt_generic", "Waiting for checkpoint file on %s\n",pipefile);  if((pipefd = open(pipefile,O_RDONLY | O_NONBLOCK)) == -1)  {    printe("Opening pipefile: %s",pipefile);    return -1;  }  printi("ckpt_generic", "pipe opened\n");  return pipefd;}*//** * open the pipe for writting checkpoint image * @return fd of the pipe, -1 on failure. *//*int openCheckpointPipeWR(int wid, int rankid){  char pipefile[256];  int pipefd;  sprintf(pipefile, TMPDIR"/%d:%d.restart.pipe", wid, rankid);  if(mknod(pipefile, S_IFIFO | S_IREAD | S_IWRITE,0) == -1)  {    if(errno != EEXIST)    {      printe("creating checkpoint pipe between native app and worker (mknod) %s", pipefile);      return -1;    }  }  printi("ckpt_generic", "Waiting for checkpoint file on %s\n",pipefile);  if((pipefd = open(pipefile,O_WRONLY | O_APPEND)) == -1)  {    printe("Opening pipefile: %s",pipefile);    return -1;  }  printi("ckpt_generic", "pipe opened\n");  return pipefd;}*//*******************************************************************************  All that deals with checkpoint protocol*******************************************************************************/static inline int _sendckpt(int s, const void *buffer, int size);static inline int _recvckpt(int s, void *buffer, int size);void initCheckpointSock(CkptSock *s){  s->proto = -1;  s->data = -1;  s->file = -1;  s->protocur = 0;  s->datacur = 0;}int connectCheckpointServer(CkptSock *s, const struct sockaddr *addr){  struct sockaddr_in cpaddr;# define addr ((struct sockaddr_in *) addr)    ASSERT(s);  s->proto = -1;  s->data = -1;    if(addr)  {    memcpy(&cpaddr, addr, sizeof(struct sockaddr_in));    printi("ckpt_generic", "try to connect to proto channel %s:%d\n", inet_ntoa(addr->sin_addr), ntohs(addr->sin_port));    if((s->proto = socket(PF_INET,SOCK_STREAM,0)) == -1)    {      printe("Connecting checkpoint server (proto channel socket)");      s->proto = s->data = -1;      return -1;    }    if(connect(s->proto, (struct sockaddr*) &cpaddr, sizeof(struct sockaddr_in)) == -1)    {      printe("Connecting checkpoint server %s:%d (proto channel connect)", inet_ntoa(addr->sin_addr), ntohs(addr->sin_port));      close(s->proto);      s->proto = s->data = -1;      return -1;    }    if(_uread(s->proto, &(cpaddr.sin_port), sizeof(in_port_t)) != sizeof(in_port_t))    {      printe("Recieving data channel port from server %s (_uread)", inet_ntoa(addr->sin_addr));      close(s->proto);      s->proto = s->data = -1;      return -1;    }    printi("ckpt_generic", "try to connect to data channel %s:%d\n", inet_ntoa(cpaddr.sin_addr), ntohs(cpaddr.sin_port));    if((s->data = socket(PF_INET,SOCK_STREAM,0)) == -1)    {      printe("Connecting checkpoint server (data channel socket)");      close(s->proto);      s->proto = s->data = -1;      return -1;    }    if(connect(s->data, (struct sockaddr *) &cpaddr, sizeof(struct sockaddr_in)) == -1)    {      printe("Connecting checkpoint server %s:%d (data channel connect)", inet_ntoa(cpaddr.sin_addr), ntohs(cpaddr.sin_port));      close(s->proto);      close(s->data);      s->proto = s->data = -1;      return -1;    }    return 1;  }  else   {    printi("ckpt_generic", "Checkpoint server adress not filled, dismissing connect");    return 0;  }# undef addr}#ifndef DEF_PERM#define DEF_PERM      (S_IREAD | S_IWRITE | S_IRGRP | S_IWGRP)#endifint openWCheckpointLocalFile(CkptSock *s, int group, int rank, int seq){  ASSERT(s);    if(seq >= 0) snprintf(s->name, 64, TMPDIR"/ckptimg-g%d-r%d-s%d", group, rank, seq);  else snprintf(s->name, 64, TMPDIR"/ckptimg-g%d-r%d.tmp", group, rank);  if((s->file = creat(s->name, DEF_PERM)) == -1) printe("Cannot open checkpoint local file %s", s->name);  return s->file;}int openRCheckpointLocalFile(CkptSock *s, int group, int rank, int seq){  ASSERT(s);    if(seq >= 0) snprintf(s->name, 64,  TMPDIR"/ckptimg-g%d-r%d-s%d", group, rank, seq);  else snprintf(s->name, 64,  TMPDIR"/ckptimg-g%d-r%d", group, rank);  if((s->file = open(s->name, O_RDONLY)) == -1) printe("Cannot open checkpoint local file %s", s->name);  return s->file;}int sendCheckpointConfirm(CkptSock *s, int totalsize){  char buff[sizeof (int) + 1];  char *dstname;  char *extchar;  printi ("ckpt_confirm", "Sending final checkpoint confirmation (totaldatasize=%d)\n", totalsize);  /* if some checkpoint server is connected, ACK the local copy has no meaning      when distant ack failed     */  if(s->proto != -1)  {    printi("ckpt_generic", "Linked to checkpoint server, sending confirmation");    *buff = 'C';    *((int *) (buff + 1)) = htonl (totalsize);    if (_uwrite (s->proto, buff, sizeof (int) + 1) != (sizeof (int) + 1))    {      printe ("Confirmation could not be sent ton checkpoint server\n");      return -1;    }    if (shutdown (s->proto, SHUT_WR) < 0)    {      printe ("Checkpoint protocol connection shutdown (WR) failed");      return -1;    }    if (_uread (s->proto, buff, 1) != 1)    {      printe ("Checkpoint confirmation could not be acknoledged\n");      return -1;    }    if (buff[0] != 'A')    {      printw ("checkpoint confirmation refused by server\n");      return -1;    }  }  if(s->file != -1)  {    printi("ckpt_confirm", "Linked to file, confirming local file");    if(lseek(s->file, 4 * sizeof(int), SEEK_SET) != 4 * sizeof(int))     {      printe("Ckpt: lseek to confirmation protocol area on local file failed");      return -2;    }    *((int *) buff) = htonl (totalsize);    if(write(s->file, buff, sizeof(int)) != sizeof(int))    {      printe("Ckpt: write checkpoint confirmation on local file failed");      return -2;    }    if(fsync(s->file) == -1)     {      printe("Flushing local checkpoint to disk");      return -2;    }    if(close(s->file) == -1)     {      printe("Closing local checkpoint file");      return -2;    }     s->file = -1;    /* Do whe use seqnumbers ? If we don't we have to replace previous       checkpoint file  with this one */    dstname = strdup(s->name);    if((extchar = strstr(dstname, ".tmp")))    {      *extchar = '\0';      printi("ckpt_confirm", "Rename %s to %s", s->name, dstname);      if(rename(s->name, dstname))      {        printe("Replacing previous checkpoint file (%s) with current terminated checkpoint (%s)", s->name, dstname);        free(dstname);        return -2;      }    }        free(dstname);     }  return 0;}int closeCheckpointServer(CkptSock *s){  int ret1, ret2;  if(s->proto == -1) return 0;  shutdown(s->proto, 2);  shutdown(s->data, 2);  ret1 = close(s->proto);  ret2 = close(s->data);  s->proto = -1;  s->data = -1;  if((ret1 == -1) || (ret2 == -1)) return -1;  else return 0;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -