⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 nt_ipvishm_priv.cpp

📁 MPICH是MPI的重要研究,提供了一系列的接口函数,为并行计算的实现提供了编程环境.
💻 CPP
📖 第 1 页 / 共 3 页
字号:
	if (strlen(g_pszHostName) < 1)	{		if (gethostname(g_pszHostName, NT_HOSTNAME_LEN) == SOCKET_ERROR)		{			err = WSAGetLastError();			if (err == WSAEFAULT)				AbortInit(err, "Cannot handle hostnames longer than 100 characters");			else				AbortInit(err, "gethostname failed");		}		// Convert the host name to an ip string to make connection establishment more robust		NT_Tcp_get_ip_string(g_pszHostName, g_pszHostName);	}	if (g_bUseBNR)	{		char pBuffer[100];		if (MPID_MyWorldRank == 0)			BNR_Put(g_myBNRgroup, "MPICH_ROOT", g_pszHostName, -1);		BNR_Fence(g_myBNRgroup);		BNR_Get(g_myBNRgroup, "MPICH_ROOT", pBuffer);		SetEnvironmentVariable("MPICH_ROOT", pBuffer);		// Remove this line later		sprintf(pszIproc, "%d", MPID_MyWorldRank);	}	// Read in the variables passed in the environment	if (GetEnvironmentVariable("MPICH_JOBID", g_pszJobID, 100) == 0)		g_bMPIRunUsed = false;	g_bUseDatabase = false;	if (GetEnvironmentVariable("MPICH_DBS", pszTemp, 100))	{		char *token;		token = strtok(pszTemp, ":");		if (token != NULL)			strcpy(pszDBSHost, token);		token = strtok(NULL, " \n");		if (token != NULL)			strcpy(pszDBSPort, token);		g_bUseDatabase = true;	}	else	{		if ( GetEnvironmentVariable("MPICH_DBS_HOST", pszDBSHost, 100) && 			 GetEnvironmentVariable("MPICH_DBS_PORT", pszDBSPort, 10) )		{			g_bUseDatabase = true;		}	}	if (g_bUseDatabase)	{		char pszEnv[1024];		int length = 1024;		g_Database.SetID(g_pszJobID);		g_Database.Init();		if (GetEnvironmentVariable("MPICH_IPROC", pszIproc, 10) == 0)		{			// If there is no iproc variable in the environment then get a 			// generic environment from the dbs server.			g_Database.Get("env", pszEnv, &length);			SetEnvironmentString(pszEnv);			GetEnvironmentVariable("MPICH_IPROC", pszIproc, 10);		}		else		{			if (GetEnvironmentVariable("MPICH_NPROC", pszNproc, 10) == 0)			{				// If there is an iproc but no nproc envrionment variable				// then get the environment specific to this process from 				// the dbs server.				char pszEnvKey[100];				sprintf(pszEnvKey, "env%d", atoi(pszIproc));				g_Database.Get(pszEnvKey, pszEnv, &length);				SetEnvironmentString(pszEnv);			}			// If there is an iproc and nproc environment variable then get 			// nothing from the dbs server.		}	}	else	{		if (GetEnvironmentVariable("MPICH_IPROC", pszIproc, 10) == 0)		{			// If an application is run without MPIRun then it is the first 			// and only process			strcpy(pszIproc, "0");			g_bMPIRunUsed = false;		}		if (GetEnvironmentVariable("MPICH_ROOT", pszTemp, 100))		{			char *token;			token = strtok(pszTemp, ":");			if (token != NULL)				strcpy(g_pszRootHostName, token);			token = strtok(NULL, " \n");			if (token != NULL)				strcpy(pszRootPort, token);		}		else		{			if (GetEnvironmentVariable("MPICH_ROOTHOST", g_pszRootHostName, 100) == 0)			{				unsigned long size = 100;				GetComputerName(g_pszRootHostName, &size);				g_bMPIRunUsed = false;			}			if (GetEnvironmentVariable("MPICH_ROOTPORT", pszRootPort, 10) == 0)			{				strcpy(pszRootPort, "-1");				g_bMPIRunUsed = false;			}		}		g_nRootPort = atoi(pszRootPort);		GetEnvironmentVariable("MPICH_EXTRA", pszExtra, 100);	}	if (GetEnvironmentVariable("MPICH_NPROC", pszNproc, 10) == 0)	{		// If an application is run without MPIRun then it is the only process		strcpy(pszNproc, "1");		g_bMPIRunUsed = false;	}	if (GetEnvironmentVariable("MPICH_NUMCOMMPORTS", pszTemp, 100))		g_NumCommPortThreads = atoi(pszTemp);	if (GetEnvironmentVariable("MPICH_NOCOMMPORT", pszTemp, 100))		bCommPortAvailable = false;	MPID_MyWorldRank = g_nIproc = atoi(pszIproc);	MPID_MyWorldSize = g_nNproc = atoi(pszNproc);	// Save the high performance counter frequency	QueryPerformanceFrequency(&g_nPerfFrequency);	if (g_nIproc == 0)		ClearLog();	if (g_nNproc < 1)		AbortInit(1, "Invalid number of processes: %d", g_nNproc);	g_pProcTable = new NT_ipvishm_ProcEntry[g_nNproc];	if (g_pProcTable == NULL)		AbortInit(1, "Unable to allocate memory for the proc table in MPID_Init");	for (i=0; i<g_nNproc; i++)	{		g_pProcTable[i].exename[0] = '\0';		g_pProcTable[i].host[0] = '\0';		g_pProcTable[i].listen_port = 0;		g_pProcTable[i].control_port = 0;		g_pProcTable[i].pid = 0;		g_pProcTable[i].sock = INVALID_SOCKET;		g_pProcTable[i].sock_event = NULL;		g_pProcTable[i].hConnectLock = NULL;		g_pProcTable[i].hValidDataEvent = CreateEvent(NULL, TRUE, FALSE, NULL);		g_pProcTable[i].shm = 0;		g_pProcTable[i].via = 0;		g_pProcTable[i].msg.ovl.hEvent = NULL;		g_pProcTable[i].msg.state = NT_MSG_READING_TAG;		g_pProcTable[i].multinic = FALSE;		g_pProcTable[i].num_nics = 0;	}	g_pProcTable[g_nIproc].num_nics = GetLocalIPs(g_pProcTable[g_nIproc].nic_ip, MAX_NUM_NICS);	if (g_pProcTable[g_nIproc].num_nics > 1)	    g_pProcTable[g_nIproc].multinic = TRUE;	char pszNetMask[50];	if (GetEnvironmentVariable("MPICH_NETMASK", pszNetMask, 50))	{	    char *token = strtok(pszNetMask, "/");	    if (token != NULL)	    {		token = strtok(NULL, "\n");		if (token != NULL)		{		    g_nNicNet = GetIP(pszNetMask);		    g_nNicMask = GetMask(token);		    g_bMultinic = true;		}	    }	}	else	{	    g_nNicNet = 0;	    g_nNicMask = 0;	    g_bMultinic = false;	}	bool bFixedPortUsed = false;	if (g_nRootPort > 0 && g_nIproc == 0)	{		g_pProcTable[0].control_port = g_nRootPort;		// If a specific port was provided through the environment then		// don't write the port out to a file.		bFixedPortUsed = true; 	}	// The executable name is the full path to the executable	HMODULE hModule = GetModuleHandle(NULL);	if (!GetModuleFileName(hModule, 			g_pProcTable[g_nIproc].exename, NT_EXENAME_LEN))		strcpy(g_pProcTable[g_nIproc].exename, "unknown.exe");	strcpy(g_pProcTable[g_nIproc].host, g_pszHostName);	g_pProcTable[g_nIproc].pid = (long)GetCurrentProcessId();	// If all the processes can reach each other through shared memory then there is	// no need to create the socket completion port threads.	int nNumShmQueues = GetShmemClique();	if (nNumShmQueues == g_nNproc)		bCommPortAvailable = false;	/*	// If all the processes can reach each other through VI's then there is	// no need to create the socket completion port threads.	GetViClique();	if (everyonecantalkvi)		bCommPortAvailable = false;	//*/	if (bCommPortAvailable) // If there is no completion port available (Win9x) then socket communication is not available	{		DWORD dwThreadID;		HANDLE hReadyEvent = CreateEvent(NULL, TRUE, FALSE, NULL);		// Start the communication thread		hReadyEvent = CreateEvent(NULL, TRUE, FALSE, NULL);		if (hReadyEvent == NULL)			AbortInit(1, "Unable to create an event in MPID_Init");		g_hCommPortThread = CreateThread(			NULL, 0, 			(LPTHREAD_START_ROUTINE) CommPortThread,			hReadyEvent,			NT_THREAD_STACK_SIZE, &dwThreadID);		if (g_hCommPortThread == NULL)			AbortInit(GetLastError(), "Unable to spawn CommPortThread");		if (WaitForSingleObject(hReadyEvent, MPICH_SHORT_TIMEOUT) == WAIT_TIMEOUT)			AbortInit(1, "Communication thread setup timed out");		CloseHandle(hReadyEvent);	}	else		g_hCommPortThread = NULL;	if (g_bUseBNR)	{		char pszKey[100], pszValue[MAX_PATH];		sprintf(pszKey, "ListenPort%d", g_nIproc);		sprintf(pszValue, "%d", g_pProcTable[g_nIproc].listen_port);		BNR_Put(g_myBNRgroup, pszKey, pszValue, -1);		sprintf(pszKey, "ListenHost%d", g_nIproc);		strcpy(pszValue, g_pProcTable[g_nIproc].host);		BNR_Put(g_myBNRgroup, pszKey, pszValue, -1);		sprintf(pszKey, "Executable%d", g_nIproc);		strcpy(pszValue, g_pProcTable[g_nIproc].exename);		BNR_Put(g_myBNRgroup, pszKey, pszValue, -1);		sprintf(pszKey, "pid%d", g_nIproc);		sprintf(pszValue, "%d", g_pProcTable[g_nIproc].pid);		BNR_Put(g_myBNRgroup, pszKey, pszValue, -1);		// Put anything for VI ???	}	else if (g_bUseDatabase)	{		char pszKey[100], pszValue[MAX_PATH];		sprintf(pszKey, "ListenPort%d", g_nIproc);		sprintf(pszValue, "%d", g_pProcTable[g_nIproc].listen_port);		g_Database.Put(pszKey, pszValue, strlen(pszValue)+1);		sprintf(pszKey, "ListenHost%d", g_nIproc);		strcpy(pszValue, g_pProcTable[g_nIproc].host);		g_Database.Put(pszKey, pszValue, strlen(pszValue)+1);		sprintf(pszKey, "Executable%d", g_nIproc);		strcpy(pszValue, g_pProcTable[g_nIproc].exename);		g_Database.Put(pszKey, pszValue, strlen(pszValue)+1);		sprintf(pszKey, "pid%d", g_nIproc);		sprintf(pszValue, "%d", g_pProcTable[g_nIproc].pid);		g_Database.Put(pszKey, pszValue, strlen(pszValue)+1);		//g_Database.Put(pszKey, &g_pProcTable[g_nIproc].pid, sizeof(int));		// Put anything for VI ???	}	else	{		DWORD dwThreadID;		HANDLE hReadyEvent = CreateEvent(NULL, TRUE, FALSE, NULL);		// Start the control thread		ResetEvent(hReadyEvent);		g_hControlLoopThread = CreateThread(			NULL, 0, 			(LPTHREAD_START_ROUTINE) ControlLoopThread,			hReadyEvent,			NT_THREAD_STACK_SIZE, &dwThreadID);		if (g_hControlLoopThread == NULL)			AbortInit(GetLastError(), "Unable to spawn ControlLoopThread");		if (WaitForSingleObject(hReadyEvent, MPICH_SHORT_TIMEOUT) == WAIT_TIMEOUT)			AbortInit(1, "Control thread setup timed out");				if (g_nIproc == 0)		{			ResetEvent(hReadyEvent);						// Why do I use a global variable instead of just using 			// g_pProcTable[0].control_port?			g_nRootPort = g_pProcTable[0].control_port;						if (g_bMPIRunUsed && !bFixedPortUsed)			{				if (strnicmp(pszExtra, "shm:", 4) == 0)				{					// Write the port number to the temporary memory mapped file 					// described by pszExtra					HANDLE hMapping;					LONG *pMapping;										SECURITY_ATTRIBUTES saAttr;					saAttr.nLength = sizeof(SECURITY_ATTRIBUTES);					saAttr.lpSecurityDescriptor = NULL;					saAttr.bInheritHandle = FALSE;										// Create a mapping from the page file					hMapping = CreateFileMapping(						INVALID_HANDLE_VALUE,						&saAttr, //NULL,						PAGE_READWRITE,						0, sizeof(LONG),						&pszExtra[4]);										if (hMapping == NULL)						AbortInit(GetLastError(), "Unable to create a memory mapping for inter-process communication");					if (GetLastError() != ERROR_ALREADY_EXISTS)						AbortInit(1, "MPIRun has not created the memory mapping to place the root port number in");										// Map the file and save the pointer to the base of the mapped file					pMapping = (LONG *)MapViewOfFile(						hMapping,						FILE_MAP_WRITE,						0,0,						sizeof(LONG));										if (pMapping == NULL)						AbortInit(GetLastError(), "Unable to memory map the view of the ipc file");										// Write the listening port to the ipc shared memory file					*pMapping = g_pProcTable[0].control_port;										// Wait for the launcher to read the data before closing the mapping					while (*pMapping != 0)						Sleep(200);										UnmapViewOfFile(pMapping);					CloseHandle(hMapping);				}				else if (strnicmp(pszExtra, "mpd:", 4) == 0)				{					// use mpd to get the root port back to mpirun					if (!PutRootPortInMPDDatabase(&pszExtra[4], g_pProcTable[0].control_port, g_pszJobID))					    AbortInit(-1, "Unable to put the root listening port in the mpd database");					g_bMPDFinalize = true;				}				else				{					// Write the port number to the temporary file 					// described by pszExtra					char str[100];					DWORD num_written;					HANDLE hFile = CreateFile(pszExtra, GENERIC_WRITE, 						FILE_SHARE_READ, NULL, OPEN_EXISTING, 0, NULL);					if (hFile == INVALID_HANDLE_VALUE)					{						int error = GetLastError();						LogMsg("CreateFile failed: error %d, file '%s'\n", error, pszExtra);						AbortInit(error, "CreateFile failed: %s", pszExtra);					}					sprintf(str, "%d\n", g_pProcTable[0].control_port);					if (!WriteFile(hFile, str, strlen(str)+2, &num_written, NULL))					{						int error = GetLastError();						LogMsg("WriteFile failed of the root control port, Error: %d", error);						CloseHandle(hFile);						AbortInit(error, "WriteFile(%s, root port) failed", pszExtra);					}					CloseHandle(hFile);				}			}						SendInitDataToRoot();						// Wait for g_hControlLoop(Client)Thread(s) to signal that all 			// processes have connected. For now, this is set to INFINITE.  			// What value would be appropriate to give the processes time to 			// launch and connect back to the root?			WaitForSingleObject(g_hEveryoneConnectedEvent, INFINITE);					}		else		{		    if (strnicmp(pszExtra, "mpd:", 4) == 0)		    {			if (!ParseMPDString(&pszExtra[4]))			    AbortInit(-1, "Unable to parse the mpd host and port\n");			easy_socket_init();			g_bMPDFinalize = true;		    }			// Send the root process or the database server information so it			// can inform other processes how to connect to this process			SendInitDataToRoot();		}		CloseHandle(hReadyEvent);	}	pszTemp[0] = '\0';	GetEnvironmentVariable("MPICH_SINGLETHREAD", pszTemp, 100);	if (pszTemp[0] == '1')	{		SetEnvironmentVariable("MPICH_SHM_SINGLETHREAD", "1");		SetEnvironmentVariable("MPICH_VI_SINGLETHREAD", "1");	}	// Initialize the shared memory stuff	try {	InitSMP();	}catch(...)	{	    nt_error("exception thrown in InitSMP caught in Init", 1);	}	try{	// Initialize the VIA stuff	if (InitVI())	{		pszTemp[0] = '\0';		pszTemp[1] = '\0';		GetEnvironmentVariable("MPICH_SHM_SINGLETHREAD", pszTemp, 100);		GetEnvironmentVariable("MPICH_VI_SINGLETHREAD", &pszTemp[1], 99);		if (pszTemp[0] == '1' && pszTemp[1] == '1' && g_pShmemQueue)			g_MsgQueue.SetProgressFunction(PollShmemAndViQueues);	}	}catch(...)	{		nt_error("exception thrown in InitVi caught in Init", 1);	}	}catch(...)	{		nt_error("Exception caught in Init.", 1);	}}// Function name	: MPID_NT_ipvishm_End// Description	    : Finishes any outstanding IO, closes all connections, and waits for //                    everyone else to finish?// Return type		: void void MPID_NT_ipvishm_End(){	//DPRINTF(("MPID_NT_ipvishm_End() called.\n"));	g_bInNT_ipvishm_End = true;	if (g_bMPDFinalize)	    UpdateMPIFinalizedInMPD();    if (g_nNproc > 1)	{		if (g_bUseBNR)		{			char pKey[100], pValue[100];			sprintf(pKey, "InDone%d", g_nIproc);			BNR_Put(g_myBNRgroup, pKey, "yes", 0);			BNR_Fence(g_myBNRgroup);			if (g_nIproc == 0)			{				for (int i=0; i<g_nNproc; i++)				{					sprintf(pKey, "InDone%d", i);					BNR_Get(g_myBNRgroup, pKey, pValue);				}				BNR_Put(g_myBNRgroup, "AllDone", "yes", -1);			}			BNR_Fence(g_myBNRgroup);			BNR_Get(g_myBNRgroup, "AllDone", pValue);		}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -