📄 originempirun.vrun.c
字号:
/* MPICH-V Copyright (C) 2002, 2003 Groupe Cluster et Grid, LRI, Universite de Paris Sud This file is part of MPICH-V. MPICH-V is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. MPICH-V is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with MPICH-V; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA $Id: originempirun.vrun.c,v 1.1 2004/06/17 17:35:26 bouziane Exp $*//* This program takes two arguments : a v2pgfile and a job_id*//* This program takes the filename of a v2pgfile as input. The format is: * The command lines (v2pgfile.commands) - rshcmd=<rsh command> - elcmd=<event logger command> - cscmd=<checkpoint server command> - prog=<program + arguments> * Then the machines (v2pgfile) - For the EL servers: EL eventLoggerIP portEL - For the CS servers: CS checkpointServerIP portCS tmpCS - For the computing nodes: CN MPIrank nodeHostName nodeIP portCommunication associatedELIP portEL associatedCSIP portCS*/#include <stdio.h>#include <stdlib.h>#include <sys/time.h>#include <time.h>#include <sys/types.h>#include <sys/stat.h>#include <unistd.h>#include <netdb.h>#include <netinet/in.h>#include <netinet/tcp.h>#include <unistd.h>#include <getopt.h>#include <string.h>#include <dirent.h>#include <fcntl.h>#include "config.h"#include "v2run.h"#include "v2run_msg.h"#include "debug.h"#include "vrun_protocol.h"#define MAX_LISTEN_WAIT 10static char *options = "f:g:p:d:";void printUsage() { printf("Usage: mpirun.vrun -f <v2pgfile> -g <job_id> -p <communication_port> -debug <debug> [-hpnetwork <IP translation file>]\n"); printf(" <v2pgfile>: the program file containing the EL/CS/Nodes specifications\n"); printf(" <job_id>: the Job unique Identifier\n"); printf(" <communication_port>: the port to which the event logger nodes will connect\n"); printf(" -hpnetwork can be used to use IP over a high performance network such as myrinet. The IP translation file gives the correspondance between the normal addresses and the 'HPN' addresses\n");}/* These are read by the program in the 'el*file's*/#define BUFF_SIZE 16000boolean keepXWFile;boolean logging;#define PORT_DISPATCHER 9800/* The structures containing the data of the program*/int totalAM;char ** ipList = (char **)NULL;int ipNb = 0;PL ** pidList;PL ** pidListTail;static int printonly = 0;void cleanELCS(JS *);/* Now come some functions enabling work on the linked list of available machines*/AM * firstAM;AM * lastAM;void initializeAMList() { firstAM = (AM *)NULL; lastAM = (AM *)NULL;}AM * initializeAM(char * host, char * ip, int np, int elp, int csp, int dc) { AM * nam; nam = (AM *)calloc(1, sizeof(AM)); strcpy(nam->hostName, host); strcpy(nam->ipAddress, ip); nam->nprocs = np; nam->eventLoggerPort = elp; nam->checkpointServerPort = csp; nam->dualCapabilities = dc; return nam;}void addAvailableMachine(AM * newAM) { if (firstAM == NULL) { lastAM = (AM *)calloc(1,sizeof(AM)); firstAM = newAM; } lastAM->next = newAM; newAM->next = NULL; lastAM = newAM;}void useAvailableMachine() { if (firstAM->nprocs == 1) firstAM = firstAM->next; else firstAM->nprocs --;}void parseAvailableFile(char * availFilePath) { FILE * fd; char line[BUFF_SIZE]; char hostName[BUFF_SIZE]; char ip[IP_LENGTH]; int nprocs, port1, port2, dc; AM * newAM; initializeAMList(); if ((fd = fopen(availFilePath, "r")) != NULL ) { while (fgets(line, BUFF_SIZE, fd) ) { sscanf(line, "%s %s %d %d %d %d", hostName, ip, &nprocs, &port1, &port2, &dc); newAM = initializeAM(hostName, ip, nprocs, port1, port2, dc); addAvailableMachine(newAM); } fclose(fd); }}/* If rank <rank> has been disconnected, we relaunch it*/void relaunchByRank(int rank, JS * js, boolean moving) { int i; char cl[BUFF_SIZE]; char log [BUFF_SIZE]; CN * currentNode; currentNode = js->nodeList; while (currentNode->rank != rank) currentNode = currentNode->next; /* First, we look if there are nodes available */ /*if (firstAM != NULL) { sprintf(log, "host %s running rank %d crashed, replaced by host %s", js->nodeList->hostName, rank, firstAM->hostName); v2logMessage(log); strcpy(js->nodeList->hostName,firstAM->hostName); useAvailableMachine(); */ nodeCommandLine(cl, RESTART, js, *currentNode); /*sprintf(cl, "%s &", cl);*/ /* if (!moving) sleep(20);*//* fflush(stdout);*/ if (js->testOnly) printf("%s\n", cl); else { if (fork() == 0) { system(cl); _exit(0); } } /* } else { printf("Disconnection detected, but no more nodes available\n"); return; } */}boolean checkArguments(int argc, char * argv[], JS * js) { boolean b; int c; static struct option long_options[] = { {"debug", required_argument, NULL, 'd'}, {"debugfile", required_argument, NULL, 'b'}, {"checkpointing", required_argument, NULL, 'c'}, {"no-auto-launch", no_argument, NULL, 'a'}, {"ckptlocal", no_argument, NULL, 'l'}, // {"simultaneous-jobs",required_argument, NULL, 's'}, {"no-ckpt-server",no_argument, NULL, 'k'}, {"msc",required_argument, NULL, 'm'} }; js->debugFile[0] = 0; js->checkpointFrequency = 0; js->autoLaunch = true; js->localckpt = false; // js->nbrJobSimultaneous = -1; js->noCkptServer = 0; js->masterString = NULL; if (argc < 7) { printf("%s error: not enough arguments\n", argv[0]); printUsage(); b = false; } else { b = true; while ((c = getopt_long_only(argc, argv, options, long_options, NULL)) != -1) { switch(c) { case 'f': strcpy(js->v2pgFile, optarg); break; case 'g': js->jobId = atoi(optarg); break; case 'p': js->dispatcherPort = atoi(optarg); break; case 'd': strcpy(js->debugCommand, optarg); break; case 'b': strcpy(js->debugFile, optarg); break; case 'c': js->checkpointFrequency = atoi(optarg); break; case 'a': js->autoLaunch = false; break; case 'l': js->localckpt = true; break; /* case 's': js->nbrJobSimultaneous = atoi(optarg); break; */ case 'k': js->noCkptServer = 1; break; case 'm': { int ip1, ip2,ip3,ip4, port; js->masterString= strdup(optarg); sscanf(js->masterString, "%d.%d.%d.%d:%d", &ip1, &ip2, &ip3, &ip4, &port); sprintf(js->masterIP, "%d.%d.%d.%d", ip1, ip2, ip3, ip4); js->masterPort= port; } break; default: b = false; } } } return b;}#define HOST_NAME_SIZE 256Connected * connectedNodes = (Connected *)NULL;Connected * connectedNodesTail = (Connected *)NULL;void addConnectedNode(int rank, int socket) { Connected * new; new = (Connected *)malloc(sizeof(Connected)); new->rank = rank; new->socket = socket; new->finalized = false; if(connectedNodes == NULL) { connectedNodes = new; connectedNodesTail = (Connected *)malloc(sizeof(Connected)); } connectedNodesTail->next = new; new->next = NULL; connectedNodesTail = new;}void removeConnectedNode(Connected * toRemove) { Connected * cc; Connected * tmp; cc = connectedNodes; tmp = toRemove; if (tmp == cc) { connectedNodes = connectedNodes->next; if (connectedNodesTail == tmp) connectedNodesTail = connectedNodesTail->next; } else { while (cc->next != toRemove) cc = cc->next; cc->next = tmp->next; if (connectedNodesTail == tmp) connectedNodesTail = cc; } free(tmp);}static void check_sync_finalize(void){ Connected *currentConnected; int ack = 1; for( currentConnected = connectedNodes; currentConnected; currentConnected=currentConnected->next) { if(currentConnected->finalized != true) return; } for( currentConnected = connectedNodes; currentConnected; currentConnected=currentConnected->next) { write(currentConnected->socket, &ack, sizeof(int)); removeConnectedNode(currentConnected); } /*@todo mettre a null le job courrant puis lancer le job suivant*/}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -