⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 nt_ipvishm_priv.cpp

📁 MPICH是MPI的重要研究,提供了一系列的接口函数,为并行计算的实现提供了编程环境.
💻 CPP
📖 第 1 页 / 共 3 页
字号:
		else if (g_bUseDatabase)		{			///////////////////////////////////////////////////////////////////////			// Use InDone and PassThroughDone keys to create a barrier here.			// Then use the ThroughDone key to guarantee no more database accesses.			///////////////////////////////////////////////////////////////////////			char pValue[100];			int length = 100;			// Everyone puts an InDone message into the database			g_Database.Put("InDone", (void*)"yes", 4, false);			if (g_nIproc == 0)			{				// Process zero consumes all the InDone messages ...				for (int i=0; i<g_nNproc; i++)				{					length = 100;					g_Database.Get("InDone", pValue, &length);				}				// ... and then put a PassThroughDone message				g_Database.Put("PassThroughDone", (void*)"yes", 4);			}			// Everyone waits for the PassThroughDone message from process zero ...			length = 100;			g_Database.Get("PassThroughDone", pValue, &length);			// ... and then puts a ThroughDone message			g_bViClosing = true;			g_Database.Put("ThroughDone", (void*)"yes", 4, false);			if (g_nIproc == 0)			{				// Process zero consumes all the ThroughDone messages				for (int i=0; i<g_nNproc; i++)				{					length = 100;					g_Database.Get("ThroughDone", pValue, &length);				}				// When all the ThroughDone messages have been consumed, we can				// guarantee that there will be no more use of the database.				// So, it is safe for process zero to delete the branch in the				// database corresponding to this job.				g_Database.Delete();			}		}		else		{			// Signal that the current process is in End			SendInDoneMsg();						WaitForSingleObject(g_hOkToPassThroughDone, INFINITE);			CloseHandle(g_hOkToPassThroughDone);						if (g_nIproc == 0)			{				// Wait for everyone else to arrive here				WaitForSingleObject(g_hAllInDoneEvent, INFINITE);				CloseHandle(g_hAllInDoneEvent);			}						// Signal the control loop thread to stop			SetEvent(g_hStopControlLoopEvent);			if (g_hControlLoopThread != NULL)			{			    WaitForSingleObject(g_hControlLoopThread, MPICH_SHORT_TIMEOUT);			    CloseHandle(g_hControlLoopThread);			    g_hControlLoopThread = NULL;			}		}				if (g_hCommPortThread != NULL)		{			// Signal the communication thread to stop			DPRINTF(("process %d: MPID_NT_ipvishm_End signalling CommPortThread to exit.\n", g_nIproc));			g_nCommPortCommand = NT_COMM_CMD_EXIT;			SetEvent(g_hCommPortEvent);						// Assuming there aren't any blocking calls pending			// the CommThread should exit soon after signalling g_hCommEvent			if (WaitForSingleObject(g_hCommPortThread, MPICH_SHORT_TIMEOUT) == WAIT_TIMEOUT)			{				//nt_error("wait for CommThread to exit timed out", 1);				LogMsg(TEXT("wait for CommPortThread to exit in End timed out"));				TerminateThread(g_hCommPortThread, 0);			}						// Close all the communication sockets			for (int i=0; i<g_nNproc; i++)			{				if (g_pProcTable[i].sock_event != NULL)				{					// Close the socket					NT_Tcp_closesocket(g_pProcTable[i].sock, g_pProcTable[i].sock_event);					g_pProcTable[i].sock = INVALID_SOCKET;					g_pProcTable[i].sock_event = NULL;					CloseHandle(g_pProcTable[i].msg.ovl.hEvent);				}				CloseHandle(g_pProcTable[i].hValidDataEvent);			}		}	}	// Clean up the shared memory stuff	EndSMP();	// Clean up the VIA stuff	EndVI();	// Clean up the BNR interface	if (g_bUseBNR)		BNR_Finalize();	// Free up allocated memory	if (g_pProcTable != NULL)	{		delete g_pProcTable;		g_pProcTable = NULL;	}	if (g_bMPDFinalize)	    easy_socket_finalize();	WSACleanup();}int MPID_NT_ipvishm_exitall(char *msg, int code){    nt_error(msg, code);    return 0;}int MPID_NT_ipvishm_is_shm( int rank ){    return g_pProcTable[rank].shm;}// Function name	: nt_tcp_shm_proc_info// Description	    : fills hostname and exename with information for //                    the i'th process and returns the process id of //                    that process// Return type		: int // Argument         : int i// Argument         : char **hostname// Argument         : char **exenameint nt_ipvishm_proc_info(int i, char **hostname, char **exename){	//DPRINTF(("nt_ipvishm_proc_info called for process %d.\n", i));	// Check bounds	if ((i < 0) || (i >= g_nNproc))		return -1;	// Check to see whether the information needs to be retrieved	if (g_pProcTable[i].pid == 0)	{		if (g_bUseBNR)		{			char pszKey[100];			char pszTemp[100];			sprintf(pszKey, "ListenHost%d", i);			BNR_Get(g_myBNRgroup, pszKey, g_pProcTable[i].host);			sprintf(pszKey, "Executable%d", i);			BNR_Get(g_myBNRgroup, pszKey, g_pProcTable[i].exename);			sprintf(pszKey, "pid%d", i);			BNR_Get(g_myBNRgroup, pszKey, pszTemp);			g_pProcTable[i].pid = atoi(pszTemp);		}		else if (g_bUseDatabase)		{			char pszKey[100];			char pszTemp[100];			int length = NT_HOSTNAME_LEN;			sprintf(pszKey, "ListenHost%d", i);			g_Database.Get(pszKey, g_pProcTable[i].host, &length);			length = NT_EXENAME_LEN;			sprintf(pszKey, "Executable%d", i);			g_Database.Get(pszKey, g_pProcTable[i].exename, &length);			length = 100;			sprintf(pszKey, "pid%d", i);			g_Database.Get(pszKey, pszTemp, &length);			g_pProcTable[i].pid = atoi(pszTemp);			//length = sizeof(int);			//g_Database.Get(pszKey, &g_pProcTable[i].pid, &length);		}		else			GetProcessInfo(i);	}	// Return a pointer to the information in the proc table	// I assume the buffers will not be modified only read	*hostname = g_pProcTable[i].host;	*exename = g_pProcTable[i].exename;	return g_pProcTable[i].pid;}// Function name	: nt_error// Description	    : Prints an error message and exits// Return type		: void // Argument         : char *string// Argument         : int valuevoid nt_error(char *string, int value){	printf("Error %d, process %d:\n   %s\n", value, g_nIproc, string);fflush(stdout);    	// Signal the threads to stop and close their socket connections	DPRINTF(("process %d: nt_error signalling CommunicationThread to exit.\n", g_nIproc);fflush(stdout));	g_nCommPortCommand = NT_COMM_CMD_EXIT;	SetEvent(g_hCommPortEvent);	// Close all the communication sockets	if (g_pProcTable != NULL)	{		for (int i=0; i<g_nNproc; i++)		{			if (g_pProcTable[i].sock_event != NULL)			{				NT_Tcp_closesocket(g_pProcTable[i].sock, g_pProcTable[i].sock_event);				g_pProcTable[i].sock = INVALID_SOCKET;				g_pProcTable[i].sock_event = NULL;			}		}	}	if (g_bUseBNR)		BNR_Finalize();	WSACleanup();	ExitProcess(value);}void PrintWinSockError(int error){	HLOCAL str;	int num_bytes;	num_bytes = FormatMessage(		FORMAT_MESSAGE_FROM_SYSTEM |		FORMAT_MESSAGE_ALLOCATE_BUFFER,		0,		error,		MAKELANGID( LANG_NEUTRAL, SUBLANG_DEFAULT ),		(LPTSTR) &str,		0,0);	if (strlen((const char*)str))	    printf("%s", str);	else	    printf("\n");	LocalFree(str);}// Function name	: nt_error_socket// Description	    : Prints an error message and exits// Return type		: void // Argument         : char *string// Argument         : int valuevoid nt_error_socket(char *string, int value){	printf("Error %d, process %d:\n   %s\n   ", value, g_nIproc, string);	PrintWinSockError(value);	fflush(stdout);    	// Signal the threads to stop and close their socket connections	DPRINTF(("process %d: nt_error signalling CommunicationThread to exit.\n", g_nIproc);fflush(stdout));	g_nCommPortCommand = NT_COMM_CMD_EXIT;	SetEvent(g_hCommPortEvent);	// Close all the communication sockets	if (g_pProcTable != NULL)	{		for (int i=0; i<g_nNproc; i++)		{			if (g_pProcTable[i].sock_event != NULL)			{				NT_Tcp_closesocket(g_pProcTable[i].sock, g_pProcTable[i].sock_event);				g_pProcTable[i].sock = INVALID_SOCKET;				g_pProcTable[i].sock_event = NULL;			}		}	}	if (g_bUseBNR)		BNR_Finalize();	WSACleanup();	ExitProcess(value);}// Function name	: NT_PIbsend// Description	    : Sends the buffer to process to, establishing a connection if necessary// Return type		: int // Argument         : int type// Argument         : void *buffer// Argument         : int length// Argument         : int to// Argument         : int datatypeint NT_PIbsend(int type, void *buffer, int length, int to, int datatype){	DPRINTF(("NT_PIbsend called: %d to %d, tag: %d, length: %d\n", g_nIproc, to, type, length));	// Handle the special case of sending to oneself	if (to == g_nIproc)	{		MessageQueue::MsgQueueElement *pElement;		void *pBuf = g_MsgQueue.GetBufferToFill(type, length, g_nIproc, &pElement);		if (pBuf == NULL)			nt_error("NT_PIbsend: MessageQueue.GetBuffer failed.", 1);		memcpy(pBuf, buffer, length);		if (!g_MsgQueue.SetElementEvent(pElement))			nt_error("NT_PIbsend: MessageQueue.SetElementEvent failed", 1);		return 0;	}	// Check bounds	if (to < 0 || to >= g_nNproc)		MakeErrMsg(1, "Send out of range: %d is not between 0 and %d", to, g_nNproc);	if (g_pProcTable[to].shm)	{		NT_ShmSend(type, buffer, length, to);	}	else	{		if (g_pProcTable[to].via)		{			NT_ViSend(type, buffer, length, to);		}		else		{			if (g_pProcTable[to].sock == INVALID_SOCKET)			{				DPRINTF(("making a connection to %d\n", to));				if (!ConnectTo(to))					MakeErrMsg(1, "NT_PIbsend: Unable to connect to process %d", to);			}			/*			if (SendBlocking(g_pProcTable[to].sock, (char*)&type, sizeof(int), 0) == SOCKET_ERROR)				nt_error_socket("NT_PIbsend: send type failed.", WSAGetLastError());			if (SendBlocking(g_pProcTable[to].sock, (char*)&length, sizeof(int), 0) == SOCKET_ERROR)				nt_error_socket("NT_PIbsend: send length failed", WSAGetLastError());			if (SendBlocking(g_pProcTable[to].sock, (char*)buffer, length, 0) == SOCKET_ERROR)				nt_error_socket("NT_PIbsend: send buffer failed", WSAGetLastError());			*/			if (SendStreamBlocking(g_pProcTable[to].sock, (char*)buffer, length, type) == SOCKET_ERROR)				nt_error_socket("NT_PIbsend: send msg failed.", WSAGetLastError());		}	}	DPRINTF(("type: %d, length: %d sent to %d\n", type, length, to));	return 0;}// Function name	: NT_PInsend// Description	    : // Return type		: int // Argument         : int type// Argument         : void *buffer// Argument         : int length// Argument         : int to// Argument         : int datatype// Argument         : int *pIdint NT_PInsend(int type, void *buffer, int length, int to, int datatype, int *pId){	// Do a blocking send	NT_PIbsend(type, buffer, length, to, datatype);	// Set the handle to be finished	pId[0] = 0;	return 0;}// Function name	: NT_PIbrecv// Description	    : // Return type		: int // Argument         : int type// Argument         : void *buffer// Argument         : int length// Argument         : int datatypeint NT_PIbrecv(int type, void *buffer, int length, int datatype){	/*	DPRINTF(("NT_PIbrecv called: %d type: %d, length: %d\n", g_nIproc, type, length));		if (!g_MsgQueue.FillThisBuffer(type, buffer, &length, &g_nLastRecvFrom))	{		if (length == -1)			return MPI_ERR_COUNT;		else			nt_error("Recv:FillBuffer failed.\n", 1);	}	DPRINTF(("type: %d len: %d received from %d\n", type, length, g_nLastRecvFrom));	return 0;	/*/	int pId[10];	g_MsgQueue.PostBufferForFilling(type, buffer, length, pId);	g_MsgQueue.Wait(pId);	g_nLastRecvFrom = pId[3];	return 0;	//*/}// Function name	: NT_PInrecv// Description	    : // Return type		: int // Argument         : int type// Argument         : void *buffer// Argument         : int length// Argument         : int datatype// Argument         : int *pIdint NT_PInrecv(int type, void *buffer, int length, int datatype, int *pId){	DPRINTF(("NT_PInrecv called: %d type: %d, length: %d\n", g_nIproc, type, length));	return (g_MsgQueue.PostBufferForFilling(type, buffer, length, pId)) ? 0 : 1;}// Function name	: NT_PIwait// Description	    : // Return type		: int // Argument         : int *pIdint NT_PIwait(int *pId){	if (pId == NULL)		nt_error("wait called on invalid object", 1);	if (pId[0] == 0)		return 1;	return (g_MsgQueue.Wait(pId)) ? 1 : 0;}// Function name	: NT_PInstatus// Description	    : // Return type		: int // Argument         : int *pIdint NT_PInstatus(int *pId){	if (pId[0] == 0)		return 1;	//return (g_MsgQueue.Test(pId)) ? 1 : 0;	if (g_MsgQueue.Test(pId))		return 1;	Sleep(0);	return 0;}// Function name	: NT_PInprobe// Description	    : Returns true if a message is available with the tag 'type'// Return type		: int // Argument         : int typeint NT_PInprobe(int type){	//DPRINTF(("NT_PInprobe called.\n"));	if (g_MsgQueue.Available(type, g_nLastRecvFrom))	{		return 1;	}	Sleep(0);	return 0;}// Function name	: MPID_Wtime// Description	    : // Return type		: void // Argument         : double *tvoid MPID_Wtime(double *t){	LARGE_INTEGER nLargeInt;	QueryPerformanceCounter(&nLargeInt);	*t = double(nLargeInt.QuadPart) / (double)g_nPerfFrequency.QuadPart;}// Function name	: MPID_Wtick// Description	    : // Return type		: void // Argument         : double *tvoid MPID_Wtick(double *t){	*t = 1.0 / (double)g_nPerfFrequency.QuadPart;}// Function name	: NT_PIgimax// Description	    : Does a global max operation. Used for setting up //                    heterogeneous environments.//                    Needed when MPID_HAS_HETERO is defined.//                    What it is supposed to do, I have no idea.// Return type		: int // Argument         : void *val// Argument         : int n// Argument         : int work// Argument         : int procset#ifdef MPID_HAS_HETEROint NT_PIgimax(void *val, int n, int work, int procset){	DPRINTF(("NT_PIgimax called.\n"));    return -1;}#endif

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -