📄 mpirun.cpp
字号:
g_bOutputExitCodes = true; } else if (stricmp(&argv[1][1], "priority") == 0) { char *str; nPriorityClass = atoi(argv[2]); str = strchr(argv[2], ':'); if (str) { str++; nPriority = atoi(str); } //printf("priorities = %d:%d\n", nPriorityClass, nPriority);fflush(stdout); bUsePriorities = true; nArgsToStrip = 2; } else { printf("Unknown option: %s\n", argv[1]); } StripArgs(argc, argv, nArgsToStrip); } if (argc < 2) { printf("Error: no executable or configuration file specified\n"); return 0; } // The next argument is the executable or a configuration file strncpy(g_pszExe, argv[1], MAX_CMD_LENGTH); g_pszExe[MAX_CMD_LENGTH-1] = '\0'; // All the rest of the arguments are passed to the application g_pszArgs[0] = '\0'; for (i = 2; i<argc; i++) { strncat(g_pszArgs, argv[i], MAX_CMD_LENGTH - 1 - strlen(g_pszArgs)); if (i < argc-1) { strncat(g_pszArgs, " ", MAX_CMD_LENGTH - 1 - strlen(g_pszArgs)); } } if (g_nHosts == 0) { // If -np or -localonly options have not been specified, check if the first // parameter is an executable or a configuration file if (GetBinaryType(g_pszExe, &dwType) || (ParseConfigFile(g_pszExe) == PARSE_ERR_NO_FILE)) { g_nHosts = 1; bRunLocal = true; } } // Fix up the executable name char pszTempExe[MAX_CMD_LENGTH], *namepart; if (g_pszExe[0] == '\\' && g_pszExe[1] == '\\') { strncpy(pszTempExe, g_pszExe, MAX_CMD_LENGTH); pszTempExe[MAX_CMD_LENGTH-1] = '\0'; } else GetFullPathName(g_pszExe, MAX_PATH, pszTempExe, &namepart); // Quote the executable in case there are spaces in the path sprintf(g_pszExe, "\"%s\"", pszTempExe); easy_socket_init(); if (!bRunLocal && g_pHosts == NULL) { // Save the original file name in case we end up running locally strncpy(pszTempExe, g_pszExe, MAX_CMD_LENGTH); pszTempExe[MAX_CMD_LENGTH-1] = '\0'; // Convert the executable to its unc equivalent. This negates // the need to map network drives on remote machines just to locate // the executable. ExeToUnc(g_pszExe); // If we are not running locally and the hosts haven't been set up with a configuration file, // create the host list now if (bUseMachineFile) { if (!GetHostsFromFile(pszMachineFileName)) { printf("Error parsing the machine file '%s'\n", pszMachineFileName); return 0; } } else if (!GetAvailableHosts()) { strncpy(g_pszExe, pszTempExe, MAX_CMD_LENGTH); g_pszExe[MAX_CMD_LENGTH-1] = '\0'; bRunLocal = true; } } // Setup multi-color output if (g_bDoMultiColorOutput) { char pszTemp[10]; DWORD len = 10; if (ReadMPDRegistry("color", pszTemp, &len)) { g_bDoMultiColorOutput = (stricmp(pszTemp, "yes") == 0); } } if (g_bDoMultiColorOutput) { CONSOLE_SCREEN_BUFFER_INFO info; // Save the state of the console so it can be restored hStdout = GetStdHandle(STD_OUTPUT_HANDLE); GetConsoleScreenBufferInfo(hStdout, &info); g_ConsoleAttribute = info.wAttributes; } // Check if the directory needs to be mapped on the remote machines if (!bNoDriveMapping && NeedToMap(g_pszDir, &cMapDrive, pszMapShare)) { MapDriveNode *pNode = new MapDriveNode; pNode->cDrive = cMapDrive; strcpy(pNode->pszShare, pszMapShare); pNode->pNext = g_pDriveMapList; g_pDriveMapList = pNode; } // If -getphrase was not specified, get the mpd passphrase from // the registry or use the default if (bPhraseNeeded) { if (!ReadMPDRegistry("phrase", phrase, NULL)) { strcpy(phrase, MPD_DEFAULT_PASSPHRASE); } } if (bRunLocal) { RunLocal(bDoSMP); easy_socket_finalize(); return 0; } //dbg_printf("retrieving account information\n"); if (g_bUseMPDUser) { bLogon = false; g_pszAccount[0] = '\0'; g_pszPassword[0] = '\0'; } else { if (bUsePwdFile) { bLogon = true; GetAccountAndPasswordFromFile(pszPwdFileName); } else { if (bLogon) GetAccountAndPassword(); else { char pszTemp[10] = "no"; ReadMPDRegistry("SingleUser", pszTemp, NULL); if (stricmp(pszTemp, "yes")) { if (!ReadCachedPassword()) { if (bLogonDots) { DWORD dwThreadId; HANDLE hEvent, hDotThread; hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); hDotThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)PrintDots, hEvent, 0, &dwThreadId); if (!ReadPasswordFromRegistry(g_pszAccount, g_pszPassword)) { SetEvent(hEvent); if (bCredentialsPrompt) GetAccountAndPassword(); else { printf("Error: unable to acquire the necessary user credentials to launch a job.\n"); ExitProcess(-1); } } else SetEvent(hEvent); CloseHandle(hDotThread); } else { if (!ReadPasswordFromRegistry(g_pszAccount, g_pszPassword)) { if (bCredentialsPrompt) GetAccountAndPassword(); else { printf("Error: unable to acquire the necessary user credentials to launch a job.\n"); ExitProcess(-1); } } } CachePassword(); } bLogon = true; } } } } // Figure out how many processes to launch int nProc = 0; HostNode *n = g_pHosts; if (g_pHosts == NULL) nProc = g_nHosts; while (n) { nProc += n->nSMPProcs; n = n->next; } g_nNproc = nProc; CreateJobID(pszJobID); // Set the environment variables common to all processes if (g_bNoMPI) pszEnv[0] = '\0'; else { sprintf(pszEnv, "MPICH_JOBID=%s|MPICH_NPROC=%d|MPICH_ROOTHOST=%s", pszJobID, nProc, g_pHosts->host); } // Allocate an array to hold handles to the LaunchProcess threads, sockets, ids, ranks, and forward host structures pThread = new HANDLE[nProc]; g_pProcessSocket = new SOCKET[nProc]; for (i=0; i<nProc; i++) g_pProcessSocket[i] = INVALID_SOCKET; g_pProcessLaunchId = new int[nProc]; g_pLaunchIdToRank = new int [nProc]; g_nNumProcessSockets = 0; g_pForwardHost = new ForwardHostStruct[nProc]; for (i=0; i<nProc; i++) g_pForwardHost[i].nPort = 0; // Start the IO redirection thread HANDLE hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); g_hRedirectIOListenThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)RedirectIOThread, hEvent, 0, &dwThreadID); if (g_hRedirectIOListenThread) { if (WaitForSingleObject(hEvent, 60000) != WAIT_OBJECT_0) { printf("RedirectIOThread failed to initialize\n"); return 0; } } else { printf("Unable to create RedirectIOThread, error %d\n", GetLastError()); return 0; } CloseHandle(hEvent); strncpy(g_pForwardHost[0].pszHost, g_pszIOHost, MAX_HOST_LENGTH); g_pForwardHost[0].pszHost[MAX_HOST_LENGTH-1] = '\0'; g_pForwardHost[0].nPort = g_nIOPort; //printf("io redirection: %s:%d\n", g_pForwardHost[0].pszHost, g_pForwardHost[0].nPort);fflush(stdout);#ifdef SERIALIZE_ROOT_PROCESS HANDLE hRootMutex = CreateMutex(NULL, FALSE, "MPIRunRootMutex");#endif CreateShmCliqueString(g_pHosts, pszShmCliqueString); // Launch the threads to launch the processes iproc = 0; while (g_pHosts) { nShmLow = iproc; nShmHigh = iproc + g_pHosts->nSMPProcs - 1; for (int i = 0; i<g_pHosts->nSMPProcs; i++) { MPIRunLaunchProcessArg *arg = new MPIRunLaunchProcessArg; arg->bUsePriorities = bUsePriorities; arg->nPriorityClass = nPriorityClass; arg->nPriority = nPriority; arg->bUseDebugFlag = bUseDebugFlag; arg->n = g_nNproc; sprintf(arg->pszIOHostPort, "%s:%d", g_pszIOHost, g_nIOPort); strcpy(arg->pszPassPhrase, phrase); arg->i = iproc; arg->bLogon = bLogon; if (bLogon) { strcpy(arg->pszAccount, g_pszAccount); strcpy(arg->pszPassword, g_pszPassword); } else { arg->pszAccount[0] = '\0'; arg->pszPassword[0] = '\0'; } if (strlen(g_pHosts->exe) > 0) { strncpy(arg->pszCmdLine, g_pHosts->exe, MAX_CMD_LENGTH); arg->pszCmdLine[MAX_CMD_LENGTH-1] = '\0'; } else { strncpy(arg->pszCmdLine, g_pszExe, MAX_CMD_LENGTH); arg->pszCmdLine[MAX_CMD_LENGTH-1] = '\0'; } if (strlen(g_pszArgs) > 0) { strncat(arg->pszCmdLine, " ", MAX_CMD_LENGTH - 1 - strlen(arg->pszCmdLine)); strncat(arg->pszCmdLine, g_pszArgs, MAX_CMD_LENGTH - 1 - strlen(arg->pszCmdLine)); } strcpy(arg->pszDir, g_pszDir); if (strlen(pszEnv) >= MAX_CMD_LENGTH) { printf("Warning: environment variables truncated.\n"); fflush(stdout); } strncpy(arg->pszEnv, pszEnv, MAX_CMD_LENGTH); arg->pszEnv[MAX_CMD_LENGTH-1] = '\0'; strncpy(arg->pszHost, g_pHosts->host, MAX_HOST_LENGTH); arg->pszHost[MAX_HOST_LENGTH-1] = '\0'; strcpy(arg->pszJobID, pszJobID); if (g_bNoMPI) { if (strlen(g_pszEnv) >= MAX_CMD_LENGTH) { printf("Warning: environment variables truncated.\n"); fflush(stdout); } strncpy(arg->pszEnv, g_pszEnv, MAX_CMD_LENGTH); arg->pszEnv[MAX_CMD_LENGTH-1] = '\0'; } else { if (ParseCliques(pszShmCliqueString, iproc, g_nNproc, &nCliqueCount, &pMembers) == 0) { if (nCliqueCount > 1) { CreateSingleShmCliqueString(nCliqueCount, pMembers, pszSingleShmCliqueString); if (iproc == 0) sprintf(pBuffer, "MPICH_ROOTPORT=-1|MPICH_IPROC=%d|MPICH_SHM_CLIQUES=%s", iproc, pszSingleShmCliqueString); else sprintf(pBuffer, "MPICH_ROOTPORT=%d|MPICH_IPROC=%d|MPICH_SHM_CLIQUES=%s", g_nRootPort, iproc, pszSingleShmCliqueString); } else { if (iproc == 0) sprintf(pBuffer, "MPICH_ROOTPORT=-1|MPICH_IPROC=%d|MPICH_SHM_LOW=%d|MPICH_SHM_HIGH=%d", iproc, nShmLow, nShmHigh); else sprintf(pBuffer, "MPICH_ROOTPORT=%d|MPICH_IPROC=%d|MPICH_SHM_LOW=%d|MPICH_SHM_HIGH=%d", g_nRootPort, iproc, nShmLow, nShmHigh); } if (pMembers) { delete pMembers; pMembers = NULL; } } else { if (iproc == 0) sprintf(pBuffer, "MPICH_ROOTPORT=-1|MPICH_IPROC=%d|MPICH_SHM_LOW=%d|MPICH_SHM_HIGH=%d", iproc, nShmLow, nShmHigh); else sprintf(pBuffer, "MPICH_ROOTPORT=%d|MPICH_IPROC=%d|MPICH_SHM_LOW=%d|MPICH_SHM_HIGH=%d", g_nRootPort, iproc, nShmLow, nShmHigh); } /* if (iproc == 0) sprintf(pBuffer, "MPICH_ROOTPORT=-1|MPICH_IPROC=%d|MPICH_SHM_LOW=%d|MPICH_SHM_HIGH=%d", iproc, nShmLow, nShmHigh); else sprintf(pBuffer, "MPICH_ROOTPORT=%d|MPICH_IPROC=%d|MPICH_SHM_LOW=%d|MPICH_SHM_HIGH=%d", g_nRootPort, iproc, nShmLow, nShmHigh); */ if (strlen(arg->pszEnv) > 0) strncat(arg->pszEnv, "|", MAX_CMD_LENGTH - 1 - strlen(arg->pszEnv)); if (strlen(pBuffer) + strlen(arg->pszEnv) >= MAX_CMD_LENGTH) { printf("Warning: environment variables truncated.\n"); fflush(stdout); } strncat(arg->pszEnv, pBuffer, MAX_CMD_LENGTH - 1 - strlen(arg->pszEnv)); if (strlen(g_pszEnv) > 0) { if (strlen(arg->pszEnv) + strlen(g_pszEnv) + 1 >= MAX_CMD_LENGTH) { printf("Warning: environment variables truncated.\n"); } strncat(arg->pszEnv, "|", MAX_CMD_LENGTH - 1 - strlen(arg->pszEnv)); strncat(arg->pszEnv, g_pszEnv, MAX_CMD_LENGTH - 1 - strlen(arg->pszEnv)); } } //printf("creating MPIRunLaunchProcess thread\n");fflush(stdout);#ifdef SERIALIZE_ROOT_PROCESS if (iproc == 0 && !g_bNoMPI) WaitForSingleObject(hRootMutex, INFINITE);#endif pThread[iproc] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)MPIRunLaunchProcess, arg, 0, &dwThreadID); if (pThread[iproc] == NULL) { printf("Unable to create LaunchProcess thread\n");fflush(stdout); // Signal launch threads to abort // Wait for them to return // ... insert code here // In the mean time, just exit if (g_bDoMultiColorOutput) { SetConsoleTextAttribute(GetStdHandle(STD_OUTPUT_HANDLE), g_ConsoleAttribute); }#ifdef SERIALIZE_ROOT_PROCESS if (iproc == 0 && !g_bNoMPI) { ReleaseMutex(hRootMutex); CloseHandle(hRootMutex); }#endif ExitProcess(1); } if (iproc == 0 && !g_bNoMPI) { // Wait for the root port to be valid while (g_nRootPort == 0 && (WaitForSingleObject(g_hAbortEvent, 0) != WAIT_OBJECT_0)) Sleep(200);#ifdef SERIALIZE_ROOT_PROCESS ReleaseMutex(hRootMutex); CloseHandle(hRootMutex);#endif if (g_nRootPort == 0) { // free stuff // ... <insert code here> CloseHandle(pThread[0]); delete pThread; delete g_pProcessSocket; delete g_pProcessLaunchId; delete g_pLaunchIdToRank; delete g_pForwardHost; if (g_bDoMultiColorOutput) { SetConsoleTextAttribute(GetStdHandle(STD_OUTPUT_HANDLE), g_ConsoleAttribute); } return 0; } } iproc++; } HostNode *n = g_pHosts; g_pHosts = g_pHosts->next; delete n; } //printf("Waiting for processes\n");fflush(stdout); // Wait for all the process launching threads to complete WaitForLotsOfObjects(nProc, pThread); for (i = 0; i<nProc; i++) CloseHandle(pThread[i]); delete pThread; pThread = NULL; if (WaitForSingleObject(g_hAbortEvent, 0) == WAIT_OBJECT_0) { char pszStr[100]; printf("aborting...\n");fflush(stdout); for (i=0; i<nProc; i++) { if (g_pProcessSocket[i] != INVALID_SOCKET) { sprintf(pszStr, "kill %d", g_pProcessLaunchId[i]); WriteString(g_pProcessSocket[i], pszStr); sprintf(pszStr, "freeprocess %d", g_pProcessLaunchId[i]); WriteString(g_pProcessSocket[i], pszStr); ReadStringTimeout(g_pProcessSocket[i], pszStr, g_nMPIRUN_SHORT_TIMEOUT); WriteString(g_pProcessSocket[i], "done"); easy_closesocket(g_pProcessSocket[i]); } } if (g_bUseJobHost && !g_bNoMPI) UpdateJobState("ABORTED"); ExitProcess(0); } // Note: If the user hits Ctrl-C between the above if statement and the following ResetEvent statement // nothing will happen and the user will have to hit Ctrl-C again. ResetEvent(g_hLaunchThreadsRunning); //printf("____g_hLaunchThreadsRunning event is reset, Ctrl-C should work now____\n");fflush(stdout); if (g_bUseJobHost && !g_bNoMPI) UpdateJobState("RUNNING"); //printf("Waiting for exit codes\n");fflush(stdout); // Wait for the mpds to return the exit codes of all the processes WaitForExitCommands(); delete g_pForwardHost; g_pForwardHost = NULL; // Signal the IO redirection thread to stop char ch = 0; easy_send(g_sockStopIOSignalSocket, &ch, 1); //printf("Waiting for redirection thread to exit\n");fflush(stdout); // Wait for the redirection thread to complete. Kill it if it takes too long. if (WaitForSingleObject(g_hRedirectIOListenThread, 10000) != WAIT_OBJECT_0) { //printf("Terminating the IO redirection control thread\n"); TerminateThread(g_hRedirectIOListenThread, 0); } CloseHandle(g_hRedirectIOListenThread); easy_closesocket(g_sockStopIOSignalSocket); CloseHandle(g_hAbortEvent); if (g_bUseJobHost && !g_bNoMPI) UpdateJobState("FINISHED"); if (g_bDoMultiColorOutput) { SetConsoleTextAttribute(hStdout, g_ConsoleAttribute); } easy_socket_finalize(); delete g_pProcessSocket; delete g_pProcessLaunchId; delete g_pLaunchIdToRank; while (g_pDriveMapList) { MapDriveNode *pNode = g_pDriveMapList; g_pDriveMapList = g_pDriveMapList->pNext; delete pNode; } return 0;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -