📄 mpirun.v2run.c
字号:
/* MPICH-V Copyright (C) 2002, 2003 Groupe Cluster et Grid, LRI, Universite de Paris Sud This file is part of MPICH-V. MPICH-V is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. MPICH-V is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with MPICH-V; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA $Id: mpirun.v2run.c,v 1.9 2004/05/17 13:40:14 collin Exp $*//* This program takes two arguments : a v2pgfile and a job_id*//* This program takes the filename of a v2pgfile as input. The format is: * The command lines (v2pgfile.commands) - rshcmd=<rsh command> - elcmd=<event logger command> - cscmd=<checkpoint server command> - prog=<program + arguments> * Then the machines (v2pgfile) - For the EL servers: EL eventLoggerIP portEL - For the CS servers: CS checkpointServerIP portCS tmpCS - For the computing nodes: CN MPIrank nodeHostName nodeIP portCommunication associatedELIP portEL associatedCSIP portCS*/#include <stdio.h>#include <stdlib.h>#include <sys/time.h>#include <time.h>#include <netdb.h>#include <netinet/in.h>#include <netinet/tcp.h>#include <unistd.h>#include <getopt.h>#include <string.h>#include <dirent.h>#include <fcntl.h>#include "config.h"#include "v2run.h"#include "v2run_msg.h"#include "debug.h"#define MAX_LISTEN_WAIT 10static char *options = "f:g:p:d:";void printUsage() { printf("Usage: mpirun.v2run -f <v2pgfile> -g <job_id> -p <communication_port> -debug <debug> [-hpnetwork <IP translation file>]\n"); printf(" <v2pgfile>: the program file containing the EL/CS/Nodes specifications\n"); printf(" <job_id>: the Job unique Identifier\n"); printf(" <communication_port>: the port to which the event logger nodes will connect\n"); printf(" -hpnetwork can be used to use IP over a high performance network such as myrinet. The IP translation file gives the correspondance between the normal addresses and the 'HPN' addresses\n");}/* These are read by the program in the 'el*file's*/#define CMD_LENGTH 256boolean testOnly;boolean keepXWFile;boolean logging;#define PORT_DISPATCHER 9800/* The structures containing the data of the program*/int totalAM;char ** ipList = (char **)NULL;int ipNb = 0;PL ** pidList;PL ** pidListTail;static int printonly = 0;void cleanELCS(JS *);/* Now come some functions enabling work on the linked list of available machines*/AM * firstAM;AM * lastAM;void initializeAMList() { firstAM = (AM *)NULL; lastAM = (AM *)NULL;}AM * initializeAM(char * host, char * ip, int np, int elp, int csp, int dc) { AM * nam; nam = (AM *)calloc(1, sizeof(AM)); strcpy(nam->hostName, host); strcpy(nam->ipAddress, ip); nam->nprocs = np; nam->eventLoggerPort = elp; nam->checkpointServerPort = csp; nam->dualCapabilities = dc; return nam;}void addAvailableMachine(AM * newAM) { if (firstAM == NULL) { lastAM = (AM *)calloc(1,sizeof(AM)); firstAM = newAM; } lastAM->next = newAM; newAM->next = NULL; lastAM = newAM;}void useAvailableMachine() { if (firstAM->nprocs == 1) firstAM = firstAM->next; else firstAM->nprocs --;}void parseAvailableFile(char * availFilePath) { FILE * fd; char line[4096]; char hostName[128]; char ip[15]; int nprocs, port1, port2, dc; AM * newAM; initializeAMList(); if ((fd = fopen(availFilePath, "r")) != NULL ) { while (fgets(line, 4096, fd) ) { sscanf(line, "%s %s %d %d %d %d", hostName, ip, &nprocs, &port1, &port2, &dc); newAM = initializeAM(hostName, ip, nprocs, port1, port2, dc); addAvailableMachine(newAM); } fclose(fd); }}/* If rank <rank> has been disconnected, we relaunch it*/void relaunchByRank(int rank, JS * js, boolean moving) { int i; char * cl; char log [1024]; CN * currentNode; currentNode = js->nodeList; while (currentNode->rank != rank) currentNode = currentNode->next; /* First, we look if there are nodes available */ /*if (firstAM != NULL) { sprintf(log, "host %s running rank %d crashed, replaced by host %s", js->nodeList->hostName, rank, firstAM->hostName); v2logMessage(log); strcpy(js->nodeList->hostName,firstAM->hostName); useAvailableMachine(); */ cl = (char *)calloc(4096, sizeof(char)); nodeCommandLine(&cl, RESTART, js, *currentNode); /*sprintf(cl, "%s &", cl);*/ /* if (!moving) sleep(20);*/ fflush(stdout); if (testOnly) printf("%s\n", cl); else { if (fork() == 0) { system(cl); _exit(0); } } free(cl); /* } else { printf("Disconnection detected, but no more nodes available\n"); return; } */}boolean checkArguments(int argc, char * argv[], JS * js1) { boolean b; int c; JS * js; static struct option long_options[] = { {"debug", required_argument, NULL, 'd'}, {"debugfile", required_argument, NULL, 'b'}, {"checkpointing", required_argument, NULL, 'c'}, {"no-auto-launch", no_argument, NULL, 'a'}, {"slowEl", no_argument, NULL, 'e'} }; js = (JS *)malloc(sizeof(JS)); strcpy(js->debugFile, ""); js->checkpointFrequency = 0; js->autoLaunch = true; js->slowEl = false; if (argc < 7) { printf("%s error: not enough arguments\n", argv[0]); printUsage(); b = false; } else { b = true; while ((c = getopt_long_only(argc, argv, options, long_options, NULL)) != -1) { switch(c) { case 'f': strcpy(js->v2pgFile, optarg); break; case 'g': js->jobId = atoi(optarg); break; case 'p': js->dispatcherPort = atoi(optarg); break; case 'd': strcpy(js->debugCommand, optarg); break; case 'b': strcpy(js->debugFile, optarg); break; case 'c': js->checkpointFrequency = atoi(optarg); break; case 'a': js->autoLaunch = false; break; case 'e': js->slowEl = true; break; default: b = false; } } } memcpy((void *)js1, (void *)js, sizeof(JS)); return b;}#define HOST_NAME_SIZE 256Connected * connectedNodes = (Connected *)NULL;Connected * connectedNodesTail = (Connected *)NULL;void addConnectedNode(int rank, int socket) { Connected * new; new = (Connected *)malloc(sizeof(Connected)); new->rank = rank; new->socket = socket; if(connectedNodes == NULL) { connectedNodes = new; connectedNodesTail = (Connected *)malloc(sizeof(Connected)); } connectedNodesTail->next = new; new->next = NULL; connectedNodesTail = new;}void removeConnectedNode(Connected * toRemove) { Connected * cc; Connected * tmp; cc = connectedNodes; tmp = toRemove; if (tmp == cc) { connectedNodes = connectedNodes->next; if (connectedNodesTail == tmp) connectedNodesTail = connectedNodesTail->next; } else { while (cc->next != toRemove) cc = cc->next; cc->next = tmp->next; if (connectedNodesTail == tmp) connectedNodesTail = cc; } free(tmp);}/* The main!*/int main (int argc, char* argv[]) { char * v2cmdfile; char * v2availfile; int i; char * commandLine; char log[1024]; char srvName[HOST_NAME_SIZE]; struct hostent * myAddress; Connected * currentConnected; Connected * ccAux; fd_set setOfCN; int fdFifo; int ret, retListen, maxFd; int listenSocket, acceptSocket; struct sockaddr_in address; struct sockaddr_in pin; int addrlen; char buff[256]; int rank; int port; int ip1, ip2, ip3, ip4, ip5, ip6, ip7, ip8; char newIP[15]; char newFastIP[15]; char fileName[256]; char hostName[HOST_NAME_SIZE]; char line[256]; FILE * fd; pid_t forked, pid1; pid_t * pid; pid_t * auxiliariesPid; int nbAuxiliaries; int nbAuxTotal; int auxId; int n; boolean * moving; boolean * connectedArray; CN * auxCN; EL * auxEL; CS * auxCS; SC * auxSC; struct timeval * selectTimeout; boolean autoRelaunch = true; boolean finalizing = false; /*boolean autoRelaunch = false;*/ int msg; struct jobSpecifications theJob; logging = true; selectTimeout = (struct timeval *)malloc(sizeof(struct timeval)); selectTimeout->tv_sec = FINALIZE_TIMEOUT; if (!checkArguments(argc, argv, &theJob)) { return 1; } parseProgramFile(&theJob); if (strlen(theJob.debugFile) != 0) parseDebugFile(&theJob); /* We get the local IP address */ if (gethostname(srvName, (size_t)HOST_NAME_SIZE) < 0) { printf("Error: could not get hostname\n"); exit(1); } if((myAddress = gethostbyname(srvName)) == 0) qerror("gethostbyname"); /*printf("struct hostent {\n\ char *h_name; */ /* official name of host */ /*=%s\n\ int h_length; */ /* length of address */ /*=%d\n\ char **h_addr_list; */ /* list of addresses */ /*=%s\n\ }\n", myAddress->h_name, myAddress->h_length, myAddress->h_addr); */ strcpy(theJob.dispatcherIP, (char *)inet_ntoa(*(struct in_addr *)myAddress->h_addr)); v2cmdfile = (char *)calloc(strlen(theJob.v2pgFile)+9,sizeof(char)); sprintf(v2cmdfile,"%s.commands",theJob.v2pgFile); parseCommandsFile(v2cmdfile, &theJob); free(v2cmdfile); v2availfile = (char *)calloc(strlen(theJob.v2pgFile)+7,sizeof(char)); sprintf(v2availfile,"%s.avail", theJob.v2pgFile); parseAvailableFile(v2availfile); commandLine = (char *)calloc(1024, sizeof(char)); sprintf(log, "Starting program %s", theJob.v2pgFile); v2logMessage(log); /* We already start listening on the connection socket in order to get the PID's from the auxiliary programs */ address.sin_family = AF_INET; address.sin_addr.s_addr = htonl(INADDR_ANY); address.sin_port = htons(theJob.dispatcherPort); if ((listenSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) { printf("Socket could not be created\n"); bailout(&theJob); } while (bind(listenSocket, ((struct sockaddr*)&address), sizeof(struct sockaddr_in)) == -1) { printf("Bind could not be performed: trying in 1 second\n"); sleep(1); } if (listen(listenSocket, MAX_LISTEN_WAIT) == -1) { close(listenSocket); printf("Could not listen on socket\n"); bailout(&theJob); } nbAuxTotal = 0; /* We launch the EL */ auxEL = theJob.elList; while (auxEL != NULL) { if (strlen(auxEL->debugString) == 0) { sprintf(commandLine,"%s %s %s -p %d -np %d -g %d -auxid %d -dispatcher %s:%d -debug %s &", theJob.rshCmd, auxEL->ipAddress, theJob.elCmd, auxEL->port, theJob.nprocs, theJob.jobId, nbAuxTotal, theJob.dispatcherIP, theJob.dispatcherPort, theJob.debugCommand); } else { sprintf(commandLine,"%s %s %s -p %d -np %d -g %d -auxid %d -dispatcher %s:%d %s &", theJob.rshCmd, auxEL->ipAddress, theJob.elCmd, auxEL->port, theJob.nprocs, theJob.jobId, nbAuxTotal, theJob.dispatcherIP, theJob.dispatcherPort, auxEL->debugString); } sprintf(log, "launching event logger server on machine %s",auxEL->ipAddress); v2logMessage(log); if (testOnly || (strlen(auxEL->debugString)!=0)) printf("%s\n",commandLine);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -