📄 redirectio.c
字号:
if (child_abort_bfd == BFD_INVALID_SOCKET) SetEvent(g_hListenReleasedEvent); else { beasy_send(child_abort_bfd, "x", 1); beasy_closesocket(child_abort_bfd); } beasy_closesocket(abort_bfd); if (hChildThread != NULL) CloseHandle(hChildThread);}void RedirectIOThread(RedirectIOArg *pArg){ int client_bfd, signal_bfd, child_abort_bfd = BFD_INVALID_SOCKET; int bfdListen; int n, i; int bfdStopIOSignalSocket; char pBuffer[1024]; DWORD num_read, num_written; HANDLE hStdout, hStderr, hOut; int nRank; char cType; int nDatalen; BOOL bDeleteOnEmpty = FALSE; DWORD dwThreadId; HANDLE hChildThread = NULL; bfd_set total_set, readset; int bfdActive[FD_SETSIZE]; int nActive = 0; hStdout = GetStdHandle(STD_OUTPUT_HANDLE); hStderr = GetStdHandle(STD_ERROR_HANDLE); /* Create a listener*/ if (beasy_create(&g_bfdListen, ADDR_ANY, INADDR_ANY) == SOCKET_ERROR) { int error = WSAGetLastError(); err_printf("RedirectIOThread: beasy_create listen socket failed: error %d\n", error);fflush(stdout); bsocket_finalize(); ExitProcess(error); } blisten(g_bfdListen, 5); beasy_get_sock_info(g_bfdListen, g_pszIOHost, &g_nIOPort); /* Connect a stop socket to myself*/ if (beasy_create(pArg->m_pbfdStopIOSignalSocket, ADDR_ANY, INADDR_ANY) == SOCKET_ERROR) { int error = WSAGetLastError(); err_printf("beasy_create(m_bfdStopIOSignalSocket) failed, error %d\n", error);fflush(stdout); ExitProcess(error); } if (beasy_connect(*pArg->m_pbfdStopIOSignalSocket, g_pszIOHost, g_nIOPort) == SOCKET_ERROR) { int error = WSAGetLastError(); err_printf("beasy_connect(m_bfdStopIOSignalSocket, %s, %d) failed, error %d\n", g_pszIOHost, g_nIOPort, error);fflush(stdout); ExitProcess(error); } bfdStopIOSignalSocket = *pArg->m_pbfdStopIOSignalSocket; /* Accept the connection from myself*/ signal_bfd = beasy_accept(g_bfdListen); if (signal_bfd == BFD_INVALID_SOCKET) { int error = WSAGetLastError(); err_printf("beasy_accept failed, error %d\n", error); ExitProcess(error); } SetEvent(pArg->hReadyEvent); free(pArg); pArg = NULL; nActive = 0; BFD_ZERO(&total_set); BFD_SET(g_bfdListen, &total_set); BFD_SET(signal_bfd, &total_set); bfdListen = g_bfdListen; while (TRUE) { readset = total_set; n = bselect(0, &readset, NULL, NULL, NULL); if (n == SOCKET_ERROR) { err_printf("RedirectIOThread: bselect failed, error %d\n", WSAGetLastError());fflush(stdout); break; } if (n == 0) { dbg_printf("RedirectIOThread: bselect returned zero sockets available\n");fflush(stdout); break; } else { if (BFD_ISSET(signal_bfd, &readset)) { char c; num_read = beasy_receive(signal_bfd, &c, 1); if (num_read == 1) { if (c == 0) { if (child_abort_bfd != BFD_INVALID_SOCKET) beasy_send(child_abort_bfd, &c, 1); if (nActive == 0) { if (hChildThread != NULL) WaitForSingleObject(hChildThread, 10000); break; } bDeleteOnEmpty = TRUE; } } else { if (num_read == SOCKET_ERROR) dbg_printf("Error: redirect IO signal socket closed, exiting\n"); else err_printf("Error: error reading redirect IO signal socket, error %d\n", WSAGetLastError()); fflush(stdout); break; } n--; } if (bfdListen != BFD_INVALID_SOCKET && BFD_ISSET(bfdListen, &readset)) { if ((nActive + 3) >= FD_SETSIZE) { int temp_bfd; MakeLoop(&temp_bfd, &child_abort_bfd); if (temp_bfd == BFD_INVALID_SOCKET || child_abort_bfd == BFD_INVALID_SOCKET) { err_printf("Critical error: Unable to create a socket\n");fflush(stdout); break; } hChildThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)RedirectIOThread2, (LPVOID)temp_bfd, 0, &dwThreadId); if (hChildThread == NULL) { err_printf("Critical error: Unable to create an io thread\n");fflush(stdout); break; } BFD_CLR(bfdListen, &total_set); bfdListen = BFD_INVALID_SOCKET; /*dbg_printf("started new IO redirection thread\n");fflush(stdout);*/ } else { client_bfd = beasy_accept(bfdListen); if (client_bfd == BFD_INVALID_SOCKET) { int error = WSAGetLastError(); err_printf("RedirectIOThread: baccept failed: %d\n", error);fflush(stdout); break; } if (beasy_receive(client_bfd, &cType, sizeof(char)) == SOCKET_ERROR) return; if (cType == 0) { HANDLE hThread; DWORD dwThreadID; hThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)RedirectStdin, (void*)client_bfd, 0, &dwThreadID); if (hThread == NULL) { err_printf("Critical error: Standard input redirection thread creation failed. error %d\n", GetLastError());fflush(stdout); } else CloseHandle(hThread); } else { bfdActive[nActive] = client_bfd; BFD_SET(client_bfd, &total_set); nActive++; /*dbg_printf("(+%d:%d)", nActive, bget_fd(client_bfd));fflush(stdout);*/ } } n--; } if (n > 0) { for (i=0; n > 0; i++) { if (BFD_ISSET(bfdActive[i], &readset)) { char pTemp[sizeof(int)+sizeof(char)+sizeof(int)]; num_read = beasy_receive(bfdActive[i], pTemp, sizeof(int)+sizeof(char)+sizeof(int)); if (num_read == SOCKET_ERROR || num_read == 0) { /*dbg_printf("(-%d:%d)", nActive, bget_fd(bfdActive[i]));fflush(stdout);*/ BFD_CLR(bfdActive[i], &total_set); beasy_closesocket(bfdActive[i]); nActive--; bfdActive[i] = bfdActive[nActive]; i--; } else { nDatalen = *(int*)pTemp; cType = pTemp[sizeof(int)]; nRank = *(int*)&pTemp[sizeof(int)+sizeof(char)]; /*dbg_printf("\nreceiving %d bytes from %d of type %d\n", nDatalen, nRank, (int)cType);fflush(stdout);*/ num_read = beasy_receive(bfdActive[i], pBuffer, nDatalen); if (num_read == SOCKET_ERROR || num_read == 0) { BFD_CLR(bfdActive[i], &total_set); beasy_closesocket(bfdActive[i]); nActive--; bfdActive[i] = bfdActive[nActive]; i--; /* if (num_read == SOCKET_ERROR) err_printf("(err-%d)", nActive); else dbg_printf("(-%d)", nActive); fflush(stdout); */ } else { hOut = (cType == 1) ? hStdout : hStderr; hOut = hStdout; /* if (g_bDoMultiColorOutput) { WaitForSingleObject(g_hConsoleOutputMutex, INFINITE); SetConsoleTextAttribute(hOut, aConsoleColorAttribute[nRank%NUM_OUTPUT_COLORS]); //dbg_printf("(%d)", bget_fd(bfdActive[i]));fflush(stdout); if (WriteFile(hOut, pBuffer, num_read, &num_written, NULL)) FlushFileBuffers(hOut); else { err_printf("*** output lost ***\n");fflush(stdout); } SetConsoleTextAttribute(hOut, g_ConsoleAttribute); ReleaseMutex(g_hConsoleOutputMutex); } else { */ if (!WriteFile(hOut, pBuffer, num_read, &num_written, NULL)) { err_printf("*** output lost ***\n");fflush(stdout); } /*}*/ } } n--; } } } if (bDeleteOnEmpty && nActive == 0) { if (hChildThread != NULL) { WaitForSingleObject(hChildThread, 10000); CloseHandle(hChildThread); hChildThread = NULL; } break; } } } if (child_abort_bfd != BFD_INVALID_SOCKET) { /*dbg_printf("signalling child threads to shut down\n");fflush(stdout);*/ beasy_send(child_abort_bfd, "x", 1); WaitForSingleObject(g_hListenReleasedEvent, 10000); beasy_closesocket(g_bfdListen); } else if (bfdListen != BFD_INVALID_SOCKET) { /*dbg_printf("closing listen socket\n");fflush(stdout);*/ beasy_closesocket(bfdListen); } for (i=0; i<nActive; i++) { /*dbg_printf("closing io socket %d\n", i);fflush(stdout);*/ beasy_closesocket(bfdActive[i]); } /*dbg_printf("closing signal socket\n");fflush(stdout);*/ beasy_closesocket(signal_bfd); if (hChildThread != NULL) CloseHandle(hChildThread); /*dbg_printf("RedirectIOThread exiting\n");fflush(stdout);*/}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -