📄 localmpi.c
字号:
/** @file localmpi.c implements the communication level between local mpi process and the daemon */#include <sys/types.h>#include <sys/stat.h>#include <unistd.h>#include <fcntl.h>#include <string.h>#include <sys/ioctl.h>#include <sys/wait.h>#include "config.h"#include "debug.h"#include "localmpi.h"#include "daemoncom.h"#include "select.h"#include "simple_list.h"#include "utils_socket.h"#include "flitvec.h"#include "otherdaemoncomm.h"#include "checkpoint.h"#include "vtypes.h"//#define AFFMPIMESSAGEstatic int RFmpi = -1;static int WTmpi = -1;static int myrank = -1;static int DEBUG_NEXT;static int hasFinalized;void localMPI_removePipes(int group, int rank){ char *filename, *basename; struct stat s; myrank = rank; basename = localMPI_PipeBaseName(group, rank); filename = (char*)malloc(strlen(basename)+16); sprintf(filename, "%s.Daemon2Driver", basename); if(stat(filename, &s) == 0) { if(unlink(filename) < 0) qerror("could not clean the file %s", filename); } filename = (char*)malloc(strlen(basename)+16); sprintf(filename, "%s.Driver2Daemon", basename); if(stat(filename, &s) == 0) { if(unlink(filename) < 0) qerror ("could not clean the file %s", filename); }}void localMPI_regeneratePipes(int group, int rank){ char *filename, *basename; struct stat s; myrank = rank; basename = localMPI_PipeBaseName(group, rank); filename = (char*)malloc(strlen(basename)+16); sprintf(filename, "%s.Daemon2Driver", basename); if(stat(filename, &s) == 0) { if(unlink(filename) < 0) qerror("could not clean the file %s", filename); } /* reopening Daemon2Driver pipe */ if (mkfifo(filename, 00644) < 0) qerror("could not create FIFO %s", filename); filename = (char*)malloc(strlen(basename)+16); sprintf(filename, "%s.Driver2Daemon", basename); if(stat(filename, &s) == 0) { if(unlink(filename) < 0) qerror ("could not clean the file %s", filename); } /* reopening Driver2Daemon pipe */ if (mkfifo(filename, 00644) < 0) qerror("could not create FIFO %s", filename);}int localMPI_OpenWriteToDriver(int group, int rank){ char *filename, *basename; struct stat s; basename = localMPI_PipeBaseName(group, rank); filename = (char*)malloc(strlen(basename)+16); sprintf(filename, "%s.Daemon2Driver", basename); while( stat(filename, &s) < 0 ) { printi("init", "%s does not exists, waiting for creation (did you fork the MPI process ?)", filename); sleep(1); } printi("init", "opening %s for writing", filename); WTmpi = open(filename, O_WRONLY); if(WTmpi < 0) qerror("opening pipe for writing %s.", filename); free(filename); free(basename); return WTmpi;}int localMPI_OpenReadFromDriver(int group, int rank){ char *filename, *basename; basename = localMPI_PipeBaseName(group, rank); filename = (char*)malloc(strlen(basename)+16); sprintf(filename, "%s.Driver2Daemon", basename); // unlink(filename); RFmpi = open(filename, O_RDONLY); printi("init", "%s opened for reading", filename); if(RFmpi < 0) qerror("could not open FIFO %s for reading", filename); free(filename); free(basename); return RFmpi;}char* localMPI_PipeBaseName(int group, int rank){ char basename[strlen(TMPDIR)+31]; sprintf(basename, TMPDIR"/%d:%d", group, rank); return strdup(basename);}int localMPI_setCommand(int *argc, char ***argv, int g, int r, int np, char *debug){ char *pbn = localMPI_PipeBaseName(g, r); char opt[strlen(debug?debug:"")+strlen(TMPDIR)+128]; sprintf(opt, "-vparam(%d, %d, %d, %s, %s)", g, np, r, pbn, debug?debug:""); (*argv)[(*argc)++] = strdup(opt); return 0;}void on_writeback (pkt_header *pkt){ int n; int cp; pkt_header cp_order; cp = pckpt_atomic_try_and_begin(); if(cp == CP_START || cp == CP_START_NO_FORK ) { memset(&cp_order, htonl(-1), sizeof(pkt_header)); cp_order.iType = htonl(cp); n = _uwrite(WTmpi, &cp_order, sizeof(pkt_header)); if(n != sizeof(pkt_header)) qerror("writing checkpoint order to device"); // if (ftp_wait_ckpt ()) // return; n = _uread(RFmpi, &cp_order, sizeof(pkt_header)); if(n != sizeof(pkt_header)) qerror("re-reading reception request"); printi("Request", "re-read reception request of %s", format_packet(&cp_order)); } printi ("deliver", "deliver of %s", format_packet_long(pkt, 48)); n = _uwrite (WTmpi, pkt, sizeof(pkt_header) + ntohl(pkt->iSize)); if (n != sizeof(pkt_header) + ntohl(pkt->iSize)) qerror ("writing data to device : wrote %d/%d", n, sizeof(pkt_header) + ntohl(pkt->iSize)); pkt->iType = htonl(cp); ftp_readSave(pkt); free (pkt);}/** the local MPI process probe the daemon for a message * local MPI process --> daemon * @param tag : the tag of message probed * @return void */static void on_probe (int tag){ /* * iTag hold the result of probe * iType hold 0 if no checkpoint to do, CP_START if checkpoint */ pkt_header rep; int res; bzero (&rep, sizeof (pkt_header)); res = ftp_probeSaveOrReplay(&rep); if (res == 0) { res = probing_pnd_mesgs(tag); rep.iTag = htonl(res); rep.iType = 0; //htonl(pckpt_atomic_try_and_begin()); } if (_uwrite (WTmpi, &rep, sizeof (pkt_header)) != sizeof (pkt_header)) qerror ("writing the probe result to device"); printi("on_probe","answer probe: %d, %s", ntohl(rep.iType), format_packet(&rep));}/** to read what the local mpi process wants * @param tag : the tag of message wanted * @return void */static void on_read(int tag){ pkt_header* mesg = NULL; if ( ftp_readReplay(tag) ) /* apply function of message re-read after restart */ return; /* if in restart mode, it's over here */ mesg = find_pending(cmp_tag, (void*)tag); /* locate message in the pending list*/ if (mesg) { /* if message found in pending, send it to mpi process */ on_writeback(mesg); } else newpendingtag(tag); /* if message no found in pending, declare the tag */}/** I'm talking aloud to myself! * @param pkt: what I'm saying */static void on_soliloquize(pkt_header *pkt){ ftp_on_soliloquize_send(pkt); /**@todo check this (unsure wrt FT)*/ if ( waiting_for_tag( ntohl(pkt->iTag) ) ) on_writeback(pkt); else pending_add(pkt);}/** called when local process wants to give something to send * (read pipe awakened select); * local MPI process --> Com Daemon (sb) * @param buf : packet to send. * @return 0 or fails */static int on_write (pkt_header *pkt){ pkt_header rep; if(ntohl(pkt->iDst) == myrank) on_soliloquize(pkt); else ftp_newOutgoingMessage(pkt); memset(&rep, 0, sizeof (pkt_header)); rep.iType = 0; // htonl(pckpt_atomic_try_and_begin()); printi ("emit", "emit %s", format_packet_long(pkt, 48)); DEBUG_NEXT = 5; if (_uwrite (WTmpi, &rep, sizeof (pkt_header)) != sizeof (pkt_header)) qerror ("writing the rep of reception to device"); return 0;}void on_request(){ pkt_header pkt; char * buf; int n; int r; if( hasFinalized ) { r = -1; ioctl(RFmpi, FIONREAD, &r); if(r == 0) return; } printi("fdset", "smpi_read is read ready\n"); n = _uread(RFmpi, &pkt, sizeof(pkt_header)); if(n != sizeof(pkt_header)) qerror("read command header from device (received %d/%d)", n, sizeof(pkt_header)); printi("Request", "MPI process request : %s", format_packet(&pkt)); if (ntohl(pkt.iType)==ANY_MSG_AVAIL) { on_probe(ntohl(pkt.iTag)); return; } if ((ntohl(pkt.iType)==RECV_ANY_CONTROL) || (ntohl(pkt.iType)==RECV_FROM_CHANNEL)) { on_read(ntohl(pkt.iTag)); return; } if ((ntohl(pkt.iType)==SEND_CONTROL) || (ntohl(pkt.iType)==SEND_CHANNEL)) { buf=(char *)malloc(sizeof(pkt_header)+ntohl(pkt.iSize)); memcpy(buf,&pkt,sizeof(pkt_header)); n = _uread(RFmpi, buf+sizeof(pkt_header), ntohl(pkt.iSize)); if(n != ntohl(pkt.iSize)) qerror("read data from device"); on_write((pkt_header*)buf); return; } if(ntohl(pkt.iType) == CLIENT_EXIT) { /* finalize computation */ hasFinalized = 1; dispatcher_mpi_exit(); }}void release_driver(void){ pkt_header pkt; int n; n = _uwrite(WTmpi, &pkt, sizeof(pkt_header)); if(n != sizeof(pkt_header)) qerror("write final packet to device");}void ack_endCkpt_localMpi(int ack_type){ /* pkt_header pkt; int n; pkt.iType = htonl(ack_type); n = _uwrite(WTmpi, &pkt, sizeof(pkt_header)); if(n != sizeof(pkt_header)) qerror("write ack for end checkpoint phase to device"); */ printi("exit", "is called"); close_all_sockets(); wait (NULL); exit(0);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -