📄 launchprocess.cpp
字号:
#include "LaunchProcess.h"#include <stdio.h>#include "global.h"#include "..\Common\MPIJobDefs.h"#include "Translate_Error.h"#include "mpdutil.h"#include "mpd.h"#include "RedirectIO.h"#include <stdlib.h>static char *GenerateMapString(MapDriveNode *pNode){ char *str, *ret_val; if (pNode == NULL) return NULL; ret_val = str = new char[8192]; str += sprintf(str, " m='%c:%s", pNode->cDrive, pNode->pszShare); pNode = pNode->pNext; while (pNode) { str += sprintf(str, ";%c:%s", pNode->cDrive, pNode->pszShare); pNode = pNode->pNext; } strcpy(str, "'"); return ret_val;}bool HostIsLocal(char *pszHost){ char temp[100], localhost[100]; DWORD len = 100; return false; strcpy(temp, pszHost); // get rid of the domain extension strtok(temp, "."); // get the local computer name GetComputerName(localhost, &len); // compare the local name to the provided name return (stricmp(temp, localhost) == 0);}void CreateInProcessMPD(){}int ConnectToInProcessMPD(SOCKET *sock){ *sock = NULL; return -1;}// Function name : LaunchProcess// Description : // Return type : void // Argument : LaunchProcessArg *argvoid MPIRunLaunchProcess(MPIRunLaunchProcessArg *arg){ DWORD length = 100; HANDLE hRIThread = NULL; long error; int nPid; int nPort = MPD_DEFAULT_PORT; SOCKET sock; int launchid; char pszStartupDB[100]; char pszStr[MAX_CMD_LENGTH+1]; char pszIOE[10]; char *dbg_str = "no"; char *pszMap = NULL; bool bLocalStartup = false; if (arg->bUseDebugFlag) dbg_str = "yes"; /* if (arg->i == 0 && HostIsLocal(arg->pszHost)) { CreateInProcessMPD(); error = ConnectToInProcessMPD(&sock); } else { error = ConnectToMPD(arg->pszHost, nPort, arg->pszPassPhrase, &sock); } */ if (arg->i == 0 && HostIsLocal(arg->pszHost)) bLocalStartup = true; error = ConnectToMPD(arg->pszHost, nPort, arg->pszPassPhrase, &sock); //printf("MPIRunLaunchProcess:connecting to %s:%d rank %d\n", arg->pszHost, nPort, arg->i);fflush(stdout); //if ((error = ConnectToMPD(arg->pszHost, nPort, arg->pszPassPhrase, &sock)) == 0) if (error == 0) { if (arg->i == 0 && !g_bNoMPI) { sprintf(pszStr, "dbcreate"); if (WriteString(sock, pszStr) == SOCKET_ERROR) { printf("ERROR: Unable to write '%s' to socket[%d]\n", pszStr, sock); //ExitProcess(0); easy_closesocket(sock); SetEvent(g_hAbortEvent); delete arg; return; } // read result if (!ReadStringTimeout(sock, pszStartupDB, g_nMPIRUN_SHORT_TIMEOUT)) { printf("ERROR: ReadString failed to read the database name: error %d\n", WSAGetLastError()); //ExitProcess(0); easy_closesocket(sock); SetEvent(g_hAbortEvent); delete arg; return; } if (strnicmp(pszStartupDB, "FAIL ", 5) == 0) { printf("Unable to create a database on '%s'\n%s", arg->pszHost, pszStartupDB);fflush(stdout); //ExitProcess(0); easy_closesocket(sock); SetEvent(g_hAbortEvent); delete arg; return; } // The ordering of the arguments may seem funny because pszHost is last. // I think this needs to be this way so old mpd's can still launch new mpich processes sprintf(pszStr, "|MPICH_EXTRA=mpd:%s:%d:%s:%s", pszStartupDB, nPort, arg->pszPassPhrase, arg->pszHost); strncat(arg->pszEnv, pszStr, MAX_CMD_LENGTH - 1 - strlen(arg->pszEnv)); if (g_bUseJobHost) { PutJobInDatabase(arg); } } else { sprintf(pszStr, "|MPICH_EXTRA=mpd:%s:%d:%s", arg->pszHost, nPort, arg->pszPassPhrase); strncat(arg->pszEnv, pszStr, MAX_CMD_LENGTH - 1 - strlen(arg->pszEnv)); } if (arg->i == 0) strcpy(pszIOE, "012"); // only redirect stdin to the root process else strcpy(pszIOE, "12"); if (g_nNproc > FORWARD_NPROC_THRESHOLD) { if (arg->i > 0) { while (g_pForwardHost[(arg->i - 1)/2].nPort == 0) Sleep(100); sprintf(arg->pszIOHostPort, "%s:%d", g_pForwardHost[(arg->i - 1)/2].pszHost, g_pForwardHost[(arg->i - 1)/2].nPort); if (g_nNproc/2 > arg->i) { strncpy(g_pForwardHost[arg->i].pszHost, arg->pszHost, MAX_HOST_LENGTH); g_pForwardHost[arg->i].pszHost[MAX_HOST_LENGTH-1] = '\0'; sprintf(pszStr, "createforwarder host=%s forward=%s", arg->pszHost, arg->pszIOHostPort); WriteString(sock, pszStr); ReadStringTimeout(sock, pszStr, g_nMPIRUN_SHORT_TIMEOUT); int nTempPort = atoi(pszStr); if (nTempPort == -1) { // If creating the forwarder fails, redirect output to the root instead g_pForwardHost[arg->i] = g_pForwardHost[0]; } else g_pForwardHost[arg->i].nPort = nTempPort; //printf("forwarder %s:%d\n", g_pForwardHost[arg->i].pszHost, g_pForwardHost[arg->i].nPort);fflush(stdout); } } } if (g_pDriveMapList) pszMap = GenerateMapString(g_pDriveMapList); // LaunchProcess //printf("MPIRunLaunchProcess:launching on %s, %s\n", arg->pszHost, arg->pszCmdLine);fflush(stdout); if (arg->bLogon) { char *pszEncoded; pszEncoded = EncodePassword(arg->pszPassword); if (strlen(arg->pszDir) > 0) { if (_snprintf(pszStr, MAX_CMD_LENGTH, "launch h=%s c='%s' e='%s' a=%s p=%s %s=%s k=%d d='%s' g=%s", arg->pszHost, arg->pszCmdLine, arg->pszEnv, arg->pszAccount, pszEncoded, pszIOE, arg->pszIOHostPort, arg->i, arg->pszDir, dbg_str) < 0) { printf("ERROR: command exceeds internal buffer size\n"); easy_closesocket(sock); SetEvent(g_hAbortEvent); delete arg; if (pszEncoded != NULL) free(pszEncoded); return; } } else { if (_snprintf(pszStr, MAX_CMD_LENGTH, "launch h=%s c='%s' e='%s' a=%s p=%s %s=%s k=%d g=%s", arg->pszHost, arg->pszCmdLine, arg->pszEnv, arg->pszAccount, pszEncoded, pszIOE, arg->pszIOHostPort, arg->i, dbg_str) < 0) { printf("ERROR: command exceeds internal buffer size\n"); easy_closesocket(sock); SetEvent(g_hAbortEvent); delete arg; if (pszEncoded != NULL) free(pszEncoded); return; } } if (pszEncoded != NULL) free(pszEncoded); } else { if (strlen(arg->pszDir) > 0) { if (_snprintf(pszStr, MAX_CMD_LENGTH, "launch h=%s c='%s' e='%s' %s=%s k=%d d='%s' g=%s", arg->pszHost, arg->pszCmdLine, arg->pszEnv, pszIOE, arg->pszIOHostPort, arg->i, arg->pszDir, dbg_str) < 0) { printf("ERROR: command exceeds internal buffer size\n"); easy_closesocket(sock); SetEvent(g_hAbortEvent); delete arg; return; } } else { if (_snprintf(pszStr, MAX_CMD_LENGTH, "launch h=%s c='%s' e='%s' %s=%s k=%d g=%s", arg->pszHost, arg->pszCmdLine, arg->pszEnv, pszIOE, arg->pszIOHostPort, arg->i, dbg_str) < 0) { printf("ERROR: command exceeds internal buffer size\n"); easy_closesocket(sock); SetEvent(g_hAbortEvent); delete arg; return; } } } if (arg->bUsePriorities) { char str[100]; sprintf(str, " r='%d:%d'", arg->nPriorityClass, arg->nPriority); strcat(pszStr, str); } if (pszMap) { strcat(pszStr, pszMap); delete pszMap; } //printf("MPIRunLaunchProcess:launch command = %s\n", pszStr);fflush(stdout); if (bLocalStartup) { // launch process nPid = -1; } else { if (WriteString(sock, pszStr) == SOCKET_ERROR) { printf("ERROR: Unable to send launch command to '%s'\r\nError %d", arg->pszHost, WSAGetLastError()); easy_closesocket(sock); SetEvent(g_hAbortEvent); delete arg; return; } if (!ReadStringTimeout(sock, pszStr, g_nMPIRUN_SHORT_TIMEOUT)) { printf("ERROR: Unable to read the result of the launch command on '%s'\r\nError %d", arg->pszHost, WSAGetLastError()); easy_closesocket(sock); SetEvent(g_hAbortEvent); delete arg; return; } launchid = atoi(pszStr); // save the launch id, get the pid sprintf(pszStr, "getpid %d", launchid); if (WriteString(sock, pszStr) == SOCKET_ERROR) { printf("ERROR: Unable to send getpid command to '%s'\r\nError %d", arg->pszHost, WSAGetLastError()); easy_closesocket(sock); SetEvent(g_hAbortEvent); delete arg; return; } // the following timeout needs to be longer than MPIRUN_SHORT_TIMEOUT because the CreateProcess command may take // a while to start the process if it lives in a shared directory if (!ReadStringTimeout(sock, pszStr, g_nMPIRUN_CREATE_PROCESS_TIMEOUT)) { error = WSAGetLastError(); if (error == ERROR_TIMEOUT || error == 0) { printf("Launch process error: Timed out waiting for the result of the process launch command sent to host '%s'\r\n", arg->pszHost); } else { printf("Launch process error: Unable to read the result of the getpid command on '%s'\r\nError %d", arg->pszHost, error); fflush(stdout); } printf("Attempt to launch '%s' on '%s' failed.\n", arg->pszCmdLine, arg->pszHost); fflush(stdout); easy_closesocket(sock); SetEvent(g_hAbortEvent); delete arg; return; } nPid = atoi(pszStr); if (nPid == -1) { sprintf(pszStr, "geterror %d", launchid); if (WriteString(sock, pszStr) == SOCKET_ERROR) { printf("ERROR: Unable to send geterror command after an unsuccessful launch on '%s'\r\nError %d", arg->pszHost, WSAGetLastError()); easy_closesocket(sock); SetEvent(g_hAbortEvent); delete arg; return; } if (!ReadStringTimeout(sock, pszStr, g_nMPIRUN_SHORT_TIMEOUT)) { printf("ERROR: Unable to read the result of the geterror command on '%s'\r\nError %d", arg->pszHost, WSAGetLastError()); easy_closesocket(sock); SetEvent(g_hAbortEvent);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -