⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 leftthread.cpp

📁 MPICH是MPI的重要研究,提供了一系列的接口函数,为并行计算的实现提供了编程环境.
💻 CPP
📖 第 1 页 / 共 2 页
字号:
#include "LeftThread.h"#include "sockets.h"#include "Command.h"#include <stdio.h>#include "global.h"#include "GetReturnThread.h"#include "LaunchMPDProcess.h"#include "LaunchNode.h"#include "GetCPUsage.h"// Function name	: LeftThread// Description	    : // Return type		: void // Argument         : LaunchMPDArg *pArgvoid LeftThread(LaunchMPDArg *pArg){	SOCKET sock, listen_sock;	WSAEVENT sock_event, listen_sock_event;	char host[100] = "";	int port = 0;	int error;	unsigned long nLocalIP, nTempIP, nGetIdentifier;	int nLocalPort, nTempPort;	CommandData Command;	MPD_CMD_HANDLE hCommand;	char pszDbsID[256], *pszDbsKey;	void *pDbsValue;	int n;	bool bPutPersistent = true;	char *pBuf;	char pShortBuffer[100];	LaunchNode *pLaunchNode = NULL;	if (error = NT_create_bind_socket(&listen_sock, &listen_sock_event, 0))	{		printf("LeftThread: create and bind listen socket failed, error %d\n", error);		ExitProcess(error);	}	if (WSAEventSelect(listen_sock, listen_sock_event, FD_ACCEPT) == SOCKET_ERROR)	{		error = WSAGetLastError();		printf("LeftThread: WSAEventSelect(FD_ACCEPT) failed for the listen socket, error %d", error);		ExitProcess(error);	}	if (listen(listen_sock, SOMAXCONN) == SOCKET_ERROR)	{		error = WSAGetLastError();		printf("LeftThread: listen failed, error %d\n", error);		ExitProcess(error);	}	NT_get_sock_info(listen_sock, host, &port);	if (pArg == NULL)	{		printf("%s\n%d\n", host, port);		fflush(stdout);	}	else	{		strcpy(pArg->pszHost, host);		pArg->nPort = port;		SetEvent(pArg->hReadyEvent);	}	while (true)	{		sock = accept(listen_sock, NULL, NULL);		if (sock != INVALID_SOCKET)			break;		error = GetLastError();		if (error == WSAEWOULDBLOCK)		{			WSAResetEvent(listen_sock_event);			WSAEventSelect(listen_sock, listen_sock_event, FD_ACCEPT);		}		Sleep(100);	}	NT_closesocket(listen_sock, listen_sock_event);	if ((sock_event = WSACreateEvent()) == WSA_INVALID_EVENT)	{		error = WSAGetLastError();		printf("LeftThread: WSACreateEvent failed, error %d\n", error);		ExitProcess(error);	}	if (WSAEventSelect(sock, sock_event, FD_READ | FD_CLOSE) == SOCKET_ERROR)	{		error = WSAGetLastError();		printf("LeftThread: WSAEventSelect failed, error %d\n", error);		ExitProcess(error);	}	//printf("socket accepted\n");	NT_get_sock_info(sock, host, &nLocalPort);	g_List.SetMyID(host, nLocalPort);	g_List.GetMyID(&nLocalIP, &nLocalPort);	g_bLeftConnected = true;	while (true)	{		if (ReceiveBlocking(sock, sock_event, (char*)&Command.hCmd, sizeof(CommandHeader), 0))			break;		//printf("[%d.%d.%d.%d:%d:%d]\n", (int)(Command.hCmd.nSrcIP & 0xff), (int)((Command.hCmd.nSrcIP >> 8) & 0xff), (int)((Command.hCmd.nSrcIP >> 16) & 0xff),	(int)((Command.hCmd.nSrcIP >> 24) & 0xff), Command.hCmd.nSrcPort, (int)Command.hCmd.cCommand);		if (Command.hCmd.nBufferLength > CMD_BUFF_SIZE)		{			printf("Command buffer too long, length: %d, exiting\n", Command.hCmd.nBufferLength);			ExitProcess(1);		}		if (Command.hCmd.nSrcIP == nLocalIP && Command.hCmd.nSrcPort == nLocalPort)		{			// This is a command sent from myself which has traversed the entire ring.			// Either handle the command, or eat up the data and throw it away.			switch (Command.hCmd.cCommand)			{			case MPD_CMD_HOSTS:				if (Command.hCmd.nBufferLength > 0)					ReceiveBlocking(sock, sock_event, Command.hCmd.pData->pCommandBuffer, Command.hCmd.nBufferLength, 0);				Command.hCmd.pData->pCommandBuffer[Command.hCmd.nBufferLength] = '\0';				Command.hCmd.pData->hCmd.nBufferLength = Command.hCmd.nBufferLength + 1;				MarkCommandCompleted(Command.hCmd.pData);				break;			case MPD_CMD_CPUSAGE:				if (Command.hCmd.nBufferLength > 0)					ReceiveBlocking(sock, sock_event, Command.hCmd.pData->pCommandBuffer, Command.hCmd.nBufferLength, 0);				Command.hCmd.pData->pCommandBuffer[Command.hCmd.nBufferLength] = '\0';				Command.hCmd.pData->hCmd.nBufferLength = Command.hCmd.nBufferLength + 1;				MarkCommandCompleted(Command.hCmd.pData);				break;			case MPD_CMD_PS:				if (Command.hCmd.nBufferLength > 0)					ReceiveBlocking(sock, sock_event, Command.hCmd.pData->pCommandBuffer, Command.hCmd.nBufferLength, 0);				Command.hCmd.pData->pCommandBuffer[Command.hCmd.nBufferLength] = '\0';				Command.hCmd.pData->hCmd.nBufferLength = Command.hCmd.nBufferLength + 1;				MarkCommandCompleted(Command.hCmd.pData);				break;			case MPD_CMD_DESTROY_RING:				printf("DestroyRing command received ...");fflush(stdout);				KillRemainingMPDProcesses();				printf(" Exiting\n");fflush(stdout);				//Sleep(100);				//ExitProcess(0);				ExitThread(0);				break;			case MPD_CMD_RUN_THE_RING:				// Command has finished running the loop				MarkCommandCompleted(Command.hCmd.pData);				break;			case MPD_CMD_PRINT_DATABASE:				if (Command.hCmd.nBufferLength > 0)					ReceiveBlocking(sock, sock_event, Command.hCmd.pData->pCommandBuffer, Command.hCmd.nBufferLength, 0);				Command.hCmd.pData->hCmd.nBufferLength = Command.hCmd.nBufferLength;				if (g_bDatabaseIsLocal)				{					Command.hCmd.pData->hCmd.nBufferLength = CMD_BUFF_SIZE;					g_Database.PrintStateToBuffer(Command.hCmd.pData->pCommandBuffer, &Command.hCmd.pData->hCmd.nBufferLength);				}				MarkCommandCompleted(Command.hCmd.pData);				break;			case MPD_CMD_LAUNCH:				ReceiveBlocking(sock, sock_event, Command.pCommandBuffer, Command.hCmd.nBufferLength, 0);				pBuf = Command.pCommandBuffer;				pLaunchNode = *(LaunchNode**)pBuf;				pBuf = pBuf + sizeof(LaunchNode*);				nTempIP = *(unsigned long *)pBuf;				pBuf = pBuf + sizeof(unsigned long);				nTempPort = *(int *)pBuf;				pBuf = pBuf + sizeof(int);				if (nTempIP == nLocalIP && nTempPort == nLocalPort)				{					//printf("launch command received: '%s'\n", pBuf);fflush(stdout);					LaunchMPDProcessArg *pArg = new LaunchMPDProcessArg;					pArg->nIP = nLocalIP;					pArg->nPort = nLocalPort;					pArg->nSrcIP = Command.hCmd.nSrcIP;					pArg->nSrcPort = Command.hCmd.nSrcPort;					pArg->pszCommand = new char[strlen(pBuf)+1];					pArg->pNode = pLaunchNode;					strcpy(pArg->pszCommand, pBuf);					//printf("launching '%s'\n", pArg->pszCommand);fflush(stdout);					DWORD dwThreadID;					CloseHandle(CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)LaunchMPDProcess, pArg, 0, &dwThreadID));				}				else				{					// The launch command made it around the ring without anyone satisfying it.					// It must have bogus host:port values so just throw it away.					char *pName;					in_addr in;					in.S_un.S_addr = nTempIP;					pName = inet_ntoa(in);					printf("Unfulfilled launch command for host: %s:%d\n", pName, nTempPort);					fflush(stdout);				}				break;				/*			case MPD_CMD_LAUNCH_RET:				ReceiveBlocking(sock, sock_event, Command.pCommandBuffer, Command.hCmd.nBufferLength, 0);				pBuf = Command.pCommandBuffer;				nTempIP = *((unsigned long *)pBuf);				pBuf = pBuf + sizeof(unsigned long);				nTempPort = *((int *)pBuf);				if (nTempIP == nLocalIP && nTempPort == nLocalPort)				{					pBuf = pBuf + sizeof(int);					pLaunchNode = *((LaunchNode **)pBuf);					pBuf = pBuf + sizeof(LaunchNode*);					pLaunchNode->Set(*((DWORD*)pBuf));				}				else				{					Command.nCommand = MPD_CMD_FORWARD;					hCommand = InsertCommand(Command);					WaitForCommand(hCommand);				}				break;				//*/			default:				if (Command.hCmd.nBufferLength > 0)				{					char *pBuffer = new char[Command.hCmd.nBufferLength];					ReceiveBlocking(sock, sock_event, pBuffer, Command.hCmd.nBufferLength, 0);					delete pBuffer;				}			}		}		else		{			switch (Command.hCmd.cCommand)			{			case MPD_CMD_ADD:				Command.nCommand = MPD_CMD_FORWARD;				pBuf = Command.pCommandBuffer;				ReceiveBlocking(sock, sock_event, pBuf, sizeof(unsigned long), 0);				nTempIP = *((unsigned long *)pBuf);				pBuf += sizeof(unsigned long);				ReceiveBlocking(sock, sock_event, pBuf, sizeof(int), 0);				nTempPort = *((int *)pBuf);				pBuf += sizeof(int);				ReceiveBlocking(sock, sock_event, pBuf, sizeof(int), 0);				n = *((int *)pBuf);				pBuf += sizeof(int);				Command.hCmd.nBufferLength = sizeof(unsigned long) + 2 * sizeof(int);								hCommand = InsertCommand(Command);				WaitForCommand(hCommand);				g_List.Add(nTempIP, nTempPort, n);				//printf("[%d.%d.%d.%d:%d:%d]\n", (int)(nTempIP & 0xff), (int)((nTempIP >> 8) & 0xff), (int)((nTempIP >> 16) & 0xff),	(int)((nTempIP >> 24) & 0xff), nTempPort, n);				break;			case MPD_CMD_REMOVE:				Command.nCommand = MPD_CMD_FORWARD;				pBuf = Command.pCommandBuffer;				ReceiveBlocking(sock, sock_event, pBuf, sizeof(unsigned long), 0);				nTempIP = *((unsigned long *)pBuf);				pBuf += sizeof(unsigned long);				ReceiveBlocking(sock, sock_event, pBuf, sizeof(int), 0);				nTempPort = *((int *)pBuf);				Command.hCmd.nBufferLength = sizeof(unsigned long) + sizeof(int);								hCommand = InsertCommand(Command);				WaitForCommand(hCommand);				g_List.Remove(nTempIP, nTempPort);				break;			case MPD_CMD_INCREMENT:				Command.nCommand = MPD_CMD_FORWARD;				pBuf = Command.pCommandBuffer;				ReceiveBlocking(sock, sock_event, pBuf, sizeof(unsigned long), 0);				nTempIP = *((unsigned long *)pBuf);				pBuf += sizeof(unsigned long);				ReceiveBlocking(sock, sock_event, pBuf, sizeof(int), 0);				nTempPort = *((int *)pBuf);				Command.hCmd.nBufferLength = sizeof(unsigned long) + sizeof(int);								hCommand = InsertCommand(Command);				WaitForCommand(hCommand);				g_List.Increment(nTempIP, nTempPort);				break;			case MPD_CMD_DECREMENT:				Command.nCommand = MPD_CMD_FORWARD;				pBuf = Command.pCommandBuffer;				ReceiveBlocking(sock, sock_event, pBuf, sizeof(unsigned long), 0);				nTempIP = *((unsigned long *)pBuf);				pBuf += sizeof(unsigned long);				ReceiveBlocking(sock, sock_event, pBuf, sizeof(int), 0);				nTempPort = *((int *)pBuf);				Command.hCmd.nBufferLength = sizeof(unsigned long) + sizeof(int);								hCommand = InsertCommand(Command);				WaitForCommand(hCommand);				g_List.Decrement(nTempIP, nTempPort);				break;			case MPD_CMD_ENABLE:				Command.nCommand = MPD_CMD_FORWARD;				pBuf = Command.pCommandBuffer;				ReceiveBlocking(sock, sock_event, pBuf, sizeof(unsigned long), 0);				nTempIP = *((unsigned long *)pBuf);				pBuf += sizeof(unsigned long);				ReceiveBlocking(sock, sock_event, pBuf, sizeof(int), 0);				nTempPort = *((int *)pBuf);				Command.hCmd.nBufferLength = sizeof(unsigned long) + sizeof(int);								hCommand = InsertCommand(Command);				WaitForCommand(hCommand);				g_List.Enable(nTempIP, nTempPort);				break;			case MPD_CMD_DISABLE:				Command.nCommand = MPD_CMD_FORWARD;				pBuf = Command.pCommandBuffer;				ReceiveBlocking(sock, sock_event, pBuf, sizeof(unsigned long), 0);				nTempIP = *((unsigned long *)pBuf);				pBuf += sizeof(unsigned long);				ReceiveBlocking(sock, sock_event, pBuf, sizeof(int), 0);				nTempPort = *((int *)pBuf);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -