⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 daemoncom.c

📁 Path MPICH-V for MPICH the MPI Implementation
💻 C
字号:
/** @file daemoncom.c implements the communication between daemons implied in the * fault tolerant protocol */#include "config.h"#include "debug.h"#include "vtypes.h"#include "flitvec.h"#include "utils_socket.h"#include "connect.h"#include "daemoncom.h"#include "receiverbased.h"#include "checkpoint.h"#include "localmpi.h"#include "otherdaemoncomm.h"#include "daemonCsched.h"#include "select.h"#include <assert.h>static int worldsize; /**< total number of mpi processes */static int myrank;  /**< rank of this daemon */static listeMessages receiverbased;  /**< senderbased is a table of listeMessages refered by mpi rank*/static int *ckpt_warn; /**< index by rank, 1 if ckpt_info received from rank*//***************************************************** *                       non-API                     * *****************************************************/int cl_ckptmark_status(){  int i;  for (i = 0; i < worldsize; i++)    {      if (ckpt_warn[i] == 0)	{	  printi("ckpt_warn", "ckpt_warn[%d] = 0", i);	  return 0;	}    }  printi("ckpt_warn", "every peer in the same wave");  return 1;}void ckpt_csched_warn(){  printi("ckpt_warn", "received a csched warning : ckpt_warn[%d] = 1", myrank);  ckpt_warn[myrank] = 1;}void ckpt_info_reinit(){  printi("ckpt_warn", "reinit of the ckpt_warn to zero");  memset(ckpt_warn, 0 , worldsize * sizeof(int));  remove_message_from_log(&receiverbased);}/** Export SB (used by ftp checkpoint module)  */listeMessages *daemon_exportrb(void){  return &receiverbased;}extern LinkedList_t * getpending();/* callback for sent flit */static void data_sendcomplete(flit *f, void *p){  free(p);}/** WARN : call just after the deliver of CP_START message to local mpi process * and before any more message is deliver to it */void launch_ckpt(){  LinkedList_t * thepending;  sElement_t * elem;  flitvec *fv;  pkt_header *p;  int i;  int myseqnum = pckpt_getSeqnumber();  thepending = getpending();  printi("launchckpt", "myseqnum = %d, mystatus = %d", myseqnum, ckpt_status());  debugpndpkt();  for(i = 0; i < worldsize; i++)    {      if(i == myrank)	continue;      p=(pkt_header*)calloc(sizeof(pkt_header), 1);      p->iSrc = htonl(myrank);      p->iDst = htonl(i);      p->iType = htonl( FORCE_CHECKPOINT );      p->ftp.ckpt = htonl(myseqnum);      fv = newflitvec(1);      fv->parameter = p;      newflit(&(fv->flittab[0]), p, sizeof(pkt_header), data_sendcomplete);      addtopeersend(fv);    }  printi("launchckpt", "pendingsize = %d ", thepending->size);  for(elem = thepending->head; elem; elem = elem->pNext)    {      pkt_header * pkt = (pkt_header *)elem->data;      if( ((int)ntohl(pkt->ftp.ckpt)) < myseqnum )	{	  if( ((int)ntohl(pkt->ftp.ckpt)) < myseqnum - 1 )	    printw("it seems that two checkpoint waves are running concurently. Cross your fingers.");	  add_message_to_log(&receiverbased, pkt);	}    }}/***************************************************** *                         API                       * *****************************************************/void ftp_readSave(pkt_header *pkt){  if( ntohl(pkt->iType) != CP_START )    return;  pkt->iType = htonl(0);  if( ((int)ntohl(pkt->ftp.ckpt)) < pckpt_getSeqnumber() )    push_message_to_log(&receiverbased, pkt);}int ftp_probeSaveOrReplay(pkt_header * rep){  return 0;}void ftp_init(int sizeofmpipeer, int rank){  ckpt_warn = calloc(sizeofmpipeer , sizeof(int));  printi("ckpt_warn", "init of the ckpt_warn to zero");  myrank = rank;  worldsize = sizeofmpipeer;  initReceiverBased(&receiverbased, sizeofmpipeer); /* initialize sender based log */}void ftp_newOutgoingMessage(pkt_header *msg){  flitvec *fv;  printi("clsend", "will send %s", format_packet(msg));  /** we have 3 flits */  fv = newflitvec(2);    /** assert this message is NOT a checkpoint information */  msg->ftp.ckpt = htonl(pckpt_getSeqnumber());    /** flit0 : the pkt header */  newflit(&(fv->flittab[0]), msg, sizeof(pkt_header), NULL);  /** flit2 : the data */  newflit(&(fv->flittab[1]), ((char*)msg)+sizeof(pkt_header), ntohl(msg->iSize), data_sendcomplete);  /** we need to send everything, so we let cflitindex and cflitpos   *  be zero */  /** we set parameter to be the message, in order to free it in   *  dummy_sendcomplete   * free message in dummy_sendcomplete possible since a copy is made in sender based   */  fv->parameter = msg;  /** finally we add the flitvec to message ready to be send */  addtopeersend(fv);  return;}static void data_receive(flit *f, void *param){  pkt_header * pkt = (pkt_header *)param;  int pktseqnum = ntohl(pkt->ftp.ckpt);  int myseqnum  = pckpt_getSeqnumber();  int mystatus  = ckpt_status();  printi("clrecv", "myseqnum = %d, mystatus = %d, receive packet %s", myseqnum, mystatus, format_packet(pkt));  if(!( (pktseqnum == myseqnum) ||	((pktseqnum < myseqnum) && (mystatus == STATUS_CP_RUNNING) ) ||	((pktseqnum > myseqnum) && (mystatus != STATUS_CP_RUNNING) ) 	) )    {      printi("ckpt_seqnum", " Error : %d != %d &&", pktseqnum, myseqnum);      printi("ckpt_seqnum", "      && %d >= %d || %d != STATUS_CP_RUNNING(%d)", pktseqnum, myseqnum, mystatus, STATUS_CP_RUNNING);      printi("ckpt_seqnum", "      && %d <= %d || %d == STATUS_CP_RUNNING(%d)", pktseqnum, myseqnum, mystatus, STATUS_CP_RUNNING);      printq("ckpt_seqnum");    }  if (pktseqnum > myseqnum)    {      cp_request();      printi("ckpt_warn", "ckpt_warn[%d] = 1 (first checkpoint request)", ntohl(pkt->iSrc));      ckpt_warn[ntohl(pkt->iSrc)] = 1;    }  if ( (pktseqnum == myseqnum) && (mystatus == STATUS_CP_RUNNING) )    {      if(!ckpt_warn[ntohl(pkt->iSrc)])	printi("ckpt_warn", "ckpt_warn[%d] = 1 (acknowledge in cp running", ntohl(pkt->iSrc));      ckpt_warn[ntohl(pkt->iSrc)] = 1;    }  /* 0 length messages are used to force communication between peers */  if( ntohl(pkt->iType) == FORCE_CHECKPOINT )    return;  if ( (mystatus == STATUS_CP_RUNNING) && (pktseqnum < myseqnum) )    add_message_to_log(&receiverbased, pkt);  if (waiting_for_tag(ntohl(pkt->iTag)))    {      printi ("Recv", "this is the pending message");      clearpendingtag();      on_writeback(pkt);    }  else    pending_add(pkt);}flitvec *ftp_create_newfv( pkt_header *pkt ){  flitvec *res;  pkt_header *fv0;  char *data;  if( ntohl(pkt->iType) == FORCE_CHECKPOINT )    {      data_receive(NULL, pkt);      return NULL;    }  /** we use 2 flits */  res = newflitvec(2);  /** first flit is always the pkt_header */  /** pkt must be copied if further use is needed   *  (it is currently allocated in stack)   * Moreover, we allocate the buffer of reception   * @todo if you are brave: add a pipeline with driver here   */  fv0 = (pkt_header*)malloc(sizeof(pkt_header) + ntohl(pkt->iSize));  data = ((char *) fv0) + sizeof(pkt_header);  memcpy(fv0, pkt, sizeof(pkt_header));  /** it is the first flit */  newflit(&(res->flittab[0]), fv0, sizeof(pkt_header), NULL);  /** second flit is the data */  /** we  attach the data to the flit */  newflit(&(res->flittab[1]), data, ntohl(pkt->iSize), data_receive);  /** Since we already have received the first flit,   *  we don't receive it again !   */  res->cflitindex = 1;  res->cflitpos  = 0;  /** Last : fix the parameter given to dummy_receive, in addition to   *  the flit   */  res->parameter  = fv0;  return res;}int ftp_connection_starting_state(int connection ){  return CO_DONE;}/* Called when something to read during connection phase * to stop connection phase and begin communication phase, * set mpipeer[rank].hstate to CO_DONE * @param mpipeer : tab of all peer socket infos * @param rank : rank of the local mpi process * @return <0 on error, 0 on success */int ftp_connection_read(SockInfo mpipeer[], int rank){  return 0;}/* Called when something to write during connection phase * to stop connection phase and begin communication phase, * set mpipeer[rank].hstate to CO_DONE * @param mpipeer : tab of all peer socket infos * @param rank : rank of the local mpi process * @return <0 on error, 0 on success */int ftp_connection_write(SockInfo mpipeer[], int rank){  return 0;}void ftp_on_disconnect(SockInfo mpipeer[], int rank){  printi("VCL", "detected some disctonnection : bye!");  close_all_sockets();  exit (-1);}void ftp_on_soliloquize_send(pkt_header *pkt){  pkt->ftp.ckpt = htonl(pckpt_getSeqnumber());  return;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -