📄 mpirun.cpp
字号:
bool bDoSMP = !GetOpt(argc, argv, "-tcp"); if (g_nHosts < 1) { printf("Error: must specify a number greater than 0 after the -localonly option\n"); return; } if (argc < 2) { printf("Error: not enough arguments.\n"); return; } TCHAR pszTempExe[MAX_PATH], *namepart; _tcscpy(g_pszExe, argv[1]); GetFullPathName(g_pszExe, MAX_PATH, pszTempExe, &namepart); // Quote the executable in case there are spaces in the path _stprintf(g_pszExe, TEXT("\"%s\""), pszTempExe); g_pszArgs[0] = TEXT('\0'); for (int i=2; i<argc; i++) { _tcscat(g_pszArgs, argv[i]); if (i < argc-1) _tcscat(g_pszArgs, TEXT(" ")); } RunLocal(bDoSMP); return; } else { if (ParseConfigFile(argv[1]) == PARSE_ERR_NO_FILE) { // The first argument might be an executable with the extension missing (.exe, .bat, .com, etc.) // so set things up to run one process g_nHosts = 1; TCHAR pszTempExe[MAX_PATH], *namepart; _tcscpy(g_pszExe, argv[1]); GetFullPathName(g_pszExe, MAX_PATH, pszTempExe, &namepart); // Quote the executable in case there are spaces in the path _stprintf(g_pszExe, TEXT("\"%s\""), pszTempExe); g_pszArgs[0] = TEXT('\0'); for (int i=2; i<argc; i++) { _tcscat(g_pszArgs, argv[i]); if (i < argc-1) _tcscat(g_pszArgs, TEXT(" ")); } //g_bNoMPI = true; RunLocal(true); return; } else { if ((_tcslen(g_pszArgs) > 0) && (argc > 2)) _tcscat(g_pszArgs, TEXT(" ")); for (int i=2; i<argc; i++) { _tcscat(g_pszArgs, argv[i]); if (i < argc-1) _tcscat(g_pszArgs, TEXT(" ")); } } } } TCHAR pszTempExe[MAX_PATH], *namepart; GetFullPathName(g_pszExe, MAX_PATH, pszTempExe, &namepart); // Quote the executable in case there are spaces in the path _stprintf(g_pszExe, TEXT("\"%s\""), pszTempExe); if (bLogon) GetAccountAndPassword(); else if (ReadPasswordFromRegistry(g_pszAccount, g_pszPassword)) bLogon = true; hr = CoInitializeEx(NULL, COINIT_MULTITHREADED); if (FAILED(hr)) { _tprintf(TEXT("CoInitialize() failed.\n")); PrintError(hr); return; } hr = CoInitializeSecurity(NULL, -1, NULL, NULL, //RPC_C_AUTHN_LEVEL_NONE, RPC_C_IMP_LEVEL_ANONYMOUS, NULL, EOAC_NONE, NULL); //RPC_C_AUTHN_LEVEL_NONE, RPC_C_IMP_LEVEL_IMPERSONATE, NULL, EOAC_NONE, NULL); RPC_C_AUTHN_LEVEL_CONNECT, RPC_C_IMP_LEVEL_IMPERSONATE, NULL, EOAC_NONE, NULL); //RPC_C_AUTHN_LEVEL_PKT, RPC_C_IMP_LEVEL_IMPERSONATE, NULL, EOAC_NONE, NULL); if (FAILED(hr)) { if (hr == RPC_E_TOO_LATE) printf("CoInitializeSecurity failed because it has already been set.\n"); else { char error_msg[256]; Translate_HRError(hr, error_msg); printf("CoInitializeSecurity failed\nError: %s", error_msg); } } // Figure out how many processes to launch int nProc = 0; HostNode *n = g_pHosts; if (g_pHosts == NULL) nProc = g_nHosts; while (n) { nProc += n->nSMPProcs; n = n->next; } CreateJobID(pszJobID); if (bUseMPICH2) { char pszKey[100], pBuffer[4096]; BNR_Group mpirun_group, spawned_group, joint_group; BNR_Info info; char pszEnv[4096]; //SetConnectionsLeft(nProc * 2); HANDLE hReadyEvent = CreateEvent(NULL, TRUE, FALSE, NULL); DWORD dwThreadID; HANDLE hIOThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)RedirectIOLoopThread, hReadyEvent, 0, &dwThreadID); if (WaitForSingleObject(hReadyEvent, 5000) != WAIT_OBJECT_0) { printf("Wait for hReadyEvent failed, error %d\n", GetLastError()); ExitProcess(1); } BNR_Info_create(&info); strcpy(pBuffer, g_pszIOListenHost); BNR_Info_set(info, "stdinHost", pBuffer); sprintf(pBuffer, "%d", g_nIOListenPort); BNR_Info_set(info, "stdinPort", pBuffer); strcpy(pBuffer, g_pszIOListenHost); BNR_Info_set(info, "stdoutHost", pBuffer); sprintf(pBuffer, "%d", g_nIOListenPort); BNR_Info_set(info, "stdoutPort", pBuffer); strcpy(pBuffer, g_pszIOListenHost); BNR_Info_set(info, "stderrHost", pBuffer); sprintf(pBuffer, "%d", g_nIOListenPort); BNR_Info_set(info, "stderrPort", pBuffer); g_hBNRProcessesFinishedEvent = CreateEvent(NULL, TRUE, FALSE, NULL); g_nNumBNRProcessesRemaining = nProc; BNR_Get_group(&mpirun_group); BNR_Open_group(mpirun_group, &spawned_group); //BNR_Spawn(spawned_group, nProc, g_pszExe, g_pszArgs, g_pszEnv, info, ExitBNRProcess); for (int i=0; i<nProc; i++) { if (strlen(g_pszEnv)) sprintf(pszEnv, "SHMEMKEY=%s|SHMEMGRPSIZE=%d|SHMEMGRPRANK=%d|%s", pszJobID, nProc, i, g_pszEnv); else sprintf(pszEnv, "SHMEMKEY=%s|SHMEMGRPSIZE=%d|SHMEMGRPRANK=%d", pszJobID, nProc, i); BNR_Spawn(spawned_group, 1, g_pszExe, g_pszArgs, pszEnv, info, ExitBNRProcess); } BNR_Close_group(spawned_group); BNR_Merge(mpirun_group, spawned_group, &joint_group); for (i=0; i<nProc; i++) { sprintf(pBuffer, "MPICH_JOBID=%s|MPICH_NPROC=%d|MPICH_IPROC=%d", pszJobID, nProc, i); sprintf(pszKey, "env%d", i); BNR_Put(joint_group, pszKey, pBuffer, i); } BNR_Fence(joint_group); WaitForSingleObject(g_hBNRProcessesFinishedEvent, INFINITE); //WaitForAllConnections(); BNR_Free_group(joint_group); BNR_Free_group(spawned_group); BNR_Free_group(mpirun_group); BNR_Finalize(); } else if (g_bUseBNR) { char pszKey[100], pBuffer[4096]; BNR_Group mpirun_group, spawned_group, joint_group; BNR_Info info; //SetConnectionsLeft(nProc * 2); HANDLE hReadyEvent = CreateEvent(NULL, TRUE, FALSE, NULL); DWORD dwThreadID; HANDLE hIOThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)RedirectIOLoopThread, hReadyEvent, 0, &dwThreadID); if (WaitForSingleObject(hReadyEvent, 5000) != WAIT_OBJECT_0) { printf("Wait for hReadyEvent failed, error %d\n", GetLastError()); ExitProcess(1); } BNR_Info_create(&info); strcpy(pBuffer, g_pszIOListenHost); BNR_Info_set(info, "stdinHost", pBuffer); sprintf(pBuffer, "%d", g_nIOListenPort); BNR_Info_set(info, "stdinPort", pBuffer); strcpy(pBuffer, g_pszIOListenHost); BNR_Info_set(info, "stdoutHost", pBuffer); sprintf(pBuffer, "%d", g_nIOListenPort); BNR_Info_set(info, "stdoutPort", pBuffer); strcpy(pBuffer, g_pszIOListenHost); BNR_Info_set(info, "stderrHost", pBuffer); sprintf(pBuffer, "%d", g_nIOListenPort); BNR_Info_set(info, "stderrPort", pBuffer); g_hBNRProcessesFinishedEvent = CreateEvent(NULL, TRUE, FALSE, NULL); g_nNumBNRProcessesRemaining = nProc; BNR_Get_group(&mpirun_group); BNR_Open_group(mpirun_group, &spawned_group); BNR_Spawn(spawned_group, nProc, g_pszExe, g_pszArgs, g_pszEnv, info, ExitBNRProcess); BNR_Close_group(spawned_group); BNR_Merge(mpirun_group, spawned_group, &joint_group); for (i=0; i<nProc; i++) { sprintf(pBuffer, "MPICH_JOBID=%s|MPICH_NPROC=%d|MPICH_IPROC=%d", pszJobID, nProc, i); sprintf(pszKey, "env%d", i); BNR_Put(joint_group, pszKey, pBuffer, i); } BNR_Fence(joint_group); WaitForSingleObject(g_hBNRProcessesFinishedEvent, INFINITE); //WaitForAllConnections(); BNR_Free_group(joint_group); BNR_Free_group(spawned_group); BNR_Free_group(mpirun_group); BNR_Finalize(); } else { // Set the environment variables common to all processes sprintf(pszEnv, "MPICH_JOBID=%s|MPICH_NPROC=%d|MPICH_ROOTHOST=%s", pszJobID, nProc, g_pHosts->host); // Allocate an array to hold handles to the LaunchProcess threads pThread = new HANDLE[nProc]; g_pAbortThreads = new HANDLE[nProc]; for (i=0; i<nProc; i++) g_pAbortThreads[i] = NULL; // Launch the processes while (g_pHosts) { nShmLow = iproc; nShmHigh = iproc + g_pHosts->nSMPProcs - 1; for (int i=0; i<g_pHosts->nSMPProcs; i++) { LaunchProcessArg *arg = new LaunchProcessArg; arg->i = iproc; arg->bLogon = bLogon;#ifdef UNICODE if (bLogon) { wcscpy(arg->pszAccount, g_pszAccount); wcscpy(arg->pszPassword, g_pszPassword); } if (wcslen(g_pHosts->exe) > 0) wcscpy(arg->pszCmdLine, g_pHosts->exe); else wcscpy(arg->pszCmdLine, g_pszExe); if (wcslen(g_pszArgs) > 0) { wcscat(arg->pszCmdLine, L" "); wcscat(arg->pszCmdLine, g_pszArgs); } wcscpy(arg->pszDir, pszDir); wcscpy(arg->pszEnv, pszEnv); wcscpy(arg->pszHost, g_pHosts->host); wcscpy(arg->pszJobID, pszJobID); if (iproc == 0) swprintf(pBuffer, L"MPICH_ROOTPORT=-1|MPICH_IPROC=%d|MPICH_SHM_LOW=%d|MPICH_SHM_HIGH=%d", iproc, nShmLow, nShmHigh); else swprintf(pBuffer, L"|MPICH_ROOTPORT=%d|MPICH_IPROC=%d|MPICH_SHM_LOW=%d|MPICH_SHM_HIGH=%d", g_nRootPort, iproc, nShmLow, nShmHigh); if (wcslen(arg->pszEnv) > 0) wcscat(arg->pszEnv, L"|"); wcscat(arg->pszEnv, pBuffer); if (wcslen(g_pszEnv) > 0) { wcscat(arg->pszEnv, L"|"); wcscat(arg->pszEnv, g_pszEnv); }#else WCHAR wTemp[MAX_PATH]; if (bLogon) { mbstowcs(arg->pszAccount, g_pszAccount, strlen(g_pszAccount)+1); mbstowcs(arg->pszPassword, g_pszPassword, strlen(g_pszPassword)+1); } if (strlen(g_pHosts->exe) > 0) mbstowcs(arg->pszCmdLine, g_pHosts->exe, strlen(g_pHosts->exe)+1); else mbstowcs(arg->pszCmdLine, g_pszExe, strlen(g_pszExe)+1); if (strlen(g_pszArgs) > 0) { wcscat(arg->pszCmdLine, L" "); mbstowcs(wTemp, g_pszArgs, strlen(g_pszArgs)+1); wcscat(arg->pszCmdLine, wTemp); } mbstowcs(arg->pszDir, pszDir, strlen(pszDir)+1); mbstowcs(arg->pszEnv, pszEnv, strlen(pszEnv)+1); mbstowcs(arg->pszHost, g_pHosts->host, strlen(g_pHosts->host)+1); mbstowcs(arg->pszJobID, pszJobID, strlen(pszJobID)+1); if (iproc == 0) swprintf(wTemp, L"MPICH_ROOTPORT=-1|MPICH_IPROC=%d|MPICH_SHM_LOW=%d|MPICH_SHM_HIGH=%d", iproc, nShmLow, nShmHigh); else swprintf(wTemp, L"MPICH_ROOTPORT=%d|MPICH_IPROC=%d|MPICH_SHM_LOW=%d|MPICH_SHM_HIGH=%d", g_nRootPort, iproc, nShmLow, nShmHigh); if (wcslen(arg->pszEnv) > 0) wcscat(arg->pszEnv, L"|"); wcscat(arg->pszEnv, wTemp); if (strlen(g_pszEnv) > 0) { wcscat(arg->pszEnv, L"|"); mbstowcs(wTemp, g_pszEnv, strlen(g_pszEnv)+1); wcscat(arg->pszEnv, wTemp); }#endif pThread[iproc] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)LaunchProcess, arg, 0, &dwThreadID); if (pThread[iproc] == NULL) { printf("Unable to create LaunchProcess thread\n"); ExitProcess(1); } if (iproc == 0) { // Wait for the root port to be valid while (g_nRootPort == 0) Sleep(200); } iproc++; } HostNode *n = g_pHosts; g_pHosts = g_pHosts->next; delete n; } WaitForLotsOfObjects(nProc, pThread); SetEvent(g_hAbortEvent); for (i=0; i<nProc; i++) CloseHandle(pThread[i]); delete pThread; WaitForLotsOfObjects(nProc, g_pAbortThreads); for (i=0; i<nProc; i++) CloseHandle(g_pAbortThreads[i]); delete g_pAbortThreads; CloseHandle(g_hAbortEvent); } #ifdef MULTI_COLOR_OUTPUT SetConsoleTextAttribute(hStdout, g_ConsoleAttribute);#endif CoUninitialize();}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -