⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 localmpi.c

📁 Path MPICH-V for MPICH the MPI Implementation
💻 C
字号:
/** @file localmpi.c implements the communication level between local mpi process and the daemon */#include <sys/types.h>#include <sys/stat.h>#include <unistd.h>#include <fcntl.h>#include <string.h>#include <sys/ioctl.h>#include <sys/wait.h>#include "config.h"#include "debug.h"#include "localmpi.h"#include "daemoncom.h"#include "select.h"#include "simple_list.h"#include "utils_socket.h"#include "flitvec.h"#include "otherdaemoncomm.h"#include "checkpoint.h"#include "vtypes.h"//#define AFFMPIMESSAGEstatic int RFmpi = -1;static int WTmpi = -1;static int myrank = -1;static int DEBUG_NEXT;static int hasFinalized;void localMPI_removePipes(int group, int rank){  char *filename, *basename;  struct stat s;  myrank = rank;  basename = localMPI_PipeBaseName(group, rank);  filename = (char*)malloc(strlen(basename)+16);  sprintf(filename, "%s.Daemon2Driver", basename);  if(stat(filename, &s) == 0)    {      if(unlink(filename) < 0)        qerror("could not clean the file %s", filename);    }  filename = (char*)malloc(strlen(basename)+16);  sprintf(filename, "%s.Driver2Daemon", basename);  if(stat(filename, &s) == 0)    {      if(unlink(filename) < 0)        qerror ("could not clean the file %s", filename);    }}void localMPI_regeneratePipes(int group, int rank){  char *filename, *basename;  struct stat s;  myrank = rank;  basename = localMPI_PipeBaseName(group, rank);  filename = (char*)malloc(strlen(basename)+16);  sprintf(filename, "%s.Daemon2Driver", basename);  if(stat(filename, &s) == 0)    {      if(unlink(filename) < 0)        qerror("could not clean the file %s", filename);    }  /* reopening Daemon2Driver pipe */  if (mkfifo(filename, 00644) < 0)    qerror("could not create FIFO %s", filename);    filename = (char*)malloc(strlen(basename)+16);  sprintf(filename, "%s.Driver2Daemon", basename);  if(stat(filename, &s) == 0)    {      if(unlink(filename) < 0)        qerror ("could not clean the file %s", filename);    }  /* reopening Driver2Daemon pipe */  if (mkfifo(filename, 00644) < 0)    qerror("could not create FIFO %s", filename);}int localMPI_OpenWriteToDriver(int group, int rank){  char *filename, *basename;  struct stat s;  basename = localMPI_PipeBaseName(group, rank);  filename = (char*)malloc(strlen(basename)+16);  sprintf(filename, "%s.Daemon2Driver", basename);  while( stat(filename, &s) < 0 )    {      printi("init", "%s does not exists, waiting for creation (did you fork the MPI process ?)", filename);      sleep(1);    }  printi("init", "opening %s for writing", filename);  WTmpi = open(filename, O_WRONLY);  if(WTmpi < 0)    qerror("opening pipe for writing %s.", filename);  free(filename);  free(basename);  return WTmpi;}int localMPI_OpenReadFromDriver(int group, int rank){  char *filename, *basename;  basename = localMPI_PipeBaseName(group, rank);  filename = (char*)malloc(strlen(basename)+16);  sprintf(filename, "%s.Driver2Daemon", basename);  // unlink(filename);   RFmpi = open(filename, O_RDONLY);  printi("init", "%s opened for reading", filename);  if(RFmpi < 0)    qerror("could not open FIFO %s for reading", filename);  free(filename);  free(basename);  return RFmpi;}char* localMPI_PipeBaseName(int group, int rank){  char basename[strlen(TMPDIR)+31];  sprintf(basename, TMPDIR"/%d:%d", group, rank);  return strdup(basename);}int localMPI_setCommand(int *argc, char ***argv, int g, int r, int np, char *debug){  char *pbn = localMPI_PipeBaseName(g, r);  char opt[strlen(debug?debug:"")+strlen(TMPDIR)+128];  sprintf(opt, "-vparam(%d, %d, %d, %s, %s)",	  g, np, r, pbn, debug?debug:"");  (*argv)[(*argc)++] = strdup(opt);  return 0;}void on_writeback (pkt_header *pkt){  int n;  int cp;  pkt_header cp_order;  cp = pckpt_atomic_try_and_begin();  if(cp == CP_START || cp == CP_START_NO_FORK )    {      memset(&cp_order, htonl(-1), sizeof(pkt_header));      cp_order.iType = htonl(cp);      n = _uwrite(WTmpi, &cp_order, sizeof(pkt_header));      if(n != sizeof(pkt_header))	qerror("writing checkpoint order to device");      //      if (ftp_wait_ckpt ())      //    return;      n = _uread(RFmpi, &cp_order, sizeof(pkt_header));      if(n != sizeof(pkt_header))	qerror("re-reading reception request");      printi("Request", "re-read reception request of %s", format_packet(&cp_order));    }  printi ("deliver", "deliver of %s", format_packet_long(pkt, 48));  n = _uwrite (WTmpi, pkt, sizeof(pkt_header) + ntohl(pkt->iSize));  if (n != sizeof(pkt_header) + ntohl(pkt->iSize))    qerror ("writing data to device : wrote %d/%d", n, sizeof(pkt_header) + ntohl(pkt->iSize));    pkt->iType = htonl(cp);  ftp_readSave(pkt);  free (pkt);}/** the local MPI process probe the daemon for a message * local MPI process --> daemon * @param tag : the tag of message probed * @return void */static void on_probe (int tag){  /*   *  iTag hold the result of probe   *  iType hold 0 if no checkpoint to do, CP_START if checkpoint   */  pkt_header rep;  int res;  bzero (&rep, sizeof (pkt_header));  res = ftp_probeSaveOrReplay(&rep);  if (res == 0)  {    res = probing_pnd_mesgs(tag);    rep.iTag = htonl(res);    rep.iType =  0; //htonl(pckpt_atomic_try_and_begin());  }  if (_uwrite (WTmpi, &rep, sizeof (pkt_header)) != sizeof (pkt_header))    qerror ("writing the probe result to device");  printi("on_probe","answer probe: %d, %s", ntohl(rep.iType), format_packet(&rep));}/** to read what the local mpi process wants * @param tag : the tag of message wanted * @return void */static void on_read(int tag){  pkt_header* mesg = NULL;  if ( ftp_readReplay(tag) )     /* apply function of message re-read after restart */    return;                      /* if in restart mode, it's over here */    mesg = find_pending(cmp_tag, (void*)tag); /* locate message in the pending list*/  if (mesg)    {      /* if message found in pending, send it to mpi process */      on_writeback(mesg);    }  else    newpendingtag(tag);     /* if message no found in pending, declare the tag */}/** I'm talking aloud to myself! * @param pkt: what I'm saying */static void on_soliloquize(pkt_header *pkt){  ftp_on_soliloquize_send(pkt);  /**@todo check this (unsure wrt FT)*/  if ( waiting_for_tag( ntohl(pkt->iTag) ) )    on_writeback(pkt);  else    pending_add(pkt);}/** called when local process wants to give something to send * (read pipe awakened select); * local MPI process --> Com Daemon (sb) * @param buf : packet to send. * @return 0 or fails */static int on_write (pkt_header *pkt){  pkt_header rep;  if(ntohl(pkt->iDst) == myrank)    on_soliloquize(pkt);  else    ftp_newOutgoingMessage(pkt);  memset(&rep, 0, sizeof (pkt_header));  rep.iType = 0; // htonl(pckpt_atomic_try_and_begin());  printi ("emit", "emit %s", format_packet_long(pkt, 48));  DEBUG_NEXT = 5;  if (_uwrite (WTmpi, &rep, sizeof (pkt_header)) != sizeof (pkt_header))    qerror ("writing the rep of reception to device");  return 0;}void on_request(){  pkt_header pkt;  char * buf;  int n;  int r;  if( hasFinalized )    {      r = -1;      ioctl(RFmpi, FIONREAD, &r);      if(r == 0)	return;    }  printi("fdset", "smpi_read is read ready\n");  n = _uread(RFmpi, &pkt, sizeof(pkt_header));  if(n != sizeof(pkt_header))    qerror("read command header from device (received %d/%d)", n, sizeof(pkt_header));  printi("Request", "MPI process request : %s", format_packet(&pkt));  if (ntohl(pkt.iType)==ANY_MSG_AVAIL)    {      on_probe(ntohl(pkt.iTag));      return;    }  if ((ntohl(pkt.iType)==RECV_ANY_CONTROL) || (ntohl(pkt.iType)==RECV_FROM_CHANNEL))    {      on_read(ntohl(pkt.iTag));      return;    }  if ((ntohl(pkt.iType)==SEND_CONTROL) || (ntohl(pkt.iType)==SEND_CHANNEL))    {      buf=(char *)malloc(sizeof(pkt_header)+ntohl(pkt.iSize));      memcpy(buf,&pkt,sizeof(pkt_header));      n = _uread(RFmpi, buf+sizeof(pkt_header), ntohl(pkt.iSize));      if(n != ntohl(pkt.iSize))        qerror("read data from device");      on_write((pkt_header*)buf);      return;    }  if(ntohl(pkt.iType) == CLIENT_EXIT)     {      /* finalize computation */      hasFinalized = 1;      dispatcher_mpi_exit();          }}void release_driver(void){  pkt_header pkt;  int n;  n = _uwrite(WTmpi, &pkt, sizeof(pkt_header));  if(n != sizeof(pkt_header))    qerror("write final packet to device");}void ack_endCkpt_localMpi(int ack_type){  /*  pkt_header pkt;  int n;  pkt.iType = htonl(ack_type);  n = _uwrite(WTmpi, &pkt, sizeof(pkt_header));  if(n != sizeof(pkt_header))    qerror("write ack for end checkpoint phase to device");  */  printi("exit", "is called");  close_all_sockets();   wait (NULL);  exit(0);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -