⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 bnr_spawn.cpp

📁 MPICH是MPI的重要研究,提供了一系列的接口函数,为并行计算的实现提供了编程环境.
💻 CPP
字号:
#include "bnr_internal.h"int (*g_notify_fn)(BNR_Group group, int rank, int exit_code) = NULL;LONG g_nProcessesRemaining = 0;HANDLE g_hProcessFinishedThread = NULL;void ProcessFinishedThread(){	char pBuffer[100];	int nGroup, nRank;	int nExitCode;	int nRetVal;	char *token;	BNR_Group_node *pGroup;	//printf("ProcessFinishedThread started, nProc: %d\n", g_nProcessesRemaining);fflush(stdout);	while (true)	{		nRetVal = GetZString(g_hMPDEndOutputPipe, pBuffer);		if (nRetVal)		{			printf("GetZString failed, error: %d\n", nRetVal);			ExitThread(nRetVal);		}				//printf("%s\n", pBuffer);fflush(stdout);						token = strtok(pBuffer, " ");		if (token != NULL)		{			nGroup = atoi(token);			pGroup = FindBNRGroupFromInt(nGroup);			token = strtok(NULL, " ");			if (token != NULL)			{				nRank = atoi(token);				token = strtok(NULL, " ");				if (token != NULL)				{					nExitCode = atoi(token);					if (g_notify_fn != NULL)						g_notify_fn(pGroup, nRank, nExitCode);					nRetVal = InterlockedDecrement(&g_nProcessesRemaining);					if (nRetVal == 0)					{						CloseHandle(g_hProcessFinishedThread);						g_hProcessFinishedThread = NULL;						return;					}				}			}		}	}}/* not collective. * remote_group is an open BNR_Group and may be passed to Spawn  * multiple times. It is not valid until it is closed.   * BNR_Spawn will fail if remote_group is closed or uninitialized. * notify_fn is called if a process exits, and gets the * group, rank, and return code. argv and env * arrays are null terminated.  The caller's group is the * parent of the spawned processes. */MPICH_BNR_API int BNR_Spawn( BNR_Group remote_group, 						    int count, char *command, char *args, char *env, 						    BNR_Info info, int (*notify_fn)(BNR_Group group, 						    int rank, int exit_code) ){	int i;	int flag;	char pBuffer[4096], pszEnv[3072];	char pszStdinHost[101], pszStdoutHost[101], pszStderrHost[101];	int nStdinPort, nStdoutPort, nStderrPort;	int group = ((BNR_Group_node*)remote_group)->nID;	DWORD dwNumWritten;	BNR_Info_get(info, "stdoutHost", 100, pszStdoutHost, &flag);	BNR_Info_get(info, "stdoutPort", 100, pBuffer, &flag);	nStdoutPort = atoi(pBuffer);	BNR_Info_get(info, "stdinHost", 100, pszStdinHost, &flag);	BNR_Info_get(info, "stdinPort", 100, pBuffer, &flag);	nStdinPort = atoi(pBuffer);	BNR_Info_get(info, "stderrHost", 100, pszStderrHost, &flag);	BNR_Info_get(info, "stderrPort", 100, pBuffer, &flag);	nStderrPort = atoi(pBuffer);	SpawnedProcess *pProcesses = new SpawnedProcess[count];	// get the next 'count' hosts available	sprintf(pBuffer, "next %d\n", count);	WriteFile(g_hMPDPipe, pBuffer, strlen(pBuffer), &dwNumWritten, NULL);	for (i=0; i<count; i++)		GetString(g_hMPDOutputPipe, pProcesses[i].pszHost);	// Make sure the exit output thread is ready before any processes are launched	g_notify_fn = notify_fn;	if (g_nProcessesRemaining == 0 && g_hProcessFinishedThread == NULL)	{		g_nProcessesRemaining = count;		DWORD dwThreadID;		g_hProcessFinishedThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)ProcessFinishedThread, NULL, 0, &dwThreadID);	}	else	{		for (i=0; i<count; i++)			InterlockedIncrement(&g_nProcessesRemaining);	}	// launch processes	for (i=0; i<count; i++)	{		sprintf(pszEnv, "MPICH_BNR_LIB=mpichbnr.dll|BNR_RANK=%d|BNR_SIZE=%d|BNR_GROUP=%d|BNR_PARENT=%d|BNR_PARENT_SIZE=%d",			i, count, group, ((BNR_Group_node*)g_bnrGroup)->nID, ((BNR_Group_node*)g_bnrGroup)->nSize);		if (strlen(env))		{			strcat(pszEnv, "|");			strcat(pszEnv, env);		}		if (i == 0)			sprintf(pBuffer, "launch h'%s'c'%s'a'%s'g'%d'r'%d'e'%s'0'%s:%d'1'%s:%d'2'%s:%d'\n", 			pProcesses[i].pszHost, command, args, group, i, pszEnv,			pszStdinHost, nStdinPort, 			pszStdoutHost, nStdoutPort, 			pszStderrHost, nStderrPort);		else			sprintf(pBuffer, "launch h'%s'c'%s'a'%s'g'%d'r'%d'e'%s'1'%s:%d'2'%s:%d'\n", 			pProcesses[i].pszHost, command, args, group, i, pszEnv,			pszStdoutHost, nStdoutPort, 			pszStderrHost, nStderrPort);		WriteFile(g_hMPDPipe, pBuffer, strlen(pBuffer), &dwNumWritten, NULL);		GetString(g_hMPDOutputPipe, pProcesses[i].pszSpawnID);	}	// get the launch ids	for (i=0; i<count; i++)	{		sprintf(pBuffer, "launchid %d\n", pProcesses[i].pszSpawnID);		WriteFile(g_hMPDPipe, pBuffer, strlen(pBuffer), &dwNumWritten, NULL);		GetString(g_hMPDOutputPipe, pProcesses[i].pszLaunchID);	}	// store the process information in the group structure	BNR_Group_node *pRemoteGroup = (BNR_Group_node*)remote_group;	SpawnedProcessNode *pProcessNode = new SpawnedProcessNode;	pProcessNode->nProc = count;	pProcessNode->pProcesses = pProcesses;	pProcessNode->pNext = pRemoteGroup->pProcessList;	pRemoteGroup->pProcessList = pProcessNode;	// Increase the size of the remote group 	((BNR_Group_node*)remote_group)->nSize += count;	return BNR_SUCCESS;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -