📄 mpirun.v2run.c
字号:
if (testOnly) {} else { if (auxEL->autoLaunch) system(commandLine); else printf("*** You need to launch the following command:\n%s\n",commandLine); } nbAuxTotal ++; auxEL = auxEL->next; } /* Then the CS */ auxCS = theJob.csList; while (auxCS != NULL) { if (strlen(auxCS->debugString) == 0) { sprintf(commandLine, "%s %s %s -g %d -p %d -d %s -i %s -auxid %d -dispatcher %s:%d &", theJob.rshCmd, auxCS->ipAddress, theJob.csCmd, theJob.jobId, auxCS->port, theJob.debugCommand, auxCS->tmp, nbAuxTotal, theJob.dispatcherIP, theJob.dispatcherPort); } else { sprintf(commandLine, "%s %s %s -g %d -p %d %s -i %s -auxid %d -dispatcher %s:%d &", theJob.rshCmd, auxCS->ipAddress, theJob.csCmd, theJob.jobId, auxCS->port, auxCS->debugString, auxCS->tmp, nbAuxTotal, theJob.dispatcherIP, theJob.dispatcherPort); } sprintf(log, "launching checkpoint server on machine %s", auxCS->ipAddress); v2logMessage(log); if (testOnly || (strlen(auxCS->debugString) != 0)) printf("%s\n", commandLine); if (!testOnly) { if (auxCS->autoLaunch) system(commandLine); else printf("#*** You need to launch the following command:\n%s &\n",commandLine); } nbAuxTotal ++; auxCS = auxCS->next; } /* Then the SC */ auxSC = theJob.scList; while (auxSC != NULL) { if (strlen(auxSC->debugString) == 0) { if (theJob.checkpointFrequency == 0) { sprintf(commandLine, "%s %s %s %d %d %d %d %s:%d %s &", theJob.rshCmd, auxSC->ipAddress, theJob.scCmd, auxSC->port, theJob.nprocs, theJob.jobId, nbAuxTotal, theJob.dispatcherIP, theJob.dispatcherPort, theJob.debugCommand); } else { sprintf(commandLine, "%s %s %s %d %d %d %d %s:%d %s -t %d 2> /dev/null &", theJob.rshCmd, auxSC->ipAddress, theJob.scCmd, auxSC->port, theJob.nprocs, theJob.jobId, nbAuxTotal, theJob.dispatcherIP, theJob.dispatcherPort, theJob.debugCommand, theJob.checkpointFrequency); } } else { if (theJob.checkpointFrequency == 0) { sprintf(commandLine, "%s %s %s %d %d %d %d %s:%d %s &", theJob.rshCmd, auxSC->ipAddress, theJob.scCmd, auxSC->port, theJob.nprocs, theJob.jobId, nbAuxTotal, theJob.dispatcherIP, theJob.dispatcherPort, auxSC->debugString); } else { sprintf(commandLine, "%s %s %s %d %d %d %d %s:%d %s -t %d 2> /dev/null &", theJob.rshCmd, auxSC->ipAddress, theJob.scCmd, auxSC->port, theJob.nprocs, theJob.jobId, nbAuxTotal, theJob.dispatcherIP, theJob.dispatcherPort, auxSC->debugString, theJob.checkpointFrequency); } } sprintf(log, "launching checkpoint scheduler on machine %s", auxSC->ipAddress); v2logMessage(log); if (testOnly || (strlen(auxSC->debugString) != 0)) printf("%s\n", commandLine); if (!testOnly) { if (auxSC->autoLaunch) system(commandLine); else printf("#*** You need to launch the following command:\n%s &\n",commandLine); } nbAuxTotal ++; auxSC = auxSC->next; } /* We wait until the rsh are completed */ /*while(!checkAuxiliaries(&theJob)); */ if (strlen(theJob.debugFile) != 0) printf("Waiting for auxiliaries to be launched before launching nodes\n"); nbAuxiliaries = 0; auxiliariesPid = (int *)malloc(nbAuxTotal * sizeof(int));/* while (nbAuxiliaries < nbAuxTotal) { if ( (acceptSocket = accept(listenSocket, (struct sockaddr *)&pin, &addrlen)) < 0 ) { printf("Could not accept socket connection from auxiliary\n"); } else { read(acceptSocket, &auxId, sizeof(int)); auxId = ntohl(auxId); read(acceptSocket, &pid1, sizeof(pid_t)); auxiliariesPid[auxId] = ntohl(pid1); close(acceptSocket); nbAuxiliaries ++; } }*/ if (strlen(theJob.debugFile) != 0) printf("All auxiliaries have been identified: nodes can now be launched\n"); /* Now that all the auxiliaries have given their IPs, time to put these IPs in the correct place *//* i = 0; auxEL = theJob.elList; while (auxEL != NULL) { auxEL->pid = auxiliariesPid[i]; auxEL = auxEL->next; i++; } auxCS = theJob.csList; while (auxCS != NULL) { auxCS->pid = auxiliariesPid[i]; auxCS = auxCS->next; i++; } auxSC = theJob.scList; while (auxSC != NULL) { auxSC->pid = auxiliariesPid[i]; auxSC = auxSC->next; i++; }*/ /* We do not need the IP used by the auxiliary servers at this point. Let's clean *//* for (i = 0; i < ipNb; i++) { free(ipList[i]); } free(ipList); ipNb = 0;*/ /* And finally, the computation nodes! */ auxCN = theJob.nodeList; while(auxCN != NULL) { nodeCommandLine(&commandLine, CHECKPOINT, &theJob, *auxCN); sprintf(log, "launching rank %d on host %s", auxCN->rank, auxCN->hostName); v2logMessage(log); if (testOnly || (strcmp(auxCN->debugString, "") != 0)) printf("%s\n", commandLine); if (!testOnly) { if (auxCN->autoLaunch) { if ( (forked = fork()) != -1) { if (forked == 0) { system(commandLine); _exit(0); } } else { printf("Error: could not fork(): aborting\n"); exit(1); } } else printf("#*** You need to launch the following command:\n%s &\n",commandLine); } auxCN = auxCN->next; }#define MAX_MSG 10 pid = (pid_t *)malloc(theJob.nprocs * sizeof(pid_t)); for (i = 0; i< theJob.nprocs; i++) { if ( (acceptSocket = accept(listenSocket, (struct sockaddr *)&pin, &addrlen)) < 0 ) { printf("Could not accept socket connection from client\n"); } else { read(acceptSocket, &rank, sizeof(int)); rank = ntohl(rank); read(acceptSocket, &pid1, sizeof(pid_t)); pid[rank] = ntohl(pid1); addConnectedNode(rank, acceptSocket); /* We then send the node list to that node, with the following form: int[5]: ip1.ip2.ip3.ip4:port -> ip1ip2ip3ip4port */ write(acceptSocket, theJob.nodeListArray, theJob.nprocs*sizeof(struct sockaddr_in)); /*close(acceptSocket);*/ } } moving = (boolean *)malloc(theJob.nprocs * sizeof(boolean)); for (i = 0; i < theJob.nprocs; i++) moving[i] = false; /* The main loop, waiting for the disconnection from all clients to end, meaning the program is finalized or aborted */ createFifoFile(&theJob); if( (fdFifo = open(theJob.fifoFile, O_NONBLOCK)) == -1) { printf("Internal problem: could not open FIFO file!\n"); bailout(&theJob); } currentConnected = (Connected *)malloc(sizeof(Connected)); while (connectedNodes != NULL) { FD_ZERO(&setOfCN); currentConnected = connectedNodes; auxCN = theJob.nodeList; while (currentConnected != NULL) { auxCN->pid = pid[auxCN->rank]; FD_SET(currentConnected->socket, &setOfCN); currentConnected = currentConnected->next; auxCN = auxCN->next; } FD_SET(fdFifo, &setOfCN); if (finalizing) { if ((ret = select(FD_SETSIZE, &setOfCN, NULL, NULL, selectTimeout)) < 0) { printf("Error in select: irrecoverable error\n"); bailout(&theJob); } } else { if ((ret = select(FD_SETSIZE, &setOfCN, NULL, NULL, NULL)) < 0) { printf("Error in select: irrecoverable error\n"); bailout(&theJob); } } currentConnected = connectedNodes; while(currentConnected != NULL) { if (FD_ISSET(currentConnected->socket, &setOfCN)) { break; } currentConnected = currentConnected->next; } if (currentConnected != NULL) { /* We have a node event! */ if ( read(currentConnected->socket, &msg, sizeof(int)) == 0) { /* If we got no message, it means it's a simple disconnection */ if (!moving[currentConnected->rank]) printf("***** A crash was detected *****\n"); if (autoRelaunch || !auxCN->autoLaunch) { printf("Relaunching rank %d\n", currentConnected->rank); fflush(stdout); relaunchByRank(currentConnected->rank, &theJob, moving[currentConnected->rank]); moving[currentConnected->rank] = false; } else { auxCN = theJob.nodeList; while(auxCN->rank != currentConnected->rank) auxCN = auxCN->next; nodeCommandLine(&commandLine, RESTART, &theJob, *auxCN); printf("In order to continue execution, you need to relaunch the node with the following command:\n\n"); printf(" %s\n\n", commandLine); } if (!moving[currentConnected->rank]) { printf("********************************\n"); fflush(stdout); } /* We reconnect to the node that has previously been disconnected */ if ( (acceptSocket = accept(listenSocket, (struct sockaddr *)&pin, &addrlen)) < 0 ) { printf("Could not accept socket connection from client\n"); } else { read(acceptSocket, &rank, sizeof(int)); rank = ntohl(rank); read(acceptSocket, &pid1, sizeof(pid_t)); pid[rank] = ntohl(pid1); currentConnected->socket = acceptSocket; write(acceptSocket, theJob.nodeListArray, theJob.nprocs*sizeof(struct sockaddr_in)); /* End of reconnection of a crashed node */ } /* If we have received a message */ } else { if (msg == FINALIZE_MSG) { finalizing = true; removeConnectedNode(currentConnected); } } } else if (FD_ISSET(fdFifo, &setOfCN)) { /* We have a FIFO file event, ie an order! */ n = read(fdFifo, buff, sizeof(buff)-1); buff[n] = '\0'; close(fdFifo); if (strncmp(buff, DISPATCHER_MOVEALL_MSG, 7) == 0) { sscanf(buff, DISPATCHER_MOVEALL_STRING, fileName); sprintf(log, "MOVEALL instruction received: file %s", fileName); v2logMessage(log); fprintf(stdout, "**** MOVEALL instruction received ****\n"); fflush(stdout); /* In fact this is more a 'move group' than a move all: no real need to move everything... */ /* the format of the file is as such: rank newHost newIP newFastIP:newPort */ if ((fd = fopen(fileName, "r")) != NULL) { while(fgets(line, 256, fd)) { sscanf(line, "%d %s %d.%d.%d.%d %d.%d.%d.%d:%d", &rank, hostName, &ip1, &ip2, &ip3, &ip4, &ip5, &ip6, &ip7, &ip8, &port); sprintf(log, " rank %d on %s (%d.%d.%d.%d/%d.%d.%d.%d:%d)", rank, hostName, ip1, ip2,ip3, ip4, ip5, ip6, ip7, ip8, port); v2logMessage(log); fprintf(stdout, " rank %d on %s (%d.%d.%d.%d/%d.%d.%d.%d:%d)\n", rank, hostName, ip1, ip2,ip3, ip4, ip5, ip6, ip7, ip8, port); fflush(stdout); moving[rank] = true; sprintf(newIP, "%d.%d.%d.%d", ip1, ip2, ip3, ip4); sprintf(newFastIP, "%d.%d.%d.%d", ip5, ip6, ip7, ip8); moveExecutionOneNode(&theJob, rank, hostName, newIP, newFastIP, port); } fclose(fd); } else { sprintf(log, "Unable to open file %s: MOVEALL aborted", fileName); v2logMessage(log); fprintf(stdout, " Unable to open instruction file %s: aborting MOVEALL\n", fileName); fflush(stdout); } } else if (strncmp(buff, DISPATCHER_MOVE_MSG, 4) == 0) { sscanf(buff, DISPATCHER_MOVE_STRING, &rank, hostName, &ip1, &ip2, &ip3, &ip4, &ip5, &ip6, &ip7, &ip8, &port); sprintf(newIP, "%d.%d.%d.%d", ip1, ip2, ip3, ip4); sprintf(newFastIP, "%d.%d.%d.%d", ip5, ip6, ip7, ip8); sprintf(log, "MOVE instruction received: rank %d on %s (%d.%d.%d.%d/%d.%d.%d.%d:%d)", rank, hostName, ip1, ip2, ip3, ip4, ip5, ip6, ip7, ip8, port); v2logMessage(log); moving[rank] = true; fprintf(stdout, "**** MOVE'ing rank %d on %s ****\n", rank, hostName); moveExecutionOneNode(&theJob, rank, hostName, newIP, newFastIP, port); } else { printf("Unknown command: %s\n", buff); } if( (fdFifo = open(theJob.fifoFile, O_NONBLOCK)) == -1) { printf("Internal problem: could not open FIFO file!\n"); bailout(&theJob); } } else { /* Time out on the select() */ v2logMessage("Timeout detected on MPI_Finalize(): killing remaining running nodes"); connectedArray = (boolean *)malloc(theJob.nprocs*sizeof(boolean)); for (i = 0; i < theJob.nprocs; i++) connectedArray[i] = false; currentConnected = connectedNodes; while (currentConnected != NULL) { connectedArray[currentConnected->rank] = true; ccAux = currentConnected->next; removeConnectedNode(currentConnected); currentConnected = ccAux; } auxCN = theJob.nodeList; while (auxCN != NULL) { /* We kill only the ones that were still connected */ if (connectedArray[auxCN->rank]) killComputingNode(&theJob, auxCN); auxCN = auxCN->next; } free(connectedArray); } } killAllAuxiliaries(&theJob); cleanAuxiliaryFiles(&theJob); sprintf(log, "Execution of program %s finished", theJob.v2pgFile); v2logMessage(log); free(currentConnected); free(connectedNodes); cleanMemory(); free(commandLine); free(v2availfile); free(v2cmdfile); exit(0); return (0);}void bailout(JS * js) { CN * auxCN; /* This is the moment to clean a bit... */ /* The auxiliaries */ killAllAuxiliaries(js); /* The nodes */ auxCN = js->nodeList; while (auxCN != NULL) { if (auxCN->pid != 0) killComputingNode(js, auxCN); auxCN = auxCN->next; } exit(-1);}void cleanMemory() { AP * ca; int i; /* Arguments, tmp and so on... */ /* Array of IP addresses */ for (i = 0; i < ipNb; i++) { free(ipList[i]); } free(ipList);}void cleanELCS(JS *js) { char * commandLine; char log[1024]; commandLine = (char *)calloc(4096, sizeof(char)); /* We need to clean the CS and EL which are still running */ sprintf(log, "Cleaning EL and CS from the nodes"); v2logMessage(log); sprintf(commandLine, "%s %d %s", js->killCmd, js->jobId, js->v2pgFile); system(commandLine); free(commandLine);}void v2Error(char * msg) { v2logMessage(msg); v2logMessage("Quitting"); exit(0);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -