📄 mpirun_protocol.c
字号:
/* MPICH-V Copyright (C) 2002, 2003,2004 Groupe Cluster et Grid, LRI, Universite de Paris Sud This file is part of MPICH-V. MPICH-V is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. MPICH-V is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with MPICH-V; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA $Id: mpirun_protocol.c,v 1.9 2004/04/06 16:27:19 bouteill Exp $*/#include <stdio.h>#include <unistd.h>#include <stdlib.h>#include <string.h>#include <sys/types.h>#include <sys/socket.h>#include "v2run.h"#include "vrun_protocol.h"#include "debug.h"#define MAX_MSG 10#define BUFF_SIZE 4096/* This file contains the protocol specific actions */void launchAuxiliaryPrograms(JS * js, int *nbAuxTotal) { char commandLine[BUFF_SIZE]; char log[BUFF_SIZE]; CS * auxCS; EL * auxEL; SC * auxSC; *nbAuxTotal = 0; /* We launch the EL */ // auxEL = js->elList; // while (auxEL != NULL) { // if (strlen(auxEL->debugString) == 0) { // sprintf(commandLine,"%s %s %s -p %d -np %d -g %d -auxid %d -dispatcher %s:%d -debug %s &", // js->rshCmd, //auxEL->ipAddress, //js->elCmd, //auxEL->port, //js->nprocs, //js->jobId, //nbAuxTotal, //js->dispatcherIP, js->dispatcherPort, //js->debugCommand); // } else { // sprintf(commandLine,"%s %s %s -p %d -np %d -g %d -auxid %d -dispatcher %s:%d %s &", // js->rshCmd, //auxEL->ipAddress, //js->elCmd, //auxEL->port, //js->nprocs, //js->jobId, //nbAuxTotal, //js->dispatcherIP, js->dispatcherPort, //auxEL->debugString); // } // sprintf(log, "launching event logger server on machine %s",auxEL->ipAddress); // v2logMessage(log); // if (js->testOnly || (strlen(auxEL->debugString)!=0)) // printf("%s\n",commandLine); // // if (js->testOnly) {} //else { // if (auxEL->autoLaunch) // system(commandLine); // else // printf("#*** You need to launch the following command:\n%s\n",commandLine); // } // nbAuxTotal ++; // auxEL = auxEL->next; // } /* Then the CS */ auxCS = js->csList; while (auxCS != NULL) { if (strlen(auxCS->debugString) == 0) { sprintf(commandLine, "%s %s %s -g %d -p %d -d %s -i %s -auxid %d -dispatcher %s:%d &", js->rshCmd, auxCS->ipAddress, js->csCmd, js->jobId, auxCS->port, js->debugCommand, auxCS->tmp, *nbAuxTotal, js->dispatcherIP, js->dispatcherPort); } else { sprintf(commandLine, "%s %s %s -g %d -p %d %s -i %s -auxid %d -dispatcher %s:%d &", js->rshCmd, auxCS->ipAddress, js->csCmd, js->jobId, auxCS->port, auxCS->debugString, auxCS->tmp, *nbAuxTotal, js->dispatcherIP, js->dispatcherPort); } sprintf(log, "launching checkpoint server on machine %s", auxCS->ipAddress); v2logMessage(log); if (js->testOnly || (strlen(auxCS->debugString) != 0)) printf("%s\n", commandLine); if (!js->testOnly) { if (auxCS->autoLaunch) system(commandLine); else printf("#*** You need to launch the following command:\n%s\n",commandLine); } (*nbAuxTotal)++; auxCS = auxCS->next; } /* Then the SC */ auxSC = js->scList; while (auxSC != NULL) { if (strlen(auxSC->debugString) == 0) { if (js->checkpointFrequency == 0) { sprintf(commandLine, "%s %s %s %d %d %d %d %s:%d %s -nockpt 2> /dev/null &", js->rshCmd, auxSC->ipAddress, js->scCmd, auxSC->port, js->nprocs, js->jobId, *nbAuxTotal, js->dispatcherIP, js->dispatcherPort, js->debugCommand); } else { sprintf(commandLine, "%s %s %s %d %d %d %d %s:%d %s -t %d 2> /dev/null &", js->rshCmd, auxSC->ipAddress, js->scCmd, auxSC->port, js->nprocs, js->jobId, *nbAuxTotal, js->dispatcherIP, js->dispatcherPort, js->debugCommand, js->checkpointFrequency); } } else { if (js->checkpointFrequency == 0) { sprintf(commandLine, "%s %s %s %d %d %d %d %s:%d %s -nockpt 2> /dev/null &", js->rshCmd, auxSC->ipAddress, js->scCmd, auxSC->port, js->nprocs, js->jobId, *nbAuxTotal, js->dispatcherIP, js->dispatcherPort, auxSC->debugString); } else { sprintf(commandLine, "%s %s %s %d %d %d %d %s:%d %s -t %d 2> /dev/null &", js->rshCmd, auxSC->ipAddress, js->scCmd, auxSC->port, js->nprocs, js->jobId, *nbAuxTotal, js->dispatcherIP, js->dispatcherPort, auxSC->debugString, js->checkpointFrequency); } } sprintf(log, "launching checkpoint scheduler on machine %s", auxSC->ipAddress); v2logMessage(log); if (js->testOnly || (strlen(auxSC->debugString) != 0)) printf("%s\n", commandLine); if (!js->testOnly) { if (auxSC->autoLaunch) system(commandLine); else printf("#*** You need to launch the following command:\n%s\n",commandLine); } (*nbAuxTotal)++; auxSC = auxSC->next; }}void waitConnectionFromAuxiliaries(JS * js, int nbAuxTotal, pid_t *auxiliariesPid, int listenSocket) { int nbAuxiliaries; int acceptSocket; int auxId; struct sockaddr_in pin; int addrlen; pid_t pid1; int i; int ipNb; CS * auxCS; EL * auxEL; SC * auxSC; if (strlen(js->debugFile) != 0) printf("Waiting for auxiliaries to be launched before launching nodes\n"); nbAuxiliaries = 0; auxiliariesPid = (pid_t *)malloc(nbAuxTotal * sizeof(pid_t)); while (nbAuxiliaries < nbAuxTotal) { if ( (acceptSocket = accept(listenSocket, (struct sockaddr *)&pin, &addrlen)) < 0 ) { printe("Could not accept socket connection from auxiliary"); } else { read(acceptSocket, &auxId, sizeof(int)); auxId = ntohl(auxId); read(acceptSocket, &pid1, sizeof(pid_t)); auxiliariesPid[auxId] = ntohl(pid1); close(acceptSocket); nbAuxiliaries ++; } } /* Now that all the auxiliaries have given their IPs, time to put these IPs in the correct place */ i = 0; auxEL = js->elList; while (auxEL != NULL) { auxEL->pid = auxiliariesPid[i]; auxEL = auxEL->next; i++; } auxCS = js->csList; while (auxCS != NULL) { auxCS->pid = auxiliariesPid[i]; auxCS = auxCS->next; i++; } auxSC = js->scList; while (auxSC != NULL) { auxSC->pid = auxiliariesPid[i]; auxSC = auxSC->next; i++; } /* We do not need the IP used by the auxiliary servers at this point. Let's clean *//* for (i = 0; i < ipNb; i++) { free(ipList[i]); } free(ipList); ipNb = 0; */ if (strlen(js->debugFile) != 0) printf("All auxiliaries have been identified: nodes can now be launched\n");}void launchComputingNodes(JS * js) { CN * auxCN; char commandLine[BUFF_SIZE]; char log[BUFF_SIZE]; pid_t forked; auxCN = js->nodeList; while(auxCN != NULL) { nodeCommandLine(commandLine, CHECKPOINT, js, *auxCN); sprintf(log, "launching rank %d on host %s", auxCN->rank, auxCN->hostName); v2logMessage(log); if (js->testOnly || (strcmp(auxCN->debugString, "") != 0)) printf("%s\n", commandLine); if (!js->testOnly) { if (auxCN->autoLaunch) { if ((forked = fork()) != -1) { if (forked == 0) { system(commandLine); _exit(0); } } else { printf("Error: could not fork(): aborting\n"); exit(1); } } else printf("#*** You need to launch the following command:\n%s &\n", commandLine); } auxCN = auxCN->next; }}void waitConnectionFromComputingNodes(JS * js, int listenSocket, pid_t *pid) { int acceptSocket; int rank; struct sockaddr_in pin; int addrlen; pid_t pid1; int i; for (i = 0; i< js->nprocs; i++) { if ( (acceptSocket = accept(listenSocket, (struct sockaddr *)&pin, &addrlen)) < 0 ) { printf("Could not accept socket connection from client\n"); } else { read(acceptSocket, &rank, sizeof(int)); rank = ntohl(rank); read(acceptSocket, &pid1, sizeof(pid_t)); pid[rank] = ntohl(pid1); addConnectedNode(rank, acceptSocket); /* We then send the node list to that node, with the following form: int[5]: ip1.ip2.ip3.ip4:port -> ip1ip2ip3ip4port */ write(acceptSocket, js->nodeListArray, js->nprocs*sizeof(struct sockaddr_in)); /*close(acceptSocket);*/ } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -