📄 daemoncom.c
字号:
/** @file daemoncom.c implements the communication between daemons implied in the * fault tolerant protocol */#include "config.h"#include "debug.h"#include "vtypes.h"#include "flitvec.h"#include "utils_socket.h"#include "connect.h"#include "daemoncom.h"#include "receiverbased.h"#include "checkpoint.h"#include "localmpi.h"#include "otherdaemoncomm.h"#include "daemonCsched.h"#include "select.h"#include <assert.h>static int worldsize; /**< total number of mpi processes */static int myrank; /**< rank of this daemon */static listeMessages receiverbased; /**< senderbased is a table of listeMessages refered by mpi rank*/static int *ckpt_warn; /**< index by rank, 1 if ckpt_info received from rank*//***************************************************** * non-API * *****************************************************/int cl_ckptmark_status(){ int i; for (i = 0; i < worldsize; i++) { if (ckpt_warn[i] == 0) { printi("ckpt_warn", "ckpt_warn[%d] = 0", i); return 0; } } printi("ckpt_warn", "every peer in the same wave"); return 1;}void ckpt_csched_warn(){ printi("ckpt_warn", "received a csched warning : ckpt_warn[%d] = 1", myrank); ckpt_warn[myrank] = 1;}void ckpt_info_reinit(){ printi("ckpt_warn", "reinit of the ckpt_warn to zero"); memset(ckpt_warn, 0 , worldsize * sizeof(int)); remove_message_from_log(&receiverbased);}/** Export SB (used by ftp checkpoint module) */listeMessages *daemon_exportrb(void){ return &receiverbased;}extern LinkedList_t * getpending();/* callback for sent flit */static void data_sendcomplete(flit *f, void *p){ free(p);}/** WARN : call just after the deliver of CP_START message to local mpi process * and before any more message is deliver to it */void launch_ckpt(){ LinkedList_t * thepending; sElement_t * elem; flitvec *fv; pkt_header *p; int i; int myseqnum = pckpt_getSeqnumber(); thepending = getpending(); printi("launchckpt", "myseqnum = %d, mystatus = %d", myseqnum, ckpt_status()); debugpndpkt(); for(i = 0; i < worldsize; i++) { if(i == myrank) continue; p=(pkt_header*)calloc(sizeof(pkt_header), 1); p->iSrc = htonl(myrank); p->iDst = htonl(i); p->iType = htonl( FORCE_CHECKPOINT ); p->ftp.ckpt = htonl(myseqnum); fv = newflitvec(1); fv->parameter = p; newflit(&(fv->flittab[0]), p, sizeof(pkt_header), data_sendcomplete); addtopeersend(fv); } printi("launchckpt", "pendingsize = %d ", thepending->size); for(elem = thepending->head; elem; elem = elem->pNext) { pkt_header * pkt = (pkt_header *)elem->data; if( ((int)ntohl(pkt->ftp.ckpt)) < myseqnum ) { if( ((int)ntohl(pkt->ftp.ckpt)) < myseqnum - 1 ) printw("it seems that two checkpoint waves are running concurently. Cross your fingers."); add_message_to_log(&receiverbased, pkt); } }}/***************************************************** * API * *****************************************************/void ftp_readSave(pkt_header *pkt){ if( ntohl(pkt->iType) != CP_START ) return; pkt->iType = htonl(0); if( ((int)ntohl(pkt->ftp.ckpt)) < pckpt_getSeqnumber() ) push_message_to_log(&receiverbased, pkt);}int ftp_probeSaveOrReplay(pkt_header * rep){ return 0;}void ftp_init(int sizeofmpipeer, int rank){ ckpt_warn = calloc(sizeofmpipeer , sizeof(int)); printi("ckpt_warn", "init of the ckpt_warn to zero"); myrank = rank; worldsize = sizeofmpipeer; initReceiverBased(&receiverbased, sizeofmpipeer); /* initialize sender based log */}void ftp_newOutgoingMessage(pkt_header *msg){ flitvec *fv; printi("clsend", "will send %s", format_packet(msg)); /** we have 3 flits */ fv = newflitvec(2); /** assert this message is NOT a checkpoint information */ msg->ftp.ckpt = htonl(pckpt_getSeqnumber()); /** flit0 : the pkt header */ newflit(&(fv->flittab[0]), msg, sizeof(pkt_header), NULL); /** flit2 : the data */ newflit(&(fv->flittab[1]), ((char*)msg)+sizeof(pkt_header), ntohl(msg->iSize), data_sendcomplete); /** we need to send everything, so we let cflitindex and cflitpos * be zero */ /** we set parameter to be the message, in order to free it in * dummy_sendcomplete * free message in dummy_sendcomplete possible since a copy is made in sender based */ fv->parameter = msg; /** finally we add the flitvec to message ready to be send */ addtopeersend(fv); return;}static void data_receive(flit *f, void *param){ pkt_header * pkt = (pkt_header *)param; int pktseqnum = ntohl(pkt->ftp.ckpt); int myseqnum = pckpt_getSeqnumber(); int mystatus = ckpt_status(); printi("clrecv", "myseqnum = %d, mystatus = %d, receive packet %s", myseqnum, mystatus, format_packet(pkt)); if(!( (pktseqnum == myseqnum) || ((pktseqnum < myseqnum) && (mystatus == STATUS_CP_RUNNING) ) || ((pktseqnum > myseqnum) && (mystatus != STATUS_CP_RUNNING) ) ) ) { printi("ckpt_seqnum", " Error : %d != %d &&", pktseqnum, myseqnum); printi("ckpt_seqnum", " && %d >= %d || %d != STATUS_CP_RUNNING(%d)", pktseqnum, myseqnum, mystatus, STATUS_CP_RUNNING); printi("ckpt_seqnum", " && %d <= %d || %d == STATUS_CP_RUNNING(%d)", pktseqnum, myseqnum, mystatus, STATUS_CP_RUNNING); printq("ckpt_seqnum"); } if (pktseqnum > myseqnum) { cp_request(); printi("ckpt_warn", "ckpt_warn[%d] = 1 (first checkpoint request)", ntohl(pkt->iSrc)); ckpt_warn[ntohl(pkt->iSrc)] = 1; } if ( (pktseqnum == myseqnum) && (mystatus == STATUS_CP_RUNNING) ) { if(!ckpt_warn[ntohl(pkt->iSrc)]) printi("ckpt_warn", "ckpt_warn[%d] = 1 (acknowledge in cp running", ntohl(pkt->iSrc)); ckpt_warn[ntohl(pkt->iSrc)] = 1; } /* 0 length messages are used to force communication between peers */ if( ntohl(pkt->iType) == FORCE_CHECKPOINT ) return; if ( (mystatus == STATUS_CP_RUNNING) && (pktseqnum < myseqnum) ) add_message_to_log(&receiverbased, pkt); if (waiting_for_tag(ntohl(pkt->iTag))) { printi ("Recv", "this is the pending message"); clearpendingtag(); on_writeback(pkt); } else pending_add(pkt);}flitvec *ftp_create_newfv( pkt_header *pkt ){ flitvec *res; pkt_header *fv0; char *data; if( ntohl(pkt->iType) == FORCE_CHECKPOINT ) { data_receive(NULL, pkt); return NULL; } /** we use 2 flits */ res = newflitvec(2); /** first flit is always the pkt_header */ /** pkt must be copied if further use is needed * (it is currently allocated in stack) * Moreover, we allocate the buffer of reception * @todo if you are brave: add a pipeline with driver here */ fv0 = (pkt_header*)malloc(sizeof(pkt_header) + ntohl(pkt->iSize)); data = ((char *) fv0) + sizeof(pkt_header); memcpy(fv0, pkt, sizeof(pkt_header)); /** it is the first flit */ newflit(&(res->flittab[0]), fv0, sizeof(pkt_header), NULL); /** second flit is the data */ /** we attach the data to the flit */ newflit(&(res->flittab[1]), data, ntohl(pkt->iSize), data_receive); /** Since we already have received the first flit, * we don't receive it again ! */ res->cflitindex = 1; res->cflitpos = 0; /** Last : fix the parameter given to dummy_receive, in addition to * the flit */ res->parameter = fv0; return res;}int ftp_connection_starting_state(int connection ){ return CO_DONE;}/* Called when something to read during connection phase * to stop connection phase and begin communication phase, * set mpipeer[rank].hstate to CO_DONE * @param mpipeer : tab of all peer socket infos * @param rank : rank of the local mpi process * @return <0 on error, 0 on success */int ftp_connection_read(SockInfo mpipeer[], int rank){ return 0;}/* Called when something to write during connection phase * to stop connection phase and begin communication phase, * set mpipeer[rank].hstate to CO_DONE * @param mpipeer : tab of all peer socket infos * @param rank : rank of the local mpi process * @return <0 on error, 0 on success */int ftp_connection_write(SockInfo mpipeer[], int rank){ return 0;}void ftp_on_disconnect(SockInfo mpipeer[], int rank){ printi("VCL", "detected some disctonnection : bye!"); close_all_sockets(); exit (-1);}void ftp_on_soliloquize_send(pkt_header *pkt){ pkt->ftp.ckpt = htonl(pckpt_getSeqnumber()); return;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -