📄 genericcheckpoint.c
字号:
int closeCheckpointLocalFile(CkptSock *s){ int ret = 0; if(s->file != -1) { ret = close(s->file); s->file = -1; } return ret;}/** * Send PUT protocol to the checkpoint server. * Initialise any space required on the server to store implementation specific * protocol data's * @return -1 on checkpoint server comm error, -2 on local file storing error, 0 when no errors. */int putCheckpointProto(CkptSock *s, int mygroup, int myrank, int seqnumber, int protosize){#define N (1 + 4 * sizeof(int)) char buff[N]; *buff = 'P'; *((int *)(buff + 1)) = htonl(mygroup); *((int *)(buff + 1 + sizeof(int))) = htonl(myrank); *((int *)(buff + 1 + 2 * sizeof(int))) = htonl(seqnumber); *((int *)(buff + 1 + 3 * sizeof(int))) = htonl(protosize); if(s->proto != -1) { printi("ckpt_generic", "Sending PUT Ckpt protocol Header group=%d rank=%d seq=%d protosize=%d", mygroup, myrank, seqnumber, protosize); if (_uwrite (s->proto, buff, N) != N) { printe ("PUT %d %d %d %d was not accepted by server", mygroup, myrank, seqnumber, protosize); return -1; } else printi ("ckpt_generic", "Ckpt protocol header sent g=%d r=%d s=%d ps=%d, waiting for reply ", mygroup, myrank, seqnumber, protosize); if (_uread (s->proto, buff, 1) != 1) { printe("Ckpt transaction (g=%d r=%d s=%d ps=%d) failed reading answer", mygroup, myrank, seqnumber, protosize); return -1; } else if (buff[0] != 'A') { printw("Ckpt transaction failed, server refused to recieve image (g=%d r=%d s=%d ps=%d)", mygroup, myrank, seqnumber, protosize); return -1; } printi ("ckpt_generic", "Ckpt protocol header accepted (g=%d r=%d s=%d ps=%d)", mygroup, myrank, seqnumber, protosize); }#undef N#define N (4 * sizeof(int)) if(s->file != -1) { printi("ckpt_generic", "Localy storing PUT Ckpt protocol Header group=%d rank=%d seq=%d protosize=%d\n", mygroup, myrank, seqnumber, protosize); if(write(s->file, buff + 1, N) != N) /* bad trick, not using first byte modified by reply */ { printe("PUT %d %d %d %d could not be commited on local file", mygroup, myrank, seqnumber, protosize); s->file = -1; return -2; } else { printi("ckpt_generic", "Ckpt protocol header commited to file (g=%d r=%d s=%d ps=%d)\n", mygroup, myrank, seqnumber, protosize); s->protocur += N + sizeof(int); s->datacur = protosize + N + sizeof(int); }#undef N } return 0;}/** * Send GET checkpoint protocol and retrieve implementation independant protocol * datas. */int getCheckpointProto(CkptSock *s, int *mygroup, int *myrank, int *seqnumber, int *protosize, int *totalsize){#define N (5 * sizeof(int)) char buff[N + 1]; *buff = 'G'; *((int *)(buff + 1)) = htonl(*mygroup); *((int *)(buff + 1 + sizeof(int))) = htonl(*myrank); *((int *)(buff + 1 + 2 * sizeof(int))) = htonl(*seqnumber); printi("ckpt_generic", "GET Ckpt protocol Header group=%d rank=%d seqnumber=%d\n", *mygroup, *myrank, *seqnumber); printi("ckpt_generic", "file name %s", s-> name); if(s->file != -1) { printi("ckpt_generic", "Trying from local checkpoint file"); if(read(s->file, buff, N) != N) printe ("GET %d %d %d from local file failed", *mygroup, *myrank, *seqnumber); else { if(*mygroup != ntohl(*((int *) buff))) printw("Ckpt read from file failed, requested checkpoint %d:%d:%d and returned group is %d\n", *mygroup, *myrank, *seqnumber, ntohl(*((int *) buff))); else if(*myrank != ntohl(*((int *) (buff + sizeof (int))))) printw("Ckpt read from file failed, requested checkpoint %d:%d:%d and returned rank is %d\n", *mygroup, *myrank, *seqnumber, ntohl(*((int *) (buff + sizeof (int))))); else if((*seqnumber != -1) && (*seqnumber != ntohl(*((int *) (buff + 2 * sizeof (int)))))) printw("Ckpt read from file failed, requested checkpoint %d:%d:%d and returned sequence number is %d\n", *mygroup, *myrank, *seqnumber, ntohl (*((int *) (buff + 2 * sizeof (int))))); else if((*totalsize = ntohl(*((int *) (buff + 4 * sizeof (int))))) == 0) printw("Ckpt protocol header retrieved from file g=%d r=%d s=%d proved incomplete checkpoint file \n", *mygroup, *myrank, *seqnumber); else { *protosize = ntohl(*((int *) (buff + 3 * sizeof (int)))); s->datacur = N + *protosize; s->protocur += N; if(s->proto != -1) { printi("ckpt_generic", "Local checkpoint file matches. Aborting checkpoint server request"); close(s->proto); close(s->data); s->proto = s->data = -1; } printi ("ckpt_generic", "Ckpt protocol header retrieved from file g=%d r=%d s=%d proto=%d total=%d\n", *mygroup, *myrank, *seqnumber, *protosize, *totalsize); /* the only return, as if we fail localy, try to get it from checkpoint server */ return 0; } } closeCheckpointLocalFile(s); printi("ckpt_generic", "Failed to read checkpoint from local file"); } if(s->proto != -1) { printi("ckpt_generic", "Trying from checkpoint server"); if(_uwrite(s->proto, buff, 3 * sizeof(int) + 1) != (3 * sizeof(int) + 1)) { printe("GET g=%d r=%d s=%d was not accepted by server", *mygroup, *myrank, *seqnumber); return -1; } else { printi("ckpt_generic", "Ckpt protocol header sent g=%d r=%d s=%d, waiting for reply\n", *mygroup, *myrank, *seqnumber); } if(_uread(s->proto, buff, 1) != 1) { printe("Ckpt transaction (g=%d r=%d s=%d) failed reading answer\n", *mygroup, *myrank, *seqnumber); return -1; } else if(buff[0] != 'A') { printw("Ckpt transaction failed, server refused to send image (g=%d r=%d s=%d)\n", *mygroup, *myrank, *seqnumber); return -2; } if(_uread(s->proto, buff, N) != N) { printe("Ckpt transaction (g=%d r=%d s=%d) failed reading answer\n", *mygroup, *myrank, *seqnumber); return -1; }#undef N if(*mygroup != ntohl(*((int *)buff))) { printw("Ckpt transaction failed, requested checkpoint %d:%d and returned group is %d\n", *mygroup, *myrank, ntohl(*((int *)buff))); return -1; } if(*myrank != ntohl(*((int *)(buff + sizeof(int))))) { printw("Ckpt transaction failed, requested checkpoint %d:%d and returned rank is %d\n", *mygroup, *myrank, ntohl(*((int *)(buff + sizeof(int))))); return -1; } if((*seqnumber != -1) && (*seqnumber != ntohl(*((int *) (buff + 2 * sizeof (int)))))) { printw("Ckpt transaction failed, requested checkpoint g=%d r=%d s=%d and returned sequence number is %d\n", *mygroup, *myrank, *seqnumber, ntohl (*((int *) (buff + 2 * sizeof (int))))); return -1; } *protosize = ntohl(*((int *)(buff + 3 * sizeof(int)))); *totalsize = ntohl(*((int *)(buff + 4 * sizeof(int)))); printi("ckpt_generic", "GET Ckpt %d:%d Request returned info (seq=%d, protosize=%d, datasize=%d)\n", *mygroup, *myrank, *seqnumber, *protosize, *totalsize); return 0; } return -1;}/** * sends data to implementation dependant protocol's buffer */int sendCheckpointProtoData(CkptSock *s, const void *buffer, int size){ int ret = size; printi("ckpt_generic", "sending implementation dependant protocol data (size=%d)\n", size); if(s->proto != -1) { if((ret = _sendckpt (s->proto, buffer, size)) == -1) return -1; } if(s->file != -1) { if(lseek(s->file, s->protocur, SEEK_SET) != s->protocur) return -2; if(write(s->file, buffer, ret) != ret) return -2; /* synchronous write to file, may be bottleneck for very high perfs networks */ s->protocur += ret; } return ret;}_UCREATE_SYNCIO(syncSendCheckpointProtoData, sendCheckpointProtoData, CkptSock *, const void *, int)/** * sends data to image data buffer */int sendCheckpointImageData(CkptSock *s, const void *buffer, int size){ int ret = size; printi("ckpt_Generic", "sending image data (size=%d)\n", size); if(s->data != -1) { if((ret = _sendckpt (s->data, buffer, size)) == -1) return -1; } if(s->file != -1) { if(lseek(s->file, s->datacur, SEEK_SET) != s->datacur) return -2; if(write(s->file, buffer, ret) != ret) return -2; /* synchronous write to file, may be bottleneck for very high perfs networks */ s->datacur += ret; } return ret;}_UCREATE_SYNCIO(syncSendCheckpointImageData, sendCheckpointImageData, CkptSock *, const void *, int)static inline int _sendckpt(int s, const void *buffer, int size){ return write(s, buffer, size);}/** * recieves data from implementation dependant protocol's buffer */int recvCheckpointProtoData(CkptSock *s, void *buffer, int size){ printi("ckpt_Generic", "recieving implementation dependant protocol data (size=%d)\n", size); if(s->file != -1) { if(lseek(s->file, s->protocur, SEEK_SET) != s->protocur) return -2; if(read(s->file, buffer, size) != size) return -2; s->protocur += size; } else return _recvckpt (s->proto, buffer, size); return size;}_UCREATE_SYNCIO(syncRecvCheckpointProtoData, recvCheckpointProtoData, CkptSock *, void *, int)/** * recieves data from image data buffer */int recvCheckpointImageData(CkptSock *s, void *buffer, int size){ printi("ckpt_Generic", "recieving image data (size=%d)\n", size); if(s->file != -1) { if(lseek(s->file, s->datacur, SEEK_SET) != s->datacur) return -2; if(read(s->file, buffer, size) != size) return -2; s->datacur += size; } else return _recvckpt (s->data, buffer, size); return size;}_UCREATE_SYNCIO(syncRecvCheckpointImageData, recvCheckpointImageData, CkptSock *, void *, int)static inline int _recvckpt(int s, void *buffer, int size){ return read(s, buffer, size);}/************************************************************************//* Helpers specific code *//************************************************************************/#if 0#ifdef CKPT_HELPER_IS_CKPT/** returns a newly allocated string holding the command * to restart. Should point to the full path to restart, using * environment variables like PATH, and MPIRUN_HOME * @todo * @return see up! */static char *get_restart_cmd(){ /*char *rs; rs = getenv("RESTART"); if(rs) return strdup(rs);*/ return strdup("ckpt_restart");}int checkpoint_set_command(int *argc, char ***argv, int rank, int group, int restarting){ char pipename[strlen(TMPDIR)+48]; sprintf(pipename, TMPDIR"/%d:%d.%s.pipe", group, rank, restarting?"restart":"ckpt"); printi("GenericCheckpoint", "ckpt_set_command **+*"); if(restarting) { (*argv)[0] = get_restart_cmd(); (*argv)[1] = strdup(pipename); (*argv)[2] = NULL; *argc = 2; return 0; } else { /** assumes that the environment is inherited */ setenv("CKPT_FILENAME", pipename, 1); return 0; }}#else /* CKPT_HELPER_IS_CKPT */# ifdef CKPT_HELPER_IS_CONDORint checkpoint_set_command(int *argc, char ***argv, int rank, int group, int restarting){ char pipename[strlen(TMPDIR)+48]; sprintf(pipename, TMPDIR"/%d:%d.%s.pipe", group, rank, restarting?"restart":"ckpt"); (*argv)[(*argc)++] = strdup(restarting?"-_condor_restart":"-_condor_ckpt"); (*argv)[(*argc)++] = strdup(pipename); return 0;}# else /* CKPT_HELPER_IS_CONDOR */#ifdef CKPT_HELPER_IS_BLCRstatic char *get_restart_cmd(){ char *rs; rs = getenv("RESTART"); if(rs) return strdup(rs); return strdup("/home/orsay/erodriguez/mpich-1.2.7p1/bin/ckpt_restart");}int checkpoint_set_command(int *argc, char ***argv, int rank, int group, int restarting){ FILE* f_debug; char pipename[strlen(TMPDIR)+48]; f_debug = fopen("/tmp/debug.log","a"); fprintf(f_debug,"%sRESTART %s %d:%d.%s.pipe\n", restarting?"":"(not)", TMPDIR,group, rank, restarting?"restart":"ckpt"); sprintf(pipename, TMPDIR"/%d:%d.%s.pipe", group, rank, restarting?"restart":"ckpt"); printi("GenericCheckpoint", "ckpt_set_command **+*: restarting = %d pipename=%s",restarting,pipename); fclose(f_debug); if(restarting) { (*argv)[0] = get_restart_cmd(); (*argv)[1] = strdup(pipename); (*argv)[2] = NULL; *argc = 2; return 0; } else { /** assumes that the environment is inherited */ setenv("CKPT_FILENAME", pipename, 1); return 0; } return 0;}# else /* CKPT_HELPER_IS_BLCR # */# endif /* CKPT_HELPER_IS_BLCR */# endif /*CKPT_HELPER_IS_CONDOR */#endif /* CKPT_HELPER_IS_CKPT */#endif
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -