📄 bnr_spawn.cpp
字号:
{ sprintf(pszStr, "stopforwarder host=%s port=%d abort=no", pSpawn->m_pNode[j].fwd_host, pSpawn->m_pNode[j].fwd_port); if (WriteString(pSpawn->m_bfd, pszStr) == SOCKET_ERROR) { printf("WriteString('%s') failed, error %d\n", pszStr, WSAGetLastError()); RemoveSpawnThread(pSpawn); RemoveSpawnStruct(pSpawn); delete pSpawn; return; } } sprintf(pszStr, "freeprocess %d", pSpawn->m_pNode[j].launchid); if (WriteString(pSpawn->m_bfd, pszStr) == SOCKET_ERROR) { printf("WriteString('%s') failed, error %d\n", pszStr, WSAGetLastError()); RemoveSpawnThread(pSpawn); RemoveSpawnStruct(pSpawn); delete pSpawn; return; } } } } } } if (pSpawn->m_bfdStop != BFD_INVALID_SOCKET) { // tell the redirection thread to stop pszStr[0] = 0; beasy_send(pSpawn->m_bfdStop, pszStr, 1); } if (pSpawn->m_hRedirectIOThread != NULL) { if (WaitForSingleObject(pSpawn->m_hRedirectIOThread, 10000) != WAIT_OBJECT_0) { TerminateThread(pSpawn->m_hRedirectIOThread, 0); } CloseHandle(pSpawn->m_hRedirectIOThread); pSpawn->m_hRedirectIOThread = NULL; } WriteString(pSpawn->m_bfd, "done"); beasy_closesocket(pSpawn->m_bfd); RemoveSpawnThread(pSpawn); RemoveSpawnStruct(pSpawn); delete pSpawn;}int BNR_Spawn_multiple(int count, char **cmds, char ***argvs, int *maxprocs, void *info, int *errors, bool_t *same_domain, void *preput_info){ char pszStr[4096]; int nNproc, nIproc; int i, j, error; int nNumHostsNeeded = 0; char pszHost[100]; char pszHostFile[MAX_PATH]; HostNode *pHosts = NULL; int flag; Spawn_struct *pSpawn = NULL; char pszCmd[1024]; char pszDb[100]; char pszTemp[1024]; int nKeys; char pszKey[MPICH_MAX_INFO_KEY], pszValue[MPICH_MAX_INFO_VAL]; // should the user and password be passed in by info? // should this information be in each info allowing for multiple user credentials? if (info != NULL) { if (MPICH_Info_get(((MPICH_Info*)info)[0], "user", 100, g_pszBNRAccount, &flag) != MPICH_SUCCESS) { printf("Error: MPICH_Info_get('user') failed\n"); return BNR_FAIL; } if (MPICH_Info_get(((MPICH_Info*)info)[0], "password", 100, g_pszBNRPassword, &flag) != MPICH_SUCCESS) { printf("Error: MPICH_Info_get('password') failed\n"); return BNR_FAIL; } } nNproc = 0; for (i=0; i<count; i++) { if (maxprocs[i] < 1) { FreeHosts(pHosts); return BNR_FAIL; } nNproc += maxprocs[i]; flag = 0; if (MPICH_Info_get(((MPICH_Info*)info)[i], "host", 100, pszHost, &flag) != MPICH_SUCCESS) { printf("Error: MPICH_Info_get failed\n"); FreeHosts(pHosts); return BNR_FAIL; } if (flag) { // user specified a single host HostNode *n; if (pHosts == NULL) pHosts = n = new HostNode; else { n = pHosts; while (n->pNext != NULL) n = n->pNext; n->pNext = new HostNode; n = n->pNext; } for (j=0; j<maxprocs[i]; j++) { n->nSMPProcs = 1; strcpy(n->pszHost, pszHost); if (j<maxprocs[i]-1) { n->pNext = new HostNode; n = n->pNext; } else n->pNext = NULL; } } else { flag = 0; if (MPICH_Info_get(((MPICH_Info*)info)[i], "hostfile", MAX_PATH, pszHostFile, &flag) != MPICH_SUCCESS) { printf("Error: MPICH_Info_get failed\n"); FreeHosts(pHosts); return BNR_FAIL; } if (flag) { // user specified a host file if (!GetHostsFromFile(pszHostFile, &pHosts, maxprocs[i])) { FreeHosts(pHosts); return BNR_FAIL; } } else { // user did not specify any hosts // create a list of blank host nodes to be filled in later nNumHostsNeeded += maxprocs[i]; HostNode *n; if (pHosts == NULL) pHosts = n = new HostNode; else { n = pHosts; while (n->pNext != NULL) n = n->pNext; n->pNext = new HostNode; n = n->pNext; } for (j=0; j<maxprocs[i]; j++) { n->nSMPProcs = 1; n->pszHost[0] = '\0'; if (j<maxprocs[i]-1) { n->pNext = new HostNode; n = n->pNext; } else n->pNext = NULL; } } } } // fill in the blank host nodes if (nNumHostsNeeded > 0) { sprintf(pszStr, "next %d", nNumHostsNeeded); if (WriteString(g_bfdMPD, pszStr) == SOCKET_ERROR) { printf("WriteString('%s') failed, error %d\n", pszStr, WSAGetLastError()); FreeHosts(pHosts); return BNR_FAIL; } HostNode *n = pHosts; for (i=0; i<nNumHostsNeeded; i++) { while (n->pszHost[0] != '\0') n = n->pNext; if (!ReadString(g_bfdMPD, n->pszHost)) { printf("ReadString(next host) failed, error %d\n", WSAGetLastError()); FreeHosts(pHosts); return BNR_FAIL; } } } // allocate a spawn structure to hold all the data structures for this spawn call pSpawn = new Spawn_struct(nNproc); if (pSpawn == NULL) { FreeHosts(pHosts); return BNR_FAIL; } // give this spawn its own connection to the mpd error = ConnectToMPD(g_pszMPDHost, g_nMPDPort, g_pszMPDPhrase, &pSpawn->m_bfd); if (error) { FreeHosts(pHosts); return BNR_FAIL; } // if there isn't already a host to redirect io to, create one if (g_pszIOHost[0] == '\0') { DWORD dwThreadId; HANDLE hEvent; RedirectIOArg *pArg = new RedirectIOArg; pArg->hReadyEvent = hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); pArg->m_pbfdStopIOSignalSocket = &pSpawn->m_bfdStop; pSpawn->m_hRedirectIOThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)RedirectIOThread, pArg, 0, &dwThreadId); if (pSpawn->m_hRedirectIOThread == NULL) { printf("Error: Unable to create the redirect io thread, error %d\n", GetLastError()); FreeHosts(pHosts); CloseHandle(hEvent); delete pArg; delete pSpawn; return BNR_FAIL; } if (WaitForSingleObject(hEvent, 10000) == WAIT_TIMEOUT) { printf("Error: timed out waiting for io redirection thread to initialize\n"); FreeHosts(pHosts); CloseHandle(hEvent); delete pSpawn; return BNR_FAIL; } CloseHandle(hEvent); } strcpy(pSpawn->m_pNode[0].fwd_host, g_pszIOHost); pSpawn->m_pNode[0].fwd_port = g_nIOPort; // create a database for the spawned processes if (WriteString(pSpawn->m_bfd, "dbcreate") == SOCKET_ERROR) { printf("WriteString('dbcreate') failed, error %d\n", WSAGetLastError()); FreeHosts(pHosts); delete pSpawn; return BNR_FAIL; } if (!ReadString(pSpawn->m_bfd, pszDb)) { printf("ReadString(db) failed, error %d\n", WSAGetLastError()); FreeHosts(pHosts); delete pSpawn; return BNR_FAIL; } // pre-put any data provided into the spawnee's database MPICH_Info_get_nkeys((MPICH_Info)preput_info, &nKeys); for (i=0; i<nKeys; i++) { MPICH_Info_get_nthkey((MPICH_Info)preput_info, i, pszKey); MPICH_Info_get((MPICH_Info)preput_info, pszKey, MPICH_MAX_INFO_VAL, pszValue, &flag); if (flag) { BNR_KM_Put(pszDb, pszKey, pszValue); } } // launch each process for (nIproc = 0; nIproc < nNproc; nIproc++) { // get the host name for this process GetHost(pHosts, nIproc, pszHost); // create the command CreateCommand(count, maxprocs, cmds, argvs, nIproc, pszCmd); // possibly start an io forwarder if ((nIproc > 0) && ((nNproc/2) > nIproc)) { sprintf(pszStr, "createforwarder host=%s forward=%s:%d", pszHost, pSpawn->m_pNode[(nIproc-1)/2].fwd_host, pSpawn->m_pNode[(nIproc-1)/2].fwd_port); if (WriteString(pSpawn->m_bfd, pszStr) == SOCKET_ERROR) { printf("WriteString('%s') failed, error %d\n", pszStr, WSAGetLastError()); FreeHosts(pHosts); delete pSpawn; return BNR_FAIL; } if (!ReadString(pSpawn->m_bfd, pszStr)) { printf("ReadString(forwarder port) failed, error %d\n", WSAGetLastError()); FreeHosts(pHosts); delete pSpawn; return BNR_FAIL; } strcpy(pSpawn->m_pNode[nIproc].fwd_host, pszHost); pSpawn->m_pNode[nIproc].fwd_port = atoi(pszStr); } // create the command line sprintf(pszStr, "launch h=%s c='%s' 12=%s:%d k=%d e='BNR_SPAWN=yes|BNR_RANK=%d|BNR_SIZE=%d|BNR_DB=%s|BNR_MPD=%s|BNR_IO=%s:%d", pszHost, pszCmd, pSpawn->m_pNode[(nIproc-1)/2].fwd_host, pSpawn->m_pNode[(nIproc-1)/2].fwd_port, nIproc, nIproc, nNproc, pszDb, pszHost, pSpawn->m_pNode[(nIproc-1)/2].fwd_host, pSpawn->m_pNode[(nIproc-1)/2].fwd_port); if (strlen(g_pszBNRAccount)) { sprintf(pszTemp, "|BNR_USER=%s|BNR_PWD=%s' a=%s p=%s", g_pszBNRAccount, g_pszBNRPassword, g_pszBNRAccount, g_pszBNRPassword); strcat(pszStr, pszTemp); } else strcat(pszStr, "'"); // write the launch command if (WriteString(pSpawn->m_bfd, pszStr) == SOCKET_ERROR) { printf("WriteString('launch h=%s c='%s' ...') failed, error %d\n", pszHost, pszCmd, WSAGetLastError()); FreeHosts(pHosts); delete pSpawn; return BNR_FAIL; } if (!ReadString(pSpawn->m_bfd, pszStr)) { printf("ReadString(launchid) failed, error %d\n", WSAGetLastError()); FreeHosts(pHosts); delete pSpawn; return BNR_FAIL; } pSpawn->m_pNode[nIproc].launchid = atoi(pszStr); } FreeHosts(pHosts); pHosts = NULL; // get the process ids for (i=0; i<nNproc; i++) { sprintf(pszStr, "getpid %d", pSpawn->m_pNode[i].launchid); if (WriteString(pSpawn->m_bfd, pszStr) == SOCKET_ERROR) { printf("WriteString('%s') failed, error %d\n", pszStr, WSAGetLastError()); delete pSpawn; return BNR_FAIL; } if (!ReadString(pSpawn->m_bfd, pszStr)) { printf("ReadString(pid) failed, error %d\n", WSAGetLastError()); delete pSpawn; return BNR_FAIL; } pSpawn->m_pNode[i].pid = atoi(pszStr); if (pSpawn->m_pNode[i].pid == -1) { sprintf(pszStr, "geterror %d", pSpawn->m_pNode[i].launchid); if (WriteString(pSpawn->m_bfd, pszStr) == SOCKET_ERROR) { printf("Error: launching process %d failed, unable to determine the error.\nWriting the request for the error message failed, error %d", i, WSAGetLastError()); delete pSpawn; return BNR_FAIL; } if (!ReadString(pSpawn->m_bfd, pszStr)) { printf("Error: launching process %d failed, unable to determine the error.\nReading the error message failed, error %d", i, WSAGetLastError()); delete pSpawn; return BNR_FAIL; } printf("Error: launching process %d failed, %s\n", i, pszStr); delete pSpawn; return BNR_FAIL; } } // Start a thread to monitor the spawned processes until they all exit and all output has been redirected // Add the spawn data structure to the global list WaitForSingleObject(g_hSpawnMutex, INFINITE); pSpawn->m_pNext = g_pSpawnList; g_pSpawnList = pSpawn; DWORD dwThreadId; pSpawn->m_hThread = g_hJobThreads[g_nNumJobThreads] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)SpawnWaitThread, pSpawn, 0, &dwThreadId); if (g_hJobThreads[g_nNumJobThreads] == NULL) { printf("Error: Unable to create the job wait thread, error %d\n", GetLastError()); g_pSpawnList = pSpawn->m_pNext; ReleaseMutex(g_hSpawnMutex); delete pSpawn; return BNR_FAIL; } g_nNumJobThreads++; ReleaseMutex(g_hSpawnMutex); return BNR_SUCCESS;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -