⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 launchprocess.cpp

📁 MPICH是MPI的重要研究,提供了一系列的接口函数,为并行计算的实现提供了编程环境.
💻 CPP
📖 第 1 页 / 共 2 页
字号:
#include "LaunchProcess.h"#include <stdio.h>#include "global.h"#include "..\Common\MPIJobDefs.h"#include "Translate_Error.h"#include "mpdutil.h"#include "mpd.h"#include "RedirectIO.h"#include <stdlib.h>static char *GenerateMapString(MapDriveNode *pNode){    char *str, *ret_val;    if (pNode == NULL)	return NULL;    ret_val = str = new char[8192];    str += sprintf(str, " m='%c:%s", pNode->cDrive, pNode->pszShare);    pNode = pNode->pNext;    while (pNode)    {	str += sprintf(str, ";%c:%s", pNode->cDrive, pNode->pszShare);	pNode = pNode->pNext;    }    strcpy(str, "'");    return ret_val;}bool HostIsLocal(char *pszHost){    char temp[100], localhost[100];    DWORD len = 100;    return false;    strcpy(temp, pszHost);    // get rid of the domain extension    strtok(temp, ".");    // get the local computer name    GetComputerName(localhost, &len);    // compare the local name to the provided name    return (stricmp(temp, localhost) == 0);}void CreateInProcessMPD(){}int ConnectToInProcessMPD(SOCKET *sock){    *sock = NULL;    return -1;}// Function name	: LaunchProcess// Description	    : // Return type		: void // Argument         : LaunchProcessArg *argvoid MPIRunLaunchProcess(MPIRunLaunchProcessArg *arg){    DWORD length = 100;    HANDLE hRIThread = NULL;    long error;    int nPid;    int nPort = MPD_DEFAULT_PORT;    SOCKET sock;    int launchid;    char pszStartupDB[100];    char pszStr[MAX_CMD_LENGTH+1];    char pszIOE[10];    char *dbg_str = "no";    char *pszMap = NULL;    bool bLocalStartup = false;    if (arg->bUseDebugFlag)	dbg_str = "yes";    /*    if (arg->i == 0 && HostIsLocal(arg->pszHost))    {	CreateInProcessMPD();	error = ConnectToInProcessMPD(&sock);    }    else    {	error = ConnectToMPD(arg->pszHost, nPort, arg->pszPassPhrase, &sock);    }    */    if (arg->i == 0 && HostIsLocal(arg->pszHost))	bLocalStartup = true;    error = ConnectToMPD(arg->pszHost, nPort, arg->pszPassPhrase, &sock);    //printf("MPIRunLaunchProcess:connecting to %s:%d rank %d\n", arg->pszHost, nPort, arg->i);fflush(stdout);    //if ((error = ConnectToMPD(arg->pszHost, nPort, arg->pszPassPhrase, &sock)) == 0)    if (error == 0)    {	if (arg->i == 0 && !g_bNoMPI)	{	    sprintf(pszStr, "dbcreate");	    if (WriteString(sock, pszStr) == SOCKET_ERROR)	    {		printf("ERROR: Unable to write '%s' to socket[%d]\n", pszStr, sock);		//ExitProcess(0);		easy_closesocket(sock);		SetEvent(g_hAbortEvent);		delete arg;		return;	    }	    // read result	    if (!ReadStringTimeout(sock, pszStartupDB, g_nMPIRUN_SHORT_TIMEOUT))	    {		printf("ERROR: ReadString failed to read the database name: error %d\n", WSAGetLastError());		//ExitProcess(0);		easy_closesocket(sock);		SetEvent(g_hAbortEvent);		delete arg;		return;	    }	    if (strnicmp(pszStartupDB, "FAIL ", 5) == 0)	    {		printf("Unable to create a database on '%s'\n%s", arg->pszHost, pszStartupDB);fflush(stdout);		//ExitProcess(0);		easy_closesocket(sock);		SetEvent(g_hAbortEvent);		delete arg;		return;	    }	    // The ordering of the arguments may seem funny because pszHost is last.	    // I think this needs to be this way so old mpd's can still launch new mpich processes	    sprintf(pszStr, "|MPICH_EXTRA=mpd:%s:%d:%s:%s", pszStartupDB, nPort, arg->pszPassPhrase, arg->pszHost);	    strncat(arg->pszEnv, pszStr, MAX_CMD_LENGTH - 1 - strlen(arg->pszEnv));	    if (g_bUseJobHost)	    {		PutJobInDatabase(arg);	    }	}	else	{	    sprintf(pszStr, "|MPICH_EXTRA=mpd:%s:%d:%s", arg->pszHost, nPort, arg->pszPassPhrase);	    strncat(arg->pszEnv, pszStr, MAX_CMD_LENGTH - 1 - strlen(arg->pszEnv));	}		if (arg->i == 0)	    strcpy(pszIOE, "012"); // only redirect stdin to the root process	else	    strcpy(pszIOE, "12");	if (g_nNproc > FORWARD_NPROC_THRESHOLD)	{	    if (arg->i > 0)	    {		while (g_pForwardHost[(arg->i - 1)/2].nPort == 0)		    Sleep(100);		sprintf(arg->pszIOHostPort, "%s:%d", g_pForwardHost[(arg->i - 1)/2].pszHost, g_pForwardHost[(arg->i - 1)/2].nPort);		if (g_nNproc/2 > arg->i)		{		    strncpy(g_pForwardHost[arg->i].pszHost, arg->pszHost, MAX_HOST_LENGTH);		    g_pForwardHost[arg->i].pszHost[MAX_HOST_LENGTH-1] = '\0';		    sprintf(pszStr, "createforwarder host=%s forward=%s", arg->pszHost, arg->pszIOHostPort);		    WriteString(sock, pszStr);		    ReadStringTimeout(sock, pszStr, g_nMPIRUN_SHORT_TIMEOUT);		    int nTempPort = atoi(pszStr);		    if (nTempPort == -1)		    {			// If creating the forwarder fails, redirect output to the root instead			g_pForwardHost[arg->i] = g_pForwardHost[0];		    }		    else			g_pForwardHost[arg->i].nPort = nTempPort;		    //printf("forwarder %s:%d\n", g_pForwardHost[arg->i].pszHost, g_pForwardHost[arg->i].nPort);fflush(stdout);		}	    }	}	if (g_pDriveMapList)	    pszMap = GenerateMapString(g_pDriveMapList);	// LaunchProcess	//printf("MPIRunLaunchProcess:launching on %s, %s\n", arg->pszHost, arg->pszCmdLine);fflush(stdout);	if (arg->bLogon)	{	    char *pszEncoded;	    pszEncoded = EncodePassword(arg->pszPassword);	    if (strlen(arg->pszDir) > 0)	    {		if (_snprintf(pszStr, MAX_CMD_LENGTH, "launch h=%s c='%s' e='%s' a=%s p=%s %s=%s k=%d d='%s' g=%s", 		    arg->pszHost, arg->pszCmdLine, arg->pszEnv, arg->pszAccount, pszEncoded, 		    pszIOE, arg->pszIOHostPort, arg->i, arg->pszDir, dbg_str) < 0)		{		    printf("ERROR: command exceeds internal buffer size\n");		    easy_closesocket(sock);		    SetEvent(g_hAbortEvent);		    delete arg;		    if (pszEncoded != NULL) free(pszEncoded);		    return;		}	    }	    else	    {		if (_snprintf(pszStr, MAX_CMD_LENGTH, "launch h=%s c='%s' e='%s' a=%s p=%s %s=%s k=%d g=%s", 		    arg->pszHost, arg->pszCmdLine, arg->pszEnv, arg->pszAccount, pszEncoded, 		    pszIOE, arg->pszIOHostPort, arg->i, dbg_str) < 0)		{		    printf("ERROR: command exceeds internal buffer size\n");		    easy_closesocket(sock);		    SetEvent(g_hAbortEvent);		    delete arg;		    if (pszEncoded != NULL) free(pszEncoded);		    return;		}	    }	    if (pszEncoded != NULL) free(pszEncoded);	}	else	{	    if (strlen(arg->pszDir) > 0)	    {		if (_snprintf(pszStr, MAX_CMD_LENGTH, "launch h=%s c='%s' e='%s' %s=%s k=%d d='%s' g=%s",		    arg->pszHost, arg->pszCmdLine, arg->pszEnv, 		    pszIOE, arg->pszIOHostPort, arg->i, arg->pszDir, dbg_str) < 0)		{		    printf("ERROR: command exceeds internal buffer size\n");		    easy_closesocket(sock);		    SetEvent(g_hAbortEvent);		    delete arg;		    return;		}	    }	    else	    {		if (_snprintf(pszStr, MAX_CMD_LENGTH, "launch h=%s c='%s' e='%s' %s=%s k=%d g=%s",		    arg->pszHost, arg->pszCmdLine, arg->pszEnv, 		    pszIOE, arg->pszIOHostPort, arg->i, dbg_str) < 0)		{		    printf("ERROR: command exceeds internal buffer size\n");		    easy_closesocket(sock);		    SetEvent(g_hAbortEvent);		    delete arg;		    return;		}	    }	}	if (arg->bUsePriorities)	{	    char str[100];	    sprintf(str, " r='%d:%d'", arg->nPriorityClass, arg->nPriority);	    strcat(pszStr, str);	}	if (pszMap)	{	    strcat(pszStr, pszMap);	    delete pszMap;	}	//printf("MPIRunLaunchProcess:launch command = %s\n", pszStr);fflush(stdout);	if (bLocalStartup)	{	    // launch process	    nPid = -1;	}	else	{	    if (WriteString(sock, pszStr) == SOCKET_ERROR)	    {		printf("ERROR: Unable to send launch command to '%s'\r\nError %d", arg->pszHost, WSAGetLastError());		easy_closesocket(sock);		SetEvent(g_hAbortEvent);		delete arg;		return;	    }	    if (!ReadStringTimeout(sock, pszStr, g_nMPIRUN_SHORT_TIMEOUT))	    {		printf("ERROR: Unable to read the result of the launch command on '%s'\r\nError %d", arg->pszHost, WSAGetLastError());		easy_closesocket(sock);		SetEvent(g_hAbortEvent);		delete arg;		return;	    }	    launchid = atoi(pszStr);	    // save the launch id, get the pid	    sprintf(pszStr, "getpid %d", launchid);	    if (WriteString(sock, pszStr) == SOCKET_ERROR)	    {		printf("ERROR: Unable to send getpid command to '%s'\r\nError %d", arg->pszHost, WSAGetLastError());		easy_closesocket(sock);		SetEvent(g_hAbortEvent);		delete arg;		return;	    }	    // the following timeout needs to be longer than MPIRUN_SHORT_TIMEOUT because the CreateProcess command may take	    // a while to start the process if it lives in a shared directory	    if (!ReadStringTimeout(sock, pszStr, g_nMPIRUN_CREATE_PROCESS_TIMEOUT))	    {		error = WSAGetLastError();		if (error == ERROR_TIMEOUT || error == 0)		{		    printf("Launch process error: Timed out waiting for the result of the process launch command sent to host '%s'\r\n", arg->pszHost);		}		else		{		    printf("Launch process error: Unable to read the result of the getpid command on '%s'\r\nError %d", arg->pszHost, error);		    fflush(stdout);		}		printf("Attempt to launch '%s' on '%s' failed.\n", arg->pszCmdLine, arg->pszHost);		fflush(stdout);		easy_closesocket(sock);		SetEvent(g_hAbortEvent);		delete arg;		return;	    }	    nPid = atoi(pszStr);	    if (nPid == -1)	    {		sprintf(pszStr, "geterror %d", launchid);		if (WriteString(sock, pszStr) == SOCKET_ERROR)		{		    printf("ERROR: Unable to send geterror command after an unsuccessful launch on '%s'\r\nError %d", arg->pszHost, WSAGetLastError());		    easy_closesocket(sock);		    SetEvent(g_hAbortEvent);		    delete arg;		    return;		}		if (!ReadStringTimeout(sock, pszStr, g_nMPIRUN_SHORT_TIMEOUT))		{		    printf("ERROR: Unable to read the result of the geterror command on '%s'\r\nError %d", arg->pszHost, WSAGetLastError());		    easy_closesocket(sock);		    SetEvent(g_hAbortEvent);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -