📄 mpirun.cpp
字号:
if (strlen(g_pszEnv) >= MAX_CMD_LENGTH) { printf("Warning: environment variables truncated.\n"); fflush(stdout); } strncpy(arg->pszEnv, g_pszEnv, MAX_CMD_LENGTH); arg->pszEnv[MAX_CMD_LENGTH-1] = '\0'; } else { sprintf(pBuffer, "PMI_RANK=%d|PMI_SHM_LOW=%d|PMI_SHM_HIGH=%d", iproc, nShmLow, nShmHigh); if (strlen(arg->pszEnv) > 0) strncat(arg->pszEnv, "|", MAX_CMD_LENGTH - 1 - strlen(arg->pszEnv)); if (strlen(pBuffer) + strlen(arg->pszEnv) >= MAX_CMD_LENGTH) { printf("Warning: environment variables truncated.\n"); fflush(stdout); } strncat(arg->pszEnv, pBuffer, MAX_CMD_LENGTH - 1 - strlen(arg->pszEnv)); if (strlen(g_pszEnv) > 0) { if (strlen(arg->pszEnv) + strlen(g_pszEnv) + 1 >= MAX_CMD_LENGTH) { printf("Warning: environment variables truncated.\n"); } strncat(arg->pszEnv, "|", MAX_CMD_LENGTH - 1 - strlen(arg->pszEnv)); strncat(arg->pszEnv, g_pszEnv, MAX_CMD_LENGTH - 1 - strlen(arg->pszEnv)); } } //printf("creating MPIRunLaunchProcess thread\n");fflush(stdout); pThread[iproc] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)MPIRunLaunchProcess, arg, 0, &dwThreadID); if (pThread[iproc] == NULL) { printf("Unable to create LaunchProcess thread\n");fflush(stdout); // Signal launch threads to abort // Wait for them to return // ... insert code here // In the mean time, just exit if (g_bDoMultiColorOutput) { SetConsoleTextAttribute(GetStdHandle(STD_OUTPUT_HANDLE), g_ConsoleAttribute); } ExitProcess(1); } iproc++; } HostNode *n = g_pHosts; g_pHosts = g_pHosts->next; delete n; } //printf("Waiting for processes\n");fflush(stdout); // Wait for all the process launching threads to complete WaitForLotsOfObjects(nProc, pThread); for (i = 0; i<nProc; i++) CloseHandle(pThread[i]); delete pThread; pThread = NULL; if (WaitForSingleObject(g_hAbortEvent, 0) == WAIT_OBJECT_0) { char pszStr[100]; for (i=0; i<nProc; i++) { if (g_pProcessSocket[i] != INVALID_SOCKET) { sprintf(pszStr, "kill %d", g_pProcessLaunchId[i]); WriteString(g_pProcessSocket[i], pszStr); if (!UnmapDrives(g_pProcessSocket[i])) { printf("Drive unmappings failed\n"); } sprintf(pszStr, "freeprocess %d", g_pProcessLaunchId[i]); WriteString(g_pProcessSocket[i], pszStr); WriteString(g_pProcessSocket[i], "done"); easy_closesocket(g_pProcessSocket[i]); } } ExitProcess(0); } // Note: If the user hits Ctrl-C between the above if statement and the following ResetEvent statement // nothing will happen and the user will have to hit Ctrl-C again. ResetEvent(g_hLaunchThreadsRunning); //printf("Waiting for exit codes\n");fflush(stdout); // Wait for the mpds to return the exit codes of all the processes WaitForExitCommands(); delete g_pForwardHost; g_pForwardHost = NULL; // Signal the IO redirection thread to stop char ch = 0; easy_send(g_sockStopIOSignalSocket, &ch, 1); //printf("Waiting for redirection thread to exit\n");fflush(stdout); // Wait for the redirection thread to complete. Kill it if it takes too long. if (WaitForSingleObject(g_hRedirectIOListenThread, 10000) != WAIT_OBJECT_0) { //printf("Terminating the IO redirection control thread\n"); TerminateThread(g_hRedirectIOListenThread, 0); } CloseHandle(g_hRedirectIOListenThread); easy_closesocket(g_sockStopIOSignalSocket); CloseHandle(g_hAbortEvent); if (g_bDoMultiColorOutput) { SetConsoleTextAttribute(hStdout, g_ConsoleAttribute); } DestroyPMIDatabase(pmi_host, pmi_port, phrase, pmi_kvsname); easy_socket_finalize(); delete g_pProcessSocket; delete g_pProcessLaunchId; delete g_pLaunchIdToRank; while (g_pDriveMapList) { MapDriveNode *pNode = g_pDriveMapList; g_pDriveMapList = g_pDriveMapList->pNext; delete pNode; } return 0;}struct ProcessWaitAbortThreadArg{ SOCKET sockAbort; SOCKET sockStop; int n; SOCKET *pSocket;};// Function name : ProcessWaitAbort// Description : // Return type : void // Argument : ProcessWaitAbortThreadArg *pArgvoid ProcessWaitAbort(ProcessWaitAbortThreadArg *pArg){ int n, i; fd_set readset; FD_ZERO(&readset); FD_SET(pArg->sockAbort, &readset); FD_SET(pArg->sockStop, &readset); n = select(0, &readset, NULL, NULL, NULL); if (n == SOCKET_ERROR) { printf("bselect failed, error %d\n", WSAGetLastError());fflush(stdout); for (i=0; i<pArg->n; i++) { easy_closesocket(pArg->pSocket[i]); } easy_closesocket(pArg->sockAbort); easy_closesocket(pArg->sockStop); return; } if (n == 0) { printf("ProcessWaitAbort: bselect returned zero sockets available\n");fflush(stdout); for (i=0; i<pArg->n; i++) { easy_closesocket(pArg->pSocket[i]); } easy_closesocket(pArg->sockAbort); easy_closesocket(pArg->sockStop); return; } if (FD_ISSET(pArg->sockAbort, &readset)) { for (i=0; i<pArg->n; i++) { easy_send(pArg->pSocket[i], "x", 1); } } for (i=0; i<pArg->n; i++) { easy_closesocket(pArg->pSocket[i]); } easy_closesocket(pArg->sockAbort); easy_closesocket(pArg->sockStop);}// Function name : UnmapDrives// Description : // Return type : bool // Argument : int sockbool UnmapDrives(SOCKET sock){ char pszStr[256]; if (g_pDriveMapList && !g_bNoDriveMapping) { MapDriveNode *pNode = g_pDriveMapList; while (pNode) { sprintf(pszStr, "unmap drive=%c", pNode->cDrive); if (WriteString(sock, pszStr) == SOCKET_ERROR) { printf("ERROR: Unable to send unmap command, Error %d", WSAGetLastError()); easy_closesocket(sock); SetEvent(g_hAbortEvent); return false; } if (!ReadString(sock, pszStr)) { printf("ERROR: Unable to read the result of unmap command, Error %d", WSAGetLastError()); easy_closesocket(sock); SetEvent(g_hAbortEvent); return false; } if (stricmp(pszStr, "SUCCESS")) { printf("ERROR: Unable to unmap %c: %s\r\n%s", pNode->cDrive, pNode->pszShare, pszStr); easy_closesocket(sock); SetEvent(g_hAbortEvent); return false; } pNode = pNode->pNext; } } return true;}struct ProcessWaitThreadArg{ int n; SOCKET *pSocket; int *pId; int *pRank; SOCKET sockAbort;};// Function name : ProcessWait// Description : // Return type : void // Argument : ProcessWaitThreadArg *pArgvoid ProcessWait(ProcessWaitThreadArg *pArg){ int i, j, n; fd_set totalset, readset; char str[256]; FD_ZERO(&totalset); FD_SET(pArg->sockAbort, &totalset); for (i=0; i<pArg->n; i++) { FD_SET(pArg->pSocket[i], &totalset); } while (pArg->n) { readset = totalset; n = select(0, &readset, NULL, NULL, NULL); if (n == SOCKET_ERROR) { printf("select failed, error %d\n", WSAGetLastError());fflush(stdout); for (i=0, j=0; i<pArg->n; i++, j++) { while (pArg->pSocket[j] == INVALID_SOCKET) j++; easy_closesocket(pArg->pSocket[j]); } return; } if (n == 0) { printf("WaitForExitCommands: select returned zero sockets available");fflush(stdout); for (i=0, j=0; i<pArg->n; i++, j++) { while (pArg->pSocket[j] == INVALID_SOCKET) j++; easy_closesocket(pArg->pSocket[j]); } return; } if (FD_ISSET(pArg->sockAbort, &readset)) { for (i=0; pArg->n > 0; i++) { while (pArg->pSocket[i] == INVALID_SOCKET) i++; sprintf(str, "kill %d", pArg->pId[i]); WriteString(pArg->pSocket[i], str); int nRank = pArg->pRank[i]; if (g_nNproc > FORWARD_NPROC_THRESHOLD) { if (nRank > 0 && (g_nNproc/2) > nRank) { //printf("rank %d(%d) stopping forwarder\n", nRank, g_pProcessLaunchId[i]);fflush(stdout); sprintf(str, "stopforwarder port=%d abort=yes", g_pForwardHost[nRank].nPort); WriteString(pArg->pSocket[i], str); } } sprintf(str, "freeprocess %d", pArg->pId[i]); WriteString(pArg->pSocket[i], str); WriteString(pArg->pSocket[i], "done"); easy_closesocket(pArg->pSocket[i]); pArg->pSocket[i] = INVALID_SOCKET; pArg->n--; } return; } for (i=0; n>0; i++) { while (pArg->pSocket[i] == INVALID_SOCKET) i++; if (FD_ISSET(pArg->pSocket[i], &readset)) { if (!ReadString(pArg->pSocket[i], str)) { printf("Unable to read the result of the getexitcodewait command for process %d, error %d", i, WSAGetLastError());fflush(stdout); return; } int nRank = pArg->pRank[i]; if (g_nNproc > FORWARD_NPROC_THRESHOLD) { if (nRank > 0 && (g_nNproc/2) > nRank) { sprintf(str, "stopforwarder port=%d abort=no", g_pForwardHost[nRank].nPort); WriteString(pArg->pSocket[i], str); } } UnmapDrives(pArg->pSocket[i]); sprintf(str, "freeprocess %d", pArg->pId[i]); WriteString(pArg->pSocket[i], str); WriteString(pArg->pSocket[i], "done"); easy_closesocket(pArg->pSocket[i]); FD_CLR(pArg->pSocket[i], &totalset); pArg->pSocket[i] = INVALID_SOCKET; n--; pArg->n--; } } }}// Function name : WaitForExitCommands// Description : // Return type : void void WaitForExitCommands(){ if (g_nNumProcessSockets < FD_SETSIZE) { int i, n; fd_set totalset, readset; char str[256]; SOCKET break_sock; MakeLoop(&break_sock, &g_sockBreak); SetEvent(g_hBreakReadyEvent); FD_ZERO(&totalset); FD_SET(break_sock, &totalset); for (i=0; i<g_nNumProcessSockets; i++) { FD_SET(g_pProcessSocket[i], &totalset); } while (g_nNumProcessSockets) { readset = totalset; n = select(0, &readset, NULL, NULL, NULL); if (n == SOCKET_ERROR) { printf("WaitForExitCommands: select failed, error %d\n", WSAGetLastError());fflush(stdout); for (i=0; g_nNumProcessSockets > 0; i++) { while (g_pProcessSocket[i] == INVALID_SOCKET) i++; easy_closesocket(g_pProcessSocket[i]); g_nNumProcessSockets--; } return; } if (n == 0) { printf("WaitForExitCommands: select returned zero sockets available\n");fflush(stdout); for (i=0; g_nNumProcessSockets > 0; i++) { while (g_pProcessSocket[i] == INVALID_SOCKET) i++; easy_closesocket(g_pProcessSocket[i]); g_nNumProcessSockets--; } return; } else { if (FD_ISSET(break_sock, &readset)) { int num_read = easy_receive(break_sock, str, 1); if (num_read == 0 || num_read == SOCKET_ERROR) { FD_CLR(break_sock, &totalset); } else { printf("Sending kill commands to launched processes\n");fflush(stdout); for (int j=0, i=0; i<g_nNumProcessSockets; i++, j++) { while (g_pProcessSocket[j] == INVALID_SOCKET) j++; sprintf(str, "kill %d", g_pProcessLaunchId[j]); //printf("%s\n", str);fflush(stdout); WriteString(g_pProcessSocket[j], str); } } n--; } for (i=0; n>0; i++) { while (g_pProcessSocket[i] == INVALID_SOCKET) i++; if (FD_ISSET(g_pProcessSocket[i], &readset)) { if (!ReadString(g_pProcessSocket[i], str)) { printf("Unable to read the result of the getexitcodewait command for process %d, error %d", i, WSAGetLastError());fflush(stdout); return; } int nRank = g_pLaunchIdToRank[i]; if (g_nNproc > FORWARD_NPROC_THRESHOLD) { if (nRank > 0 && (g_nNproc/2) > nRank) { //printf("rank %d(%d) stopping forwarder\n", nRank, g_pProcessLaunchId[i]);fflush(stdout); sprintf(str, "stopforwarder port=%d abort=no", g_pForwardHost[nRank].nPort); WriteString(g_pProcessSocket[i], str); } } UnmapDrives(g_pProcessSocket[i]); sprintf(str, "freeprocess %d", g_pProcessLaunchId[i]); WriteString(g_pProcessSocket[i], str); WriteString(g_pProcessSocket[i], "done"); easy_closesocket(g_pProcessSocket[i]); FD_CLR(g_pProcessSocket[i], &totalset); g_pProcessSocket[i] = INVALID_SOCKET; n--; g_nNumProcessSockets--; //printf("(E:%d)", g_pProcessLaunchId[i]);fflush(stdout); } } } } easy_closesocket(g_sockBreak); g_sockBreak = INVALID_SOCKET; delete g_pProcessSocket; delete g_pProcessLaunchId; delete g_pLaunchIdToRank; g_pProcessSocket = NULL; g_pProcessLaunchId = NULL; g_pLaunchIdToRank = NULL; } else { DWORD dwThreadID; int num = (g_nNumProcessSockets / (FD_SETSIZE-1)) + 1; HANDLE *hThread = new HANDLE[num]; SOCKET *pAbortsock = new SOCKET[num]; SOCKET sockStop; ProcessWaitThreadArg *arg = new ProcessWaitThreadArg[num]; ProcessWaitAbortThreadArg *arg2 = new ProcessWaitAbortThreadArg; int i; for (i=0; i<num; i++) { if (i == num-1) arg[i].n = g_nNumProcessSockets % (FD_SETSIZE-1); else arg[i].n = (FD_SETSIZE-1); arg[i].pSocket = &g_pProcessSocket[i*(FD_SETSIZE-1)]; arg[i].pId = &g_pProcessLaunchId[i*(FD_SETSIZE-1)]; arg[i].pRank = &g_pLaunchIdToRank[i*(FD_SETSIZE-1)]; MakeLoop(&arg[i].sockAbort, &pAbortsock[i]); } for (i=0; i<num; i++) { hThread[i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)ProcessWait, &arg[i], 0, &dwThreadID); } MakeLoop(&arg2->sockAbort, &g_sockBreak); MakeLoop(&arg2->sockStop, &sockStop); HANDLE hWaitAbortThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)ProcessWaitAbort, arg2, 0, &dwThreadID); SetEvent(g_hBreakReadyEvent); WaitForMultipleObjects(num, hThread, TRUE, INFINITE); for (i=0; i<num; i++) CloseHandle(hThread[i]); delete hThread; delete arg; easy_send(sockStop, "x", 1); easy_closesocket(sockStop); WaitForSingleObject(hWaitAbortThread, 10000); delete pAbortsock; delete arg2; CloseHandle(hWaitAbortThread); easy_closesocket(g_sockBreak); g_sockBreak = INVALID_SOCKET; delete g_pProcessSocket; delete g_pProcessLaunchId; delete g_pLaunchIdToRank; g_pProcessSocket = NULL; g_pProcessLaunchId = NULL; g_pLaunchIdToRank = NULL; } //printf("WaitForExitCommands returning\n");fflush(stdout);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -