⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pmi_spawn.c

📁 刚才是说明 现在是安装程序在 LINUX环境下进行编程的MPICH安装文件
💻 C
📖 第 1 页 / 共 2 页
字号:
    char pszStr[4096];    int nNproc, nIproc;    int i, j, error;    int nNumHostsNeeded = 0;    char pszHost[100];    char pszHostFile[MAX_PATH];    HostNode *pHosts = NULL;    int flag;    Spawn_struct *pSpawn = NULL;    char pszCmd[1024];    char pszDb[100];    char pszTemp[1024];    int nKeys;    char pszKey[MPICH_MAX_INFO_KEY], pszValue[MPICH_MAX_INFO_VAL];    HostNode *n;    DWORD dwThreadId;    /* should the user and password be passed in by info?*/    /* should this information be in each info allowing for multiple user credentials?*/    if (info != NULL)    {	if (MPICH_Info_get(((MPICH_Info*)info)[0], "user", 100, g_pszPMIAccount, &flag) != MPICH_SUCCESS)	{	    err_printf("Error: MPICH_Info_get('user') failed\n");	    return PMI_FAIL;	}	if (MPICH_Info_get(((MPICH_Info*)info)[0], "password", 100, g_pszPMIPassword, &flag) != MPICH_SUCCESS)	{	    err_printf("Error: MPICH_Info_get('password') failed\n");	    return PMI_FAIL;	}    }    nNproc = 0;    for (i=0; i<count; i++)    {	if (maxprocs[i] < 1)	{	    FreeHosts(pHosts);	    return PMI_FAIL;	}	nNproc += maxprocs[i];	flag = 0;	if (MPICH_Info_get(((MPICH_Info*)info)[i], "host", 100, pszHost, &flag) != MPICH_SUCCESS)	{	    err_printf("Error: MPICH_Info_get failed\n");	    FreeHosts(pHosts);	    return PMI_FAIL;	}	if (flag)	{	    /* user specified a single host*/	    HostNode *n;	    if (pHosts == NULL)		pHosts = n = (HostNode*)malloc(sizeof(HostNode));	    else	    {		n = pHosts;		while (n->pNext != NULL)		    n = n->pNext;		n->pNext = (HostNode*)malloc(sizeof(HostNode));		n = n->pNext;	    }	    for (j=0; j<maxprocs[i]; j++)	    {		n->nSMPProcs = 1;		strncpy(n->pszHost, pszHost, 100);		if (j<maxprocs[i]-1)		{		    n->pNext = (HostNode*)malloc(sizeof(HostNode));		    n = n->pNext;		}		else		    n->pNext = NULL;	    }	}	else	{	    flag = 0;	    if (MPICH_Info_get(((MPICH_Info*)info)[i], "hostfile", MAX_PATH, pszHostFile, &flag) != MPICH_SUCCESS)	    {		err_printf("Error: MPICH_Info_get failed\n");		FreeHosts(pHosts);		return PMI_FAIL;	    }	    if (flag)	    {		/* user specified a host file*/		if (!GetHostsFromFile(pszHostFile, &pHosts, maxprocs[i]))		{		    FreeHosts(pHosts);		    return PMI_FAIL;		}	    }	    else	    {		/* user did not specify any hosts*/		/* create a list of blank host nodes to be filled in later*/		nNumHostsNeeded += maxprocs[i];		if (pHosts == NULL)		    pHosts = n = (HostNode*)malloc(sizeof(HostNode));		else		{		    n = pHosts;		    while (n->pNext != NULL)			n = n->pNext;		    n->pNext = (HostNode*)malloc(sizeof(HostNode));		    n = n->pNext;		}		for (j=0; j<maxprocs[i]; j++)		{		    n->nSMPProcs = 1;		    n->pszHost[0] = '\0';		    if (j<maxprocs[i]-1)		    {			n->pNext = (HostNode*)malloc(sizeof(HostNode));			n = n->pNext;		    }		    else			n->pNext = NULL;		}	    }	}    }    /* fill in the blank host nodes*/    if (nNumHostsNeeded > 0)    {	snprintf(pszStr, 4096, "next %d", nNumHostsNeeded);	if (WriteString(g_bfdMPD, pszStr) == SOCKET_ERROR)	{	    err_printf("WriteString('%s') failed, error %d\n", pszStr, WSAGetLastError());	    FreeHosts(pHosts);	    return PMI_FAIL;	}	n = pHosts;	for (i=0; i<nNumHostsNeeded; i++)	{	    while (n->pszHost[0] != '\0')		n = n->pNext;	    if (!ReadString(g_bfdMPD, n->pszHost))	    {		err_printf("ReadString(next host) failed, error %d\n", WSAGetLastError());		FreeHosts(pHosts);		return PMI_FAIL;	    }	}    }    /* allocate a spawn structure to hold all the data structures for this spawn call*/    pSpawn = CreateSpawn_structn(nNproc);    if (pSpawn == NULL)    {	FreeHosts(pHosts);	return PMI_FAIL;    }    /* give this spawn its own connection to the mpd*/    error = ConnectToMPD(g_pszMPDHost, g_nMPDPort, g_pszMPDPhrase, &pSpawn->m_bfd);    if (error)    {	FreeHosts(pHosts);	return PMI_FAIL;    }    /* if there isn't already a host to redirect io to, create one*/    if (g_pszIOHost[0] == '\0')    {	DWORD dwThreadId;	HANDLE hEvent;	RedirectIOArg *pArg = (RedirectIOArg*)malloc(sizeof(RedirectIOArg));	pArg->hReadyEvent = hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);	pArg->m_pbfdStopIOSignalSocket = &pSpawn->m_bfdStop;	pSpawn->m_hRedirectIOThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)RedirectIOThread, pArg, 0, &dwThreadId);	if (pSpawn->m_hRedirectIOThread == NULL)	{	    err_printf("Error: Unable to create the redirect io thread, error %d\n", GetLastError());	    FreeHosts(pHosts);	    CloseHandle(hEvent);	    free(pArg);	    FreeSpawn_struct(pSpawn);	    return PMI_FAIL;	}	if (WaitForSingleObject(hEvent, 10000) == WAIT_TIMEOUT)	{	    err_printf("Error: timed out waiting for io redirection thread to initialize\n");	    FreeHosts(pHosts);	    CloseHandle(hEvent);	    FreeSpawn_struct(pSpawn);	    return PMI_FAIL;	}	CloseHandle(hEvent);    }    strncpy(pSpawn->m_pNode[0].fwd_host, g_pszIOHost, 100);    pSpawn->m_pNode[0].fwd_port = g_nIOPort;    /* create a database for the spawned processes*/    if (WriteString(pSpawn->m_bfd, "dbcreate") == SOCKET_ERROR)    {	err_printf("WriteString('dbcreate') failed, error %d\n", WSAGetLastError());	FreeHosts(pHosts);	FreeSpawn_struct(pSpawn);	return PMI_FAIL;    }    if (!ReadString(pSpawn->m_bfd, pszDb))    {	err_printf("ReadString(db) failed, error %d\n", WSAGetLastError());	FreeHosts(pHosts);	FreeSpawn_struct(pSpawn);	return PMI_FAIL;    }    /* pre-put any data provided into the spawnee's database*/    MPICH_Info_get_nkeys((MPICH_Info)preput_info, &nKeys);    for (i=0; i<nKeys; i++)    {	MPICH_Info_get_nthkey((MPICH_Info)preput_info, i, pszKey);	MPICH_Info_get((MPICH_Info)preput_info, pszKey, MPICH_MAX_INFO_VAL, pszValue, &flag);	if (flag)	{	    PMI_KVS_Put(pszDb, pszKey, pszValue);	}    }    /* launch each process*/    for (nIproc = 0; nIproc < nNproc; nIproc++)    {	/* get the host name for this process*/	GetHost(pHosts, nIproc, pszHost);	/* create the command*/	CreateCommand(count, maxprocs, cmds, argvs, nIproc, pszCmd);	/* possibly start an io forwarder*/	if ((nIproc > 0) && ((nNproc/2) > nIproc))	{	    snprintf(pszStr, 4096, "createforwarder host=%s forward=%s:%d",		pszHost, pSpawn->m_pNode[(nIproc-1)/2].fwd_host, pSpawn->m_pNode[(nIproc-1)/2].fwd_port);	    if (WriteString(pSpawn->m_bfd, pszStr) == SOCKET_ERROR)	    {		err_printf("WriteString('%s') failed, error %d\n", pszStr, WSAGetLastError());		FreeHosts(pHosts);		FreeSpawn_struct(pSpawn);		return PMI_FAIL;	    }	    if (!ReadString(pSpawn->m_bfd, pszStr))	    {		err_printf("ReadString(forwarder port) failed, error %d\n", WSAGetLastError());		FreeHosts(pHosts);		FreeSpawn_struct(pSpawn);		return PMI_FAIL;	    }	    strncpy(pSpawn->m_pNode[nIproc].fwd_host, pszHost, 100);	    pSpawn->m_pNode[nIproc].fwd_port = atoi(pszStr);	}	/* create the command line*/	snprintf(pszStr, 4096, "launch h=%s c='%s' 12=%s:%d k=%d e='PMI_SPAWN=yes|PMI_RANK=%d|PMI_SIZE=%d|PMI_KVS=%s|PMI_MPD=%s|PMI_IO=%s:%d",	    pszHost, pszCmd, 	    pSpawn->m_pNode[(nIproc-1)/2].fwd_host, pSpawn->m_pNode[(nIproc-1)/2].fwd_port,	    nIproc, nIproc, nNproc, pszDb, pszHost, 	    pSpawn->m_pNode[(nIproc-1)/2].fwd_host, pSpawn->m_pNode[(nIproc-1)/2].fwd_port);	if (strlen(g_pszPMIAccount))	{	    snprintf(pszTemp, 1024, "|PMI_USER=%s|PMI_PWD=%s' a=%s p=%s",		g_pszPMIAccount, g_pszPMIPassword, g_pszPMIAccount, g_pszPMIPassword);	    strncat(pszStr, pszTemp, 4096-strlen(pszStr));	}	else	{	    strncat(pszStr, "'", 2);	}	/* write the launch command*/	if (WriteString(pSpawn->m_bfd, pszStr) == SOCKET_ERROR)	{	    err_printf("WriteString('launch h=%s c='%s' ...') failed, error %d\n", pszHost, pszCmd, WSAGetLastError());	    FreeHosts(pHosts);	    FreeSpawn_struct(pSpawn);	    return PMI_FAIL;	}	if (!ReadString(pSpawn->m_bfd, pszStr))	{	    err_printf("ReadString(launchid) failed, error %d\n", WSAGetLastError());	    FreeHosts(pHosts);	    FreeSpawn_struct(pSpawn);	    return PMI_FAIL;	}	pSpawn->m_pNode[nIproc].launchid = atoi(pszStr);    }    FreeHosts(pHosts);    pHosts = NULL;    /* get the process ids*/    for (i=0; i<nNproc; i++)    {	snprintf(pszStr, 4096, "getpid %d", pSpawn->m_pNode[i].launchid);	if (WriteString(pSpawn->m_bfd, pszStr) == SOCKET_ERROR)	{	    err_printf("WriteString('%s') failed, error %d\n", pszStr, WSAGetLastError());	    FreeSpawn_struct(pSpawn);	    return PMI_FAIL;	}	if (!ReadString(pSpawn->m_bfd, pszStr))	{	    err_printf("ReadString(pid) failed, error %d\n", WSAGetLastError());	    FreeSpawn_struct(pSpawn);	    return PMI_FAIL;	}	pSpawn->m_pNode[i].pid = atoi(pszStr);	if (pSpawn->m_pNode[i].pid == -1)	{	    snprintf(pszStr, 4096, "geterror %d", pSpawn->m_pNode[i].launchid);	    if (WriteString(pSpawn->m_bfd, pszStr) == SOCKET_ERROR)	    {		err_printf("Error: launching process %d failed, unable to determine the error.\nWriting the request for the error message failed, error %d", i, WSAGetLastError());		FreeSpawn_struct(pSpawn);		return PMI_FAIL;	    }	    if (!ReadString(pSpawn->m_bfd, pszStr))	    {		err_printf("Error: launching process %d failed, unable to determine the error.\nReading the error message failed, error %d", i, WSAGetLastError());		FreeSpawn_struct(pSpawn);		return PMI_FAIL;	    }	    err_printf("Error: launching process %d failed, %s\n", i, pszStr);	    FreeSpawn_struct(pSpawn);	    return PMI_FAIL;	}    }    /* Start a thread to monitor the spawned processes until they all exit and all output has been redirected*/    /* Add the spawn data structure to the global list*/    WaitForSingleObject(g_hSpawnMutex, INFINITE);        pSpawn->m_pNext = g_pSpawnList;    g_pSpawnList = pSpawn;    pSpawn->m_hThread = g_hJobThreads[g_nNumJobThreads] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)SpawnWaitThread, pSpawn, 0, &dwThreadId);    if (g_hJobThreads[g_nNumJobThreads] == NULL)    {	err_printf("Error: Unable to create the job wait thread, error %d\n", GetLastError());	g_pSpawnList = pSpawn->m_pNext;	ReleaseMutex(g_hSpawnMutex);	FreeSpawn_struct(pSpawn);	return PMI_FAIL;    }    g_nNumJobThreads++;    ReleaseMutex(g_hSpawnMutex);    return PMI_SUCCESS;}static void StripArgs(int *argc, char ***argv, int n){    int i;    if (n+1 > *argc)    {	err_printf("Error: cannot strip %d args, only %d left.\n", n, *argc-1);    }    for (i=n+1; i<=*argc; i++)    {	(*argv)[i-n] = (*argv)[i];    }    *argc -= n;}static void GetMPDPassPhrase(char *phrase){    HANDLE hStdin;    DWORD dwMode;    fprintf(stderr, "mpd password: ");    fflush(stderr);        hStdin = GetStdHandle(STD_INPUT_HANDLE);    if (!GetConsoleMode(hStdin, &dwMode))	dwMode = ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT | ENABLE_MOUSE_INPUT;    SetConsoleMode(hStdin, dwMode & ~ENABLE_ECHO_INPUT);    gets(phrase);    SetConsoleMode(hStdin, dwMode);        fprintf(stderr, "\n");}int PMI_Args_to_info(int *argcp, char ***argvp, void *infop){    int nArgsToStrip;    int argc;    char **argv;    char phrase[MPD_PASSPHRASE_MAX_LENGTH + 1];/* = MPD_DEFAULT_PASSPHRASE;*/    argc = *argcp;    argv = *argvp;    while (argv[1] && (argv[1][0] == '-' || argv[1][0] == '/'))    {	nArgsToStrip = 1;	if (strncmp(&argv[1][1], "localonly", 10) == 0)	{	    MPICH_Info_set(infop, "localonly", "true");	}	else if (strncmp(&argv[1][1], "machinefile", 12) == 0)	{	    if (argc < 3)	    {		err_printf("Error: no filename specified after -machinefile option.\n");		return 0;	    }	    MPICH_Info_set(infop, "machinefile", argv[2]);	    nArgsToStrip = 2;	}	else if (strncmp(&argv[1][1], "map", 4) == 0)	{	    if (argc < 3)	    {		err_printf("Error: no drive specified after -map option.\n");		return 0;	    }	    if ((strlen(argv[2]) > 2) && argv[2][1] == ':')	    {		/* add code to read the map key and append this value to it to allow for multiple mappings */		MPICH_Info_set(infop, "map", argv[2]);	    }	    nArgsToStrip = 2;	}	else if (strncmp(&argv[1][1], "env", 4) == 0)	{	    if (argc < 3)	    {		err_printf("Error: no environment variables after -env option\n");		return 0;	    }	    MPICH_Info_set(infop, "env", argv[2]);	    nArgsToStrip = 2;	}	else if (strncmp(&argv[1][1], "logon", 6) == 0)	{	    MPICH_Info_set(infop, "logon", "true");	}	else if (strncmp(&argv[1][1], "tcp", 4) == 0)	{	    MPICH_Info_set(infop, "smp", "false");	}	else if (strncmp(&argv[1][1], "getphrase", 10) == 0)	{	    GetMPDPassPhrase(phrase);	    MPICH_Info_set(infop, "phrase", phrase);	}	else if (strncmp(&argv[1][1], "nocolor", 8) == 0)	{	    MPICH_Info_set(infop, "color", "false");	}	else if (strncmp(&argv[1][1], "nompi", 6) == 0)	{	    MPICH_Info_set(infop, "nompi", "true");	}	else if (strncmp(&argv[1][1], "nodots", 7) == 0)	{	    MPICH_Info_set(infop, "logondots", "false");	}	else if (strncmp(&argv[1][1], "nomapping", 10) == 0)	{	    MPICH_Info_set(infop, "mapping", "false");	}	else	{	    /* skip over unknown argument */	    argc--;	    argc--;	    argv++;	    argv++;	    continue;	}	StripArgs(argcp, argvp, nArgsToStrip);	argc -= nArgsToStrip;	argv = *argvp;    }    return PMI_SUCCESS;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -