📄 processwait.cpp
字号:
//printf("waiting for %d processes\n", g_nNumProcessSockets);fflush(stdout); /* for (i=0; i<g_nNumProcessSockets; i++) { printf("socket[%d] = id %d\n", g_pProcessSocket[i], g_pProcessLaunchId[i]); } fflush(stdout); */ MakeLoop(&break_sock, &g_sockBreak); SetEvent(g_hBreakReadyEvent); // allow a break to happen if the user has already hit Ctrl-C FD_ZERO(&totalset); FD_SET(break_sock, &totalset); for (i=0; i<g_nNumProcessSockets; i++) { FD_SET(g_pProcessSocket[i], &totalset); } while (g_nNumProcessSockets) { readset = totalset; n = select(0, &readset, NULL, NULL, NULL); if (n == SOCKET_ERROR) { PrintError(WSAGetLastError(), "WaitForExitCommands: bselect failed\n");fflush(stdout); for (i=0; g_nNumProcessSockets > 0; i++) { while (g_pProcessSocket[i] == INVALID_SOCKET) i++; //printf("closing socket [%d]: %s\n", i, bto_string(g_pProcessSocket[i]));fflush(stdout); easy_closesocket(g_pProcessSocket[i]); g_pProcessSocket[i] = INVALID_SOCKET; g_nNumProcessSockets--; } return; } if (n == 0) { printf("WaitForExitCommands: bselect returned zero sockets available\n");fflush(stdout); for (i=0; g_nNumProcessSockets > 0; i++) { while (g_pProcessSocket[i] == INVALID_SOCKET) i++; //printf("closing socket [%d]: %s\n", i, bto_string(g_pProcessSocket[i]));fflush(stdout); easy_closesocket(g_pProcessSocket[i]); g_pProcessSocket[i] = INVALID_SOCKET; g_nNumProcessSockets--; } return; } else { if (FD_ISSET(break_sock, &readset)) { int num_read = easy_receive(break_sock, str, 1); if (num_read == 0 || num_read == SOCKET_ERROR) { FD_CLR(break_sock, &totalset); } else { if (!bKillSent) { printf("Sending kill commands to launched processes\n");fflush(stdout); for (int j=0, i=0; i<g_nNumProcessSockets; i++, j++) { while (g_pProcessSocket[j] == INVALID_SOCKET) j++; sprintf(str, "kill %d", g_pProcessLaunchId[j]); //printf("%d:%s\n", __LINE__, str);fflush(stdout); //printf("kill %d (id[%d])\n", g_pProcessLaunchId[j], j);fflush(stdout); if (WriteString(g_pProcessSocket[j], str) == SOCKET_ERROR) { printf("writing kill command failed\n");fflush(stdout); } } bKillSent = true; } } n--; } for (i=0; n>0; i++) { while (g_pProcessSocket[i] == INVALID_SOCKET) i++; if (FD_ISSET(g_pProcessSocket[i], &readset)) { if (ReadStringTimeout(g_pProcessSocket[i], str, g_nMPIRUN_SHORT_TIMEOUT)) { int nRank = g_pLaunchIdToRank[i]; if (strnicmp(str, "FAIL", 4) == 0) { sprintf(str, "geterror %d", g_pProcessLaunchId[i]); WriteString(g_pProcessSocket[i], str); ReadStringTimeout(g_pProcessSocket[i], str, g_nMPIRUN_SHORT_TIMEOUT); printf("getexitcode(rank %d) failed: %s\n", nRank, str);fflush(stdout); if (g_bUseJobHost) { UpdateJobKeyValue(nRank, "error", str); // get the time the process exited sprintf(str, "getexittime %d", g_pProcessLaunchId[i]); WriteString(g_pProcessSocket[i], str); ReadStringTimeout(g_pProcessSocket[i], str, g_nMPIRUN_SHORT_TIMEOUT); UpdateJobKeyValue(nRank, "exittime", str); } if (easy_send(g_sockBreak, "x", 1) == SOCKET_ERROR) { printf("Aborting.\n");fflush(stdout); ExitProcess(-1); } } else { //printf("[%d] ExitProcess: %s\n", nRank, str);fflush(stdout); if (g_bUseJobHost) { strtok(str, ":"); // strip the extra data from the string UpdateJobKeyValue(nRank, "exitcode", str); char *temp; if (g_bOutputExitCodes) temp = strdup(str); // get the time the process exited sprintf(str, "getexittime %d", g_pProcessLaunchId[i]); WriteString(g_pProcessSocket[i], str); ReadStringTimeout(g_pProcessSocket[i], str, g_nMPIRUN_SHORT_TIMEOUT); UpdateJobKeyValue(nRank, "exittime", str); if (g_bOutputExitCodes) { printf("[rank %d exit code: %s, time: %s]\n", nRank, temp, str);fflush(stdout); free(temp); } } else { if (g_bOutputExitCodes) { strtok(str, ":"); // strip the extra data from the string printf("[rank %d exit code: %s]\n", nRank, str);fflush(stdout); } } if (!g_bNoMPI) { sprintf(str, "getmpifinalized %d", g_pProcessLaunchId[i]); WriteString(g_pProcessSocket[i], str); ReadStringTimeout(g_pProcessSocket[i], str, g_nMPIRUN_SHORT_TIMEOUT); if (stricmp(str, "yes") != 0) { if (stricmp(str, "no") != 0) printf("getmpifinalized returned: %s\n", str); else { if (!g_bSuppressErrorOutput) printf("process %d exited without calling MPIFinalize\n", nRank); } fflush(stdout); easy_send(g_sockBreak, "x", 1); } } } if (g_nNproc > FORWARD_NPROC_THRESHOLD) { if (nRank > 0 && (g_nNproc/2) > nRank) { //printf("rank %d(%d) stopping forwarder\n", nRank, g_pProcessLaunchId[i]);fflush(stdout); sprintf(str, "stopforwarder port=%d abort=no", g_pForwardHost[nRank].nPort); WriteString(g_pProcessSocket[i], str); } } sprintf(str, "freeprocess %d", g_pProcessLaunchId[i]); WriteString(g_pProcessSocket[i], str); ReadStringTimeout(g_pProcessSocket[i], str, g_nMPIRUN_SHORT_TIMEOUT); WriteString(g_pProcessSocket[i], "done"); //printf("closing socket [%d]: %s\n", i, bto_string(g_pProcessSocket[i]));fflush(stdout); FD_CLR(g_pProcessSocket[i], &totalset); easy_closesocket(g_pProcessSocket[i]); g_pProcessSocket[i] = INVALID_SOCKET; //printf("closing socket [%d]\n", i);fflush(stdout); n--; g_nNumProcessSockets--; //printf("(E:%d)", g_pProcessLaunchId[i]);fflush(stdout); } else { if (WSAGetLastError() != 0) PrintError(WSAGetLastError(), "WaitForExitCommands:Reading the exit code for process %d failed.\n", i); else { printf("WaitForExitCommands:Reading the exit code for process %d failed.\n", i); fflush(stdout); } FD_CLR(g_pProcessSocket[i], &totalset); sprintf(str, "kill %d", g_pProcessLaunchId[i]); WriteString(g_pProcessSocket[i], str); WriteString(g_pProcessSocket[i], "done"); //printf("closing socket [%d]: %s\n", i, bto_string(g_pProcessSocket[i]));fflush(stdout); easy_closesocket(g_pProcessSocket[i]); g_pProcessSocket[i] = INVALID_SOCKET; //printf("closing socket [%d]\n", i);fflush(stdout); n--; g_nNumProcessSockets--; if (easy_send(g_sockBreak, "x", 1) == SOCKET_ERROR) { printf("Unable to abort processes.\n");fflush(stdout); ExitProcess(-1); } } } } } } easy_closesocket(g_sockBreak); g_sockBreak = INVALID_SOCKET; delete g_pProcessSocket; delete g_pProcessLaunchId; delete g_pLaunchIdToRank; g_pProcessSocket = NULL; g_pProcessLaunchId = NULL; g_pLaunchIdToRank = NULL; } else { DWORD dwThreadID; int num = (g_nNumProcessSockets / (FD_SETSIZE-1)) + 1; HANDLE *hThread = new HANDLE[num]; SOCKET *pAbortsock = new SOCKET[num]; SOCKET sockStop; ProcessWaitThreadArg *arg = new ProcessWaitThreadArg[num]; ProcessWaitAbortThreadArg *arg2 = new ProcessWaitAbortThreadArg; int i; for (i=0; i<num; i++) { if (i == num-1) arg[i].n = g_nNumProcessSockets % (FD_SETSIZE-1); else arg[i].n = (FD_SETSIZE-1); arg[i].pSocket = &g_pProcessSocket[i*(FD_SETSIZE-1)]; arg[i].pId = &g_pProcessLaunchId[i*(FD_SETSIZE-1)]; arg[i].pRank = &g_pLaunchIdToRank[i*(FD_SETSIZE-1)]; MakeLoop(&arg[i].sockAbort, &pAbortsock[i]); } for (i=0; i<num; i++) { hThread[i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)ProcessWait, &arg[i], 0, &dwThreadID); } MakeLoop(&arg2->sockAbort, &g_sockBreak); MakeLoop(&arg2->sockStop, &sockStop); HANDLE hWaitAbortThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)ProcessWaitAbort, arg2, 0, &dwThreadID); SetEvent(g_hBreakReadyEvent); WaitForMultipleObjects(num, hThread, TRUE, INFINITE); for (i=0; i<num; i++) CloseHandle(hThread[i]); delete hThread; delete arg; easy_send(sockStop, "x", 1); easy_closesocket(sockStop); WaitForSingleObject(hWaitAbortThread, 10000); delete pAbortsock; delete arg2; CloseHandle(hWaitAbortThread); easy_closesocket(g_sockBreak); g_sockBreak = INVALID_SOCKET; delete g_pProcessSocket; delete g_pProcessLaunchId; delete g_pLaunchIdToRank; g_pProcessSocket = NULL; g_pProcessLaunchId = NULL; g_pLaunchIdToRank = NULL; } //printf("WaitForExitCommands returning\n");fflush(stdout);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -