📄 pmi_spawn.c
字号:
char pszStr[4096]; int nNproc, nIproc; int i, j, error; int nNumHostsNeeded = 0; char pszHost[100]; char pszHostFile[MAX_PATH]; HostNode *pHosts = NULL; int flag; Spawn_struct *pSpawn = NULL; char pszCmd[1024]; char pszDb[100]; char pszTemp[1024]; int nKeys; char pszKey[MPICH_MAX_INFO_KEY], pszValue[MPICH_MAX_INFO_VAL]; HostNode *n; DWORD dwThreadId; /* should the user and password be passed in by info?*/ /* should this information be in each info allowing for multiple user credentials?*/ if (info != NULL) { if (MPICH_Info_get(((MPICH_Info*)info)[0], "user", 100, g_pszPMIAccount, &flag) != MPICH_SUCCESS) { err_printf("Error: MPICH_Info_get('user') failed\n"); return PMI_FAIL; } if (MPICH_Info_get(((MPICH_Info*)info)[0], "password", 100, g_pszPMIPassword, &flag) != MPICH_SUCCESS) { err_printf("Error: MPICH_Info_get('password') failed\n"); return PMI_FAIL; } } nNproc = 0; for (i=0; i<count; i++) { if (maxprocs[i] < 1) { FreeHosts(pHosts); return PMI_FAIL; } nNproc += maxprocs[i]; flag = 0; if (MPICH_Info_get(((MPICH_Info*)info)[i], "host", 100, pszHost, &flag) != MPICH_SUCCESS) { err_printf("Error: MPICH_Info_get failed\n"); FreeHosts(pHosts); return PMI_FAIL; } if (flag) { /* user specified a single host*/ HostNode *n; if (pHosts == NULL) pHosts = n = (HostNode*)malloc(sizeof(HostNode)); else { n = pHosts; while (n->pNext != NULL) n = n->pNext; n->pNext = (HostNode*)malloc(sizeof(HostNode)); n = n->pNext; } for (j=0; j<maxprocs[i]; j++) { n->nSMPProcs = 1; strncpy(n->pszHost, pszHost, 100); if (j<maxprocs[i]-1) { n->pNext = (HostNode*)malloc(sizeof(HostNode)); n = n->pNext; } else n->pNext = NULL; } } else { flag = 0; if (MPICH_Info_get(((MPICH_Info*)info)[i], "hostfile", MAX_PATH, pszHostFile, &flag) != MPICH_SUCCESS) { err_printf("Error: MPICH_Info_get failed\n"); FreeHosts(pHosts); return PMI_FAIL; } if (flag) { /* user specified a host file*/ if (!GetHostsFromFile(pszHostFile, &pHosts, maxprocs[i])) { FreeHosts(pHosts); return PMI_FAIL; } } else { /* user did not specify any hosts*/ /* create a list of blank host nodes to be filled in later*/ nNumHostsNeeded += maxprocs[i]; if (pHosts == NULL) pHosts = n = (HostNode*)malloc(sizeof(HostNode)); else { n = pHosts; while (n->pNext != NULL) n = n->pNext; n->pNext = (HostNode*)malloc(sizeof(HostNode)); n = n->pNext; } for (j=0; j<maxprocs[i]; j++) { n->nSMPProcs = 1; n->pszHost[0] = '\0'; if (j<maxprocs[i]-1) { n->pNext = (HostNode*)malloc(sizeof(HostNode)); n = n->pNext; } else n->pNext = NULL; } } } } /* fill in the blank host nodes*/ if (nNumHostsNeeded > 0) { snprintf(pszStr, 4096, "next %d", nNumHostsNeeded); if (WriteString(g_bfdMPD, pszStr) == SOCKET_ERROR) { err_printf("WriteString('%s') failed, error %d\n", pszStr, WSAGetLastError()); FreeHosts(pHosts); return PMI_FAIL; } n = pHosts; for (i=0; i<nNumHostsNeeded; i++) { while (n->pszHost[0] != '\0') n = n->pNext; if (!ReadString(g_bfdMPD, n->pszHost)) { err_printf("ReadString(next host) failed, error %d\n", WSAGetLastError()); FreeHosts(pHosts); return PMI_FAIL; } } } /* allocate a spawn structure to hold all the data structures for this spawn call*/ pSpawn = CreateSpawn_structn(nNproc); if (pSpawn == NULL) { FreeHosts(pHosts); return PMI_FAIL; } /* give this spawn its own connection to the mpd*/ error = ConnectToMPD(g_pszMPDHost, g_nMPDPort, g_pszMPDPhrase, &pSpawn->m_bfd); if (error) { FreeHosts(pHosts); return PMI_FAIL; } /* if there isn't already a host to redirect io to, create one*/ if (g_pszIOHost[0] == '\0') { DWORD dwThreadId; HANDLE hEvent; RedirectIOArg *pArg = (RedirectIOArg*)malloc(sizeof(RedirectIOArg)); pArg->hReadyEvent = hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); pArg->m_pbfdStopIOSignalSocket = &pSpawn->m_bfdStop; pSpawn->m_hRedirectIOThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)RedirectIOThread, pArg, 0, &dwThreadId); if (pSpawn->m_hRedirectIOThread == NULL) { err_printf("Error: Unable to create the redirect io thread, error %d\n", GetLastError()); FreeHosts(pHosts); CloseHandle(hEvent); free(pArg); FreeSpawn_struct(pSpawn); return PMI_FAIL; } if (WaitForSingleObject(hEvent, 10000) == WAIT_TIMEOUT) { err_printf("Error: timed out waiting for io redirection thread to initialize\n"); FreeHosts(pHosts); CloseHandle(hEvent); FreeSpawn_struct(pSpawn); return PMI_FAIL; } CloseHandle(hEvent); } strncpy(pSpawn->m_pNode[0].fwd_host, g_pszIOHost, 100); pSpawn->m_pNode[0].fwd_port = g_nIOPort; /* create a database for the spawned processes*/ if (WriteString(pSpawn->m_bfd, "dbcreate") == SOCKET_ERROR) { err_printf("WriteString('dbcreate') failed, error %d\n", WSAGetLastError()); FreeHosts(pHosts); FreeSpawn_struct(pSpawn); return PMI_FAIL; } if (!ReadString(pSpawn->m_bfd, pszDb)) { err_printf("ReadString(db) failed, error %d\n", WSAGetLastError()); FreeHosts(pHosts); FreeSpawn_struct(pSpawn); return PMI_FAIL; } /* pre-put any data provided into the spawnee's database*/ MPICH_Info_get_nkeys((MPICH_Info)preput_info, &nKeys); for (i=0; i<nKeys; i++) { MPICH_Info_get_nthkey((MPICH_Info)preput_info, i, pszKey); MPICH_Info_get((MPICH_Info)preput_info, pszKey, MPICH_MAX_INFO_VAL, pszValue, &flag); if (flag) { PMI_KVS_Put(pszDb, pszKey, pszValue); } } /* launch each process*/ for (nIproc = 0; nIproc < nNproc; nIproc++) { /* get the host name for this process*/ GetHost(pHosts, nIproc, pszHost); /* create the command*/ CreateCommand(count, maxprocs, cmds, argvs, nIproc, pszCmd); /* possibly start an io forwarder*/ if ((nIproc > 0) && ((nNproc/2) > nIproc)) { snprintf(pszStr, 4096, "createforwarder host=%s forward=%s:%d", pszHost, pSpawn->m_pNode[(nIproc-1)/2].fwd_host, pSpawn->m_pNode[(nIproc-1)/2].fwd_port); if (WriteString(pSpawn->m_bfd, pszStr) == SOCKET_ERROR) { err_printf("WriteString('%s') failed, error %d\n", pszStr, WSAGetLastError()); FreeHosts(pHosts); FreeSpawn_struct(pSpawn); return PMI_FAIL; } if (!ReadString(pSpawn->m_bfd, pszStr)) { err_printf("ReadString(forwarder port) failed, error %d\n", WSAGetLastError()); FreeHosts(pHosts); FreeSpawn_struct(pSpawn); return PMI_FAIL; } strncpy(pSpawn->m_pNode[nIproc].fwd_host, pszHost, 100); pSpawn->m_pNode[nIproc].fwd_port = atoi(pszStr); } /* create the command line*/ snprintf(pszStr, 4096, "launch h=%s c='%s' 12=%s:%d k=%d e='PMI_SPAWN=yes|PMI_RANK=%d|PMI_SIZE=%d|PMI_KVS=%s|PMI_MPD=%s|PMI_IO=%s:%d", pszHost, pszCmd, pSpawn->m_pNode[(nIproc-1)/2].fwd_host, pSpawn->m_pNode[(nIproc-1)/2].fwd_port, nIproc, nIproc, nNproc, pszDb, pszHost, pSpawn->m_pNode[(nIproc-1)/2].fwd_host, pSpawn->m_pNode[(nIproc-1)/2].fwd_port); if (strlen(g_pszPMIAccount)) { snprintf(pszTemp, 1024, "|PMI_USER=%s|PMI_PWD=%s' a=%s p=%s", g_pszPMIAccount, g_pszPMIPassword, g_pszPMIAccount, g_pszPMIPassword); strncat(pszStr, pszTemp, 4096-strlen(pszStr)); } else { strncat(pszStr, "'", 2); } /* write the launch command*/ if (WriteString(pSpawn->m_bfd, pszStr) == SOCKET_ERROR) { err_printf("WriteString('launch h=%s c='%s' ...') failed, error %d\n", pszHost, pszCmd, WSAGetLastError()); FreeHosts(pHosts); FreeSpawn_struct(pSpawn); return PMI_FAIL; } if (!ReadString(pSpawn->m_bfd, pszStr)) { err_printf("ReadString(launchid) failed, error %d\n", WSAGetLastError()); FreeHosts(pHosts); FreeSpawn_struct(pSpawn); return PMI_FAIL; } pSpawn->m_pNode[nIproc].launchid = atoi(pszStr); } FreeHosts(pHosts); pHosts = NULL; /* get the process ids*/ for (i=0; i<nNproc; i++) { snprintf(pszStr, 4096, "getpid %d", pSpawn->m_pNode[i].launchid); if (WriteString(pSpawn->m_bfd, pszStr) == SOCKET_ERROR) { err_printf("WriteString('%s') failed, error %d\n", pszStr, WSAGetLastError()); FreeSpawn_struct(pSpawn); return PMI_FAIL; } if (!ReadString(pSpawn->m_bfd, pszStr)) { err_printf("ReadString(pid) failed, error %d\n", WSAGetLastError()); FreeSpawn_struct(pSpawn); return PMI_FAIL; } pSpawn->m_pNode[i].pid = atoi(pszStr); if (pSpawn->m_pNode[i].pid == -1) { snprintf(pszStr, 4096, "geterror %d", pSpawn->m_pNode[i].launchid); if (WriteString(pSpawn->m_bfd, pszStr) == SOCKET_ERROR) { err_printf("Error: launching process %d failed, unable to determine the error.\nWriting the request for the error message failed, error %d", i, WSAGetLastError()); FreeSpawn_struct(pSpawn); return PMI_FAIL; } if (!ReadString(pSpawn->m_bfd, pszStr)) { err_printf("Error: launching process %d failed, unable to determine the error.\nReading the error message failed, error %d", i, WSAGetLastError()); FreeSpawn_struct(pSpawn); return PMI_FAIL; } err_printf("Error: launching process %d failed, %s\n", i, pszStr); FreeSpawn_struct(pSpawn); return PMI_FAIL; } } /* Start a thread to monitor the spawned processes until they all exit and all output has been redirected*/ /* Add the spawn data structure to the global list*/ WaitForSingleObject(g_hSpawnMutex, INFINITE); pSpawn->m_pNext = g_pSpawnList; g_pSpawnList = pSpawn; pSpawn->m_hThread = g_hJobThreads[g_nNumJobThreads] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)SpawnWaitThread, pSpawn, 0, &dwThreadId); if (g_hJobThreads[g_nNumJobThreads] == NULL) { err_printf("Error: Unable to create the job wait thread, error %d\n", GetLastError()); g_pSpawnList = pSpawn->m_pNext; ReleaseMutex(g_hSpawnMutex); FreeSpawn_struct(pSpawn); return PMI_FAIL; } g_nNumJobThreads++; ReleaseMutex(g_hSpawnMutex); return PMI_SUCCESS;}static void StripArgs(int *argc, char ***argv, int n){ int i; if (n+1 > *argc) { err_printf("Error: cannot strip %d args, only %d left.\n", n, *argc-1); } for (i=n+1; i<=*argc; i++) { (*argv)[i-n] = (*argv)[i]; } *argc -= n;}static void GetMPDPassPhrase(char *phrase){ HANDLE hStdin; DWORD dwMode; fprintf(stderr, "mpd password: "); fflush(stderr); hStdin = GetStdHandle(STD_INPUT_HANDLE); if (!GetConsoleMode(hStdin, &dwMode)) dwMode = ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT | ENABLE_MOUSE_INPUT; SetConsoleMode(hStdin, dwMode & ~ENABLE_ECHO_INPUT); gets(phrase); SetConsoleMode(hStdin, dwMode); fprintf(stderr, "\n");}int PMI_Args_to_info(int *argcp, char ***argvp, void *infop){ int nArgsToStrip; int argc; char **argv; char phrase[MPD_PASSPHRASE_MAX_LENGTH + 1];/* = MPD_DEFAULT_PASSPHRASE;*/ argc = *argcp; argv = *argvp; while (argv[1] && (argv[1][0] == '-' || argv[1][0] == '/')) { nArgsToStrip = 1; if (strncmp(&argv[1][1], "localonly", 10) == 0) { MPICH_Info_set(infop, "localonly", "true"); } else if (strncmp(&argv[1][1], "machinefile", 12) == 0) { if (argc < 3) { err_printf("Error: no filename specified after -machinefile option.\n"); return 0; } MPICH_Info_set(infop, "machinefile", argv[2]); nArgsToStrip = 2; } else if (strncmp(&argv[1][1], "map", 4) == 0) { if (argc < 3) { err_printf("Error: no drive specified after -map option.\n"); return 0; } if ((strlen(argv[2]) > 2) && argv[2][1] == ':') { /* add code to read the map key and append this value to it to allow for multiple mappings */ MPICH_Info_set(infop, "map", argv[2]); } nArgsToStrip = 2; } else if (strncmp(&argv[1][1], "env", 4) == 0) { if (argc < 3) { err_printf("Error: no environment variables after -env option\n"); return 0; } MPICH_Info_set(infop, "env", argv[2]); nArgsToStrip = 2; } else if (strncmp(&argv[1][1], "logon", 6) == 0) { MPICH_Info_set(infop, "logon", "true"); } else if (strncmp(&argv[1][1], "tcp", 4) == 0) { MPICH_Info_set(infop, "smp", "false"); } else if (strncmp(&argv[1][1], "getphrase", 10) == 0) { GetMPDPassPhrase(phrase); MPICH_Info_set(infop, "phrase", phrase); } else if (strncmp(&argv[1][1], "nocolor", 8) == 0) { MPICH_Info_set(infop, "color", "false"); } else if (strncmp(&argv[1][1], "nompi", 6) == 0) { MPICH_Info_set(infop, "nompi", "true"); } else if (strncmp(&argv[1][1], "nodots", 7) == 0) { MPICH_Info_set(infop, "logondots", "false"); } else if (strncmp(&argv[1][1], "nomapping", 10) == 0) { MPICH_Info_set(infop, "mapping", "false"); } else { /* skip over unknown argument */ argc--; argc--; argv++; argv++; continue; } StripArgs(argcp, argvp, nArgsToStrip); argc -= nArgsToStrip; argv = *argvp; } return PMI_SUCCESS;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -