📄 processwait.cpp
字号:
#include <stdio.h>#include <stdlib.h>#include "LaunchProcess.h"#include "global.h"#include "mpirun.h"#include "mpdutil.h"struct ProcessWaitThreadArg{ int n; SOCKET *pSocket; int *pId; int *pRank; SOCKET sockAbort;};struct ProcessWaitAbortThreadArg{ SOCKET sockAbort; SOCKET sockStop; int n; SOCKET *pSocket;};// Function name : ProcessWaitAbort// Description : // Return type : void // Argument : ProcessWaitAbortThreadArg *pArgvoid ProcessWaitAbort(ProcessWaitAbortThreadArg *pArg){ int n, i; fd_set readset; FD_ZERO(&readset); FD_SET(pArg->sockAbort, &readset); FD_SET(pArg->sockStop, &readset); n = select(0, &readset, NULL, NULL, NULL); if (n == SOCKET_ERROR) { PrintError(WSAGetLastError(), "bselect failed\n");fflush(stdout); for (i=0; i<pArg->n; i++) { easy_closesocket(pArg->pSocket[i]); } easy_closesocket(pArg->sockAbort); easy_closesocket(pArg->sockStop); return; } if (n == 0) { printf("ProcessWaitAbort: bselect returned zero sockets available\n");fflush(stdout); for (i=0; i<pArg->n; i++) { easy_closesocket(pArg->pSocket[i]); } easy_closesocket(pArg->sockAbort); easy_closesocket(pArg->sockStop); return; } if (FD_ISSET(pArg->sockAbort, &readset)) { for (i=0; i<pArg->n; i++) { easy_send(pArg->pSocket[i], "x", 1); } } for (i=0; i<pArg->n; i++) { easy_closesocket(pArg->pSocket[i]); } easy_closesocket(pArg->sockAbort); easy_closesocket(pArg->sockStop);}// Function name : ProcessWait// Description : // Return type : void // Argument : ProcessWaitThreadArg *pArgvoid ProcessWait(ProcessWaitThreadArg *pArg){ int i, j, n; fd_set totalset, readset; char str[256]; FD_ZERO(&totalset); FD_SET(pArg->sockAbort, &totalset); for (i=0; i<pArg->n; i++) { FD_SET(pArg->pSocket[i], &totalset); } while (pArg->n) { readset = totalset; n = select(0, &readset, NULL, NULL, NULL); if (n == SOCKET_ERROR) { PrintError(WSAGetLastError(), "bselect failed\n");fflush(stdout); for (i=0, j=0; i<pArg->n; i++, j++) { while (pArg->pSocket[j] == INVALID_SOCKET) j++; easy_closesocket(pArg->pSocket[j]); pArg->pSocket[j] = INVALID_SOCKET; } return; } if (n == 0) { printf("ProcessWait: bselect returned zero sockets available");fflush(stdout); for (i=0, j=0; i<pArg->n; i++, j++) { while (pArg->pSocket[j] == INVALID_SOCKET) j++; easy_closesocket(pArg->pSocket[j]); pArg->pSocket[j] = INVALID_SOCKET; } return; } if (FD_ISSET(pArg->sockAbort, &readset)) { for (i=0; pArg->n > 0; i++) { while (pArg->pSocket[i] == INVALID_SOCKET) i++; sprintf(str, "kill %d", pArg->pId[i]); //printf("%d:%s\n", __LINE__, str);fflush(stdout); WriteString(pArg->pSocket[i], str); int nRank = pArg->pRank[i]; if (g_nNproc > FORWARD_NPROC_THRESHOLD) { if (nRank > 0 && (g_nNproc/2) > nRank) { //printf("rank %d(%d) stopping forwarder\n", nRank, g_pProcessLaunchId[i]);fflush(stdout); sprintf(str, "stopforwarder port=%d abort=yes", g_pForwardHost[nRank].nPort); WriteString(pArg->pSocket[i], str); } } sprintf(str, "freeprocess %d", pArg->pId[i]); WriteString(pArg->pSocket[i], str); ReadStringTimeout(pArg->pSocket[i], str, g_nMPIRUN_SHORT_TIMEOUT); WriteString(pArg->pSocket[i], "done"); easy_closesocket(pArg->pSocket[i]); pArg->pSocket[i] = INVALID_SOCKET; pArg->n--; } return; } for (i=0; n>0; i++) { while (pArg->pSocket[i] == INVALID_SOCKET) i++; if (FD_ISSET(pArg->pSocket[i], &readset)) { if (ReadStringTimeout(pArg->pSocket[i], str, g_nMPIRUN_SHORT_TIMEOUT)) { int nRank = pArg->pRank[i]; if (strnicmp(str, "FAIL", 4) == 0) { // get the error sprintf(str, "geterror %d", pArg->pId[i]); WriteString(pArg->pSocket[i], str); ReadStringTimeout(pArg->pSocket[i], str, g_nMPIRUN_SHORT_TIMEOUT); printf("getexitcode(rank %d) failed: %s\n", nRank, str);fflush(stdout); if (g_bUseJobHost) { UpdateJobKeyValue(nRank, "error", str); // get the time the process exited sprintf(str, "getexittime %d", pArg->pId[i]); WriteString(pArg->pSocket[i], str); ReadStringTimeout(pArg->pSocket[i], str, g_nMPIRUN_SHORT_TIMEOUT); UpdateJobKeyValue(nRank, "exittime", str); } if (easy_send(g_sockBreak, "x", 1) == SOCKET_ERROR) { printf("Hard abort.\n");fflush(stdout); ExitProcess(-1); } } else { if (g_bUseJobHost) { strtok(str, ":"); // strip the extra data from the string UpdateJobKeyValue(nRank, "exitcode", str); char *temp; if (g_bOutputExitCodes) temp = strdup(str); // get the time the process exited sprintf(str, "getexittime %d", pArg->pId[i]); WriteString(pArg->pSocket[i], str); ReadStringTimeout(pArg->pSocket[i], str, g_nMPIRUN_SHORT_TIMEOUT); UpdateJobKeyValue(nRank, "exittime", str); if (g_bOutputExitCodes) { printf("[rank %d exit code: %s, time: %s]\n", nRank, temp, str);fflush(stdout); free(temp); } } else { if (g_bOutputExitCodes) { strtok(str, ":"); // strip the extra data from the string printf("[rank %d exit code: %s]\n", nRank, str);fflush(stdout); } } if (!g_bNoMPI) { sprintf(str, "getmpifinalized %d", pArg->pId[i]); WriteString(pArg->pSocket[i], str); ReadStringTimeout(pArg->pSocket[i], str, g_nMPIRUN_SHORT_TIMEOUT); if (stricmp(str, "yes") != 0) { if (stricmp(str, "no") != 0) printf("getmpifinalized returned: %s\n", str); else { if (!g_bSuppressErrorOutput) printf("process %d exited without calling MPIFinalize\n", nRank); } fflush(stdout); easy_send(g_sockBreak, "x", 1); } } } if (g_nNproc > FORWARD_NPROC_THRESHOLD) { if (nRank > 0 && (g_nNproc/2) > nRank) { sprintf(str, "stopforwarder port=%d abort=no", g_pForwardHost[nRank].nPort); WriteString(pArg->pSocket[i], str); } } sprintf(str, "freeprocess %d", pArg->pId[i]); WriteString(pArg->pSocket[i], str); ReadStringTimeout(pArg->pSocket[i], str, g_nMPIRUN_SHORT_TIMEOUT); WriteString(pArg->pSocket[i], "done"); easy_closesocket(pArg->pSocket[i]); FD_CLR(pArg->pSocket[i], &totalset); pArg->pSocket[i] = INVALID_SOCKET; n--; pArg->n--; } else { PrintError(WSAGetLastError(), "ProcessWait:Reading the exit code for process %d failed\n", i);fflush(stdout); easy_closesocket(pArg->pSocket[i]); FD_CLR(pArg->pSocket[i], &totalset); pArg->pSocket[i] = INVALID_SOCKET; n--; pArg->n--; if (easy_send(g_sockBreak, "x", 1) == SOCKET_ERROR) { printf("Unable to abort processes.\n");fflush(stdout); ExitProcess(-1); } //return; } } } }}// Function name : WaitForExitCommands// Description : // Return type : void void WaitForExitCommands(){ bool bKillSent = false; if (g_nNumProcessSockets < FD_SETSIZE) { int i, n; fd_set totalset, readset; char str[256]; SOCKET break_sock;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -