⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 nt_ipvishm_comport.cpp

📁 MPICH是MPI的重要研究,提供了一系列的接口函数,为并行计算的实现提供了编程环境.
💻 CPP
📖 第 1 页 / 共 2 页
字号:
#include "nt_global_cpp.h"#include "bnrfunctions.h"#include <stdio.h>#include <stdlib.h>int g_NumCommPortThreads = 2;#define EXIT_WORKER_KEY	-1HANDLE g_hCommPortThread, g_hCommPort;HANDLE g_hCommPortEvent = CreateEvent(NULL, TRUE, FALSE, NULL);HANDLE g_hAddSocketMutex = CreateMutex(NULL, FALSE, NULL);int g_nCommPortCommand;// Function name	: CommPortWorkerThread// Description	    : // Return type		: void void CommPortWorkerThread(){	DWORD dwKey, nBytes;	OVERLAPPED *p_Ovl;	int error;	while (true)	{		if (GetQueuedCompletionStatus(g_hCommPort, &nBytes, &dwKey, &p_Ovl, INFINITE))		{			DPRINTF(("COMMPORT::%d bytes on socket %d\n", nBytes, dwKey));			if (dwKey == EXIT_WORKER_KEY)				ExitThread(0);			if (nBytes)			{				//printf("COMMPORT::%d bytes on socket %d\n", nBytes, dwKey);fflush(stdout);				g_pProcTable[dwKey].msg.nRemaining -= nBytes;				switch(g_pProcTable[dwKey].msg.state)				{				case NT_MSG_READING_TAG:					if (g_pProcTable[dwKey].msg.nRemaining)					{						g_pProcTable[dwKey].msg.ovl.Offset = 0;						g_pProcTable[dwKey].msg.ovl.OffsetHigh = 0;						if (!ReadFile((HANDLE)g_pProcTable[dwKey].sock, 							(char*)&g_pProcTable[dwKey].msg.tag + sizeof(int) - g_pProcTable[dwKey].msg.nRemaining,							g_pProcTable[dwKey].msg.nRemaining, &g_pProcTable[dwKey].msg.nRead, &g_pProcTable[dwKey].msg.ovl))						{							error = GetLastError();							if (error != ERROR_IO_PENDING)								MakeErrMsg(error, "CommPortWorkerThread:Post read(tag) from socket %d failed", dwKey);						}					}					else					{						g_pProcTable[dwKey].msg.state = NT_MSG_READING_LENGTH;						g_pProcTable[dwKey].msg.nRemaining = sizeof(int);						g_pProcTable[dwKey].msg.ovl.Offset = 0;						g_pProcTable[dwKey].msg.ovl.OffsetHigh = 0;						if (!ReadFile((HANDLE)g_pProcTable[dwKey].sock,							(char*)&g_pProcTable[dwKey].msg.length,							sizeof(int), &g_pProcTable[dwKey].msg.nRead, &g_pProcTable[dwKey].msg.ovl))						{							error = GetLastError();							if (error != ERROR_IO_PENDING)								MakeErrMsg(error, "CommPortWorkerThread:Post read(length) from socket %d failed", dwKey);						}					}					break;				case NT_MSG_READING_LENGTH:					if (g_pProcTable[dwKey].msg.nRemaining)					{						g_pProcTable[dwKey].msg.ovl.Offset = 0;						g_pProcTable[dwKey].msg.ovl.OffsetHigh = 0;						if (!ReadFile((HANDLE)g_pProcTable[dwKey].sock, 							(char*)&g_pProcTable[dwKey].msg.length + sizeof(int) - g_pProcTable[dwKey].msg.nRemaining,							g_pProcTable[dwKey].msg.nRemaining, &g_pProcTable[dwKey].msg.nRead, &g_pProcTable[dwKey].msg.ovl))						{							error = GetLastError();							if (error != ERROR_IO_PENDING)								MakeErrMsg(error, "CommPortWorkerThread:Post read(length) from socket %d failed", dwKey);						}					}					else					{						g_pProcTable[dwKey].msg.buffer = g_MsgQueue.GetBufferToFill(g_pProcTable[dwKey].msg.tag, g_pProcTable[dwKey].msg.length, dwKey, &g_pProcTable[dwKey].msg.pElement);						g_pProcTable[dwKey].msg.nRemaining = g_pProcTable[dwKey].msg.length;						g_pProcTable[dwKey].msg.state = NT_MSG_READING_BUFFER;						g_pProcTable[dwKey].msg.ovl.Offset = 0;						g_pProcTable[dwKey].msg.ovl.OffsetHigh = 0;						if (!ReadFile((HANDLE)g_pProcTable[dwKey].sock, g_pProcTable[dwKey].msg.buffer, g_pProcTable[dwKey].msg.length, &g_pProcTable[dwKey].msg.nRead, &g_pProcTable[dwKey].msg.ovl))						{							error = GetLastError();							if (error == ERROR_NO_SYSTEM_RESOURCES)							{								int n = g_pProcTable[dwKey].msg.length / 2;								while (error == ERROR_NO_SYSTEM_RESOURCES)								{									if (!ReadFile((HANDLE)g_pProcTable[dwKey].sock, g_pProcTable[dwKey].msg.buffer,										n, &g_pProcTable[dwKey].msg.nRead, &g_pProcTable[dwKey].msg.ovl))									{										error = GetLastError();									}									else										error = ERROR_SUCCESS;									n = n/2;									if (n == 0)										MakeErrMsg(1, "Not enough system resources available to post a read from socket %d\n", dwKey);								}								if (error != ERROR_SUCCESS && error != ERROR_IO_PENDING)									MakeErrMsg(error, "CommPortWorkerThread:Post read(buffer[%d]) from socket %d failed", n*2, dwKey);							}							else if (error != ERROR_IO_PENDING)								MakeErrMsg(error, "CommPortWorkerThread:Post read(buffer[%d]) from socket %d failed", g_pProcTable[dwKey].msg.length, dwKey);						}					}					break;				case NT_MSG_READING_BUFFER:					if (g_pProcTable[dwKey].msg.nRemaining)					{						g_pProcTable[dwKey].msg.ovl.Offset = 0;						g_pProcTable[dwKey].msg.ovl.OffsetHigh = 0;						if (!ReadFile((HANDLE)g_pProcTable[dwKey].sock, 							&(((char*)g_pProcTable[dwKey].msg.buffer)[g_pProcTable[dwKey].msg.length - g_pProcTable[dwKey].msg.nRemaining]),							g_pProcTable[dwKey].msg.nRemaining, &g_pProcTable[dwKey].msg.nRead, &g_pProcTable[dwKey].msg.ovl))						{							error = GetLastError();							if (error == ERROR_NO_SYSTEM_RESOURCES)							{								int n = g_pProcTable[dwKey].msg.nRemaining / 2;								while (error == ERROR_NO_SYSTEM_RESOURCES)								{									if (!ReadFile((HANDLE)g_pProcTable[dwKey].sock,										&(((char*)g_pProcTable[dwKey].msg.buffer)[g_pProcTable[dwKey].msg.length - g_pProcTable[dwKey].msg.nRemaining]),										n, &g_pProcTable[dwKey].msg.nRead, &g_pProcTable[dwKey].msg.ovl))									{										error = GetLastError();									}									else										error = ERROR_SUCCESS;									n = n/2;									if (n == 0)										MakeErrMsg(1, "Not enough system resources available to post a read from socket %d\n", dwKey);								}								if (error != ERROR_SUCCESS && error != ERROR_IO_PENDING)									MakeErrMsg(error, "CommPortWorkerThread:Post read(buffer[%d]) from socket %d failed", n*2, dwKey);							}							else if (error != ERROR_IO_PENDING)								MakeErrMsg(error, "CommPortWorkerThread:Post read(buffer[%d]) from socket %d failed", g_pProcTable[dwKey].msg.length, dwKey);						}					}					else					{						g_MsgQueue.SetElementEvent(g_pProcTable[dwKey].msg.pElement);						g_pProcTable[dwKey].msg.state = NT_MSG_READING_TAG;						g_pProcTable[dwKey].msg.nRemaining = sizeof(int);						g_pProcTable[dwKey].msg.ovl.Offset = 0;						g_pProcTable[dwKey].msg.ovl.OffsetHigh = 0;						if (!ReadFile((HANDLE)g_pProcTable[dwKey].sock,							(char*)&g_pProcTable[dwKey].msg.tag,							sizeof(int), &g_pProcTable[dwKey].msg.nRead, &g_pProcTable[dwKey].msg.ovl))						{							error = GetLastError();							if (error != ERROR_IO_PENDING)								MakeErrMsg(error, "CommPortWorkerThread:Post read(tag) from socket %d failed", dwKey);						}					}					break;				default:					break;				}			}			else			{				NT_Tcp_closesocket(g_pProcTable[dwKey].sock, g_pProcTable[dwKey].sock_event);				g_pProcTable[dwKey].sock = INVALID_SOCKET;				g_pProcTable[dwKey].sock_event = NULL;			}		}		else		{			if (!g_bInNT_ipvishm_End)			{				if (dwKey >= 0 && dwKey < (DWORD)g_nNproc && dwKey != (DWORD)g_nIproc && strlen(g_pProcTable[dwKey].host))				{					error = GetLastError();					MakeErrMsg(error, "GetQueuedCompletionStatus failed for socket %d connected to host '%s'", dwKey, g_pProcTable[dwKey].host);				}				else				{					nt_error_socket("GetQueuedCompletionStatus failed", GetLastError());				}			}		}	}}// Function name	: CommPortThread// Description	    : // Return type		: void // Argument         : HANDLE hReadyEventvoid CommPortThread(HANDLE hReadyEvent){	SOCKET listen_socket;	HANDLE ahEvent[2];				// array of events to wait on	int error = 0, num_handles=2;	SOCKET temp_socket;	WSAEVENT temp_event;	DWORD ret_val;	int remote_iproc;	int i;	BOOL opt;	char add_socket_ack;	DWORD dwThreadID;	HANDLE *hWorkers;	ahEvent[0] = g_hCommPortEvent;	// create a listening socket	if (error = NT_Tcp_create_bind_socket(&listen_socket, &ahEvent[1]))		nt_error_socket("CommPortThread: NT_Tcp_create_bind_socket failed", error);	// associate listen_socket_event with listen_socket	if (WSAEventSelect(listen_socket, ahEvent[1], FD_ACCEPT) == SOCKET_ERROR)		nt_error("CommPortThread: WSAEventSelect failed for listen_socket", 1);	if (listen(listen_socket, SOMAXCONN) == SOCKET_ERROR)		nt_error_socket("CommPortThread: listen failed", WSAGetLastError());	// get the port and local hostname for the listening socket	if (error = NT_Tcp_get_sock_info(listen_socket, g_pProcTable[g_nIproc].host, &g_pProcTable[g_nIproc].listen_port))		nt_error_socket("CommPortThread: Unable to get host and port of listening socket", error);	// Create the completion port	g_hCommPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, g_NumCommPortThreads);	if (g_hCommPort == NULL)		nt_error_socket("CommPortThread: CreateIoCompletionPort failed", GetLastError());	hWorkers = new HANDLE[g_NumCommPortThreads];	// Start the completion port threads	for (i=0; i<g_NumCommPortThreads; i++)	{	    //HANDLE hWorkerThread;	    hWorkers[i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)CommPortWorkerThread, NULL, NT_THREAD_STACK_SIZE, &dwThreadID);	    if (hWorkers[i] == NULL)		nt_error_socket("CommPortThread: CreateThread(CommPortWorkerThread) failed", GetLastError());	    //CloseHandle(hWorkerThread);	}	// Signal that the port number is valid	if (!SetEvent(hReadyEvent))		nt_error_socket("CommPortThread: SetEvent(hReadyEvent) failed", GetLastError());	while (true)	{		ret_val = WaitForMultipleObjects(num_handles, ahEvent, FALSE, INFINITE);		if (ret_val != WAIT_OBJECT_0 && ret_val != WAIT_OBJECT_0+1)		{			nt_error_socket("CommPortThread: Wait failed", GetLastError());			return;		}		// Event[0] is the event used by other threads in this process to communicate with this thread		if (WaitForSingleObject(ahEvent[0], 0) == WAIT_OBJECT_0)		{			switch (g_nCommPortCommand)			{			case NT_COMM_CMD_EXIT:				DPRINTF(("process %d: Exit command.\n", g_nIproc));				for (i=0; i<g_NumCommPortThreads; i++)					PostQueuedCompletionStatus(g_hCommPort, 0, EXIT_WORKER_KEY, NULL);				WaitForMultipleObjects(g_NumCommPortThreads, hWorkers, TRUE, 5000);				for (i=0; i<g_NumCommPortThreads; i++)				    CloseHandle(hWorkers[i]);				delete hWorkers;				CloseHandle(g_hAddSocketMutex);				CloseHandle(g_hCommPortEvent); 				CloseHandle(g_hCommPort);				closesocket(listen_socket);				WSACloseEvent(ahEvent[1]);				ExitThread(0);				break;			default:				nt_error("Invalid command sent to CommPortThread", g_nCommPortCommand);				break;			}		}		// Event[1] is the listen socket event, which is signalled when other processes whish to establish a socket connection with this process		if (WaitForSingleObject(ahEvent[1], 0) == WAIT_OBJECT_0)		{			///DPRINTF(("process %d: listen_socket signalled.\n", g_nIproc));			i=0;			opt = TRUE;			// Something in my code is causing the listen_socket event to fail to be reset by the accept call.			// For now I manually reset it here.			WSAResetEvent(ahEvent[1]);			temp_socket = accept(listen_socket, NULL, NULL);			if (temp_socket != INVALID_SOCKET)			{				// Create an event and associate it with the newly accepted socket				//if (setsockopt(temp_socket, IPPROTO_TCP, TCP_NODELAY, (char*)&opt, sizeof(BOOL)) == SOCKET_ERROR)					//nt_error_socket("setsockopt failed", WSAGetLastError());				if (setsockopt(temp_socket, IPPROTO_TCP, TCP_NODELAY, (char*)&opt, sizeof(BOOL)) == SOCKET_ERROR)				{				    error = WSAGetLastError();				    if (error == WSAENOBUFS)				    {					Sleep(250);					if (setsockopt(temp_socket, IPPROTO_TCP, TCP_NODELAY, (char*)&opt, sizeof(BOOL)) == SOCKET_ERROR)					{					    error = WSAGetLastError();					    if (error == WSAENOBUFS)					    {						Sleep(250);						if (setsockopt(temp_socket, IPPROTO_TCP, TCP_NODELAY, (char*)&opt, sizeof(BOOL)) == SOCKET_ERROR)						{						    error = WSAGetLastError();						    if (error != WSAENOBUFS)							nt_error_socket("setsockopt failed in CommPortThread", error);						}					    }					    else						nt_error_socket("setsockopt failed in CommPortThread", error);					}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -