⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mpichvrun_proprietary.c

📁 Path MPICH-V for MPICH the MPI Implementation
💻 C
字号:
/*  MPICH-V2   Copyright (C) 2002, 2003 Groupe Cluster et Grid, LRI, Universite de Paris Sud  This file is part of MPICH-V2.  MPICH-V2 is free software; you can redistribute it and/or modify  it under the terms of the GNU General Public License as published by  the Free Software Foundation; either version 2 of the License, or  (at your option) any later version.  MPICH-V2 is distributed in the hope that it will be useful,  but WITHOUT ANY WARRANTY; without even the implied warranty of  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the  GNU General Public License for more details.  You should have received a copy of the GNU General Public License  along with MPICH-V2; if not, write to the Free Software  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA  $Id: mpichvrun_proprietary.c,v 1.3 2004/04/26 09:35:04 lemarini Exp $*//*  The purpose of this program is to be able to run MPICH-V2 programs inside a    machine administrated by its own MPI system (typically SCore, Condor...).   To use it, you need to:   - Compile it WITH THE PROPRIETARY MPI LIBRARY   - Compile your program with the MPICH-V library   - Run this program with the SCrun program.   Command line:    mpirun -np <Total number of processors> score_mpichv_run <np> <nel> <ncs> <nsc> <program> [ <program arguments> ] */#include <stdio.h>#include "mpi.h"#include <netdb.h>#include <string.h>#include "v2run.h"#include "v2run_msg.h"#include <libgen.h>#include <unistd.h>Connected * connectedNodes = (Connected *)NULL;void addConnectedNode(int, int);void removeConnectedNode(Connected *);int np, nEL, nCS;int nmachines;int rank;FILE *fd;#define HOST_NAME_SIZE 128#define V2LINE_SIZE 512#define MPI_ROOT 0#define CMD_LENGTH 256char commandLine[CMD_LENGTH];char commandLine1[CMD_LENGTH];char userCommandLine[CMD_LENGTH];char * v2cmdfile;#define PORT_EL 4000#define PORT_CS 5000#define PORT_SC 6000#define PORT_CN 7000#define PORT_DISPATCHER 8900#define OFFSET_EL 1 /* The MPI_ROOT is the dispatcher! */#define OFFSET_CS 1 + nEL#define OFFSET_SC 1 + nEL + nCS#define OFFSET_CN 1 + nEL + nCS + nSCstatic char *machineFile = "machines";void usage(char * prog) {  printf("Usage:\n");  printf("  <mpirun> -np <total np> %s <np> <nel> <ncs> <program>\n", prog);  printf("   <mpirun>: the PROPRIETARY mpirun of the system\n");  printf("   <total np>: The total number of processors you will use: np + nel + ncs + nsc + 1\n");  printf("   <np>: the number of processors, MPI definition of it: nodes\n");  printf("   <nel>: the number of Event Loggers used\n");  printf("   <ncs>: the number of Checkpoint Servers used\n");  printf("   <program>: your MPI program  commands file\n");}int main(int argc, char ** argv) {  pid_t job_id, forked;  char ** allNames;  char * allNamesFlat;  int i;  char srvName[HOST_NAME_SIZE];  struct hostent *myAddress;  int my_type, my_index, my_EL, my_CS, my_SC;  int ratio_EL, ratio_SC, ratio_CS;  EL* auxEL;  CS* auxCS;  SC* auxSC;  CN* auxCN;  char *myv2Line;  char * v2LinesFlat;  char ** v2Lines;  FILE * fd;  char v2progFile[256];  char finishedFile[256];  int * recvSizes;  int * disps;  int siz;  MPI_Status * status;  int remoteRank, nbAuxTotal;  struct jobSpecifications theJob;  int listenSocket, acceptSocket;  struct sockaddr_in address;  struct sockaddr_in pin;  int addrlen;#define MAX_LISTEN_WAIT 10  pid_t *pid;  pid_t * auxiliariesPid;  pid_t pid1;  int nbAuxiliaries, auxId;  Connected *currentConnected;  fd_set setOfCN;  boolean finalizing = false;  int msg;  int ret;  struct timeval *selectTimeOut;  MPI_Init(&argc, &argv);  MPI_Comm_rank(MPI_COMM_WORLD, &rank);    if ((argc < 5) && (rank == 0)) {    printf("Not enough arguments:\n");    usage(argv[0]);    MPI_Abort(MPI_COMM_WORLD, 1);  }  np  = atoi(argv[1]);  nEL = atoi(argv[2]);  nCS = atoi(argv[3]);  strcpy(userCommandLine, argv[4]);  MPI_Comm_size(MPI_COMM_WORLD, &nmachines);  job_id = getpid();  if (nmachines != (np+nEL+nCS+1)) {    printf("Error: incorrect number of machines!\n");    printf("Found: %d Needed: %d\n", nmachines, np+nEL+nCS+1);    usage(argv[0]);    MPI_Abort(MPI_COMM_WORLD, 1);  }  if (gethostname(srvName, (size_t)HOST_NAME_SIZE) < 0) {    printf("Fatal error: could not get hostname\n");    MPI_Abort(MPI_COMM_WORLD, 1);  }  allNames = (char **)malloc(nmachines*sizeof(char *));  for (i=0; i < nmachines; i++) {    allNames[i] = (char *)calloc((size_t)HOST_NAME_SIZE, sizeof(char));  }  allNamesFlat = (char *)calloc(nmachines*HOST_NAME_SIZE*sizeof(char));  recvSizes = (int *)malloc(nmachines * sizeof(int));  disps = (int *)malloc(nmachines * sizeof(int));  siz = strlen(srvName);  MPI_Gather(&siz, 1, MPI_INT, recvSizes, 1, MPI_INT, 0, MPI_COMM_WORLD);  for (i=0; i<nmachines; i++)    disps[i] = (HOST_NAME_SIZE-1) * i;  MPI_Gatherv(srvName, siz, MPI_CHAR, allNamesFlat, recvSizes, disps, MPI_CHAR, 0, MPI_COMM_WORLD);   if (rank == 0) {    if ((fd = fopen(machineFile,"w")) > 0) {      fprintf(fd, "#machines used by job %d\n", job_id);      for (i=1; i < nmachines; i++) {        strncpy(allNames[i], allNamesFlat+(HOST_NAME_SIZE-1)*i, strlen(allNamesFlat+(HOST_NAME_SIZE-1)*i)+1);        fprintf(fd, "%s\n", allNames[i]);      }       fclose(fd);    }  }   free(allNamesFlat);   if (rank == 0) {    sprintf(commandLine,"/opt/mpich/gk/V2/bin/mpirun -np %d -eln %d -csn %d -xw-jobid %d -xw-norun -machinefile %s %s", np, nEL, nCS, job_id, machineFile, userCommandLine);    system(commandLine);  }  MPI_Barrier(MPI_COMM_WORLD);  /* The rank 0 will read the program file that it has just generated, then give each 'node' the line that it needs to execute */  if (rank == 0) {    /* We start by reading the program files */    theJob.checkpointFrequency = 0;    strcpy(theJob.debugFile, "");    strcpy(theJob.debugCommand, "129.175.7.27:+@");    sprintf(theJob.v2pgFile, "v2pgfile.%d", job_id);    theJob.dispatcherPort = 1978;    theJob.jobId = job_id;    myAddress = gethostbyname(srvName);    strcpy(theJob.dispatcherIP, (char *)inet_ntoa(*(struct in_addr *)myAddress->h_addr));    parseProgramFile(&theJob);    v2cmdfile = (char *)calloc(strlen(theJob.v2pgFile)+9,sizeof(char));    sprintf(v2cmdfile, "%s.commands", theJob.v2pgFile);    parseCommandsFile(v2cmdfile, &theJob);    free(v2cmdfile);    /* As with normal startup, we need to listen to the socket before launching the other programs */    address.sin_family = AF_INET;    address.sin_addr.s_addr = htonl(INADDR_ANY);    address.sin_port = htons(theJob.dispatcherPort);    if ((listenSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {      printf("Socket could not be created!\n");      MPI_Abort(MPI_COMM_WORLD, 1);    }    while (bind(listenSocket, ((struct sockaddr*)&address), sizeof(struct sockaddr_in)) == -1) {      fprintf(stdout, "Bind could not be performed!\n");    }    if (listen(listenSocket, MAX_LISTEN_WAIT) == -1) {      close(listenSocket);      printf("Could not listen on socket\n");      MPI_Abort(MPI_COMM_WORLD, 1);    }    /* Now we tell the auxiliary programs what to do */    nbAuxTotal = 0;    /* First, the ELs */     auxEL = theJob.elList;    while (auxEL != NULL) {      sprintf(commandLine, "%s -p %d -np %d -g %d -auxid %d -dispatcher %s:%d -debug %s",		      theJob.elCmd,		      auxEL->port,		      theJob.nprocs,		      theJob.jobId,		      nbAuxTotal,		      theJob.dispatcherIP, theJob.dispatcherPort,		      theJob.debugCommand);      remoteRank = nbAuxTotal+1;      siz = strlen(commandLine);      MPI_Send(&siz, 1, MPI_INT, remoteRank, remoteRank, MPI_COMM_WORLD);      MPI_Send(commandLine, siz, MPI_CHAR, remoteRank, remoteRank, MPI_COMM_WORLD);      auxEL = auxEL->next;      nbAuxTotal++;    }    /* Now the CSs */    auxCS = theJob.csList;    while (auxCS != NULL) {      sprintf(commandLine,"%s -g %d -p %d -d %s -i %s -auxid %d -dispatcher %s:%d",		      theJob.csCmd,		      theJob.jobId,		      auxCS->port,		      theJob.debugCommand,		      auxCS->tmp,		      nbAuxTotal,		      theJob.dispatcherIP,		      theJob.dispatcherPort);      remoteRank = nbAuxTotal+1;      siz = strlen(commandLine);      MPI_Send(&siz, 1, MPI_INT, remoteRank, remoteRank, MPI_COMM_WORLD);      MPI_Send(commandLine, siz, MPI_CHAR, remoteRank, remoteRank, MPI_COMM_WORLD);      auxCS = auxCS->next;      nbAuxTotal++;    }    /* The SC */    auxSC = theJob.scList;    sprintf(commandLine, "%s %d %d %d %d %s:%d %s ",		    theJob.scCmd,		    auxSC->port,		    theJob.nprocs,		    theJob.jobId,		    nbAuxTotal,		    theJob.dispatcherIP, theJob.dispatcherPort,		    theJob.debugCommand);    remoteRank = 1;    siz = strlen(commandLine);    MPI_Send(&siz, 1, MPI_INT, remoteRank, remoteRank, MPI_COMM_WORLD);    MPI_Send(commandLine, siz, MPI_CHAR, remoteRank, remoteRank, MPI_COMM_WORLD);    nbAuxTotal++;    /* We wait until they are all connected */    nbAuxiliaries = 0;    auxiliariesPid = (int *)malloc(nbAuxTotal * sizeof(int));    while (nbAuxiliaries < nbAuxTotal) {      if((acceptSocket = accept(listenSocket, (struct sockaddr *)&pin, &addrlen)) < 0) {        printf("Could not accept socket connection from auxiliary\n");      } else {	read(acceptSocket, &auxId, sizeof(int));	auxId = ntohl(auxId);	read(acceptSocket, &pid1, sizeof(pid_t));	auxiliariesPid[auxId] = ntohl(pid1);	close(acceptSocket);	nbAuxiliaries++;      }    }    i = 0;    auxEL - theJob.elList;    while (auxEL != NULL) {      auxEL->pid = auxiliariesPid[i];      auxEL = auxEL->next;      i++;    }    auxCS = theJob.csList;    while (auxCS != NULL) {      auxCS->pid = auxiliariesPid[i];      auxCS = auxCS->next;      i++;    }    auxSC = theJob.scList;    while (auxSC != NULL) {      auxSC->pid = auxiliariesPid[i];      auxSC = auxSC->next;      i++;    }    /* Now we can tell the nodes to go... */    remoteRank = nbAuxTotal;    auxCN = theJob.nodeList;    while (auxCN != NULL) {      sprintf(commandLine, "%s -g %d -r %d -np %d -p %d -el %s:%d -cs %s:%d -sc %s:%d -v2tmp %s/%s/ -dispatcher %s:%d %s -debug %s -cmd %s",		      theJob.v2dCmd,		      theJob.jobId,		      auxCN->rank,		      theJob.nprocs,		      auxCN->communicationPort,		      auxCN->eventLogger, auxCN->eventLoggerPort,		      auxCN->checkpointServer, auxCN->checkpointServerPort,		      auxCN->checkpointScheduler, auxCN->checkpointSchedulerPort,		      theJob.v2tmp, auxCN->ipAddress,		      theJob.dispatcherIP, theJob.dispatcherPort,		      "", /*restartFlag*/		      theJob.debugCommand,		      theJob.progCmd);      siz = strlen(commandLine);      MPI_Send(&siz, 1, MPI_INT, remoteRank, remoteRank, MPI_COMM_WORLD);      MPI_Send(commandLine, siz, MPI_CHAR, remoteRank, remoteRank, MPI_COMM_WORLD);      remoteRank++;      auxCN = auxCN->next;    }    /* Now we wait for the connexions from the nodes */    pid = (pid_t *)malloc(theJob.nprocs * sizeof(pid_t));    for (i=0; i < theJob.nprocs; i++) {      if( (acceptSocket = accept(listenSocket, (struct sockaddr *)&pin, &addrlen)) < 0) {	printf("Could not accept socket connection from client\n");      } else {	read(acceptSocket, &rank, sizeof(int));	rank = ntohl(rank);	read(acceptSocket, &pid1, sizeof(pid_t));	pid[rank] = ntohl(pid1);	addConnectedNode(rank, acceptSocket);	/* We send the node list... */	write(acceptSocket, theJob.nodeListArray, theJob.nprocs*sizeof(struct sockaddr_in));      }    }    /* Now we just need to wait for the finalize signal to end up everything */    currentConnected = (Connected *)malloc(sizeof(Connected));    while (connectedNodes != NULL) {      FD_ZERO(&setOfCN);      currentConnected = connectedNodes;      auxCN = theJob.nodeList;      while (currentConnected != NULL) {	auxCN->pid = pid[auxCN->rank];	FD_SET(currentConnected->socket, &setOfCN);	currentConnected = currentConnected->next;	auxCN = auxCN->next;      }      if (finalizing) {	if ((ret = select(FD_SETSIZE, &setOfCN, NULL, NULL, selectTimeOut)) < 0) {          fprintf(stdout, "Error in select: irrecoverable error\n");	  fflush(stdout);	  MPI_Abort(MPI_COMM_WORLD, 1);	}      } else {	if ((ret = select(FD_SETSIZE, &setOfCN, NULL, NULL, NULL)) < 0) {          fprintf(stdout, "Error in select: irrecoverable error\n");	  fflush(stdout);	  MPI_Abort(MPI_COMM_WORLD, 1);	}      }      currentConnected = connectedNodes;      while (currentConnected != NULL) {	if (FD_ISSET(currentConnected->socket, &setOfCN)) {          break;	}	currentConnected = currentConnected->next;      }      if (currentConnected != NULL) {        if (read(currentConnected->socket, &msg, sizeof(int)) == 0) {          fprintf(stdout, "Disconnection detected, but not handled in that version!\n");	  fflush(stdout);	  MPI_Abort(MPI_COMM_WORLD, 1);	} else {          if (msg == FINALIZE_MSG) {	    finalizing = true;	    removeConnectedNode(currentConnected);	  }	}      }    }    sprintf(finishedFile, "finished.%s", userCommandLine);    if (fd = fopen(finishedFile, "w")) {      fprintf(fd, "Fini!\n");      fclose(fd);    }    fprintf(stdout, "Everything seems to be finished: we may proceed to the ugly abort\n");    fflush(stdout);    MPI_Abort(MPI_COMM_WORLD, 1);  } else { /* The work done by the other processes but 0 */    MPI_Recv(&siz, 1, MPI_INT, 0, rank, MPI_COMM_WORLD, status);    MPI_Recv(commandLine, siz, MPI_CHAR, 0, rank, MPI_COMM_WORLD, status);    if (rank != 1)      system(commandLine);    else { /* rank 1 will be the first EL but also the SC */      if ((forked = fork()) != -1) {        if (forked == 0) {          system(commandLine);	} else {          MPI_Recv(&siz, 1, MPI_INT, 0, rank, MPI_COMM_WORLD, status);          MPI_Recv(commandLine1, siz, MPI_CHAR, 0, rank, MPI_COMM_WORLD, status);          system(commandLine1);	}      } else {        printf("ERROR(1): Could not perform fork!\n");	MPI_Abort(MPI_COMM_WORLD, 1);      }    }  }  MPI_Barrier(MPI_COMM_WORLD);  MPI_Finalize();  return (0);}Connected * connectedNodesTail = (Connected *)NULL;void addConnectedNode(int rank, int socket) {  Connected * new;  new = (Connected *)malloc(sizeof(Connected));  new->rank = rank;  new->socket = socket;  fprintf(stdout, "on ajoute %d [%d]\n", rank, socket);  fflush(stdout);  if (connectedNodes == NULL) {    connectedNodes = new;    connectedNodesTail = (Connected *)malloc(sizeof(Connected));  }  connectedNodesTail->next = new;  new->next = NULL;  connectedNodesTail = new;}void removeConnectedNode(Connected * toRemove) {  Connected * cc;  Connected * tmp;  cc = connectedNodes;  tmp = toRemove;  fprintf(stdout,"on enleve %d [%d]\n", toRemove->rank, toRemove->socket);  fflush(stdout);  if (tmp == cc) {    connectedNodes = connectedNodes->next;    if (connectedNodesTail == tmp)      connectedNodesTail = connectedNodesTail->next;  } else {    while (cc->next != toRemove)      cc = cc->next;    cc->next = tmp->next;    if (connectedNodesTail == tmp)      connectedNodesTail = cc;  }  free(tmp);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -