⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mpirun.v2run.c

📁 Path MPICH-V for MPICH the MPI Implementation
💻 C
📖 第 1 页 / 共 2 页
字号:
/*  MPICH-V  Copyright (C) 2002, 2003 Groupe Cluster et Grid, LRI, Universite de Paris Sud  This file is part of MPICH-V.  MPICH-V is free software; you can redistribute it and/or modify  it under the terms of the GNU General Public License as published by  the Free Software Foundation; either version 2 of the License, or  (at your option) any later version.  MPICH-V is distributed in the hope that it will be useful,  but WITHOUT ANY WARRANTY; without even the implied warranty of  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the  GNU General Public License for more details.  You should have received a copy of the GNU General Public License  along with MPICH-V; if not, write to the Free Software  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA  $Id: mpirun.v2run.c,v 1.9 2004/05/17 13:40:14 collin Exp $*//*  This program takes two arguments : a v2pgfile and a job_id*//*  This program takes the filename of a v2pgfile as input.  The format is:  * The command lines (v2pgfile.commands)  - rshcmd=<rsh command>  - elcmd=<event logger command>  - cscmd=<checkpoint server command>  - prog=<program + arguments>  * Then the machines (v2pgfile)  - For the EL servers:  EL eventLoggerIP portEL  - For the CS servers:  CS checkpointServerIP portCS tmpCS  - For the computing nodes:  CN MPIrank nodeHostName nodeIP portCommunication associatedELIP portEL associatedCSIP portCS*/#include <stdio.h>#include <stdlib.h>#include <sys/time.h>#include <time.h>#include <netdb.h>#include <netinet/in.h>#include <netinet/tcp.h>#include <unistd.h>#include <getopt.h>#include <string.h>#include <dirent.h>#include <fcntl.h>#include "config.h"#include "v2run.h"#include "v2run_msg.h"#include "debug.h"#define MAX_LISTEN_WAIT 10static char *options = "f:g:p:d:";void printUsage() {  printf("Usage: mpirun.v2run -f <v2pgfile> -g <job_id> -p <communication_port> -debug <debug> [-hpnetwork <IP translation file>]\n");  printf("  <v2pgfile>: the program file containing the EL/CS/Nodes specifications\n");  printf("  <job_id>: the Job unique Identifier\n");  printf("  <communication_port>: the port to which the event logger nodes will connect\n");  printf("  -hpnetwork can be used to use IP over a high performance network such as myrinet. The IP translation file gives the correspondance between the normal addresses and the 'HPN' addresses\n");}/*  These are read by the program in the 'el*file's*/#define CMD_LENGTH 256boolean testOnly;boolean keepXWFile;boolean logging;#define PORT_DISPATCHER 9800/*  The structures containing the data of the program*/int totalAM;char ** ipList = (char **)NULL;int ipNb = 0;PL ** pidList;PL ** pidListTail;static int printonly = 0;void cleanELCS(JS *);/*  Now come some functions enabling work on the linked list of available machines*/AM * firstAM;AM * lastAM;void initializeAMList() {  firstAM = (AM *)NULL;  lastAM  = (AM *)NULL;}AM * initializeAM(char * host, char * ip, int np, int elp, int csp, int dc) {  AM * nam;  nam = (AM *)calloc(1, sizeof(AM));  strcpy(nam->hostName, host);   strcpy(nam->ipAddress, ip);  nam->nprocs = np;  nam->eventLoggerPort = elp;  nam->checkpointServerPort = csp;  nam->dualCapabilities = dc;  return nam;}void addAvailableMachine(AM * newAM) {  if (firstAM == NULL) {    lastAM = (AM *)calloc(1,sizeof(AM));    firstAM = newAM;  }  lastAM->next = newAM;  newAM->next  = NULL;  lastAM = newAM;}void useAvailableMachine() {  if (firstAM->nprocs == 1)    firstAM = firstAM->next;  else    firstAM->nprocs --;}void parseAvailableFile(char * availFilePath) {  FILE * fd;  char line[4096];  char hostName[128];  char ip[15];  int  nprocs, port1, port2, dc;  AM * newAM;  initializeAMList();  if ((fd = fopen(availFilePath, "r")) != NULL ) {    while (fgets(line, 4096, fd) ) {      sscanf(line, "%s %s %d %d %d %d", 	     hostName, 	     ip, 	     &nprocs, 	     &port1,	     &port2,	     &dc);      newAM = initializeAM(hostName, ip, nprocs, port1, port2, dc);      addAvailableMachine(newAM);    }    fclose(fd);  }}/*  If rank <rank> has been disconnected, we relaunch it*/void relaunchByRank(int rank, JS * js, boolean moving) {  int i;  char * cl;  char log [1024];  CN * currentNode;  currentNode = js->nodeList;  while (currentNode->rank != rank)     currentNode = currentNode->next;  /*    First, we look if there are nodes available  */  /*if (firstAM != NULL) {    sprintf(log, "host %s running rank %d crashed, replaced by host %s", js->nodeList->hostName, rank, firstAM->hostName);    v2logMessage(log);        strcpy(js->nodeList->hostName,firstAM->hostName);    useAvailableMachine(); */    cl = (char *)calloc(4096, sizeof(char));    nodeCommandLine(&cl, RESTART, js, *currentNode);    /*sprintf(cl, "%s &", cl);*/  /*  if (!moving)      sleep(20);*/    fflush(stdout);    if (testOnly)      printf("%s\n", cl);    else {      if (fork() == 0) {        system(cl);	_exit(0);      }    }    free(cl);   /*   } else {    printf("Disconnection detected, but no more nodes available\n");    return;  }  */}boolean checkArguments(int argc, char * argv[], JS * js1) {  boolean b;  int c;  JS * js;  static struct option long_options[] = {    {"debug", required_argument, NULL, 'd'},     {"debugfile", required_argument, NULL, 'b'},     {"checkpointing", required_argument, NULL, 'c'},    {"no-auto-launch", no_argument, NULL, 'a'},    {"slowEl", no_argument, NULL, 'e'}  };  js = (JS *)malloc(sizeof(JS));  strcpy(js->debugFile, "");  js->checkpointFrequency = 0;  js->autoLaunch = true;  js->slowEl = false;  if (argc < 7) {    printf("%s error: not enough arguments\n", argv[0]);    printUsage();    b = false;  } else {     b = true;  while ((c = getopt_long_only(argc, argv, options, long_options, NULL)) != -1) {     switch(c) {	case 'f':	  strcpy(js->v2pgFile, optarg);	  break;	case 'g':	  js->jobId = atoi(optarg); 	  break;	case 'p':	  js->dispatcherPort = atoi(optarg);	  break;	case 'd':	  strcpy(js->debugCommand, optarg);	  break;	case 'b':	  strcpy(js->debugFile, optarg);	  break;	case 'c':	  js->checkpointFrequency = atoi(optarg);	  break;	case 'a':	  js->autoLaunch = false;	  break;        case 'e':	  js->slowEl = true;	  break;	default:	  b = false;     }  }  }  memcpy((void *)js1, (void *)js, sizeof(JS));  return b;}#define HOST_NAME_SIZE 256Connected * connectedNodes = (Connected *)NULL;Connected * connectedNodesTail = (Connected *)NULL;void addConnectedNode(int rank, int socket) {  Connected * new;  new = (Connected *)malloc(sizeof(Connected));  new->rank = rank;  new->socket = socket;  if(connectedNodes == NULL) {    connectedNodes = new;    connectedNodesTail = (Connected *)malloc(sizeof(Connected));  }  connectedNodesTail->next = new;  new->next = NULL;  connectedNodesTail = new;}void removeConnectedNode(Connected * toRemove) {  Connected * cc;  Connected * tmp;  cc = connectedNodes;  tmp = toRemove;  if (tmp == cc) {    connectedNodes = connectedNodes->next;    if (connectedNodesTail == tmp)      connectedNodesTail = connectedNodesTail->next;  } else {    while (cc->next != toRemove)      cc = cc->next;    cc->next = tmp->next;    if (connectedNodesTail == tmp)      connectedNodesTail = cc;  }  free(tmp);}/*  The main!*/int main (int argc, char* argv[]) {  char * v2cmdfile;  char * v2availfile;   int i;  char * commandLine;  char log[1024];  char srvName[HOST_NAME_SIZE];  struct hostent * myAddress;  Connected * currentConnected;  Connected * ccAux;  fd_set setOfCN;  int fdFifo;  int ret, retListen, maxFd;  int listenSocket, acceptSocket;  struct sockaddr_in address;  struct sockaddr_in pin;  int addrlen;  char buff[256];  int rank;  int port;  int ip1, ip2, ip3, ip4, ip5, ip6, ip7, ip8;  char newIP[15];  char newFastIP[15];  char fileName[256];  char hostName[HOST_NAME_SIZE];  char line[256];  FILE * fd;  pid_t forked, pid1;  pid_t * pid;  pid_t * auxiliariesPid;  int nbAuxiliaries;  int nbAuxTotal;  int auxId;  int n;  boolean * moving;  boolean * connectedArray;  CN * auxCN;  EL * auxEL;  CS * auxCS;  SC * auxSC;  struct timeval * selectTimeout;  boolean autoRelaunch = true;   boolean finalizing = false;  /*boolean autoRelaunch = false;*/  int msg;  struct jobSpecifications theJob;  logging = true;  selectTimeout = (struct timeval *)malloc(sizeof(struct timeval));  selectTimeout->tv_sec = FINALIZE_TIMEOUT;  if (!checkArguments(argc, argv, &theJob)) {    return 1;  }      parseProgramFile(&theJob);  if (strlen(theJob.debugFile) != 0)    parseDebugFile(&theJob);  /* We get the local IP address */  if (gethostname(srvName, (size_t)HOST_NAME_SIZE) < 0) {    printf("Error: could not get hostname\n");    exit(1);  }  if((myAddress = gethostbyname(srvName)) == 0) qerror("gethostbyname");  /*printf("struct hostent {\n\                      char    *h_name;      */  /* official name of host */ /*=%s\n\                      int     h_length;      */ /* length of address */ /*=%d\n\                      char    **h_addr_list; */ /* list of addresses */ /*=%s\n\              }\n", myAddress->h_name, myAddress->h_length, myAddress->h_addr);	      */  strcpy(theJob.dispatcherIP, (char *)inet_ntoa(*(struct in_addr *)myAddress->h_addr));  v2cmdfile = (char *)calloc(strlen(theJob.v2pgFile)+9,sizeof(char));  sprintf(v2cmdfile,"%s.commands",theJob.v2pgFile);  parseCommandsFile(v2cmdfile, &theJob);  free(v2cmdfile);  v2availfile = (char *)calloc(strlen(theJob.v2pgFile)+7,sizeof(char));  sprintf(v2availfile,"%s.avail", theJob.v2pgFile);  parseAvailableFile(v2availfile);  commandLine = (char *)calloc(1024, sizeof(char));  sprintf(log, "Starting program %s", theJob.v2pgFile);  v2logMessage(log);  /* We already start listening on the connection socket in order to get the PID's from the auxiliary programs */  address.sin_family = AF_INET;  address.sin_addr.s_addr = htonl(INADDR_ANY);  address.sin_port = htons(theJob.dispatcherPort);  if ((listenSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {    printf("Socket could not be created\n");    bailout(&theJob);  }  while (bind(listenSocket, ((struct sockaddr*)&address), sizeof(struct sockaddr_in)) == -1) {    printf("Bind could not be performed: trying in 1 second\n");    sleep(1);  }  if (listen(listenSocket, MAX_LISTEN_WAIT) == -1) {    close(listenSocket);    printf("Could not listen on socket\n");    bailout(&theJob);  }  nbAuxTotal = 0;  /*    We launch the EL  */  auxEL = theJob.elList;  while (auxEL != NULL) {     if (strlen(auxEL->debugString) == 0) {       sprintf(commandLine,"%s %s %s -p %d -np %d -g %d -auxid %d -dispatcher %s:%d -debug %s &",         theJob.rshCmd,	auxEL->ipAddress,	theJob.elCmd, 	auxEL->port,	theJob.nprocs,	theJob.jobId,	nbAuxTotal,	theJob.dispatcherIP, theJob.dispatcherPort,	theJob.debugCommand);     } else {       sprintf(commandLine,"%s %s %s -p %d -np %d -g %d -auxid %d -dispatcher %s:%d %s &",         theJob.rshCmd,	auxEL->ipAddress,	theJob.elCmd, 	auxEL->port,	theJob.nprocs,	theJob.jobId,	nbAuxTotal,	theJob.dispatcherIP, theJob.dispatcherPort,	auxEL->debugString);     }     sprintf(log, "launching event logger server on machine %s",auxEL->ipAddress);     v2logMessage(log);     if (testOnly || (strlen(auxEL->debugString)!=0))       printf("%s\n",commandLine);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -