⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 nt_ipvishm_control_loop.cpp

📁 MPICH是MPI的重要研究,提供了一系列的接口函数,为并行计算的实现提供了编程环境.
💻 CPP
📖 第 1 页 / 共 2 页
字号:
#include "nt_global_cpp.h"#include <stdio.h>int g_nNumInDone = 0;long g_nNumConnected = 0;HANDLE g_hAllInDoneEvent = CreateEvent(NULL, TRUE, FALSE, NULL);HANDLE g_hOkToPassThroughDone = CreateEvent(NULL, TRUE, FALSE, NULL);HANDLE g_hNumInDoneMutex = CreateMutex(NULL, FALSE, NULL);HANDLE g_hControlLoopThread = NULL;HANDLE g_hStopControlLoopEvent = CreateEvent(NULL, TRUE, FALSE, NULL);HANDLE g_hEveryoneConnectedEvent = CreateEvent(NULL, TRUE, FALSE, NULL);bool SendAllDoneMsg(char *host, int port);// Function name	: ControlLoopClientThread// Description	    : // Return type		: void // Argument         : ControlLoopClientArg *argvoid ControlLoopClientThread(ControlLoopClientArg *arg){	char cCmd, ack=1;	DWORD ret_val = 0;	int remote_iproc, query_n, i;	SOCKET sock;	WSAEVENT sock_event;	char temp_host[NT_HOSTNAME_LEN];	sock = arg->sock;	sock_event = arg->sock_event;	delete arg;	// save and compare the result of ReceiveBlocking	if ( ret_val = ReceiveBlocking(sock, sock_event, &cCmd, 1, 0) )	{		NT_Tcp_closesocket(sock, sock_event);		nt_error_socket("Failure to read command from ControlLoopClient connection.\n", ret_val);	}	switch (cCmd)	{	case NT_TCP_CTRL_CMD_INIT_DATA_TO_ROOT:		// Receive iproc, listen port, control port, hostname, exename, and pid		if (ret_val = ReceiveBlocking(sock, sock_event, (char*)&remote_iproc, sizeof(int), 0))			nt_error_socket("ControlLoopClientThread: recv remote_iproc failed.", ret_val);		if (ret_val = ReceiveBlocking(sock, sock_event, (char*)&g_pProcTable[remote_iproc].listen_port, sizeof(int), 0))			nt_error_socket("ControlLoopClientThread: recv listen port failed.", ret_val);		if (ret_val = ReceiveBlocking(sock, sock_event, (char*)&g_pProcTable[remote_iproc].control_port, sizeof(int), 0))			nt_error_socket("ControlLoopClientThread: recv control port failed.", ret_val);		if (ret_val = ReceiveBlocking(sock, sock_event, g_pProcTable[remote_iproc].host, NT_HOSTNAME_LEN, 0))			nt_error_socket("ControlLoopClientThread: recv remote_host failed.", ret_val);		if (ret_val = ReceiveBlocking(sock, sock_event, g_pProcTable[remote_iproc].exename, NT_EXENAME_LEN, 0))			nt_error_socket("ControlLoopClientThread: recv remote_exename failed.", ret_val);		if (ret_val = ReceiveBlocking(sock, sock_event, (char*)&g_pProcTable[remote_iproc].pid, sizeof(int), 0))			nt_error_socket("ControlLoopClientThread: recv remote_pid failed.", ret_val);		if (ret_val = ReceiveBlocking(sock, sock_event, (char*)&g_pProcTable[remote_iproc].num_nics, sizeof(int), 0))			nt_error_socket("ControlLoopClientThread: recv remote_num_nics failed.", ret_val);		if (ret_val = ReceiveBlocking(sock, sock_event, (char*)&g_pProcTable[remote_iproc].nic_ip, sizeof(int)*MAX_NUM_NICS, 0))			nt_error_socket("ControlLoopClientThread: recv remote_nic_ip[4] failed.", ret_val);		g_pProcTable[remote_iproc].multinic = (g_pProcTable[remote_iproc].num_nics > 1) ? TRUE : FALSE;		if (!SetEvent(g_pProcTable[remote_iproc].hValidDataEvent))			MakeErrMsg(GetLastError(), "ControlLoopClientThread: SetEvent(hValidDataEvent[%d]) failed", remote_iproc);		//printf("iproc: %d, listen: %d, control: %d, host: %s, exe: %s, pid: %d\n", 		//	remote_iproc, g_pProcTable[remote_iproc].listen_port, g_pProcTable[remote_iproc].control_port,		//	g_pProcTable[remote_iproc].host, g_pProcTable[remote_iproc].exename, g_pProcTable[remote_iproc].pid);fflush(stdout);		InterlockedIncrement(&g_nNumConnected);		if (g_nNumConnected == g_nNproc)			SetEvent(g_hEveryoneConnectedEvent);		else		    WaitForSingleObject(g_hEveryoneConnectedEvent, INFINITE);		// Send acknowledgement		if (SendBlocking(sock, &ack, 1, 0) == SOCKET_ERROR)			nt_error_socket("ControlLoopClientThread: send ack failed.", WSAGetLastError());		//printf("Init data to root message processed for %d\n", remote_iproc);fflush(stdout);		break;	case NT_TCP_CTRL_CMD_PROCESS_CONNECT_INFO:		// Receive the rank of the process information is requested of		if (ret_val = ReceiveBlocking(sock, sock_event, (char*)&query_n, sizeof(int), 0))			nt_error_socket("ControlLoopClientThread: ReceiveBlocking query_n failed", ret_val);		// What do I do if this information is not available yet?		if (g_pProcTable[query_n].listen_port == 0)		{			if (WaitForSingleObject(g_pProcTable[query_n].hValidDataEvent, 2000*g_nNproc) != WAIT_OBJECT_0)				LogMsg("Sending invalid information for process %d\n", query_n);		}		if (g_bMultinic)		{		    bool bSent = false;		    for (i=0; i<g_pProcTable[query_n].num_nics; i++)		    {			if ((g_pProcTable[query_n].nic_ip[i] & g_nNicMask) == g_nNicNet)			{			    unsigned int a, b, c, d;			    a = ((unsigned char *)(&g_pProcTable[query_n].nic_ip[i]))[0];			    b = ((unsigned char *)(&g_pProcTable[query_n].nic_ip[i]))[1];			    c = ((unsigned char *)(&g_pProcTable[query_n].nic_ip[i]))[2];			    d = ((unsigned char *)(&g_pProcTable[query_n].nic_ip[i]))[3];			    sprintf(temp_host, "%u.%u.%u.%u", a, b, c, d);			    //printf("sending %s\n", temp_host);fflush(stdout);			    // Send the host name for the requested process			    if (SendBlocking(sock, temp_host, NT_HOSTNAME_LEN, 0) == SOCKET_ERROR)				MakeErrMsg(WSAGetLastError(), "ControlLoopClientThread: send temp_host %d failed", query_n);			    bSent = true;			    break;			}		    }		    if (!bSent)		    {			//printf("sending default host: %s\n", g_pProcTable[query_n].host);fflush(stdout);			if (SendBlocking(sock, g_pProcTable[query_n].host, NT_HOSTNAME_LEN, 0) == SOCKET_ERROR)			    MakeErrMsg(WSAGetLastError(), "ControlLoopClientThread: send host %d failed", query_n);		    }		}		else		{		    //printf("sending %s\n", g_pProcTable[query_n].host);fflush(stdout);		    // Send the host name for the requested process		    if (SendBlocking(sock, g_pProcTable[query_n].host, NT_HOSTNAME_LEN, 0) == SOCKET_ERROR)			MakeErrMsg(WSAGetLastError(), "ControlLoopClientThread: send host %d failed", query_n);		}		// Send the port for the requested process		if (SendBlocking(sock, (char*)&g_pProcTable[query_n].listen_port, sizeof(int), 0) == SOCKET_ERROR)			MakeErrMsg(WSAGetLastError(), "ControlLoopClientThread: send listen_port[%d] %d failed", query_n, g_pProcTable[query_n].listen_port);		//printf("process connect info processed for %d\n", query_n);fflush(stdout);		break;	case NT_TCP_CTRL_CMD_PROCESS_INFO:		// Receive the rank of the process information is requested of		if (ret_val = ReceiveBlocking(sock, sock_event, (char*)&query_n, sizeof(int), 0))			nt_error_socket("ControlLoopClientThread: ReceiveBlocking query_n failed", ret_val);		// Send the host name, executable name, and process id for the requested process		if (SendBlocking(sock, g_pProcTable[query_n].host, NT_HOSTNAME_LEN, 0) == SOCKET_ERROR)			MakeErrMsg(WSAGetLastError(), "ControlLoopClientThread: send host %d failed", query_n);		if (SendBlocking(sock, g_pProcTable[query_n].exename, NT_EXENAME_LEN, 0) == SOCKET_ERROR)			MakeErrMsg(WSAGetLastError(), "ControlLoopClientThread: send exename %d failed", query_n);		if (SendBlocking(sock, (char*)&g_pProcTable[query_n].pid, sizeof(int), 0) == SOCKET_ERROR)			MakeErrMsg(WSAGetLastError(), "ControlLoopClientThread: send process %d id %d failed", query_n, g_pProcTable[query_n].pid);		//printf("process info processed for %d\n", query_n);fflush(stdout);		break;	case NT_TCP_CTRL_CMD_POST_IN_DONE:		// Send acknowledgement		if (SendBlocking(sock, &ack, 1, 0) == SOCKET_ERROR)			nt_error_socket("ControlLoopClientThread: send post_in_done ack failed.", WSAGetLastError());		if (WaitForSingleObject(g_hNumInDoneMutex, INFINITE) != WAIT_OBJECT_0)			nt_error_socket("ControlLoopClientThread:POST_IN_DONE: WaitForSingleObject(g_hNumInDoneMutex) failed", GetLastError());		g_nNumInDone++;		if (g_nNumInDone == g_nNproc)		{			// Send 'all in done' messages			for (i=g_nNproc-1; i>=0; i--)			{				//printf("About to call SendAllDoneMsg for %d on %s at %d\n", 				//	i, g_pProcTable[i].host, g_pProcTable[i].control_port);fflush(stdout);				SendAllDoneMsg(g_pProcTable[i].host, g_pProcTable[i].control_port);			}			if (!ReleaseMutex(g_hNumInDoneMutex))				nt_error_socket("ControlLoopClientThread:POST_IN_DONE: ReleaseMutex(g_hNumInDoneMutex) failed", GetLastError());			if (!CloseHandle(g_hNumInDoneMutex))				nt_error_socket("ControlLoopClientThread:POST_IN_DON: CloseHandle(g_hNumInDoneMutex) failed", GetLastError());			NT_Tcp_closesocket(sock, sock_event);			//printf("post in done processed\n");fflush(stdout);			if (!SetEvent(g_hAllInDoneEvent))				nt_error_socket("ControlLoopClientThread:POST_IN_DONE: SetEvent(g_hAllInDoneEvent) failed", GetLastError());			return;		}		else		{			if (!ReleaseMutex(g_hNumInDoneMutex))				nt_error_socket("ControlLoopClientThread:POST_IN_DONE: ReleaseMutex(g_hNumInDoneMutex) failed", GetLastError());		}		//printf("post in done processed\n");fflush(stdout);		break;	case NT_TCP_CTRL_CMD_ALL_IN_DONE:		// Send acknowledgement		if (SendBlocking(sock, &ack, 1, 0) == SOCKET_ERROR)			nt_error_socket("ControlLoopClientThread: send all_in_done ack failed.", WSAGetLastError());		NT_Tcp_closesocket(sock, sock_event);		//printf("all in done processed\n");fflush(stdout);		if (!SetEvent(g_hOkToPassThroughDone))			nt_error_socket("ControlLoopClientThread:ALL_IN_DONE: SetEvent(g_hOkToPassThroughDone) failed", GetLastError());		return;		break;	case NT_TCP_CTRL_CMD_ABORT:		nt_error("request to abort received", 1);		break;	default:		nt_error("Invalid command received from ControlLoopClient connection.\n", cCmd);	}	NT_Tcp_closesocket(sock, sock_event);}// Function name	: ControlLoopThread// Description	    : // Return type		: void // Argument         : HANDLE hReadyEventvoid ControlLoopThread(HANDLE hReadyEvent){	SOCKET sock;	WSAEVENT sock_event, aEvents[2];	int error = 0;	char host[NT_HOSTNAME_LEN];	DWORD result;	SOCKET temp_socket;	WSAEVENT temp_event;	HANDLE hThread;	DWORD dwThreadID;	// create a listening socket	// The control_port field of the ProcTable is initialized to zero. Therefore the system will pick any available port when creating the socket.	// But if the user selects to use a static port, then control_port will be set to this port number.	error = NT_Tcp_create_bind_socket(&sock, &sock_event, g_pProcTable[g_nIproc].control_port);	if (error)		nt_error("ControlLoopThread: NT_Tcp_create_bind_socket failed", 1);	// associate sock_event with sock	if (WSAEventSelect(sock, sock_event, FD_ACCEPT) == SOCKET_ERROR)		nt_error_socket("ControlLoopThread: WSAEventSelect(FD_ACCEPT) failed for the control socket", WSAGetLastError());	if (listen(sock, SOMAXCONN) == SOCKET_ERROR)		nt_error_socket("ControlLoopThread: listen failed", WSAGetLastError());	// get the port and local hostname for the listening socket	error = NT_Tcp_get_sock_info(sock, host, &g_pProcTable[g_nIproc].control_port);	if (error)		nt_error_socket("ControlLoopThread: Unable to get host and port of listening socket", error);	// Signal that the control port is valid	if (!SetEvent(hReadyEvent))		nt_error_socket("ControlLoopThread: SetEvent(hReadyEvent) failed", GetLastError());	aEvents[0] = sock_event;	aEvents[1] = g_hStopControlLoopEvent;	// Loop indefinitely, waiting for remote connections or a stop signal	while (true)	{		result = WSAWaitForMultipleEvents(2, aEvents, FALSE, INFINITE, FALSE);		if ((result != WSA_WAIT_EVENT_0) && (result != WSA_WAIT_EVENT_0+1))			nt_error("ControlLoopThread: Wait for a connect event failed", result);				if (result == WSA_WAIT_EVENT_0+1)		{			closesocket(sock);			CloseHandle(g_hStopControlLoopEvent);			return;		}		temp_socket = accept(sock, NULL, NULL);		if (temp_socket != INVALID_SOCKET)		{			ControlLoopClientArg *cArg = new ControlLoopClientArg;			if ((temp_event = WSACreateEvent()) == WSA_INVALID_EVENT)				nt_error_socket("ControlLoopThread: WSACreateEvent failed", WSAGetLastError());			if (WSAEventSelect(temp_socket, temp_event, FD_READ | FD_CLOSE) == SOCKET_ERROR)				nt_error_socket("ControlLoopThread: WSAEventSelect failed", WSAGetLastError());			cArg->sock = temp_socket;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -