⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 originempirun.vrun.c

📁 Path MPICH-V for MPICH the MPI Implementation
💻 C
📖 第 1 页 / 共 2 页
字号:
/*  MPICH-V  Copyright (C) 2002, 2003 Groupe Cluster et Grid, LRI, Universite de Paris Sud  This file is part of MPICH-V.  MPICH-V is free software; you can redistribute it and/or modify  it under the terms of the GNU General Public License as published by  the Free Software Foundation; either version 2 of the License, or  (at your option) any later version.  MPICH-V is distributed in the hope that it will be useful,  but WITHOUT ANY WARRANTY; without even the implied warranty of  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the  GNU General Public License for more details.  You should have received a copy of the GNU General Public License  along with MPICH-V; if not, write to the Free Software  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA  $Id: originempirun.vrun.c,v 1.1 2004/06/17 17:35:26 bouziane Exp $*//*  This program takes two arguments : a v2pgfile and a job_id*//*  This program takes the filename of a v2pgfile as input.  The format is:  * The command lines (v2pgfile.commands)  - rshcmd=<rsh command>  - elcmd=<event logger command>  - cscmd=<checkpoint server command>  - prog=<program + arguments>  * Then the machines (v2pgfile)  - For the EL servers:  EL eventLoggerIP portEL  - For the CS servers:  CS checkpointServerIP portCS tmpCS  - For the computing nodes:  CN MPIrank nodeHostName nodeIP portCommunication associatedELIP portEL associatedCSIP portCS*/#include <stdio.h>#include <stdlib.h>#include <sys/time.h>#include <time.h>#include <sys/types.h>#include <sys/stat.h>#include <unistd.h>#include <netdb.h>#include <netinet/in.h>#include <netinet/tcp.h>#include <unistd.h>#include <getopt.h>#include <string.h>#include <dirent.h>#include <fcntl.h>#include "config.h"#include "v2run.h"#include "v2run_msg.h"#include "debug.h"#include "vrun_protocol.h"#define MAX_LISTEN_WAIT 10static char *options = "f:g:p:d:";void printUsage() {  printf("Usage: mpirun.vrun -f <v2pgfile> -g <job_id> -p <communication_port> -debug <debug> [-hpnetwork <IP translation file>]\n");  printf("  <v2pgfile>: the program file containing the EL/CS/Nodes specifications\n");  printf("  <job_id>: the Job unique Identifier\n");  printf("  <communication_port>: the port to which the event logger nodes will connect\n");  printf("  -hpnetwork can be used to use IP over a high performance network such as myrinet. The IP translation file gives the correspondance between the normal addresses and the 'HPN' addresses\n");}/*  These are read by the program in the 'el*file's*/#define BUFF_SIZE 16000boolean keepXWFile;boolean logging;#define PORT_DISPATCHER 9800/*  The structures containing the data of the program*/int totalAM;char ** ipList = (char **)NULL;int ipNb = 0;PL ** pidList;PL ** pidListTail;static int printonly = 0;void cleanELCS(JS *);/*  Now come some functions enabling work on the linked list of available machines*/AM * firstAM;AM * lastAM;void initializeAMList() {  firstAM = (AM *)NULL;  lastAM  = (AM *)NULL;}AM * initializeAM(char * host, char * ip, int np, int elp, int csp, int dc) {  AM * nam;  nam = (AM *)calloc(1, sizeof(AM));  strcpy(nam->hostName, host);   strcpy(nam->ipAddress, ip);  nam->nprocs = np;  nam->eventLoggerPort = elp;  nam->checkpointServerPort = csp;  nam->dualCapabilities = dc;  return nam;}void addAvailableMachine(AM * newAM) {  if (firstAM == NULL) {    lastAM = (AM *)calloc(1,sizeof(AM));    firstAM = newAM;  }  lastAM->next = newAM;  newAM->next  = NULL;  lastAM = newAM;}void useAvailableMachine() {  if (firstAM->nprocs == 1)    firstAM = firstAM->next;  else    firstAM->nprocs --;}void parseAvailableFile(char * availFilePath) {  FILE * fd;  char line[BUFF_SIZE];  char hostName[BUFF_SIZE];  char ip[IP_LENGTH];  int  nprocs, port1, port2, dc;  AM * newAM;  initializeAMList();  if ((fd = fopen(availFilePath, "r")) != NULL ) {    while (fgets(line, BUFF_SIZE, fd) ) {      sscanf(line, "%s %s %d %d %d %d", 	     hostName, 	     ip, 	     &nprocs, 	     &port1,	     &port2,	     &dc);      newAM = initializeAM(hostName, ip, nprocs, port1, port2, dc);      addAvailableMachine(newAM);    }    fclose(fd);  }}/*  If rank <rank> has been disconnected, we relaunch it*/void relaunchByRank(int rank, JS * js, boolean moving) {  int i;  char cl[BUFF_SIZE];  char log [BUFF_SIZE];  CN * currentNode;  currentNode = js->nodeList;  while (currentNode->rank != rank)     currentNode = currentNode->next;  /*    First, we look if there are nodes available  */  /*if (firstAM != NULL) {    sprintf(log, "host %s running rank %d crashed, replaced by host %s", js->nodeList->hostName, rank, firstAM->hostName);    v2logMessage(log);        strcpy(js->nodeList->hostName,firstAM->hostName);    useAvailableMachine(); */    nodeCommandLine(cl, RESTART, js, *currentNode);    /*sprintf(cl, "%s &", cl);*/  /*  if (!moving)      sleep(20);*//*    fflush(stdout);*/    if (js->testOnly)      printf("%s\n", cl);    else {      if (fork() == 0) {        system(cl);	_exit(0);      }    }    /*   } else {    printf("Disconnection detected, but no more nodes available\n");    return;  }  */}boolean checkArguments(int argc, char * argv[], JS * js) {  boolean b;  int c;  static struct option long_options[] = {    {"debug", required_argument, NULL, 'd'},     {"debugfile", required_argument, NULL, 'b'},     {"checkpointing", required_argument, NULL, 'c'},    {"no-auto-launch", no_argument, NULL, 'a'},    {"ckptlocal", no_argument, NULL, 'l'},    //    {"simultaneous-jobs",required_argument, NULL, 's'},    {"no-ckpt-server",no_argument, NULL, 'k'},    {"msc",required_argument, NULL, 'm'}         };  js->debugFile[0] = 0;  js->checkpointFrequency = 0;  js->autoLaunch = true;  js->localckpt = false;  //  js->nbrJobSimultaneous = -1;  js->noCkptServer = 0;  js->masterString = NULL;  if (argc < 7) {    printf("%s error: not enough arguments\n", argv[0]);    printUsage();    b = false;  } else {     b = true;  while ((c = getopt_long_only(argc, argv, options, long_options, NULL)) != -1) {     switch(c) {	case 'f':	  strcpy(js->v2pgFile, optarg);	  break;	case 'g':	  js->jobId = atoi(optarg); 	  break;	case 'p':	  js->dispatcherPort = atoi(optarg);	  break;	case 'd':	  strcpy(js->debugCommand, optarg);	  break;	case 'b':	  strcpy(js->debugFile, optarg);	  break;	case 'c':	  js->checkpointFrequency = atoi(optarg);	  break;        case 'a':	  js->autoLaunch = false;	  break;        case 'l':          js->localckpt = true;          break;          /* 	case 's':	  js->nbrJobSimultaneous = atoi(optarg);	  break;          */         	case 'k':	  js->noCkptServer = 1; 	  break;        case 'm':          {                int ip1, ip2,ip3,ip4, port;            js->masterString= strdup(optarg);            sscanf(js->masterString, "%d.%d.%d.%d:%d", &ip1, &ip2, &ip3, &ip4, &port);            sprintf(js->masterIP, "%d.%d.%d.%d", ip1, ip2, ip3, ip4);            js->masterPort= port;          }            break;       	default:	  b = false;     }  }  }  return b;}#define HOST_NAME_SIZE 256Connected * connectedNodes = (Connected *)NULL;Connected * connectedNodesTail = (Connected *)NULL;void addConnectedNode(int rank, int socket) {  Connected * new;  new = (Connected *)malloc(sizeof(Connected));  new->rank = rank;  new->socket = socket;  new->finalized = false;  if(connectedNodes == NULL) {    connectedNodes = new;    connectedNodesTail = (Connected *)malloc(sizeof(Connected));  }  connectedNodesTail->next = new;  new->next = NULL;  connectedNodesTail = new;}void removeConnectedNode(Connected * toRemove) {  Connected * cc;  Connected * tmp;  cc = connectedNodes;  tmp = toRemove;  if (tmp == cc) {    connectedNodes = connectedNodes->next;    if (connectedNodesTail == tmp)      connectedNodesTail = connectedNodesTail->next;  } else {    while (cc->next != toRemove)      cc = cc->next;    cc->next = tmp->next;    if (connectedNodesTail == tmp)      connectedNodesTail = cc;  }  free(tmp);}static void check_sync_finalize(void){  Connected *currentConnected;  int ack = 1;  for( currentConnected = connectedNodes;        currentConnected;        currentConnected=currentConnected->next)    {      if(currentConnected->finalized != true)	return;    }  for( currentConnected = connectedNodes;        currentConnected;        currentConnected=currentConnected->next)    {      write(currentConnected->socket, &ack, sizeof(int));      removeConnectedNode(currentConnected);    }  /*@todo mettre a null le job courrant puis lancer le job suivant*/}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -