⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mpirun_protocol.c

📁 Path MPICH-V for MPICH the MPI Implementation
💻 C
字号:
/*  MPICH-V  Copyright (C) 2002, 2003,2004 Groupe Cluster et Grid, LRI, Universite de Paris Sud  This file is part of MPICH-V.  MPICH-V is free software; you can redistribute it and/or modify  it under the terms of the GNU General Public License as published by  the Free Software Foundation; either version 2 of the License, or  (at your option) any later version.  MPICH-V is distributed in the hope that it will be useful,  but WITHOUT ANY WARRANTY; without even the implied warranty of  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the  GNU General Public License for more details.  You should have received a copy of the GNU General Public License  along with MPICH-V; if not, write to the Free Software  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA  $Id: mpirun_protocol.c,v 1.9 2004/04/06 16:27:19 bouteill Exp $*/#include <stdio.h>#include <unistd.h>#include <stdlib.h>#include <string.h>#include <sys/types.h>#include <sys/socket.h>#include "v2run.h"#include "vrun_protocol.h"#include "debug.h"#define MAX_MSG 10#define BUFF_SIZE 4096/* This file contains the protocol specific actions */void launchAuxiliaryPrograms(JS * js, int *nbAuxTotal) {  char commandLine[BUFF_SIZE];  char log[BUFF_SIZE];  CS * auxCS;  EL * auxEL;  SC * auxSC;  *nbAuxTotal = 0;  /*    We launch the EL  */  //  auxEL = js->elList;  // while (auxEL != NULL) {  //  if (strlen(auxEL->debugString) == 0) {  //    sprintf(commandLine,"%s %s %s -p %d -np %d -g %d -auxid %d -dispatcher %s:%d -debug %s &",  //    js->rshCmd,  //auxEL->ipAddress,  //js->elCmd,  //auxEL->port,  //js->nprocs,  //js->jobId,  //nbAuxTotal,  //js->dispatcherIP, js->dispatcherPort,  //js->debugCommand);  // } else {  //   sprintf(commandLine,"%s %s %s -p %d -np %d -g %d -auxid %d -dispatcher %s:%d %s &",  //    js->rshCmd,  //auxEL->ipAddress,  //js->elCmd,  //auxEL->port,  //js->nprocs,  //js->jobId,  //nbAuxTotal,  //js->dispatcherIP, js->dispatcherPort,  //auxEL->debugString);  // }  // sprintf(log, "launching event logger server on machine %s",auxEL->ipAddress);  // v2logMessage(log);  // if (js->testOnly || (strlen(auxEL->debugString)!=0))  //   printf("%s\n",commandLine);  //  // if (js->testOnly) {}   //else {   //  if (auxEL->autoLaunch)  //    system(commandLine);  //   else  // printf("#*** You need to launch the following command:\n%s\n",commandLine);  // }  // nbAuxTotal ++;  // auxEL = auxEL->next;  // }  /*    Then the CS  */  auxCS = js->csList;  while (auxCS != NULL) {     if (strlen(auxCS->debugString) == 0) {       sprintf(commandLine, "%s %s %s -g %d -p %d -d %s -i %s -auxid %d -dispatcher %s:%d &",	js->rshCmd,	auxCS->ipAddress,	js->csCmd,	js->jobId,	auxCS->port,	js->debugCommand,	auxCS->tmp,	*nbAuxTotal,	js->dispatcherIP,	js->dispatcherPort);     } else {       sprintf(commandLine, "%s %s %s -g %d -p %d %s -i %s -auxid %d -dispatcher %s:%d &",	js->rshCmd,	auxCS->ipAddress,	js->csCmd,	js->jobId,	auxCS->port,	auxCS->debugString,	auxCS->tmp,	*nbAuxTotal,	js->dispatcherIP,	js->dispatcherPort);     }     sprintf(log, "launching checkpoint server on machine %s", auxCS->ipAddress);     v2logMessage(log);     if (js->testOnly || (strlen(auxCS->debugString) != 0))       printf("%s\n", commandLine);     if (!js->testOnly) {       if (auxCS->autoLaunch)         system(commandLine);       else	 printf("#*** You need to launch the following command:\n%s\n",commandLine);     }     (*nbAuxTotal)++;     auxCS = auxCS->next;  }  /*    Then the SC  */  auxSC = js->scList;  while (auxSC != NULL) {    if (strlen(auxSC->debugString) == 0) {      if (js->checkpointFrequency == 0) {        sprintf(commandLine, "%s %s %s %d %d %d %d %s:%d %s -nockpt 2> /dev/null &",          js->rshCmd,	  auxSC->ipAddress, 	  js->scCmd,	  auxSC->port,	  js->nprocs,	  js->jobId,	  *nbAuxTotal,	  js->dispatcherIP, js->dispatcherPort,	  js->debugCommand);       } else {        sprintf(commandLine, "%s %s %s %d %d %d %d %s:%d %s -t %d 2> /dev/null &",          js->rshCmd,	  auxSC->ipAddress, 	  js->scCmd,	  auxSC->port,	  js->nprocs,	  js->jobId,	  *nbAuxTotal,	  js->dispatcherIP, js->dispatcherPort,	  js->debugCommand,	  js->checkpointFrequency);       }     } else {       if (js->checkpointFrequency == 0) {         sprintf(commandLine, "%s %s %s %d %d %d %d %s:%d %s -nockpt 2> /dev/null &",           js->rshCmd,	   auxSC->ipAddress, 	   js->scCmd,	   auxSC->port,	   js->nprocs,	   js->jobId,	   *nbAuxTotal,	   js->dispatcherIP, js->dispatcherPort,	   auxSC->debugString);       } else {         sprintf(commandLine, "%s %s %s %d %d %d %d %s:%d %s -t %d 2> /dev/null &",           js->rshCmd,	   auxSC->ipAddress, 	   js->scCmd,	   auxSC->port,	   js->nprocs,	   js->jobId,	   *nbAuxTotal,	   js->dispatcherIP, js->dispatcherPort,	   auxSC->debugString,	   js->checkpointFrequency);       }     }     sprintf(log, "launching checkpoint scheduler on machine %s", auxSC->ipAddress);     v2logMessage(log);     if (js->testOnly || (strlen(auxSC->debugString) != 0))       printf("%s\n", commandLine);     if (!js->testOnly) {       if (auxSC->autoLaunch)         system(commandLine);       else	 printf("#*** You need to launch the following command:\n%s\n",commandLine);     }     (*nbAuxTotal)++;     auxSC = auxSC->next;  }}void waitConnectionFromAuxiliaries(JS * js, int nbAuxTotal, pid_t *auxiliariesPid, int listenSocket) {  int nbAuxiliaries;  int acceptSocket;  int auxId;  struct sockaddr_in pin;  int addrlen;  pid_t pid1;  int i;  int ipNb;  CS * auxCS;  EL * auxEL;  SC * auxSC;  if (strlen(js->debugFile) != 0)    printf("Waiting for auxiliaries to be launched before launching nodes\n");  nbAuxiliaries = 0;  auxiliariesPid = (pid_t *)malloc(nbAuxTotal * sizeof(pid_t));  while (nbAuxiliaries < nbAuxTotal) {    if ( (acceptSocket = accept(listenSocket, (struct sockaddr *)&pin, &addrlen)) < 0 ) {      printe("Could not accept socket connection from auxiliary");    }  else {      read(acceptSocket, &auxId, sizeof(int));      auxId = ntohl(auxId);      read(acceptSocket, &pid1, sizeof(pid_t));      auxiliariesPid[auxId] = ntohl(pid1);      close(acceptSocket);      nbAuxiliaries ++;    }  }  /* Now that all the auxiliaries have given their IPs, time to put these IPs in the correct place */  i = 0;  auxEL = js->elList;  while (auxEL != NULL) {    auxEL->pid = auxiliariesPid[i];    auxEL = auxEL->next;    i++;  }  auxCS = js->csList;  while (auxCS != NULL) {    auxCS->pid = auxiliariesPid[i];    auxCS = auxCS->next;    i++;  }  auxSC = js->scList;  while (auxSC != NULL) {    auxSC->pid = auxiliariesPid[i];    auxSC = auxSC->next;    i++;  }  /* We do not need the IP used by the auxiliary servers at this point. Let's clean *//*  for (i = 0; i < ipNb; i++) {     free(ipList[i]);  }  free(ipList);  ipNb = 0;  */  if (strlen(js->debugFile) != 0)    printf("All auxiliaries have been identified: nodes can now be launched\n");}void launchComputingNodes(JS * js) {  CN * auxCN;  char commandLine[BUFF_SIZE];  char log[BUFF_SIZE];  pid_t forked;  auxCN = js->nodeList;  while(auxCN != NULL) {    nodeCommandLine(commandLine, CHECKPOINT, js, *auxCN);    sprintf(log, "launching rank %d on host %s", auxCN->rank, auxCN->hostName);    v2logMessage(log);    if (js->testOnly || (strcmp(auxCN->debugString, "") != 0))      printf("%s\n", commandLine);    if (!js->testOnly) {      if (auxCN->autoLaunch) {        if ((forked = fork()) != -1) {          if (forked == 0) {            system(commandLine);            _exit(0);          }        } else {          printf("Error: could not fork(): aborting\n");          exit(1);        }      } else	printf("#*** You need to launch the following command:\n%s &\n", commandLine);    }    auxCN = auxCN->next;  }}void waitConnectionFromComputingNodes(JS * js, int listenSocket, pid_t *pid) {  int acceptSocket;  int rank;  struct sockaddr_in pin;  int addrlen;  pid_t pid1;  int i;  for (i = 0; i< js->nprocs; i++) {    if ( (acceptSocket = accept(listenSocket, (struct sockaddr *)&pin, &addrlen)) < 0 ) {      printf("Could not accept socket connection from client\n");    }  else {      read(acceptSocket, &rank, sizeof(int));      rank = ntohl(rank);      read(acceptSocket, &pid1, sizeof(pid_t));      pid[rank] = ntohl(pid1);      addConnectedNode(rank, acceptSocket);      /*        We then send the node list to that node, with the following form:	int[5]: ip1.ip2.ip3.ip4:port -> ip1ip2ip3ip4port      */      write(acceptSocket, js->nodeListArray, js->nprocs*sizeof(struct sockaddr_in));      /*close(acceptSocket);*/    }  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -