⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 originempirun.vrun.c

📁 Path MPICH-V for MPICH the MPI Implementation
💻 C
📖 第 1 页 / 共 2 页
字号:
/*  The main!*/int main (int argc, char* argv[]) {  char * v2cmdfile;  char * v2availfile;   int i;  char commandLine[BUFF_SIZE];  char log[BUFF_SIZE];  char srvName[HOST_NAME_SIZE];  struct hostent * myAddress;  Connected * currentConnected;  Connected * ccAux;  fd_set setOfCN;  int fdFifo;  int ret, retListen, maxFd;  int listenSocket, acceptSocket;  struct sockaddr_in address;  struct sockaddr_in pin;  int addrlen;  char buff[BUFF_SIZE];  int rank;  int port;  int ip1, ip2, ip3, ip4, ip5, ip6, ip7, ip8;  char newIP[IP_LENGTH];  char newFastIP[IP_LENGTH];  char fileName[BUFF_SIZE];  char hostName[HOST_NAME_SIZE];  char line[BUFF_SIZE];  FILE * fd;  pid_t forked, pid1;  pid_t * pid;  pid_t * auxiliariesPid;  int nbAuxiliaries;  int nbAuxTotal;  int n;  boolean * moving;  boolean * connectedArray;  CN * auxCN;  struct timeval selectTimeout;  boolean finalizing = false;  boolean autoRelaunch = false;  int msg;  struct jobSpecifications theJob;  logging = true;  selectTimeout.tv_sec = FINALIZE_TIMEOUT;  selectTimeout.tv_usec = 0;  if (!checkArguments(argc, argv, &theJob)) {    return 1;  }      parseProgramFile(&theJob);  auxCN = (CN *)malloc(sizeof(CN));   if (strlen(theJob.debugFile) != 0)    parseDebugFile(&theJob);  /* We get the local IP address */  if (gethostname(srvName, (size_t)HOST_NAME_SIZE) < 0) {    printf("Error: could not get hostname\n");    exit(1);  }  if((myAddress = gethostbyname(srvName)) == 0) qerror("gethostbyname");  /*printf("struct hostent {\n\                       char    *h_name;        =%s\n\                      int     h_length;       =%d\n\                      char    **h_addr_list;  =%s\n\              }\n", myAddress->h_name, myAddress->h_length, myAddress->h_addr);	      */  strcpy(theJob.dispatcherIP, (char *)inet_ntoa(*(struct in_addr *)myAddress->h_addr));  v2cmdfile = (char *)calloc(strlen(theJob.v2pgFile)+9,sizeof(char));  sprintf(v2cmdfile,"%s.commands",theJob.v2pgFile);  parseCommandsFile(v2cmdfile, &theJob);  free(v2cmdfile);  v2availfile = (char *)calloc(strlen(theJob.v2pgFile)+7,sizeof(char));  sprintf(v2availfile,"%s.avail", theJob.v2pgFile);  parseAvailableFile(v2availfile);  sprintf(log, "Starting program %s", theJob.v2pgFile);  v2logMessage(log);  /* We already start listening on the connection socket in order to get the PID's from the auxiliary programs */  address.sin_family = AF_INET;  address.sin_addr.s_addr = htonl(INADDR_ANY);  address.sin_port = htons(theJob.dispatcherPort);  if ((listenSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {    printf("Socket could not be created\n");    bailout(&theJob);  }  {    int i = !0;    if (setsockopt (listenSocket, SOL_SOCKET, SO_REUSEADDR, &i, sizeof (int)) < 0) qerror ("setting reuse of non listening binded addr");  }  while (bind(listenSocket, ((struct sockaddr*)&address), sizeof(struct sockaddr_in)) == -1) {    fprintf(stderr, "Bind on %s:%d could not be performed: (%s) trying in 1 second\n",	    inet_ntoa(address.sin_addr), ntohs(address.sin_port), strerror(errno) );    sleep(1);  }  if (listen(listenSocket, MAX_LISTEN_WAIT) == -1) {    close(listenSocket);    printf("Could not listen on socket\n");    bailout(&theJob);  }  launchAuxiliaryPrograms(&theJob, &nbAuxTotal);  /* We wait until the rsh are completed */  waitConnectionFromAuxiliaries(&theJob, nbAuxTotal, auxiliariesPid, listenSocket);   /*    And finally, the computation nodes!  */  /*@todo for gang goto here for launch next job*/  launchComputingNodes(&theJob);  pid = (pid_t *)malloc(theJob.nprocs * sizeof(pid_t));  waitConnectionFromComputingNodes(&theJob, listenSocket, pid);    moving = (boolean *)malloc(theJob.nprocs * sizeof(boolean));  for (i = 0; i < theJob.nprocs; i++)    moving[i] = false;  /*    The main loop, waiting for the disconnection from all clients to end,    meaning the program is finalized or aborted  */  createFifoFile(&theJob);  if( (fdFifo = open(theJob.fifoFile, O_NONBLOCK)) == -1) {    printf("Internal problem: could not open FIFO file!\n");    bailout(&theJob);  }  while (connectedNodes != NULL) {    FD_ZERO(&setOfCN);    currentConnected = connectedNodes;    auxCN = theJob.nodeList;    while (currentConnected != NULL) {    /*  auxCN->pid = pid[auxCN->rank]; */      FD_SET(currentConnected->socket, &setOfCN);      currentConnected = currentConnected->next;      auxCN = auxCN->next;    }    FD_SET(fdFifo, &setOfCN);    /*@todo for gang goto here for disconnection of all nodes*/    if (finalizing) {      if ((ret = select(FD_SETSIZE, &setOfCN, NULL, NULL, &selectTimeout)) < 0) {        printf("Error in select: irrecoverable error\n");        bailout(&theJob);      }    } else {      if ((ret = select(FD_SETSIZE, &setOfCN, NULL, NULL, NULL)) < 0) {        printf("Error in select: irrecoverable error\n");        bailout(&theJob);      }    }    currentConnected = connectedNodes;    while(currentConnected != NULL) {      if (FD_ISSET(currentConnected->socket, &setOfCN)) {         break;      }      currentConnected = currentConnected->next;    }    if (currentConnected != NULL) { /* We have a node event! */      if ( read(currentConnected->socket, &msg, sizeof(int)) == 0) {        /* If we got no message, it means it's a simple disconnection */	if (!moving[currentConnected->rank])          printf("***** A crash was detected *****\n");        /*@todo check if all nodes are disconnected for gangSched if yes         goto de begining of loop */	auxCN = theJob.nodeList;	while(auxCN->rank != currentConnected->rank)	  auxCN = auxCN->next;	        if (autoRelaunch || auxCN->autoLaunch) {  	  fprintf(stdout, "Relaunching rank %d\n", currentConnected->rank);	  fflush(stdout);          relaunchByRank(currentConnected->rank, &theJob, moving[currentConnected->rank]); 	  moving[currentConnected->rank] = false;        } else {	  auxCN = theJob.nodeList;	  while(auxCN->rank != currentConnected->rank)             auxCN = auxCN->next;          nodeCommandLine(commandLine, RESTART, &theJob, *auxCN);	  printf("In order to continue execution, you need to relaunch the node with the following command:\n\n");	  printf("  %s\n\n", commandLine);        }	if (!moving[currentConnected->rank]) {          printf("********************************\n");          fflush(stdout);	}        /* We reconnect to the node that has previously been disconnected */        if ( (acceptSocket = accept(listenSocket, (struct sockaddr *)&pin, &addrlen)) < 0 ) {          printf("Could not accept socket connection from client\n");        } else {          read(acceptSocket, &rank, sizeof(int));          rank = ntohl(rank);          read(acceptSocket, &pid1, sizeof(pid_t));          pid[rank] = ntohl(pid1);	  currentConnected->socket = acceptSocket;          write(acceptSocket, theJob.nodeListArray, theJob.nprocs*sizeof(struct sockaddr_in));	  /* End of reconnection of a crashed node */        }      /* If we have received a message */      } else {        if (msg == FINALIZE_MSG) {	  finalizing = true;	  currentConnected->finalized = true;	  check_sync_finalize();	}      }    } else if (FD_ISSET(fdFifo, &setOfCN)) { /* We have a FIFO file event, ie an order! */      n = read(fdFifo, buff, sizeof(buff)-1);      buff[n] = '\0';      close(fdFifo);      if (strncmp(buff, DISPATCHER_MOVEALL_MSG, 7) == 0) {	sscanf(buff, DISPATCHER_MOVEALL_STRING, fileName);	sprintf(log, "MOVEALL instruction received: file %s", fileName);        v2logMessage(log);	fprintf(stdout, "**** MOVEALL instruction received ****\n");	fflush(stdout);        /* In fact this is more a 'move group' than a move all: no real need to move everything... */	/* the format of the file is as such: rank newHost newIP newFastIP:newPort */	if ((fd = fopen(fileName, "r")) != NULL) {          while(fgets(line, 256, fd)) {            sscanf(line, "%d %s %d.%d.%d.%d %d.%d.%d.%d:%d", &rank, hostName, &ip1, &ip2, &ip3, &ip4, &ip5, &ip6, &ip7, &ip8, &port);	    sprintf(log, " rank %d on %s (%d.%d.%d.%d/%d.%d.%d.%d:%d)", rank, hostName, ip1, ip2,ip3, ip4, ip5, ip6, ip7, ip8, port);	    v2logMessage(log);	    fprintf(stdout, " rank %d on %s (%d.%d.%d.%d/%d.%d.%d.%d:%d)\n", rank, hostName, ip1, ip2,ip3, ip4, ip5, ip6, ip7, ip8, port);	    fflush(stdout);	    moving[rank] = true;	    sprintf(newIP, "%d.%d.%d.%d", ip1, ip2, ip3, ip4);	    sprintf(newFastIP, "%d.%d.%d.%d", ip5, ip6, ip7, ip8);	    moveExecutionOneNode(&theJob, rank, hostName, newIP, newFastIP, port);          }	  fclose(fd);         } else {          sprintf(log, "Unable to open file %s: MOVEALL aborted", fileName);	  v2logMessage(log);	  fprintf(stdout, " Unable to open instruction file %s: aborting MOVEALL\n", fileName);	  fflush(stdout);	}      } else if (strncmp(buff, DISPATCHER_MOVE_MSG, 4) == 0) {	sscanf(buff, DISPATCHER_MOVE_STRING, &rank, hostName, &ip1, &ip2, &ip3, &ip4, &ip5, &ip6, &ip7, &ip8, &port);	sprintf(newIP, "%d.%d.%d.%d", ip1, ip2, ip3, ip4);	sprintf(newFastIP, "%d.%d.%d.%d", ip5, ip6, ip7, ip8);	sprintf(log, "MOVE instruction received: rank %d on %s (%d.%d.%d.%d/%d.%d.%d.%d:%d)", rank, hostName, ip1, ip2, ip3, ip4, ip5, ip6, ip7, ip8, port);        v2logMessage(log);	moving[rank] = true;	fprintf(stdout, "**** MOVE'ing rank %d on %s ****\n", rank, hostName);	moveExecutionOneNode(&theJob, rank, hostName, newIP, newFastIP, port);      } else {        printf("Unknown command: %s\n", buff);      }      if( (fdFifo = open(theJob.fifoFile, O_NONBLOCK)) == -1) {        printf("Internal problem: could not open FIFO file!\n");        bailout(&theJob);      }    } else { /* Time out on the select() */      v2logMessage("Timeout detected on MPI_Finalize(): killing remaining running nodes");      connectedArray = (boolean *)malloc(theJob.nprocs*sizeof(boolean));      for (i = 0; i < theJob.nprocs; i++)	connectedArray[i] = false;      currentConnected = connectedNodes;      while (currentConnected != NULL) {        connectedArray[currentConnected->rank] = true;	ccAux = currentConnected->next;        removeConnectedNode(currentConnected); 	currentConnected = ccAux;      }      auxCN = theJob.nodeList;      while (auxCN != NULL) {	/* We kill only the ones that were still connected */	if (connectedArray[auxCN->rank])	  killComputingNode(&theJob, auxCN);        auxCN = auxCN->next;      }      free(connectedArray);    }  }  killAllAuxiliaries(&theJob);  cleanAuxiliaryFiles(&theJob);  sprintf(log, "Execution of program %s finished", theJob.v2pgFile);  v2logMessage(log);  exit(0);  return (0);}void bailout(JS * js) {  CN * auxCN;  /* This is the moment to clean a bit... */  /* The auxiliaries */  killAllAuxiliaries(js);  /* The nodes */  auxCN = js->nodeList;  while (auxCN != NULL) {    if (auxCN->pid != 0)      killComputingNode(js, auxCN);    auxCN = auxCN->next;  }  exit(-1);}void cleanMemory() {  AP * ca;  int i;  /* Arguments, tmp and so on... */  /* Array of IP addresses */  for (i = 0; i < ipNb; i++) {    free(ipList[i]);  }  free(ipList);}void cleanELCS(JS *js) {  char * commandLine;  char log[1024];  commandLine = (char *)calloc(4096, sizeof(char));  /*    We need to clean the CS and EL which are still running  */  sprintf(log, "Cleaning EL and CS from the nodes");  v2logMessage(log);  sprintf(commandLine, "%s %d %s", js->killCmd, js->jobId, js->v2pgFile);  system(commandLine);  free(commandLine);}void v2Error(char * msg) {  v2logMessage(msg);  v2logMessage("Quitting");  exit(0);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -