📄 derniermscmpirun.c
字号:
boolean *moving; boolean *connectedArray; CN *auxCN; struct timeval selectTimeout; boolean finalizing = false; boolean autoRelaunch = false; int msg; struct jobSpecifications theJob; logging = true; selectTimeout.tv_sec = FINALIZE_TIMEOUT; selectTimeout.tv_usec = 0; if (!checkArguments (argc, argv, &theJob)) { return 1; } parseProgramFile (&theJob); auxCN = (CN *) malloc (sizeof (CN)); if (strlen (theJob.debugFile) != 0) parseDebugFile (&theJob); /* We get the local IP address */ if (gethostname (srvName, (size_t) HOST_NAME_SIZE) < 0) { printf ("Error: could not get hostname\n"); exit (1); } if ((myAddress = gethostbyname (srvName)) == 0) qerror ("gethostbyname"); strcpy (theJob.dispatcherIP, (char *) inet_ntoa (*(struct in_addr *) myAddress->h_addr)); v2cmdfile = (char *) calloc (strlen (theJob.v2pgFile) + 9, sizeof (char)); sprintf (v2cmdfile, "%s.commands", theJob.v2pgFile); parseCommandsFile (v2cmdfile, &theJob); free (v2cmdfile); v2availfile = (char *) calloc (strlen (theJob.v2pgFile) + 7, sizeof (char)); sprintf (v2availfile, "%s.avail", theJob.v2pgFile); parseAvailableFile (v2availfile); sprintf (log, "Starting program %s", theJob.v2pgFile); v2logMessage (log); /* We already start listening on the connection socket in order to get the PID's from the auxiliary programs */ address.sin_family = AF_INET; address.sin_addr.s_addr = htonl (INADDR_ANY); address.sin_port = htons (theJob.dispatcherPort); if ((listenSocket = socket (AF_INET, SOCK_STREAM, 0)) < 0) { printf ("Socket could not be created\n"); bailout (&theJob); } { int i = !0; if (setsockopt (listenSocket, SOL_SOCKET, SO_REUSEADDR, &i, sizeof (int)) < 0) qerror ("setting reuse of non listening binded addr"); } while (bind (listenSocket, ((struct sockaddr *) &address), sizeof (struct sockaddr_in)) == -1) { fprintf (stderr, "Bind on %s:%d could not be performed: (%s) trying in 1 second\n", inet_ntoa (address.sin_addr), ntohs (address.sin_port), strerror (errno)); sleep (1); } if (listen (listenSocket, MAX_LISTEN_WAIT) == -1) { close (listenSocket); printf ("Could not listen on socket\n"); bailout (&theJob); } if (mscServer) connectToMaster (theJob.masterIP, theJob.masterPort); launchAuxiliaryPrograms (&theJob, &nbAuxTotal); /* We wait until the rsh are completed */ waitConnectionFromAuxiliaries (&theJob, nbAuxTotal, auxiliariesPid, listenSocket); /* And finally, the computation nodes! */ /*@todo dans protocol attendre l'ordre job */ launchComputingNodes (&theJob); pid = (pid_t *) malloc (theJob.nprocs * sizeof (pid_t)); waitConnectionFromComputingNodes (&theJob, listenSocket, pid); moving = (boolean *) malloc (theJob.nprocs * sizeof (boolean)); for (i = 0; i < theJob.nprocs; i++) moving[i] = false; /* The main loop, waiting for the disconnection from all clients to end, meaning the program is finalized or aborted */ createFifoFile (&theJob); if ((fdFifo = open (theJob.fifoFile, O_NONBLOCK)) == -1) { printf ("Internal problem: could not open FIFO file!\n"); bailout (&theJob); } while (connectedNodes != NULL) { FD_ZERO (&setOfCN); currentConnected = connectedNodes; auxCN = theJob.nodeList; while (currentConnected != NULL) { /* auxCN->pid = pid[auxCN->rank]; */ FD_SET (currentConnected->socket, &setOfCN); currentConnected = currentConnected->next; auxCN = auxCN->next; } FD_SET (fdFifo, &setOfCN); /*@todo for gang goto here for disconnection of all nodes */ if (finalizing) { if ((ret = select (FD_SETSIZE, &setOfCN, NULL, NULL, &selectTimeout)) < 0) { printf ("Error in select: irrecoverable error\n"); bailout (&theJob); } } else { if ((ret = select (FD_SETSIZE, &setOfCN, NULL, NULL, NULL)) < 0) { printf ("Error in select: irrecoverable error\n"); bailout (&theJob); } } currentConnected = connectedNodes; while (currentConnected != NULL) { if (FD_ISSET (currentConnected->socket, &setOfCN)) { break; } currentConnected = currentConnected->next; } if (currentConnected != NULL) { /* We have a node event! */ if (read (currentConnected->socket, &msg, sizeof (int)) == 0) { /* If we got no message, it means it's a simple disconnection */ if (!moving[currentConnected->rank]) printf ("***** A crash was detected *****\n"); if (mscServer) { check_all_disconnected(&theJob, listenSocket, pid, connectedNodes); } else { auxCN = theJob.nodeList; while (auxCN->rank != currentConnected->rank) auxCN = auxCN->next; if (autoRelaunch || auxCN->autoLaunch) { fprintf (stdout, "Relaunching rank %d\n", currentConnected->rank); fflush (stdout); relaunchByRank (currentConnected->rank, &theJob, moving[currentConnected->rank]); moving[currentConnected->rank] = false; } else { auxCN = theJob.nodeList; while (auxCN->rank != currentConnected->rank) auxCN = auxCN->next; nodeCommandLine (commandLine, RESTART, &theJob, *auxCN); printf ("In order to continue execution, you need to relaunch the node with the following command:\n\n"); printf (" %s\n\n", commandLine); } if (!moving[currentConnected->rank]) { printf ("********************************\n"); fflush (stdout); } /* We reconnect to the node that has previously been disconnected */ if ((acceptSocket = accept (listenSocket, (struct sockaddr *) &pin, &addrlen)) < 0) { printf ("Could not accept socket connection from client\n"); } else { read (acceptSocket, &rank, sizeof (int)); rank = ntohl (rank); read (acceptSocket, &pid1, sizeof (pid_t)); pid[rank] = ntohl (pid1); currentConnected->socket = acceptSocket; write (acceptSocket, theJob.nodeListArray, theJob.nprocs * sizeof (struct sockaddr_in)); /* End of reconnection of a crashed node */ } } /* If we have received a message */ } else { if (msg == FINALIZE_MSG) { finalizing = true; currentConnected->finalized = true; check_sync_finalize (); } } } else if (FD_ISSET (fdFifo, &setOfCN)) { /* We have a FIFO file event, ie an order! */ n = read (fdFifo, buff, sizeof (buff) - 1); buff[n] = '\0'; close (fdFifo); if (strncmp (buff, DISPATCHER_MOVEALL_MSG, 7) == 0) { sscanf (buff, DISPATCHER_MOVEALL_STRING, fileName); sprintf (log, "MOVEALL instruction received: file %s", fileName); v2logMessage (log); fprintf (stdout, "**** MOVEALL instruction received ****\n"); fflush (stdout); /* In fact this is more a 'move group' than a move all: no real need to move everything... */ /* the format of the file is as such: rank newHost newIP newFastIP:newPort */ if ((fd = fopen (fileName, "r")) != NULL) { while (fgets (line, 256, fd)) { sscanf (line, "%d %s %d.%d.%d.%d %d.%d.%d.%d:%d", &rank, hostName, &ip1, &ip2, &ip3, &ip4, &ip5, &ip6, &ip7, &ip8, &port); sprintf (log, " rank %d on %s (%d.%d.%d.%d/%d.%d.%d.%d:%d)", rank, hostName, ip1, ip2, ip3, ip4, ip5, ip6, ip7, ip8, port); v2logMessage (log); fprintf (stdout, " rank %d on %s (%d.%d.%d.%d/%d.%d.%d.%d:%d)\n", rank, hostName, ip1, ip2, ip3, ip4, ip5, ip6, ip7, ip8, port); fflush (stdout); moving[rank] = true; sprintf (newIP, "%d.%d.%d.%d", ip1, ip2, ip3, ip4); sprintf (newFastIP, "%d.%d.%d.%d", ip5, ip6, ip7, ip8); moveExecutionOneNode (&theJob, rank, hostName, newIP, newFastIP, port); } fclose (fd); } else { sprintf (log, "Unable to open file %s: MOVEALL aborted", fileName); v2logMessage (log); fprintf (stdout, " Unable to open instruction file %s: aborting MOVEALL\n", fileName); fflush (stdout); } } else if (strncmp (buff, DISPATCHER_MOVE_MSG, 4) == 0) { sscanf (buff, DISPATCHER_MOVE_STRING, &rank, hostName, &ip1, &ip2, &ip3, &ip4, &ip5, &ip6, &ip7, &ip8, &port); sprintf (newIP, "%d.%d.%d.%d", ip1, ip2, ip3, ip4); sprintf (newFastIP, "%d.%d.%d.%d", ip5, ip6, ip7, ip8); sprintf (log, "MOVE instruction received: rank %d on %s (%d.%d.%d.%d/%d.%d.%d.%d:%d)", rank, hostName, ip1, ip2, ip3, ip4, ip5, ip6, ip7, ip8, port); v2logMessage (log); moving[rank] = true; fprintf (stdout, "**** MOVE'ing rank %d on %s ****\n", rank, hostName); moveExecutionOneNode (&theJob, rank, hostName, newIP, newFastIP, port); } else { printf ("Unknown command: %s\n", buff); } if ((fdFifo = open (theJob.fifoFile, O_NONBLOCK)) == -1) { printf ("Internal problem: could not open FIFO file!\n"); bailout (&theJob); } } else { /* Time out on the select() */ v2logMessage ("Timeout detected on MPI_Finalize(): killing remaining running nodes"); connectedArray = (boolean *) malloc (theJob.nprocs * sizeof (boolean)); for (i = 0; i < theJob.nprocs; i++) connectedArray[i] = false; currentConnected = connectedNodes; while (currentConnected != NULL) { connectedArray[currentConnected->rank] = true; ccAux = currentConnected->next; removeConnectedNode (currentConnected); currentConnected = ccAux; } auxCN = theJob.nodeList; while (auxCN != NULL) { /* We kill only the ones that were still connected */ if (connectedArray[auxCN->rank]) killComputingNode (&theJob, auxCN); auxCN = auxCN->next; } free (connectedArray); } } killAllAuxiliaries (&theJob); cleanAuxiliaryFiles (&theJob); sprintf (log, "Execution of program %s finished", theJob.v2pgFile); v2logMessage (log); exit (0); return (0);}voidbailout (JS * js){ CN *auxCN; /* This is the moment to clean a bit... */ /* The auxiliaries */ killAllAuxiliaries (js); /* The nodes */ auxCN = js->nodeList; while (auxCN != NULL) { if (auxCN->pid != 0) killComputingNode (js, auxCN); auxCN = auxCN->next; } exit (-1);}voidcleanMemory (){ AP *ca; int i; /* Arguments, tmp and so on... */ /* Array of IP addresses */ for (i = 0; i < ipNb; i++) { free (ipList[i]); } free (ipList);}voidcleanELCS (JS * js){ char *commandLine; char log[1024]; commandLine = (char *) calloc (4096, sizeof (char)); /* We need to clean the CS and EL which are still running */ sprintf (log, "Cleaning EL and CS from the nodes"); v2logMessage (log); sprintf (commandLine, "%s %d %s", js->killCmd, js->jobId, js->v2pgFile); system (commandLine); free (commandLine);}voidv2Error (char *msg){ v2logMessage (msg); v2logMessage ("Quitting"); exit (0);}void setcurrentConnected ( int rank, int acceptSocket){ Connected * currentNode; currentNode = connectedNodes; while (currentNode->rank != rank) currentNode = currentNode->next; currentNode->socket = acceptSocket;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -