📄 derniermscmpirun.c
字号:
/* MPICH-V Copyright (C) 2002, 2003 Groupe Cluster et Grid, LRI, Universite de Paris Sud This file is part of MPICH-V. MPICH-V is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. MPICH-V is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with MPICH-V; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA $Id: dernierMscmpirun.c,v 1.2 2004/07/22 12:49:16 bouziane Exp $*//* This program takes two arguments : a v2pgfile and a job_id*//* This program takes the filename of a v2pgfile as input. The format is: * The command lines (v2pgfile.commands) - rshcmd=<rsh command> - elcmd=<event logger command> - cscmd=<checkpoint server command> - prog=<program + arguments> * Then the machines (v2pgfile) - For the EL servers: EL eventLoggerIP portEL - For the CS servers: CS checkpointServerIP portCS tmpCS - For the computing nodes: CN MPIrank nodeHostName nodeIP portCommunication associatedELIP portEL associatedCSIP portCS*/#include <stdio.h>#include <stdlib.h>#include <sys/time.h>#include <time.h>#include <sys/types.h>#include <sys/stat.h>#include <unistd.h>#include <netdb.h>#include <netinet/in.h>#include <netinet/tcp.h>#include <unistd.h>#include <getopt.h>#include <string.h>#include <dirent.h>#include <fcntl.h>#include "config.h"#include "v2run.h"#include "v2run_msg.h"#include "debug.h"#include "vrun_protocol.h"#define MAX_LISTEN_WAIT 10static char *options = "f:g:p:d:";static int mscServer = 0;voidprintUsage (){ printf ("Usage: mpirun.vrun -f <v2pgfile> -g <job_id> -p <communication_port> -debug <debug> [-hpnetwork <IP translation file>]\n"); printf (" <v2pgfile>: the program file containing the EL/CS/Nodes specifications\n"); printf (" <job_id>: the Job unique Identifier\n"); printf (" <communication_port>: the port to which the event logger nodes will connect\n"); printf (" -hpnetwork can be used to use IP over a high performance network such as myrinet. The IP translation file gives the correspondance between the normal addresses and the 'HPN' addresses\n");}/* These are read by the program in the 'el*file's*/#define BUFF_SIZE 16000boolean keepXWFile;boolean logging;#define PORT_DISPATCHER 9800/* The structures containing the data of the program*/int totalAM;char **ipList = (char **) NULL;int ipNb = 0;PL **pidList;PL **pidListTail;static int printonly = 0;void cleanELCS (JS *);/* Now come some functions enabling work on the linked list of available machines*/AM *firstAM;AM *lastAM;voidinitializeAMList (){ firstAM = (AM *) NULL; lastAM = (AM *) NULL;}AM *initializeAM (char *host, char *ip, int np, int elp, int csp, int dc){ AM *nam; nam = (AM *) calloc (1, sizeof (AM)); strcpy (nam->hostName, host); strcpy (nam->ipAddress, ip); nam->nprocs = np; nam->eventLoggerPort = elp; nam->checkpointServerPort = csp; nam->dualCapabilities = dc; return nam;}voidaddAvailableMachine (AM * newAM){ if (firstAM == NULL) { lastAM = (AM *) calloc (1, sizeof (AM)); firstAM = newAM; } lastAM->next = newAM; newAM->next = NULL; lastAM = newAM;}voiduseAvailableMachine (){ if (firstAM->nprocs == 1) firstAM = firstAM->next; else firstAM->nprocs--;}voidparseAvailableFile (char *availFilePath){ FILE *fd; char line[BUFF_SIZE]; char hostName[BUFF_SIZE]; char ip[IP_LENGTH]; int nprocs, port1, port2, dc; AM *newAM; initializeAMList (); if ((fd = fopen (availFilePath, "r")) != NULL) { while (fgets (line, BUFF_SIZE, fd)) { sscanf (line, "%s %s %d %d %d %d", hostName, ip, &nprocs, &port1, &port2, &dc); newAM = initializeAM (hostName, ip, nprocs, port1, port2, dc); addAvailableMachine (newAM); } fclose (fd); }}/* If rank <rank> has been disconnected, we relaunch it*/voidrelaunchByRank (int rank, JS * js, boolean moving){ int i; char cl[BUFF_SIZE]; char log[BUFF_SIZE]; CN *currentNode; currentNode = js->nodeList; while (currentNode->rank != rank) currentNode = currentNode->next; nodeCommandLine (cl, RESTART, js, *currentNode); if (js->testOnly) printf ("%s\n", cl); else { if (fork () == 0) { system (cl); _exit (0); } }}booleancheckArguments (int argc, char *argv[], JS * js){ boolean b; int c; static struct option long_options[] = { {"debug", required_argument, NULL, 'd'}, {"debugfile", required_argument, NULL, 'b'}, {"checkpointing", required_argument, NULL, 'c'}, {"no-auto-launch", no_argument, NULL, 'a'}, {"no-auto-launch", no_argument, NULL, 'a'}, {"ckptlocal", no_argument, NULL, 'l'}, {"no-ckpt-server", no_argument, NULL, 'k'}, {"msc", required_argument, NULL, 's'}, // {"memstat", no_argument, NULL, 'z'}, {"slowEl", no_argument, NULL, 'e'} }; js->debugFile[0] = 0; js->checkpointFrequency = 0; js->autoLaunch = true; js->slowEl = false; js->localckpt = false; js->noCkptServer = 0; js->masterString = NULL; //warning memstat is activated by default // js->memstat = 1; if (argc < 7) { printf ("%s error: not enough arguments\n", argv[0]); printUsage (); b = false; } else { b = true; while ((c = getopt_long_only (argc, argv, options, long_options, NULL)) != -1) { switch (c) { case 'f': strcpy (js->v2pgFile, optarg); break; case 'g': js->jobId = atoi (optarg); break; case 'p': js->dispatcherPort = atoi (optarg); break; case 'd': strcpy (js->debugCommand, optarg); break; case 'b': strcpy (js->debugFile, optarg); break; case 'c': js->checkpointFrequency = atoi (optarg); break; case 'a': js->autoLaunch = false; break; case 'e': js->slowEl = true; break; case 'l': js->localckpt = true; break; case 'k': js->noCkptServer = 1; break; // case 'z': // js->memstat = 1; // break; case 's': { int ip1, ip2, ip3, ip4, port; js->masterString = strdup (optarg); printf(js->masterString); sscanf (js->masterString, "%d.%d.%d.%d:%d", &ip1, &ip2, &ip3, &ip4, &port); sprintf (js->masterIP, "%d.%d.%d.%d", ip1, ip2, ip3, ip4); js->masterPort = port; mscServer = 1; } break; default: b = false; } } } return b;}#define HOST_NAME_SIZE 256Connected *connectedNodes = (Connected *) NULL;Connected *connectedNodesTail = (Connected *) NULL;voidaddConnectedNode (int rank, int socket){ Connected *new; new = (Connected *) malloc (sizeof (Connected)); new->rank = rank; new->socket = socket; new->finalized = false; if (connectedNodes == NULL) { connectedNodes = new; connectedNodesTail = (Connected *) malloc (sizeof (Connected)); } connectedNodesTail->next = new; new->next = NULL; connectedNodesTail = new;}voidremoveConnectedNode (Connected * toRemove){ Connected *cc; Connected *tmp; cc = connectedNodes; tmp = toRemove; if (tmp == cc) { connectedNodes = connectedNodes->next; if (connectedNodesTail == tmp) connectedNodesTail = connectedNodesTail->next; } else { while (cc->next != toRemove) cc = cc->next; cc->next = tmp->next; if (connectedNodesTail == tmp) connectedNodesTail = cc; } free (tmp);}static voidcheck_sync_finalize (void){ Connected *currentConnected; int ack = 1; for (currentConnected = connectedNodes; currentConnected; currentConnected = currentConnected->next) { if (currentConnected->finalized != true) return; } for (currentConnected = connectedNodes; currentConnected; currentConnected = currentConnected->next) { write (currentConnected->socket, &ack, sizeof (int)); removeConnectedNode (currentConnected); } if (mscServer) inform_Msc_Finalize ();}/* The main!*/intmain (int argc, char *argv[]){ char *v2cmdfile; char *v2availfile; int i; char commandLine[BUFF_SIZE]; char log[BUFF_SIZE]; char srvName[HOST_NAME_SIZE]; struct hostent *myAddress; Connected *currentConnected; Connected *ccAux; fd_set setOfCN; int fdFifo; int ret, retListen, maxFd; int listenSocket, acceptSocket; struct sockaddr_in address; struct sockaddr_in pin; int addrlen; char buff[BUFF_SIZE]; int rank; int port; int ip1, ip2, ip3, ip4, ip5, ip6, ip7, ip8; char newIP[IP_LENGTH]; char newFastIP[IP_LENGTH]; char fileName[BUFF_SIZE]; char hostName[HOST_NAME_SIZE]; char line[BUFF_SIZE]; FILE *fd; pid_t forked, pid1; pid_t *pid; pid_t *auxiliariesPid; int nbAuxiliaries; int nbAuxTotal; int n;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -