📄 bnr_spawn.cpp
字号:
#include "bnrimpl.h"#include "mpdutil.h"#include "mpd.h"#include "bsocket.h"#include <stdio.h>#include "mpichinfo.h"#include "redirectio.h"HANDLE g_hSpawnMutex = NULL;HANDLE g_hJobThreads[100];int g_nNumJobThreads = 0;struct Spawn_struct{ Spawn_struct(); Spawn_struct(int n); ~Spawn_struct(); struct Spawn_node { int pid; int launchid; char fwd_host[100]; int fwd_port; }; int m_nNproc; Spawn_node *m_pNode; int m_bfd; int m_bfdStop; HANDLE m_hRedirectIOThread; HANDLE m_hThread; Spawn_struct *m_pNext;};Spawn_struct::Spawn_struct(){ m_bfd = BFD_INVALID_SOCKET; m_bfdStop = BFD_INVALID_SOCKET; m_hRedirectIOThread = NULL; m_nNproc = 0; m_pNode = NULL; m_pNext = NULL; m_hThread = NULL;}Spawn_struct::Spawn_struct(int n){ Spawn_struct(); m_nNproc = n; m_pNode = new Spawn_node[n]; for (int i=0; i<n; i++) { m_pNode[i].fwd_host[0] = '\0'; m_pNode[i].fwd_port = 0; m_pNode[i].launchid = -1; m_pNode[i].pid = 0; }}Spawn_struct::~Spawn_struct(){ m_nNproc = 0; if (m_pNode) delete m_pNode; m_pNode = NULL; if (m_hRedirectIOThread != NULL) { if (m_bfdStop != BFD_INVALID_SOCKET) { beasy_send(m_bfdStop, "x", 1); if (WaitForSingleObject(m_hRedirectIOThread, 10000) == WAIT_TIMEOUT) TerminateThread(m_hRedirectIOThread, 0); } else TerminateThread(m_hRedirectIOThread, 0); CloseHandle(m_hRedirectIOThread); } m_hRedirectIOThread = NULL; if (m_bfd != BFD_INVALID_SOCKET) { beasy_closesocket(m_bfd); m_bfd = BFD_INVALID_SOCKET; } if (m_bfdStop != BFD_INVALID_SOCKET) { beasy_closesocket(m_bfdStop); m_bfdStop = BFD_INVALID_SOCKET; } m_pNext = NULL; // don't touch the m_hThread member because BNR_Finalize may be waiting on it? if (!g_bBNRFinalizeWaiting && m_hThread != NULL) { CloseHandle(m_hThread); m_hThread = NULL; }}Spawn_struct *g_pSpawnList = NULL;char g_pszIOHost[100] = "";int g_nIOPort = 0;struct HostNode{ char pszHost[100]; int nSMPProcs; HostNode *pNext;};static bool GetHostsFromFile(char *pszFileName, HostNode **ppNode, int nNumWanted){ FILE *fin; char buffer[1024] = ""; char *pChar, *pChar2; HostNode *node = NULL, *list = NULL, *cur_node; // check the parameters if ((nNumWanted < 1) || (ppNode == NULL)) return false; // open the file fin = fopen(pszFileName, "r"); if (fin == NULL) { printf("Error: unable to open file '%s', error %d\n", pszFileName, GetLastError()); return false; } // Read the host names from the file while (fgets(buffer, 1024, fin)) { pChar = buffer; // Advance over white space while (*pChar != '\0' && isspace(*pChar)) pChar++; if (*pChar == '#' || *pChar == '\0') continue; // Trim trailing white space pChar2 = &buffer[strlen(buffer)-1]; while (isspace(*pChar2) && (pChar >= pChar)) { *pChar2 = '\0'; pChar2--; } // If there is anything left on the line, consider it a host name if (strlen(pChar) > 0) { node = new HostNode; node->nSMPProcs = 1; node->pNext = NULL; // Copy the host name pChar2 = node->pszHost; while (*pChar != '\0' && !isspace(*pChar)) { *pChar2 = *pChar; pChar++; pChar2++; } *pChar2 = '\0'; pChar2 = strtok(node->pszHost, ":"); pChar2 = strtok(NULL, "\n"); if (pChar2 != NULL) { node->nSMPProcs = atoi(pChar2); if (node->nSMPProcs < 1) node->nSMPProcs = 1; } else { // Advance over white space while (*pChar != '\0' && isspace(*pChar)) pChar++; // Get the number of SMP processes if (*pChar != '\0') { node->nSMPProcs = atoi(pChar); if (node->nSMPProcs < 1) node->nSMPProcs = 1; } } if (list == NULL) { list = node; cur_node = node; } else { cur_node->pNext = node; cur_node = node; } } } fclose(fin); if (list == NULL) return false; // Allocate the first host node node = new HostNode; int num_left = nNumWanted; HostNode *n = list, *target = node; // add the nodes to the target list, cycling if necessary while (num_left) { target->pNext = NULL; strcpy(target->pszHost, n->pszHost); if (num_left <= n->nSMPProcs) { target->nSMPProcs = num_left; num_left = 0; } else { target->nSMPProcs = n->nSMPProcs; num_left = num_left - n->nSMPProcs; } if (num_left) { target->pNext = new HostNode; target = target->pNext; } n = n->pNext; if (n == NULL) n = list; } // free the list created from the file while (list) { n = list; list = list->pNext; delete n; } // add the generated list to the end of the list passed in if (*ppNode == NULL) { *ppNode = node; } else { cur_node = *ppNode; while (cur_node->pNext != NULL) cur_node = cur_node->pNext; cur_node->pNext = node; } return true;}static void FreeHosts(HostNode *pNode){ HostNode *n; while (pNode) { n = pNode; pNode = pNode->pNext; delete n; }}static void GetHost(HostNode *pList, int nRank, char *pszHost){ nRank++; while (nRank > 0) { if (pList == NULL) return; nRank = nRank - pList->nSMPProcs; if (nRank > 0) pList = pList->pNext; } if (pList == NULL) return; strcpy(pszHost, pList->pszHost);}static void CreateCommand(int count, int *maxprocs, char **cmds, char ***argvs, int nIproc, char *pszCmd){ int i = 0; char **ppArg; nIproc++; while (nIproc > 0) { if (i >= count) return; nIproc = nIproc - maxprocs[i]; if (nIproc > 0) i++; } if (i >= count) return; sprintf(pszCmd, "\"%s\"", cmds[i]); if (argvs == NULL) return; ppArg = argvs[i]; while (ppArg) { strcat(pszCmd, " "); strcat(pszCmd, *ppArg); ppArg++; }}static void RemoveSpawnThread(Spawn_struct *pSpawn){ if (g_bBNRFinalizeWaiting) return; WaitForSingleObject(g_hSpawnMutex, INFINITE); for (int i=0; i<g_nNumJobThreads; i++) { if (g_hJobThreads[i] == pSpawn->m_hThread) { g_nNumJobThreads--; g_hJobThreads[i] = g_hJobThreads[g_nNumJobThreads]; CloseHandle(pSpawn->m_hThread); pSpawn->m_hThread = NULL; break; } } ReleaseMutex(g_hSpawnMutex);}static void RemoveSpawnStruct(Spawn_struct *pSpawn){ Spawn_struct *p, *pTrailer; WaitForSingleObject(g_hSpawnMutex, INFINITE); if (pSpawn == g_pSpawnList) { g_pSpawnList = g_pSpawnList->m_pNext; ReleaseMutex(g_hSpawnMutex); return; } pTrailer = g_pSpawnList; p = g_pSpawnList->m_pNext; while (p) { if (p == pSpawn) { pTrailer->m_pNext = p->m_pNext; ReleaseMutex(g_hSpawnMutex); return; } pTrailer = pTrailer->m_pNext; p = p->m_pNext; } ReleaseMutex(g_hSpawnMutex);}void SpawnWaitThread(Spawn_struct *pSpawn){ int i,j; char pszStr[1024]; int nPid; for (i=0; i<pSpawn->m_nNproc; i++) { sprintf(pszStr, "getexitcodewait %d", pSpawn->m_pNode[i].launchid); if (WriteString(pSpawn->m_bfd, pszStr) == SOCKET_ERROR) { printf("WriteString('%s') failed, error %d\n", pszStr, WSAGetLastError()); RemoveSpawnThread(pSpawn); RemoveSpawnStruct(pSpawn); delete pSpawn; return; } } for (i=0; i<pSpawn->m_nNproc; i++) { if (!ReadString(pSpawn->m_bfd, pszStr)) { printf("ReadString(exitcode) failed, error %d\n", WSAGetLastError()); RemoveSpawnThread(pSpawn); RemoveSpawnStruct(pSpawn); delete pSpawn; return; } char *token = strtok(pszStr, ":"); if (token != NULL) { token = strtok(NULL, "\n"); if (token != NULL) { nPid = atoi(token); for (j=0; j<pSpawn->m_nNproc; j++) { if (pSpawn->m_pNode[j].pid == nPid) { if ((j > 0) && ((pSpawn->m_nNproc/2) > j))
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -