⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 derniermscmpirun.c

📁 Path MPICH-V for MPICH the MPI Implementation
💻 C
📖 第 1 页 / 共 2 页
字号:
  boolean *moving;  boolean *connectedArray;  CN *auxCN;  struct timeval selectTimeout;  boolean finalizing = false;  boolean autoRelaunch = false;  int msg;  struct jobSpecifications theJob;  logging = true;  selectTimeout.tv_sec = FINALIZE_TIMEOUT;  selectTimeout.tv_usec = 0;  if (!checkArguments (argc, argv, &theJob))    {      return 1;    }  parseProgramFile (&theJob);  auxCN = (CN *) malloc (sizeof (CN));  if (strlen (theJob.debugFile) != 0)    parseDebugFile (&theJob);  /* We get the local IP address */  if (gethostname (srvName, (size_t) HOST_NAME_SIZE) < 0)    {      printf ("Error: could not get hostname\n");      exit (1);    }  if ((myAddress = gethostbyname (srvName)) == 0)    qerror ("gethostbyname");  strcpy (theJob.dispatcherIP,	  (char *) inet_ntoa (*(struct in_addr *) myAddress->h_addr));  v2cmdfile = (char *) calloc (strlen (theJob.v2pgFile) + 9, sizeof (char));  sprintf (v2cmdfile, "%s.commands", theJob.v2pgFile);  parseCommandsFile (v2cmdfile, &theJob);  free (v2cmdfile);  v2availfile = (char *) calloc (strlen (theJob.v2pgFile) + 7, sizeof (char));  sprintf (v2availfile, "%s.avail", theJob.v2pgFile);  parseAvailableFile (v2availfile);  sprintf (log, "Starting program %s", theJob.v2pgFile);  v2logMessage (log);  /* We already start listening on the connection socket in order to get the PID's from the auxiliary programs */  address.sin_family = AF_INET;  address.sin_addr.s_addr = htonl (INADDR_ANY);  address.sin_port = htons (theJob.dispatcherPort);  if ((listenSocket = socket (AF_INET, SOCK_STREAM, 0)) < 0)    {      printf ("Socket could not be created\n");      bailout (&theJob);    }  {    int i = !0;    if (setsockopt (listenSocket, SOL_SOCKET, SO_REUSEADDR, &i, sizeof (int))	< 0)      qerror ("setting reuse of non listening binded addr");  }  while (bind	 (listenSocket, ((struct sockaddr *) &address),	  sizeof (struct sockaddr_in)) == -1)    {      fprintf (stderr,	       "Bind on %s:%d could not be performed: (%s) trying in 1 second\n",	       inet_ntoa (address.sin_addr), ntohs (address.sin_port),	       strerror (errno));      sleep (1);    }  if (listen (listenSocket, MAX_LISTEN_WAIT) == -1)    {      close (listenSocket);      printf ("Could not listen on socket\n");      bailout (&theJob);    }  if (mscServer)    connectToMaster (theJob.masterIP, theJob.masterPort);  launchAuxiliaryPrograms (&theJob, &nbAuxTotal);  /* We wait until the rsh are completed */  waitConnectionFromAuxiliaries (&theJob, nbAuxTotal, auxiliariesPid,				 listenSocket);  /*     And finally, the computation nodes!   */  /*@todo dans protocol attendre l'ordre job */  launchComputingNodes (&theJob);  pid = (pid_t *) malloc (theJob.nprocs * sizeof (pid_t));  waitConnectionFromComputingNodes (&theJob, listenSocket, pid);  moving = (boolean *) malloc (theJob.nprocs * sizeof (boolean));  for (i = 0; i < theJob.nprocs; i++)    moving[i] = false;  /*     The main loop, waiting for the disconnection from all clients to end,     meaning the program is finalized or aborted   */  createFifoFile (&theJob);  if ((fdFifo = open (theJob.fifoFile, O_NONBLOCK)) == -1)    {      printf ("Internal problem: could not open FIFO file!\n");      bailout (&theJob);    }  while (connectedNodes != NULL)    {      FD_ZERO (&setOfCN);      currentConnected = connectedNodes;      auxCN = theJob.nodeList;      while (currentConnected != NULL)	{	  /*  auxCN->pid = pid[auxCN->rank]; */	  FD_SET (currentConnected->socket, &setOfCN);	  currentConnected = currentConnected->next;	  auxCN = auxCN->next;	}      FD_SET (fdFifo, &setOfCN);      /*@todo for gang goto here for disconnection of all nodes */      if (finalizing)	{	  if ((ret =	       select (FD_SETSIZE, &setOfCN, NULL, NULL, &selectTimeout)) < 0)	    {	      printf ("Error in select: irrecoverable error\n");	      bailout (&theJob);	    }	}      else	{	  if ((ret = select (FD_SETSIZE, &setOfCN, NULL, NULL, NULL)) < 0)	    {	      printf ("Error in select: irrecoverable error\n");	      bailout (&theJob);	    }	}      currentConnected = connectedNodes;      while (currentConnected != NULL)	{	  if (FD_ISSET (currentConnected->socket, &setOfCN))	    {	      break;	    }	  currentConnected = currentConnected->next;	}      if (currentConnected != NULL)	{			/* We have a node event! */	  if (read (currentConnected->socket, &msg, sizeof (int)) == 0)	    {	      /* If we got no message, it means it's a simple disconnection */	      if (!moving[currentConnected->rank])                printf ("***** A crash was detected *****\n");                            if (mscServer)                {                                   check_all_disconnected(&theJob, listenSocket, pid, connectedNodes);                }                             else                {                  auxCN = theJob.nodeList;                  while (auxCN->rank != currentConnected->rank)                    auxCN = auxCN->next;                                    if (autoRelaunch || auxCN->autoLaunch)                    {                      fprintf (stdout, "Relaunching rank %d\n",                               currentConnected->rank);                      fflush (stdout);                      relaunchByRank (currentConnected->rank, &theJob,                                      moving[currentConnected->rank]);                      moving[currentConnected->rank] = false;                    }                  else                    {                      auxCN = theJob.nodeList;                      while (auxCN->rank != currentConnected->rank)                        auxCN = auxCN->next;                      nodeCommandLine (commandLine, RESTART, &theJob, *auxCN);                      printf                        ("In order to continue execution, you need to relaunch the node with the following command:\n\n");                      printf ("  %s\n\n", commandLine);                    }                  if (!moving[currentConnected->rank])                    {                      printf ("********************************\n");                      fflush (stdout);                    }                  /* We reconnect to the node that has previously been disconnected */                  if ((acceptSocket =                       accept (listenSocket, (struct sockaddr *) &pin,                               &addrlen)) < 0)                    {                      printf ("Could not accept socket connection from client\n");                    }                  else                    {                      read (acceptSocket, &rank, sizeof (int));                      rank = ntohl (rank);                      read (acceptSocket, &pid1, sizeof (pid_t));                      pid[rank] = ntohl (pid1);                      currentConnected->socket = acceptSocket;                      write (acceptSocket, theJob.nodeListArray,                             theJob.nprocs * sizeof (struct sockaddr_in));                      /* End of reconnection of a crashed node */                    }                }                  /* If we have received a message */	    }	  else	    {	      if (msg == FINALIZE_MSG)		{		  finalizing = true;		  currentConnected->finalized = true;		  check_sync_finalize ();		}	    }	}      else if (FD_ISSET (fdFifo, &setOfCN))	{			/* We have a FIFO file event, ie an order! */	  n = read (fdFifo, buff, sizeof (buff) - 1);	  buff[n] = '\0';	  close (fdFifo);	  if (strncmp (buff, DISPATCHER_MOVEALL_MSG, 7) == 0)	    {	      sscanf (buff, DISPATCHER_MOVEALL_STRING, fileName);	      sprintf (log, "MOVEALL instruction received: file %s",		       fileName);	      v2logMessage (log);	      fprintf (stdout, "**** MOVEALL instruction received ****\n");	      fflush (stdout);	      /* In fact this is more a 'move group' than a move all: no real need to move everything... */	      /* the format of the file is as such: rank newHost newIP newFastIP:newPort */	      if ((fd = fopen (fileName, "r")) != NULL)		{		  while (fgets (line, 256, fd))		    {		      sscanf (line, "%d %s %d.%d.%d.%d %d.%d.%d.%d:%d", &rank,			      hostName, &ip1, &ip2, &ip3, &ip4, &ip5, &ip6,			      &ip7, &ip8, &port);		      sprintf (log,			       " rank %d on %s (%d.%d.%d.%d/%d.%d.%d.%d:%d)",			       rank, hostName, ip1, ip2, ip3, ip4, ip5, ip6,			       ip7, ip8, port);		      v2logMessage (log);		      fprintf (stdout,			       " rank %d on %s (%d.%d.%d.%d/%d.%d.%d.%d:%d)\n",			       rank, hostName, ip1, ip2, ip3, ip4, ip5, ip6,			       ip7, ip8, port);		      fflush (stdout);		      moving[rank] = true;		      sprintf (newIP, "%d.%d.%d.%d", ip1, ip2, ip3, ip4);		      sprintf (newFastIP, "%d.%d.%d.%d", ip5, ip6, ip7, ip8);		      moveExecutionOneNode (&theJob, rank, hostName, newIP,					    newFastIP, port);		    }		  fclose (fd);		}	      else		{		  sprintf (log, "Unable to open file %s: MOVEALL aborted",			   fileName);		  v2logMessage (log);		  fprintf (stdout,			   " Unable to open instruction file %s: aborting MOVEALL\n",			   fileName);		  fflush (stdout);		}	    }	  else if (strncmp (buff, DISPATCHER_MOVE_MSG, 4) == 0)	    {	      sscanf (buff, DISPATCHER_MOVE_STRING, &rank, hostName, &ip1,		      &ip2, &ip3, &ip4, &ip5, &ip6, &ip7, &ip8, &port);	      sprintf (newIP, "%d.%d.%d.%d", ip1, ip2, ip3, ip4);	      sprintf (newFastIP, "%d.%d.%d.%d", ip5, ip6, ip7, ip8);	      sprintf (log,		       "MOVE instruction received: rank %d on %s (%d.%d.%d.%d/%d.%d.%d.%d:%d)",		       rank, hostName, ip1, ip2, ip3, ip4, ip5, ip6, ip7, ip8,		       port);	      v2logMessage (log);	      moving[rank] = true;	      fprintf (stdout, "**** MOVE'ing rank %d on %s ****\n", rank,		       hostName);	      moveExecutionOneNode (&theJob, rank, hostName, newIP, newFastIP,				    port);	    }	  else	    {	      printf ("Unknown command: %s\n", buff);	    }	  if ((fdFifo = open (theJob.fifoFile, O_NONBLOCK)) == -1)	    {	      printf ("Internal problem: could not open FIFO file!\n");	      bailout (&theJob);	    }	}      else	{			/* Time out on the select() */	  v2logMessage	    ("Timeout detected on MPI_Finalize(): killing remaining running nodes");	  connectedArray =	    (boolean *) malloc (theJob.nprocs * sizeof (boolean));	  for (i = 0; i < theJob.nprocs; i++)	    connectedArray[i] = false;	  currentConnected = connectedNodes;	  while (currentConnected != NULL)	    {	      connectedArray[currentConnected->rank] = true;	      ccAux = currentConnected->next;	      removeConnectedNode (currentConnected);	      currentConnected = ccAux;	    }	  auxCN = theJob.nodeList;	  while (auxCN != NULL)	    {	      /* We kill only the ones that were still connected */	      if (connectedArray[auxCN->rank])		killComputingNode (&theJob, auxCN);	      auxCN = auxCN->next;	    }	  free (connectedArray);	}    }  killAllAuxiliaries (&theJob);  cleanAuxiliaryFiles (&theJob);  sprintf (log, "Execution of program %s finished", theJob.v2pgFile);  v2logMessage (log);  exit (0);  return (0);}voidbailout (JS * js){  CN *auxCN;  /* This is the moment to clean a bit... */  /* The auxiliaries */  killAllAuxiliaries (js);  /* The nodes */  auxCN = js->nodeList;  while (auxCN != NULL)    {      if (auxCN->pid != 0)	killComputingNode (js, auxCN);      auxCN = auxCN->next;    }  exit (-1);}voidcleanMemory (){  AP *ca;  int i;  /* Arguments, tmp and so on... */  /* Array of IP addresses */  for (i = 0; i < ipNb; i++)    {      free (ipList[i]);    }  free (ipList);}voidcleanELCS (JS * js){  char *commandLine;  char log[1024];  commandLine = (char *) calloc (4096, sizeof (char));  /*     We need to clean the CS and EL which are still running   */  sprintf (log, "Cleaning EL and CS from the nodes");  v2logMessage (log);  sprintf (commandLine, "%s %d %s", js->killCmd, js->jobId, js->v2pgFile);  system (commandLine);  free (commandLine);}voidv2Error (char *msg){  v2logMessage (msg);  v2logMessage ("Quitting");  exit (0);}void  setcurrentConnected ( int rank, int acceptSocket){  Connected * currentNode;  currentNode = connectedNodes;   while (currentNode->rank != rank)    currentNode = currentNode->next;            currentNode->socket = acceptSocket;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -