📄 originempirun.vrun.c
字号:
/* The main!*/int main (int argc, char* argv[]) { char * v2cmdfile; char * v2availfile; int i; char commandLine[BUFF_SIZE]; char log[BUFF_SIZE]; char srvName[HOST_NAME_SIZE]; struct hostent * myAddress; Connected * currentConnected; Connected * ccAux; fd_set setOfCN; int fdFifo; int ret, retListen, maxFd; int listenSocket, acceptSocket; struct sockaddr_in address; struct sockaddr_in pin; int addrlen; char buff[BUFF_SIZE]; int rank; int port; int ip1, ip2, ip3, ip4, ip5, ip6, ip7, ip8; char newIP[IP_LENGTH]; char newFastIP[IP_LENGTH]; char fileName[BUFF_SIZE]; char hostName[HOST_NAME_SIZE]; char line[BUFF_SIZE]; FILE * fd; pid_t forked, pid1; pid_t * pid; pid_t * auxiliariesPid; int nbAuxiliaries; int nbAuxTotal; int n; boolean * moving; boolean * connectedArray; CN * auxCN; struct timeval selectTimeout; boolean finalizing = false; boolean autoRelaunch = false; int msg; struct jobSpecifications theJob; logging = true; selectTimeout.tv_sec = FINALIZE_TIMEOUT; selectTimeout.tv_usec = 0; if (!checkArguments(argc, argv, &theJob)) { return 1; } parseProgramFile(&theJob); auxCN = (CN *)malloc(sizeof(CN)); if (strlen(theJob.debugFile) != 0) parseDebugFile(&theJob); /* We get the local IP address */ if (gethostname(srvName, (size_t)HOST_NAME_SIZE) < 0) { printf("Error: could not get hostname\n"); exit(1); } if((myAddress = gethostbyname(srvName)) == 0) qerror("gethostbyname"); /*printf("struct hostent {\n\ char *h_name; =%s\n\ int h_length; =%d\n\ char **h_addr_list; =%s\n\ }\n", myAddress->h_name, myAddress->h_length, myAddress->h_addr); */ strcpy(theJob.dispatcherIP, (char *)inet_ntoa(*(struct in_addr *)myAddress->h_addr)); v2cmdfile = (char *)calloc(strlen(theJob.v2pgFile)+9,sizeof(char)); sprintf(v2cmdfile,"%s.commands",theJob.v2pgFile); parseCommandsFile(v2cmdfile, &theJob); free(v2cmdfile); v2availfile = (char *)calloc(strlen(theJob.v2pgFile)+7,sizeof(char)); sprintf(v2availfile,"%s.avail", theJob.v2pgFile); parseAvailableFile(v2availfile); sprintf(log, "Starting program %s", theJob.v2pgFile); v2logMessage(log); /* We already start listening on the connection socket in order to get the PID's from the auxiliary programs */ address.sin_family = AF_INET; address.sin_addr.s_addr = htonl(INADDR_ANY); address.sin_port = htons(theJob.dispatcherPort); if ((listenSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) { printf("Socket could not be created\n"); bailout(&theJob); } { int i = !0; if (setsockopt (listenSocket, SOL_SOCKET, SO_REUSEADDR, &i, sizeof (int)) < 0) qerror ("setting reuse of non listening binded addr"); } while (bind(listenSocket, ((struct sockaddr*)&address), sizeof(struct sockaddr_in)) == -1) { fprintf(stderr, "Bind on %s:%d could not be performed: (%s) trying in 1 second\n", inet_ntoa(address.sin_addr), ntohs(address.sin_port), strerror(errno) ); sleep(1); } if (listen(listenSocket, MAX_LISTEN_WAIT) == -1) { close(listenSocket); printf("Could not listen on socket\n"); bailout(&theJob); } launchAuxiliaryPrograms(&theJob, &nbAuxTotal); /* We wait until the rsh are completed */ waitConnectionFromAuxiliaries(&theJob, nbAuxTotal, auxiliariesPid, listenSocket); /* And finally, the computation nodes! */ /*@todo for gang goto here for launch next job*/ launchComputingNodes(&theJob); pid = (pid_t *)malloc(theJob.nprocs * sizeof(pid_t)); waitConnectionFromComputingNodes(&theJob, listenSocket, pid); moving = (boolean *)malloc(theJob.nprocs * sizeof(boolean)); for (i = 0; i < theJob.nprocs; i++) moving[i] = false; /* The main loop, waiting for the disconnection from all clients to end, meaning the program is finalized or aborted */ createFifoFile(&theJob); if( (fdFifo = open(theJob.fifoFile, O_NONBLOCK)) == -1) { printf("Internal problem: could not open FIFO file!\n"); bailout(&theJob); } while (connectedNodes != NULL) { FD_ZERO(&setOfCN); currentConnected = connectedNodes; auxCN = theJob.nodeList; while (currentConnected != NULL) { /* auxCN->pid = pid[auxCN->rank]; */ FD_SET(currentConnected->socket, &setOfCN); currentConnected = currentConnected->next; auxCN = auxCN->next; } FD_SET(fdFifo, &setOfCN); /*@todo for gang goto here for disconnection of all nodes*/ if (finalizing) { if ((ret = select(FD_SETSIZE, &setOfCN, NULL, NULL, &selectTimeout)) < 0) { printf("Error in select: irrecoverable error\n"); bailout(&theJob); } } else { if ((ret = select(FD_SETSIZE, &setOfCN, NULL, NULL, NULL)) < 0) { printf("Error in select: irrecoverable error\n"); bailout(&theJob); } } currentConnected = connectedNodes; while(currentConnected != NULL) { if (FD_ISSET(currentConnected->socket, &setOfCN)) { break; } currentConnected = currentConnected->next; } if (currentConnected != NULL) { /* We have a node event! */ if ( read(currentConnected->socket, &msg, sizeof(int)) == 0) { /* If we got no message, it means it's a simple disconnection */ if (!moving[currentConnected->rank]) printf("***** A crash was detected *****\n"); /*@todo check if all nodes are disconnected for gangSched if yes goto de begining of loop */ auxCN = theJob.nodeList; while(auxCN->rank != currentConnected->rank) auxCN = auxCN->next; if (autoRelaunch || auxCN->autoLaunch) { fprintf(stdout, "Relaunching rank %d\n", currentConnected->rank); fflush(stdout); relaunchByRank(currentConnected->rank, &theJob, moving[currentConnected->rank]); moving[currentConnected->rank] = false; } else { auxCN = theJob.nodeList; while(auxCN->rank != currentConnected->rank) auxCN = auxCN->next; nodeCommandLine(commandLine, RESTART, &theJob, *auxCN); printf("In order to continue execution, you need to relaunch the node with the following command:\n\n"); printf(" %s\n\n", commandLine); } if (!moving[currentConnected->rank]) { printf("********************************\n"); fflush(stdout); } /* We reconnect to the node that has previously been disconnected */ if ( (acceptSocket = accept(listenSocket, (struct sockaddr *)&pin, &addrlen)) < 0 ) { printf("Could not accept socket connection from client\n"); } else { read(acceptSocket, &rank, sizeof(int)); rank = ntohl(rank); read(acceptSocket, &pid1, sizeof(pid_t)); pid[rank] = ntohl(pid1); currentConnected->socket = acceptSocket; write(acceptSocket, theJob.nodeListArray, theJob.nprocs*sizeof(struct sockaddr_in)); /* End of reconnection of a crashed node */ } /* If we have received a message */ } else { if (msg == FINALIZE_MSG) { finalizing = true; currentConnected->finalized = true; check_sync_finalize(); } } } else if (FD_ISSET(fdFifo, &setOfCN)) { /* We have a FIFO file event, ie an order! */ n = read(fdFifo, buff, sizeof(buff)-1); buff[n] = '\0'; close(fdFifo); if (strncmp(buff, DISPATCHER_MOVEALL_MSG, 7) == 0) { sscanf(buff, DISPATCHER_MOVEALL_STRING, fileName); sprintf(log, "MOVEALL instruction received: file %s", fileName); v2logMessage(log); fprintf(stdout, "**** MOVEALL instruction received ****\n"); fflush(stdout); /* In fact this is more a 'move group' than a move all: no real need to move everything... */ /* the format of the file is as such: rank newHost newIP newFastIP:newPort */ if ((fd = fopen(fileName, "r")) != NULL) { while(fgets(line, 256, fd)) { sscanf(line, "%d %s %d.%d.%d.%d %d.%d.%d.%d:%d", &rank, hostName, &ip1, &ip2, &ip3, &ip4, &ip5, &ip6, &ip7, &ip8, &port); sprintf(log, " rank %d on %s (%d.%d.%d.%d/%d.%d.%d.%d:%d)", rank, hostName, ip1, ip2,ip3, ip4, ip5, ip6, ip7, ip8, port); v2logMessage(log); fprintf(stdout, " rank %d on %s (%d.%d.%d.%d/%d.%d.%d.%d:%d)\n", rank, hostName, ip1, ip2,ip3, ip4, ip5, ip6, ip7, ip8, port); fflush(stdout); moving[rank] = true; sprintf(newIP, "%d.%d.%d.%d", ip1, ip2, ip3, ip4); sprintf(newFastIP, "%d.%d.%d.%d", ip5, ip6, ip7, ip8); moveExecutionOneNode(&theJob, rank, hostName, newIP, newFastIP, port); } fclose(fd); } else { sprintf(log, "Unable to open file %s: MOVEALL aborted", fileName); v2logMessage(log); fprintf(stdout, " Unable to open instruction file %s: aborting MOVEALL\n", fileName); fflush(stdout); } } else if (strncmp(buff, DISPATCHER_MOVE_MSG, 4) == 0) { sscanf(buff, DISPATCHER_MOVE_STRING, &rank, hostName, &ip1, &ip2, &ip3, &ip4, &ip5, &ip6, &ip7, &ip8, &port); sprintf(newIP, "%d.%d.%d.%d", ip1, ip2, ip3, ip4); sprintf(newFastIP, "%d.%d.%d.%d", ip5, ip6, ip7, ip8); sprintf(log, "MOVE instruction received: rank %d on %s (%d.%d.%d.%d/%d.%d.%d.%d:%d)", rank, hostName, ip1, ip2, ip3, ip4, ip5, ip6, ip7, ip8, port); v2logMessage(log); moving[rank] = true; fprintf(stdout, "**** MOVE'ing rank %d on %s ****\n", rank, hostName); moveExecutionOneNode(&theJob, rank, hostName, newIP, newFastIP, port); } else { printf("Unknown command: %s\n", buff); } if( (fdFifo = open(theJob.fifoFile, O_NONBLOCK)) == -1) { printf("Internal problem: could not open FIFO file!\n"); bailout(&theJob); } } else { /* Time out on the select() */ v2logMessage("Timeout detected on MPI_Finalize(): killing remaining running nodes"); connectedArray = (boolean *)malloc(theJob.nprocs*sizeof(boolean)); for (i = 0; i < theJob.nprocs; i++) connectedArray[i] = false; currentConnected = connectedNodes; while (currentConnected != NULL) { connectedArray[currentConnected->rank] = true; ccAux = currentConnected->next; removeConnectedNode(currentConnected); currentConnected = ccAux; } auxCN = theJob.nodeList; while (auxCN != NULL) { /* We kill only the ones that were still connected */ if (connectedArray[auxCN->rank]) killComputingNode(&theJob, auxCN); auxCN = auxCN->next; } free(connectedArray); } } killAllAuxiliaries(&theJob); cleanAuxiliaryFiles(&theJob); sprintf(log, "Execution of program %s finished", theJob.v2pgFile); v2logMessage(log); exit(0); return (0);}void bailout(JS * js) { CN * auxCN; /* This is the moment to clean a bit... */ /* The auxiliaries */ killAllAuxiliaries(js); /* The nodes */ auxCN = js->nodeList; while (auxCN != NULL) { if (auxCN->pid != 0) killComputingNode(js, auxCN); auxCN = auxCN->next; } exit(-1);}void cleanMemory() { AP * ca; int i; /* Arguments, tmp and so on... */ /* Array of IP addresses */ for (i = 0; i < ipNb; i++) { free(ipList[i]); } free(ipList);}void cleanELCS(JS *js) { char * commandLine; char log[1024]; commandLine = (char *)calloc(4096, sizeof(char)); /* We need to clean the CS and EL which are still running */ sprintf(log, "Cleaning EL and CS from the nodes"); v2logMessage(log); sprintf(commandLine, "%s %d %s", js->killCmd, js->jobId, js->v2pgFile); system(commandLine); free(commandLine);}void v2Error(char * msg) { v2logMessage(msg); v2logMessage("Quitting"); exit(0);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -