📄 mpichvrun_proprietary.c
字号:
/* MPICH-V2 Copyright (C) 2002, 2003 Groupe Cluster et Grid, LRI, Universite de Paris Sud This file is part of MPICH-V2. MPICH-V2 is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. MPICH-V2 is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with MPICH-V2; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA $Id: mpichvrun_proprietary.c,v 1.3 2004/04/26 09:35:04 lemarini Exp $*//* The purpose of this program is to be able to run MPICH-V2 programs inside a machine administrated by its own MPI system (typically SCore, Condor...). To use it, you need to: - Compile it WITH THE PROPRIETARY MPI LIBRARY - Compile your program with the MPICH-V library - Run this program with the SCrun program. Command line: mpirun -np <Total number of processors> score_mpichv_run <np> <nel> <ncs> <nsc> <program> [ <program arguments> ] */#include <stdio.h>#include "mpi.h"#include <netdb.h>#include <string.h>#include "v2run.h"#include "v2run_msg.h"#include <libgen.h>#include <unistd.h>Connected * connectedNodes = (Connected *)NULL;void addConnectedNode(int, int);void removeConnectedNode(Connected *);int np, nEL, nCS;int nmachines;int rank;FILE *fd;#define HOST_NAME_SIZE 128#define V2LINE_SIZE 512#define MPI_ROOT 0#define CMD_LENGTH 256char commandLine[CMD_LENGTH];char commandLine1[CMD_LENGTH];char userCommandLine[CMD_LENGTH];char * v2cmdfile;#define PORT_EL 4000#define PORT_CS 5000#define PORT_SC 6000#define PORT_CN 7000#define PORT_DISPATCHER 8900#define OFFSET_EL 1 /* The MPI_ROOT is the dispatcher! */#define OFFSET_CS 1 + nEL#define OFFSET_SC 1 + nEL + nCS#define OFFSET_CN 1 + nEL + nCS + nSCstatic char *machineFile = "machines";void usage(char * prog) { printf("Usage:\n"); printf(" <mpirun> -np <total np> %s <np> <nel> <ncs> <program>\n", prog); printf(" <mpirun>: the PROPRIETARY mpirun of the system\n"); printf(" <total np>: The total number of processors you will use: np + nel + ncs + nsc + 1\n"); printf(" <np>: the number of processors, MPI definition of it: nodes\n"); printf(" <nel>: the number of Event Loggers used\n"); printf(" <ncs>: the number of Checkpoint Servers used\n"); printf(" <program>: your MPI program commands file\n");}int main(int argc, char ** argv) { pid_t job_id, forked; char ** allNames; char * allNamesFlat; int i; char srvName[HOST_NAME_SIZE]; struct hostent *myAddress; int my_type, my_index, my_EL, my_CS, my_SC; int ratio_EL, ratio_SC, ratio_CS; EL* auxEL; CS* auxCS; SC* auxSC; CN* auxCN; char *myv2Line; char * v2LinesFlat; char ** v2Lines; FILE * fd; char v2progFile[256]; char finishedFile[256]; int * recvSizes; int * disps; int siz; MPI_Status * status; int remoteRank, nbAuxTotal; struct jobSpecifications theJob; int listenSocket, acceptSocket; struct sockaddr_in address; struct sockaddr_in pin; int addrlen;#define MAX_LISTEN_WAIT 10 pid_t *pid; pid_t * auxiliariesPid; pid_t pid1; int nbAuxiliaries, auxId; Connected *currentConnected; fd_set setOfCN; boolean finalizing = false; int msg; int ret; struct timeval *selectTimeOut; MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &rank); if ((argc < 5) && (rank == 0)) { printf("Not enough arguments:\n"); usage(argv[0]); MPI_Abort(MPI_COMM_WORLD, 1); } np = atoi(argv[1]); nEL = atoi(argv[2]); nCS = atoi(argv[3]); strcpy(userCommandLine, argv[4]); MPI_Comm_size(MPI_COMM_WORLD, &nmachines); job_id = getpid(); if (nmachines != (np+nEL+nCS+1)) { printf("Error: incorrect number of machines!\n"); printf("Found: %d Needed: %d\n", nmachines, np+nEL+nCS+1); usage(argv[0]); MPI_Abort(MPI_COMM_WORLD, 1); } if (gethostname(srvName, (size_t)HOST_NAME_SIZE) < 0) { printf("Fatal error: could not get hostname\n"); MPI_Abort(MPI_COMM_WORLD, 1); } allNames = (char **)malloc(nmachines*sizeof(char *)); for (i=0; i < nmachines; i++) { allNames[i] = (char *)calloc((size_t)HOST_NAME_SIZE, sizeof(char)); } allNamesFlat = (char *)calloc(nmachines*HOST_NAME_SIZE*sizeof(char)); recvSizes = (int *)malloc(nmachines * sizeof(int)); disps = (int *)malloc(nmachines * sizeof(int)); siz = strlen(srvName); MPI_Gather(&siz, 1, MPI_INT, recvSizes, 1, MPI_INT, 0, MPI_COMM_WORLD); for (i=0; i<nmachines; i++) disps[i] = (HOST_NAME_SIZE-1) * i; MPI_Gatherv(srvName, siz, MPI_CHAR, allNamesFlat, recvSizes, disps, MPI_CHAR, 0, MPI_COMM_WORLD); if (rank == 0) { if ((fd = fopen(machineFile,"w")) > 0) { fprintf(fd, "#machines used by job %d\n", job_id); for (i=1; i < nmachines; i++) { strncpy(allNames[i], allNamesFlat+(HOST_NAME_SIZE-1)*i, strlen(allNamesFlat+(HOST_NAME_SIZE-1)*i)+1); fprintf(fd, "%s\n", allNames[i]); } fclose(fd); } } free(allNamesFlat); if (rank == 0) { sprintf(commandLine,"/opt/mpich/gk/V2/bin/mpirun -np %d -eln %d -csn %d -xw-jobid %d -xw-norun -machinefile %s %s", np, nEL, nCS, job_id, machineFile, userCommandLine); system(commandLine); } MPI_Barrier(MPI_COMM_WORLD); /* The rank 0 will read the program file that it has just generated, then give each 'node' the line that it needs to execute */ if (rank == 0) { /* We start by reading the program files */ theJob.checkpointFrequency = 0; strcpy(theJob.debugFile, ""); strcpy(theJob.debugCommand, "129.175.7.27:+@"); sprintf(theJob.v2pgFile, "v2pgfile.%d", job_id); theJob.dispatcherPort = 1978; theJob.jobId = job_id; myAddress = gethostbyname(srvName); strcpy(theJob.dispatcherIP, (char *)inet_ntoa(*(struct in_addr *)myAddress->h_addr)); parseProgramFile(&theJob); v2cmdfile = (char *)calloc(strlen(theJob.v2pgFile)+9,sizeof(char)); sprintf(v2cmdfile, "%s.commands", theJob.v2pgFile); parseCommandsFile(v2cmdfile, &theJob); free(v2cmdfile); /* As with normal startup, we need to listen to the socket before launching the other programs */ address.sin_family = AF_INET; address.sin_addr.s_addr = htonl(INADDR_ANY); address.sin_port = htons(theJob.dispatcherPort); if ((listenSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) { printf("Socket could not be created!\n"); MPI_Abort(MPI_COMM_WORLD, 1); } while (bind(listenSocket, ((struct sockaddr*)&address), sizeof(struct sockaddr_in)) == -1) { fprintf(stdout, "Bind could not be performed!\n"); } if (listen(listenSocket, MAX_LISTEN_WAIT) == -1) { close(listenSocket); printf("Could not listen on socket\n"); MPI_Abort(MPI_COMM_WORLD, 1); } /* Now we tell the auxiliary programs what to do */ nbAuxTotal = 0; /* First, the ELs */ auxEL = theJob.elList; while (auxEL != NULL) { sprintf(commandLine, "%s -p %d -np %d -g %d -auxid %d -dispatcher %s:%d -debug %s", theJob.elCmd, auxEL->port, theJob.nprocs, theJob.jobId, nbAuxTotal, theJob.dispatcherIP, theJob.dispatcherPort, theJob.debugCommand); remoteRank = nbAuxTotal+1; siz = strlen(commandLine); MPI_Send(&siz, 1, MPI_INT, remoteRank, remoteRank, MPI_COMM_WORLD); MPI_Send(commandLine, siz, MPI_CHAR, remoteRank, remoteRank, MPI_COMM_WORLD); auxEL = auxEL->next; nbAuxTotal++; } /* Now the CSs */ auxCS = theJob.csList; while (auxCS != NULL) { sprintf(commandLine,"%s -g %d -p %d -d %s -i %s -auxid %d -dispatcher %s:%d", theJob.csCmd, theJob.jobId, auxCS->port, theJob.debugCommand, auxCS->tmp, nbAuxTotal, theJob.dispatcherIP, theJob.dispatcherPort); remoteRank = nbAuxTotal+1; siz = strlen(commandLine); MPI_Send(&siz, 1, MPI_INT, remoteRank, remoteRank, MPI_COMM_WORLD); MPI_Send(commandLine, siz, MPI_CHAR, remoteRank, remoteRank, MPI_COMM_WORLD); auxCS = auxCS->next; nbAuxTotal++; } /* The SC */ auxSC = theJob.scList; sprintf(commandLine, "%s %d %d %d %d %s:%d %s ", theJob.scCmd, auxSC->port, theJob.nprocs, theJob.jobId, nbAuxTotal, theJob.dispatcherIP, theJob.dispatcherPort, theJob.debugCommand); remoteRank = 1; siz = strlen(commandLine); MPI_Send(&siz, 1, MPI_INT, remoteRank, remoteRank, MPI_COMM_WORLD); MPI_Send(commandLine, siz, MPI_CHAR, remoteRank, remoteRank, MPI_COMM_WORLD); nbAuxTotal++; /* We wait until they are all connected */ nbAuxiliaries = 0; auxiliariesPid = (int *)malloc(nbAuxTotal * sizeof(int)); while (nbAuxiliaries < nbAuxTotal) { if((acceptSocket = accept(listenSocket, (struct sockaddr *)&pin, &addrlen)) < 0) { printf("Could not accept socket connection from auxiliary\n"); } else { read(acceptSocket, &auxId, sizeof(int)); auxId = ntohl(auxId); read(acceptSocket, &pid1, sizeof(pid_t)); auxiliariesPid[auxId] = ntohl(pid1); close(acceptSocket); nbAuxiliaries++; } } i = 0; auxEL - theJob.elList; while (auxEL != NULL) { auxEL->pid = auxiliariesPid[i]; auxEL = auxEL->next; i++; } auxCS = theJob.csList; while (auxCS != NULL) { auxCS->pid = auxiliariesPid[i]; auxCS = auxCS->next; i++; } auxSC = theJob.scList; while (auxSC != NULL) { auxSC->pid = auxiliariesPid[i]; auxSC = auxSC->next; i++; } /* Now we can tell the nodes to go... */ remoteRank = nbAuxTotal; auxCN = theJob.nodeList; while (auxCN != NULL) { sprintf(commandLine, "%s -g %d -r %d -np %d -p %d -el %s:%d -cs %s:%d -sc %s:%d -v2tmp %s/%s/ -dispatcher %s:%d %s -debug %s -cmd %s", theJob.v2dCmd, theJob.jobId, auxCN->rank, theJob.nprocs, auxCN->communicationPort, auxCN->eventLogger, auxCN->eventLoggerPort, auxCN->checkpointServer, auxCN->checkpointServerPort, auxCN->checkpointScheduler, auxCN->checkpointSchedulerPort, theJob.v2tmp, auxCN->ipAddress, theJob.dispatcherIP, theJob.dispatcherPort, "", /*restartFlag*/ theJob.debugCommand, theJob.progCmd); siz = strlen(commandLine); MPI_Send(&siz, 1, MPI_INT, remoteRank, remoteRank, MPI_COMM_WORLD); MPI_Send(commandLine, siz, MPI_CHAR, remoteRank, remoteRank, MPI_COMM_WORLD); remoteRank++; auxCN = auxCN->next; } /* Now we wait for the connexions from the nodes */ pid = (pid_t *)malloc(theJob.nprocs * sizeof(pid_t)); for (i=0; i < theJob.nprocs; i++) { if( (acceptSocket = accept(listenSocket, (struct sockaddr *)&pin, &addrlen)) < 0) { printf("Could not accept socket connection from client\n"); } else { read(acceptSocket, &rank, sizeof(int)); rank = ntohl(rank); read(acceptSocket, &pid1, sizeof(pid_t)); pid[rank] = ntohl(pid1); addConnectedNode(rank, acceptSocket); /* We send the node list... */ write(acceptSocket, theJob.nodeListArray, theJob.nprocs*sizeof(struct sockaddr_in)); } } /* Now we just need to wait for the finalize signal to end up everything */ currentConnected = (Connected *)malloc(sizeof(Connected)); while (connectedNodes != NULL) { FD_ZERO(&setOfCN); currentConnected = connectedNodes; auxCN = theJob.nodeList; while (currentConnected != NULL) { auxCN->pid = pid[auxCN->rank]; FD_SET(currentConnected->socket, &setOfCN); currentConnected = currentConnected->next; auxCN = auxCN->next; } if (finalizing) { if ((ret = select(FD_SETSIZE, &setOfCN, NULL, NULL, selectTimeOut)) < 0) { fprintf(stdout, "Error in select: irrecoverable error\n"); fflush(stdout); MPI_Abort(MPI_COMM_WORLD, 1); } } else { if ((ret = select(FD_SETSIZE, &setOfCN, NULL, NULL, NULL)) < 0) { fprintf(stdout, "Error in select: irrecoverable error\n"); fflush(stdout); MPI_Abort(MPI_COMM_WORLD, 1); } } currentConnected = connectedNodes; while (currentConnected != NULL) { if (FD_ISSET(currentConnected->socket, &setOfCN)) { break; } currentConnected = currentConnected->next; } if (currentConnected != NULL) { if (read(currentConnected->socket, &msg, sizeof(int)) == 0) { fprintf(stdout, "Disconnection detected, but not handled in that version!\n"); fflush(stdout); MPI_Abort(MPI_COMM_WORLD, 1); } else { if (msg == FINALIZE_MSG) { finalizing = true; removeConnectedNode(currentConnected); } } } } sprintf(finishedFile, "finished.%s", userCommandLine); if (fd = fopen(finishedFile, "w")) { fprintf(fd, "Fini!\n"); fclose(fd); } fprintf(stdout, "Everything seems to be finished: we may proceed to the ugly abort\n"); fflush(stdout); MPI_Abort(MPI_COMM_WORLD, 1); } else { /* The work done by the other processes but 0 */ MPI_Recv(&siz, 1, MPI_INT, 0, rank, MPI_COMM_WORLD, status); MPI_Recv(commandLine, siz, MPI_CHAR, 0, rank, MPI_COMM_WORLD, status); if (rank != 1) system(commandLine); else { /* rank 1 will be the first EL but also the SC */ if ((forked = fork()) != -1) { if (forked == 0) { system(commandLine); } else { MPI_Recv(&siz, 1, MPI_INT, 0, rank, MPI_COMM_WORLD, status); MPI_Recv(commandLine1, siz, MPI_CHAR, 0, rank, MPI_COMM_WORLD, status); system(commandLine1); } } else { printf("ERROR(1): Could not perform fork!\n"); MPI_Abort(MPI_COMM_WORLD, 1); } } } MPI_Barrier(MPI_COMM_WORLD); MPI_Finalize(); return (0);}Connected * connectedNodesTail = (Connected *)NULL;void addConnectedNode(int rank, int socket) { Connected * new; new = (Connected *)malloc(sizeof(Connected)); new->rank = rank; new->socket = socket; fprintf(stdout, "on ajoute %d [%d]\n", rank, socket); fflush(stdout); if (connectedNodes == NULL) { connectedNodes = new; connectedNodesTail = (Connected *)malloc(sizeof(Connected)); } connectedNodesTail->next = new; new->next = NULL; connectedNodesTail = new;}void removeConnectedNode(Connected * toRemove) { Connected * cc; Connected * tmp; cc = connectedNodes; tmp = toRemove; fprintf(stdout,"on enleve %d [%d]\n", toRemove->rank, toRemove->socket); fflush(stdout); if (tmp == cc) { connectedNodes = connectedNodes->next; if (connectedNodesTail == tmp) connectedNodesTail = connectedNodesTail->next; } else { while (cc->next != toRemove) cc = cc->next; cc->next = tmp->next; if (connectedNodesTail == tmp) connectedNodesTail = cc; } free(tmp);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -