📄 chl.c
字号:
#include "chl.h"#include <sys/types.h>#include <sys/stat.h>#include <fcntl.h>#include <unistd.h>#include <sched.h>#include "config.h"#include "debug.h"#include "libcr.h"#include "protocolCheckpoint.h"#if CR_RELEASE_MAJOR != 0 || CR_RELEASE_MINOR != 4 || CR_RELEASE_PATCH != 2# if CR_RELEASE_MAJOR == 0 && (CR_RELEASE_MINOR < 4 || (CR_RELEASE_MINOR == 4 && CR_RELEASE_PATCH < 2 ))# error This interface with BLCR assumes BLCR version 0.4.2 at least.# else# warning This interface with BLCR is tested only with BLCR up to version 0.4.2.# warning you are using BLCR with a higher version# endif#endif/** * Create pipe used for transmission of the checkpoint file between native app * and the worker. * @param wid: the group id * @param rankid: the rank * @return 0 if no error. */int initCheckpointLib(int wid, int rankid){ char *restartpipe; char *ckptpipe; char buff[strlen(TMPDIR)+128]; cr_init(); sprintf(buff, TMPDIR"/%d:%d.restart.pipe", wid, rankid); restartpipe = strdup(buff); sprintf(buff, TMPDIR"/%d:%d.ckpt.pipe", wid, rankid); ckptpipe = strdup(buff); if(mknod(restartpipe, S_IFIFO | S_IREAD | S_IWRITE, 0) == -1) { if(errno != EEXIST) { printe("creating restart pipe between native app and worker (mknod)"); free(restartpipe); free(ckptpipe); return -1; } } if(mknod(ckptpipe, S_IFIFO | S_IREAD | S_IWRITE, 0) == -1) { if(errno != EEXIST) { printe("creating checkpoint pipe between native app and worker (mknod)"); free(restartpipe); free(ckptpipe); return -1; } } free(restartpipe); free(ckptpipe); return 0;}/* Removes created pipes * @param wid: the group id * @param rankid: the rank * @return 0 if no error. */int closeCheckpointLib(int wid, int rankid){ char *restartpipe; char *ckptpipe; char buff[512]; sprintf(buff, TMPDIR"/%d:%d.restart.pipe", wid, rankid); restartpipe = strdup(buff); sprintf(buff, TMPDIR"/%d:%d.ckpt.pipe", wid, rankid); ckptpipe = strdup(buff); unlink(restartpipe); unlink(ckptpipe); free(restartpipe);free(ckptpipe); return 0;}/** * open the pipe for reading checkpoint image * @param wid : the group number of the application. * @param rankid : the rank of the MPI process. * @return fd of the pipe, -1 on failure. */int openCheckpointPipeRD(int wid, int rankid){ char pipefile[256]; int pipefd; sprintf(pipefile, TMPDIR"/%d:%d.ckpt.pipe", wid, rankid); if(mknod(pipefile, S_IFIFO | S_IREAD | S_IWRITE, 0) == -1) { if(errno != EEXIST) { printe("creating checkpoint pipe between native app and worker (mknod) %s", pipefile); return -1; } } printi("ckpt_generic", "Waiting for checkpoint file on %s", pipefile); if((pipefd = open(pipefile, O_RDONLY | O_NONBLOCK)) == -1) { printe("Opening pipefile: %s", pipefile); return -1; } printi("ckpt_generic", "pipe opened"); return pipefd;}/** * open the pipe for reading checkpoint image * @param wid : the group number of the application. * @param rankid : the rank of the MPI process. * @return fd of the pipe, -1 on failure. */static int openRestartPipeRD(int wid, int rankid){ char pipefile[256]; int pipefd; sprintf(pipefile, TMPDIR"/%d:%d.restart.pipe", wid, rankid); printi("ckpt_generic", "Waiting for checkpoint file on %s", pipefile); if((pipefd = open(pipefile, O_RDONLY)) == -1) { printe("Opening pipefile: %s", pipefile); return -1; } printi("ckpt_generic", "pipe opened"); return pipefd;}static int invokeForkExec(char **argv, int argc, int args, int wid, int rankid){ char *p; char *exepath; char *exename; int pid; pid = fork(); if(pid == -1) { printe("fork() for the MPI process"); return -1; } if(pid > 0) return pid; exepath = argv[0]; exename = exepath; for(p = exepath; *p; p++) if( *p == '/' ) exename = p+1; argv[0] = exename; #ifdef DEBUG { char **param; char *cmdline; int cmdsize = strlen(exepath) + 128; for(param = argv; *param; param++) cmdsize += strlen(*param) + 1; cmdline = (char*)malloc(cmdsize); cmdline[0] = 0; for(param = argv; *param; param++) { strcat(cmdline, *param); strcat(cmdline, " "); } printi("param", "vdaemon: executing %s [%s]", exepath, cmdline); free(cmdline); }#endif execvp(exepath, argv); qerror("vdaemon execvp(%s)", argv[0]); return -1;}int child_main(void *arg){ void *new_stack; int local_errno; int err; /* This is from cr_restart.c of BLCR 4.2, child_main part */ /* start looping to create the required threads/processes */ while ((err = __cri_syscall(CR_OP_RSTRT_CLONES, (uintptr_t)&new_stack, &local_errno)) > 0) { err = clone(&child_main, new_stack, err, arg); /* XXX: could touch errno!! */ if (err < 0) { goto err_out; } } if (err < 0) { goto err_out; } /* Now overlay ourselves with the new image */ err = __cri_syscall(CR_OP_RSTRT_CHILD, CRI_SYSCALL_NOARG, &local_errno); err_out: /* Not reached unless we've encountered a fatal error. * ********** Things are hairy here ********** * We may have an incomplete thread environment, so we can't use libc fully. */ /* Don't even try to print an error message */ /* Try to make the entire thread group exit, if supported */ __cri_exit_group(local_errno, NULL); /* We might return here in the case of ENOSYS */ /* Note that this is really _exit() */ __cri_exit(local_errno, NULL); /* NOTREACHED */ return 0;}static int download_context(CkptInfo *cin){ int fd; char filename[strlen(TMPDIR)+16]; sprintf(filename, TMPDIR"/blcrimage.XXXXXX"); cin->pipe = mkstemp(filename); printi("blcr", "downloading checkpoint into %s", filename); downloadCheckpoint(); fd = open(filename, O_RDONLY); if(fd < 0) printe("could not open %s as checkpoint image", filename); else printi("blcr", "%s opened as checkpoint image", filename); return fd;}/** * fork and executes the exeargv array according the restart/invoke flag, for this rang and this job id * The localMPI argument is already pushed on the stack. * @param argv: the array of arguments ([0] is the path) * @param argc: the number of used arguments * @param args: the size of the argv array * @param wid: the group number of the application * @param rankid: the rank of the MPI process * @param invoke: not zero iff this is not a restart * @return the pid of the new process or -1 if failure */int ckptForkExec(char **argv, int argc, int args, int wid, int rankid, int invoke){ int err; int pid; struct cr_rstrt_args req; int context_fd; CkptInfo *cin = NULL; if( invoke ) return invokeForkExec(argv, argc, args, wid, rankid); cin = prestart_begin(); if(cin == NULL) qerror("unable to connect to a checkpoint server"); /* This is from cr_restart.c of BLCR 4.2, main parts */ /* ... connect to the kernel */ err = cri_connect(); if (err < 0) { if (errno == ENOSYS) { printe("Failed cri_connect(): support missing from kernel"); } else { printe("Failed cri_connect(): %s", cr_strerror(errno)); } return -1; } context_fd = download_context(cin); if (context_fd < 0) { printe("unable to open Checkpoint Pipe for reading"); return -1; } /* ... initialize the request structure */ req.cr_fd = context_fd; req.cr_opts = NULL; /* ... tell the kernel to go off and restart something */ err = cri_syscall(CR_OP_RSTRT_REQ, (uintptr_t)&req); if (err < 0) { /* The restart request was unsuccessful. */ printe("cri_syscall(CR_OP_RSTRT_REQ)"); return -1; } pid = fork(); if(pid == -1) { printe("fork() for the MPI process"); return -1; } else if (pid == 0) { /* CHILD */ child_main(&req); /* NOT REACHED */ _exit(0); } /* PARENT */ /* ... wait forever for the restart to complete */ err = cri_syscall(CR_OP_RSTRT_DONE, (uintptr_t)NULL); if (err < 0) { /* There was a problem with the restart. */ printe("cri_syscall(CR_OP_RSTRT_DONE)"); return -1; } /* ... reap the checkpoint, collecting a child pid on success */ err = cri_syscall(CR_OP_RSTRT_REAP, CRI_SYSCALL_NOARG); if (err < 0) { /* Unable to reap! */ printe("cri_syscall(CR_OP_RSTRT_REAP)"); return -1; } else { pid = (pid_t)err; } return pid;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -