⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mpirun.v2run.c

📁 Path MPICH-V for MPICH the MPI Implementation
💻 C
📖 第 1 页 / 共 2 页
字号:
     if (testOnly) {}     else {       if (auxEL->autoLaunch)         system(commandLine);       else 	 printf("*** You need to launch the following command:\n%s\n",commandLine);     }     nbAuxTotal ++;     auxEL = auxEL->next;  }  /*    Then the CS  */  auxCS = theJob.csList;  while (auxCS != NULL) {     if (strlen(auxCS->debugString) == 0) {       sprintf(commandLine, "%s %s %s -g %d -p %d -d %s -i %s -auxid %d -dispatcher %s:%d &",	theJob.rshCmd,	auxCS->ipAddress,	theJob.csCmd,	theJob.jobId,	auxCS->port,	theJob.debugCommand,	auxCS->tmp,	nbAuxTotal,	theJob.dispatcherIP,	theJob.dispatcherPort);     } else {       sprintf(commandLine, "%s %s %s -g %d -p %d %s -i %s -auxid %d -dispatcher %s:%d &",	theJob.rshCmd,	auxCS->ipAddress,	theJob.csCmd,	theJob.jobId,	auxCS->port,	auxCS->debugString,	auxCS->tmp,	nbAuxTotal,	theJob.dispatcherIP,	theJob.dispatcherPort);     }     sprintf(log, "launching checkpoint server on machine %s", auxCS->ipAddress);     v2logMessage(log);     if (testOnly || (strlen(auxCS->debugString) != 0))       printf("%s\n", commandLine);     if (!testOnly) {       if (auxCS->autoLaunch)         system(commandLine);       else	 printf("#*** You need to launch the following command:\n%s &\n",commandLine);     }     nbAuxTotal ++;     auxCS = auxCS->next;  }  /*    Then the SC  */  auxSC = theJob.scList;  while (auxSC != NULL) {    if (strlen(auxSC->debugString) == 0) {      if (theJob.checkpointFrequency == 0) {        sprintf(commandLine, "%s %s %s %d %d %d %d %s:%d %s &",          theJob.rshCmd,	  auxSC->ipAddress, 	  theJob.scCmd,	  auxSC->port,	  theJob.nprocs,	  theJob.jobId,	  nbAuxTotal,	  theJob.dispatcherIP, theJob.dispatcherPort,	  theJob.debugCommand);       } else {        sprintf(commandLine, "%s %s %s %d %d %d %d %s:%d %s -t %d 2> /dev/null &",          theJob.rshCmd,	  auxSC->ipAddress, 	  theJob.scCmd,	  auxSC->port,	  theJob.nprocs,	  theJob.jobId,	  nbAuxTotal,	  theJob.dispatcherIP, theJob.dispatcherPort,	  theJob.debugCommand,	  theJob.checkpointFrequency);       }     } else {       if (theJob.checkpointFrequency == 0) {         sprintf(commandLine, "%s %s %s %d %d %d %d %s:%d %s  &",           theJob.rshCmd,	   auxSC->ipAddress, 	   theJob.scCmd,	   auxSC->port,	   theJob.nprocs,	   theJob.jobId,	   nbAuxTotal,	   theJob.dispatcherIP, theJob.dispatcherPort,	   auxSC->debugString);       } else {         sprintf(commandLine, "%s %s %s %d %d %d %d %s:%d %s -t %d 2> /dev/null &",           theJob.rshCmd,	   auxSC->ipAddress, 	   theJob.scCmd,	   auxSC->port,	   theJob.nprocs,	   theJob.jobId,	   nbAuxTotal,	   theJob.dispatcherIP, theJob.dispatcherPort,	   auxSC->debugString,	   theJob.checkpointFrequency);       }     }     sprintf(log, "launching checkpoint scheduler on machine %s", auxSC->ipAddress);     v2logMessage(log);     if (testOnly || (strlen(auxSC->debugString) != 0))       printf("%s\n", commandLine);     if (!testOnly) {       if (auxSC->autoLaunch)         system(commandLine);       else	 printf("#*** You need to launch the following command:\n%s &\n",commandLine);     }     nbAuxTotal ++;      auxSC = auxSC->next;   }  /* We wait until the rsh are completed */  /*while(!checkAuxiliaries(&theJob)); */  if (strlen(theJob.debugFile) != 0)    printf("Waiting for auxiliaries to be launched before launching nodes\n");  nbAuxiliaries = 0;  auxiliariesPid = (int *)malloc(nbAuxTotal * sizeof(int));/*  while (nbAuxiliaries < nbAuxTotal) {    if ( (acceptSocket = accept(listenSocket, (struct sockaddr *)&pin, &addrlen)) < 0 ) {      printf("Could not accept socket connection from auxiliary\n");    }  else {      read(acceptSocket, &auxId, sizeof(int));      auxId = ntohl(auxId);      read(acceptSocket, &pid1, sizeof(pid_t));      auxiliariesPid[auxId] = ntohl(pid1);      close(acceptSocket);      nbAuxiliaries ++;    }  }*/  if (strlen(theJob.debugFile) != 0)    printf("All auxiliaries have been identified: nodes can now be launched\n");  /* Now that all the auxiliaries have given their IPs, time to put these IPs in the correct place *//*  i = 0;  auxEL = theJob.elList;  while (auxEL != NULL) {    auxEL->pid = auxiliariesPid[i];    auxEL = auxEL->next;    i++;  }  auxCS = theJob.csList;  while (auxCS != NULL) {    auxCS->pid = auxiliariesPid[i];    auxCS = auxCS->next;    i++;  }  auxSC = theJob.scList;  while (auxSC != NULL) {    auxSC->pid = auxiliariesPid[i];    auxSC = auxSC->next;    i++;  }*/  /* We do not need the IP used by the auxiliary servers at this point. Let's clean *//*  for (i = 0; i < ipNb; i++) {     free(ipList[i]);  }  free(ipList);  ipNb = 0;*/  /*    And finally, the computation nodes!  */  auxCN = theJob.nodeList;  while(auxCN != NULL) {    nodeCommandLine(&commandLine, CHECKPOINT, &theJob, *auxCN);    sprintf(log, "launching rank %d on host %s", auxCN->rank, auxCN->hostName);    v2logMessage(log);    if (testOnly || (strcmp(auxCN->debugString, "") != 0))       printf("%s\n", commandLine);    if (!testOnly) {      if (auxCN->autoLaunch) {        if ( (forked = fork()) != -1) {	  if (forked == 0) {            system(commandLine);	    _exit(0);  	  }        } else {	  printf("Error: could not fork(): aborting\n");	  exit(1);        }      } else	 printf("#*** You need to launch the following command:\n%s &\n",commandLine);    }    auxCN = auxCN->next;  }#define MAX_MSG 10  pid = (pid_t *)malloc(theJob.nprocs * sizeof(pid_t));  for (i = 0; i< theJob.nprocs; i++) {    if ( (acceptSocket = accept(listenSocket, (struct sockaddr *)&pin, &addrlen)) < 0 ) {      printf("Could not accept socket connection from client\n");    }  else {      read(acceptSocket, &rank, sizeof(int));      rank = ntohl(rank);      read(acceptSocket, &pid1, sizeof(pid_t));      pid[rank] = ntohl(pid1);      addConnectedNode(rank, acceptSocket);      /*        We then send the node list to that node, with the following form:	int[5]: ip1.ip2.ip3.ip4:port -> ip1ip2ip3ip4port      */            write(acceptSocket, theJob.nodeListArray, theJob.nprocs*sizeof(struct sockaddr_in));      /*close(acceptSocket);*/    }  }   moving = (boolean *)malloc(theJob.nprocs * sizeof(boolean));  for (i = 0; i < theJob.nprocs; i++)    moving[i] = false;  /*    The main loop, waiting for the disconnection from all clients to end,    meaning the program is finalized or aborted  */  createFifoFile(&theJob);  if( (fdFifo = open(theJob.fifoFile, O_NONBLOCK)) == -1) {    printf("Internal problem: could not open FIFO file!\n");    bailout(&theJob);  }  currentConnected = (Connected *)malloc(sizeof(Connected));  while (connectedNodes != NULL) {    FD_ZERO(&setOfCN);    currentConnected = connectedNodes;    auxCN = theJob.nodeList;    while (currentConnected != NULL) {      auxCN->pid = pid[auxCN->rank];       FD_SET(currentConnected->socket, &setOfCN);      currentConnected = currentConnected->next;      auxCN = auxCN->next;    }    FD_SET(fdFifo, &setOfCN);    if (finalizing) {      if ((ret = select(FD_SETSIZE, &setOfCN, NULL, NULL, selectTimeout)) < 0) {        printf("Error in select: irrecoverable error\n");        bailout(&theJob);      }    } else {      if ((ret = select(FD_SETSIZE, &setOfCN, NULL, NULL, NULL)) < 0) {        printf("Error in select: irrecoverable error\n");        bailout(&theJob);      }    }    currentConnected = connectedNodes;    while(currentConnected != NULL) {      if (FD_ISSET(currentConnected->socket, &setOfCN)) {         break;      }      currentConnected = currentConnected->next;    }    if (currentConnected != NULL) { /* We have a node event! */      if ( read(currentConnected->socket, &msg, sizeof(int)) == 0) {        /* If we got no message, it means it's a simple disconnection */	if (!moving[currentConnected->rank])          printf("***** A crash was detected *****\n");                  if (autoRelaunch || !auxCN->autoLaunch) {  	  printf("Relaunching rank %d\n", currentConnected->rank);	  fflush(stdout);          relaunchByRank(currentConnected->rank, &theJob, moving[currentConnected->rank]); 	  moving[currentConnected->rank] = false;        } else {	  auxCN = theJob.nodeList;	  while(auxCN->rank != currentConnected->rank)             auxCN = auxCN->next;          nodeCommandLine(&commandLine, RESTART, &theJob, *auxCN);	  printf("In order to continue execution, you need to relaunch the node with the following command:\n\n");	  printf("  %s\n\n", commandLine);        }	if (!moving[currentConnected->rank]) {          printf("********************************\n");          fflush(stdout);	}        /* We reconnect to the node that has previously been disconnected */        if ( (acceptSocket = accept(listenSocket, (struct sockaddr *)&pin, &addrlen)) < 0 ) {          printf("Could not accept socket connection from client\n");        } else {          read(acceptSocket, &rank, sizeof(int));          rank = ntohl(rank);          read(acceptSocket, &pid1, sizeof(pid_t));          pid[rank] = ntohl(pid1);	  currentConnected->socket = acceptSocket;          write(acceptSocket, theJob.nodeListArray, theJob.nprocs*sizeof(struct sockaddr_in));	  /* End of reconnection of a crashed node */        }      /* If we have received a message */      } else {        if (msg == FINALIZE_MSG) {	  finalizing = true;          removeConnectedNode(currentConnected); 	}      }    } else if (FD_ISSET(fdFifo, &setOfCN)) { /* We have a FIFO file event, ie an order! */      n = read(fdFifo, buff, sizeof(buff)-1);      buff[n] = '\0';      close(fdFifo);      if (strncmp(buff, DISPATCHER_MOVEALL_MSG, 7) == 0) {	sscanf(buff, DISPATCHER_MOVEALL_STRING, fileName);	sprintf(log, "MOVEALL instruction received: file %s", fileName);        v2logMessage(log);	fprintf(stdout, "**** MOVEALL instruction received ****\n");	fflush(stdout);        /* In fact this is more a 'move group' than a move all: no real need to move everything... */	/* the format of the file is as such: rank newHost newIP newFastIP:newPort */	if ((fd = fopen(fileName, "r")) != NULL) {          while(fgets(line, 256, fd)) {            sscanf(line, "%d %s %d.%d.%d.%d %d.%d.%d.%d:%d", &rank, hostName, &ip1, &ip2, &ip3, &ip4, &ip5, &ip6, &ip7, &ip8, &port);	    sprintf(log, " rank %d on %s (%d.%d.%d.%d/%d.%d.%d.%d:%d)", rank, hostName, ip1, ip2,ip3, ip4, ip5, ip6, ip7, ip8, port);	    v2logMessage(log);	    fprintf(stdout, " rank %d on %s (%d.%d.%d.%d/%d.%d.%d.%d:%d)\n", rank, hostName, ip1, ip2,ip3, ip4, ip5, ip6, ip7, ip8, port);	    fflush(stdout);	    moving[rank] = true;	    sprintf(newIP, "%d.%d.%d.%d", ip1, ip2, ip3, ip4);	    sprintf(newFastIP, "%d.%d.%d.%d", ip5, ip6, ip7, ip8);	    moveExecutionOneNode(&theJob, rank, hostName, newIP, newFastIP, port);          }	  fclose(fd);         } else {          sprintf(log, "Unable to open file %s: MOVEALL aborted", fileName);	  v2logMessage(log);	  fprintf(stdout, " Unable to open instruction file %s: aborting MOVEALL\n", fileName);	  fflush(stdout);	}      } else if (strncmp(buff, DISPATCHER_MOVE_MSG, 4) == 0) {	sscanf(buff, DISPATCHER_MOVE_STRING, &rank, hostName, &ip1, &ip2, &ip3, &ip4, &ip5, &ip6, &ip7, &ip8, &port);	sprintf(newIP, "%d.%d.%d.%d", ip1, ip2, ip3, ip4);	sprintf(newFastIP, "%d.%d.%d.%d", ip5, ip6, ip7, ip8);	sprintf(log, "MOVE instruction received: rank %d on %s (%d.%d.%d.%d/%d.%d.%d.%d:%d)", rank, hostName, ip1, ip2, ip3, ip4, ip5, ip6, ip7, ip8, port);        v2logMessage(log);	moving[rank] = true;	fprintf(stdout, "**** MOVE'ing rank %d on %s ****\n", rank, hostName);	moveExecutionOneNode(&theJob, rank, hostName, newIP, newFastIP, port);      } else {        printf("Unknown command: %s\n", buff);      }      if( (fdFifo = open(theJob.fifoFile, O_NONBLOCK)) == -1) {        printf("Internal problem: could not open FIFO file!\n");        bailout(&theJob);      }    } else { /* Time out on the select() */      v2logMessage("Timeout detected on MPI_Finalize(): killing remaining running nodes");      connectedArray = (boolean *)malloc(theJob.nprocs*sizeof(boolean));      for (i = 0; i < theJob.nprocs; i++)	connectedArray[i] = false;      currentConnected = connectedNodes;      while (currentConnected != NULL) {        connectedArray[currentConnected->rank] = true;	ccAux = currentConnected->next;        removeConnectedNode(currentConnected); 	currentConnected = ccAux;      }      auxCN = theJob.nodeList;      while (auxCN != NULL) {	/* We kill only the ones that were still connected */	if (connectedArray[auxCN->rank])	  killComputingNode(&theJob, auxCN);        auxCN = auxCN->next;      }      free(connectedArray);    }  }  killAllAuxiliaries(&theJob);  cleanAuxiliaryFiles(&theJob);  sprintf(log, "Execution of program %s finished", theJob.v2pgFile);  v2logMessage(log);  free(currentConnected);  free(connectedNodes);  cleanMemory();  free(commandLine);  free(v2availfile);  free(v2cmdfile);  exit(0);  return (0);}void bailout(JS * js) {  CN * auxCN;  /* This is the moment to clean a bit... */  /* The auxiliaries */  killAllAuxiliaries(js);  /* The nodes */  auxCN = js->nodeList;  while (auxCN != NULL) {    if (auxCN->pid != 0)      killComputingNode(js, auxCN);    auxCN = auxCN->next;  }  exit(-1);}void cleanMemory() {  AP * ca;  int i;  /* Arguments, tmp and so on... */  /* Array of IP addresses */  for (i = 0; i < ipNb; i++) {    free(ipList[i]);  }  free(ipList);}void cleanELCS(JS *js) {  char * commandLine;  char log[1024];  commandLine = (char *)calloc(4096, sizeof(char));  /*    We need to clean the CS and EL which are still running  */  sprintf(log, "Cleaning EL and CS from the nodes");  v2logMessage(log);  sprintf(commandLine, "%s %d %s", js->killCmd, js->jobId, js->v2pgFile);  system(commandLine);  free(commandLine);}void v2Error(char * msg) {  v2logMessage(msg);  v2logMessage("Quitting");  exit(0);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -