📄 checkpoint.c
字号:
/** @file checkpoint.c implements the checkpointing method for a particular protocol */#include "config.h"#include "debug.h"#include "checkpoint.h"#include "utils_socket.h"#include "otherdaemoncomm.h"#include "daemonCsched.h"#include "daemoncom.h"#include <stdlib.h>#include <string.h>#include <errno.h>#include <unistd.h>#include <signal.h>#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <netdb.h>#include <sys/stat.h>#include <fcntl.h>#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>static int ckpt_writerbfunc(CkptInfo *cin, void *args);int ftp_ckpt_wfdset(CkptInfo *cin){ if(pckpt_getCkptfunc() == ckpt_writerbfunc) { if(cl_ckptmark_status()) return cin->sock.data; else return 0; } else return cin->sock.data;}int ftp_ckpt_protocolsize(int np){ return sizeof(int) /* size of checkpoint image */ + sizeof(int) /* size of receiverbased */ ;}void ftp_ckpt_seqnumber(int seq){ ++seq; printi("ckpt", "Setting this checkpoint sequence number to %d", seq); pckpt_setSeqnumber(seq);}int ftp_ckpt_try(void){ return csched_can_cp_now();}int ftp_ckpt_begin(CkptInfo *cin){ /* do some cshed related action */ printi("ftp_ckpt", "Calling Csched related stuff"); csched_on_begin_cp(); return 0;}int ftp_ckpt_start() /* return a dependent protocol ckpt order */{ return csched_ckpt_type();}int ftp_ckpt_end(CkptInfo *cin){ /* do some cshed related action */ csched_on_end_cp(); return 0;}int ftp_ckpt_nbfunc(void){ return 2;}void ftp_ckpt_setwritefunc(CkptInfo *cin){ pckpt_setCkptfunc(0, pckpt_writeimgfunc); pckpt_setCkptargs(0, NULL); pckpt_setCkptfunc(1, ckpt_writerbfunc); pckpt_setCkptargs(1, daemon_exportrb());}static int _rbsend_one_message(CkptInfo *cin, char *data){ static int sent; static int size = -1; int n;#define hpkt ((pkt_header *) data) ASSERT(data); if(size == -1) { /* previous message was totally sent, load a new one. */ size = ntohl( hpkt->iSize ) + sizeof(pkt_header); sent = 0; } printi("ckpt_sbsend", "Sending SenderBased stored packet %s, from byte %d", format_packet(hpkt), sent); if((n = sendCheckpointImageData(&(cin->sock), data + sent, size - sent)) < 0) { if(errno == EAGAIN) { printw("Sending SenderBased stored packect %s",format_packet(hpkt)); return 0; } else { printe("Sending SenderBased stored packet %s",format_packet(hpkt)); return -1; } } if((sent += n) >= size) { cin->datasize += size; size = -1; return 1; } return 0;#undef hpkt}static int _rbsend_one_list(CkptInfo *cin, sElement_t *head){ static int sent = 0; static sElement_t *l = (sElement_t *) -1;#define hpkt ((pkt_header *)(l->data)) if(l == (sElement_t *) -1) l = head; /* dirty trick due to non constant initializer */ if(l == NULL) { printi("ckpt_rbsend", "List completed, sending its size %d trough protocol channel", sent); if(syncSendCheckpointProtoData(&(cin->sock), &sent, sizeof(int)) != sizeof(int)) qerror("Cannot send size %d of SenderBased list trough protocol data channel to checkpoint reliable server", sent); sent = 0; l = (sElement_t *) -1; return 1; } switch(_rbsend_one_message(cin, (char *) l->data)) { case 0: return 0; case 1: printi("ckpt_rbsend","Receiver based num: %d, packet %s successfully sent, proceding to next message", sent + 1, format_packet(hpkt)); l = l->pNext; sent++; return 0; default: return -1; }#undef hpkt}static int ckpt_writerbfunc(CkptInfo *cin, void *data){# define rb ((listeMessages *) data) switch(_rbsend_one_list(cin, rb->messages.head)) { case 0 : return 0; case 1 : return 1; default : ASSERT(0); }# undef rb return 1;}/****************************************************************************** * restart functions. All are synchronous. */static int restart_readrbfunc(CkptInfo *cin, void *data);int ftp_restart_begin(CkptInfo *cin){ //printi("ckpt_recvsb", " received a Sender Based payload message for : %s", format_packet(&pkt)); csched_begin_restart(); return 0;}int ftp_restart_end(CkptInfo *cin){ csched_end_restart(); return 0;}int ftp_restart_nbfunc(void){ return 2;}int ftp_restart_setreadfunc(CkptInfo *cin){ pckpt_setCkptfunc(0, prestart_readimgfunc); pckpt_setCkptargs(0, NULL); pckpt_setCkptfunc(1, restart_readrbfunc); pckpt_setCkptargs(1, NULL); return 0;}static int unserialize_one_rb_one_message(CkptSock *s){ pkt_header pkt; char *m; int n; if(syncRecvCheckpointImageData(s, &pkt, sizeof(pkt_header)) != sizeof(pkt_header)) qerror("Recieving Sender Based payload, cannot retrieve packet header"); printi("ckpt_recvsb", " received a Sender Based payload message for : %s", format_packet(&pkt)); m = (char*)malloc(sizeof(pkt_header) + ntohl(pkt.iSize)); memcpy(m, &pkt, sizeof(pkt_header)); if((n = syncRecvCheckpointImageData(s, m + sizeof(pkt_header), ntohl(pkt.iSize))) != ntohl(pkt.iSize)) qerror("recieving Sender Based message payload for message %s", format_packet(&pkt)); printi("ckpt_recvsb","data received"); pending_add((pkt_header *) m); return 0;}static int unserialize_one_rb(CkptSock *s){ int nbentry, i; if(syncRecvCheckpointProtoData(s, &nbentry, sizeof(int)) != sizeof(int)) qerror("Receiving number of entries for Receiver Based payload "); printi("ckpt_recvsb", "Recieving Receiver Based payload : waiting %d entries", nbentry); for(i = 0; i < nbentry; i++) if(unserialize_one_rb_one_message(s) < 0) { printw("could not unserialize message %d of pending ", i); return -1; } { LinkedList_t *p; pkt_header *pkt; if(nbentry > 0) { p = getpending(); pkt = (pkt_header*)p->head->data; printi("deliver", "deliver : CP_RESTART first packet = %s", format_packet_long(pkt, 48)); } else printi("deliver", "nbentry = 0!"); } debugpndpkt(); return 0;}static int restart_readrbfunc(CkptInfo *cin, void *data){ if(unserialize_one_rb(&(cin->sock)) < 0) { printw("could not unserialize Receiver Based message log"); return -1; } return 1;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -