⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rightthread.cpp

📁 MPICH是MPI的重要研究,提供了一系列的接口函数,为并行计算的实现提供了编程环境.
💻 CPP
📖 第 1 页 / 共 2 页
字号:
#include "RightThread.h"#include "Command.h"#include "sockets.h"#include <stdio.h>#include "global.h"#include "GetReturnThread.h"#include "StringOpt.h"#include "LaunchMPDProcess.h"#include "LaunchNode.h"#include "GetCPUsage.h"// Function name	: RightThread// Description	    : // Return type		: void // Argument         : LaunchMPDArg *pArgvoid RightThread(LaunchMPDArg *pArg){	SOCKET sock;	WSAEVENT sock_event;	char host[100] = "";	int port = 0;	char buffer[CMD_BUFF_SIZE];	int error;	CommandData *pCommand;	char *pBuf;	char pShortBuffer[100];	unsigned long nLocalIP;	int nLocalPort = 0;	int nLocalSpawns;	bool bDone = false;	char *token;	char pszID[256], *pszKey, *pszValue;	int n;	bool bPersistentPut = true;	unsigned long nTempIP;	int nTempPort;	char pszLocalHost[100] = "";	LaunchNode *pLaunchNode = NULL;	gethostname(pszLocalHost, 100);	if (error = NT_create_bind_socket(&sock, &sock_event, 0))	{		printf("RightThread: create and bind socket failed, error %d\n", error);		ExitProcess(error);	}	if (pArg == NULL)	{		gets(host);		gets(buffer);		port = atoi(buffer);	}	else	{		WaitForSingleObject(pArg->pRight->hReadyEvent, INFINITE);		strcpy(host, pArg->pRight->pszHost);		port = pArg->pRight->nPort;	}	if (error = NT_connect(sock, host, port))	{		printf("RightThread: NT_connect failed for %s:%d, error %d\n", host, port, error);		ExitProcess(error);	}	//printf("Connected to %s:%d\n", host, port);	g_bRightConnected = true;	while (nLocalPort == 0)	{		g_List.GetMyID(&nLocalIP, &nLocalPort, &nLocalSpawns);		Sleep(200);	}	g_List.Add(nLocalIP, nLocalPort, nLocalSpawns);	// Send an ADD message around the ring	CommandData data;	data.hCmd.cCommand = MPD_CMD_ADD;	data.hCmd.nSrcIP = nLocalIP;	data.hCmd.nSrcPort = nLocalPort;	data.hCmd.pData = NULL;	data.hCmd.nBufferLength = sizeof(unsigned long) + 2 * sizeof(int);	pBuf = data.pCommandBuffer;	*((unsigned long *)pBuf) = nLocalIP;	pBuf += sizeof(unsigned long);	*((int *)pBuf) = nLocalPort;	pBuf += sizeof(int);	*((int *)pBuf) = nLocalSpawns;	SendBlocking(sock, (char*)&data.hCmd, sizeof(CommandHeader), 0);	SendBlocking(sock, data.pCommandBuffer, data.hCmd.nBufferLength, 0);	while (!bDone)	{		pCommand = GetNextCommand();		// Do something with the command		if (pCommand->nCommand == MPD_CMD_QUIT)		{			MarkCommandCompleted(pCommand);			break;		}		switch (pCommand->nCommand)		{		case MPD_CMD_FORWARD:			SendBlocking(sock, (char*)&pCommand->hCmd, sizeof(CommandHeader), 0);			if (pCommand->hCmd.nBufferLength > 0)				SendBlocking(sock, pCommand->pCommandBuffer, pCommand->hCmd.nBufferLength, 0);			MarkCommandCompleted(pCommand);			break;		case MPD_CMD_ADD:			token = strtok(pCommand->pCommandBuffer, ":");			pBuf = pCommand->pCommandBuffer;			if (NT_get_ip(token, (unsigned long *)pBuf))			{				sprintf(pCommand->pCommandBuffer, "Unable to resolve hostname, error %d\n", WSAGetLastError()); // <----------				pCommand->hCmd.nBufferLength = strlen(pCommand->pCommandBuffer)+1;				pCommand->bSuccess = false;				MarkCommandCompleted(pCommand);				break;			}			nTempIP = *(unsigned long *)pBuf;			pBuf += sizeof(unsigned long);			token = strtok(NULL, " \t");			*((int *)pBuf) = nTempPort = atoi(token);			pBuf += sizeof(int);			token = strtok(NULL, "\n");			*((int *)pBuf) = n = (token == NULL) ? 1 : atoi(token);			pCommand->hCmd.cCommand = MPD_CMD_ADD;			pCommand->hCmd.nSrcIP = nLocalIP;			pCommand->hCmd.nSrcPort = nLocalPort;			pCommand->hCmd.pData = NULL;			pCommand->hCmd.nBufferLength = sizeof(unsigned long) + 2 * sizeof(int);			SendBlocking(sock, (char*)&pCommand->hCmd, sizeof(CommandHeader), 0);			SendBlocking(sock, pCommand->pCommandBuffer, pCommand->hCmd.nBufferLength, 0);			g_List.Add(nTempIP, nTempPort, n);			MarkCommandCompleted(pCommand);			break;		//case MPD_CMD_REMOVE:			// Send a remove message after a node has crashed and the ring has been stitched.		//	break;		case MPD_CMD_INCREMENT:			pCommand->hCmd.cCommand = MPD_CMD_INCREMENT;			pBuf = pCommand->pCommandBuffer;			*(unsigned long *)pBuf = nLocalIP;			pBuf += sizeof(unsigned long);			*(int *)pBuf = nLocalPort;			pCommand->hCmd.nBufferLength = sizeof(unsigned long) + sizeof(int);			pCommand->hCmd.nSrcIP = nLocalIP;			pCommand->hCmd.nSrcPort = nLocalPort;			pCommand->hCmd.pData = NULL;			SendBlocking(sock, (char*)&pCommand->hCmd, sizeof(CommandHeader), 0);			SendBlocking(sock, pCommand->pCommandBuffer, pCommand->hCmd.nBufferLength, 0);			g_List.Increment(nLocalIP, nLocalPort);			MarkCommandCompleted(pCommand);			break;		case MPD_CMD_DECREMENT:			pCommand->hCmd.cCommand = MPD_CMD_DECREMENT;			pBuf = pCommand->pCommandBuffer;			*(unsigned long *)pBuf = nLocalIP;			pBuf += sizeof(unsigned long);			*(int *)pBuf = nLocalPort;			pCommand->hCmd.nBufferLength = sizeof(unsigned long) + sizeof(int);			pCommand->hCmd.nSrcIP = nLocalIP;			pCommand->hCmd.nSrcPort = nLocalPort;			pCommand->hCmd.pData = NULL;			SendBlocking(sock, (char*)&pCommand->hCmd, sizeof(CommandHeader), 0);			SendBlocking(sock, pCommand->pCommandBuffer, pCommand->hCmd.nBufferLength, 0);			g_List.Decrement(nLocalIP, nLocalPort);			MarkCommandCompleted(pCommand);			break;		case MPD_CMD_ENABLE:			token = strtok(pCommand->pCommandBuffer, ":");			pBuf = pCommand->pCommandBuffer;			if (NT_get_ip(token, (unsigned long *)pBuf))			{				sprintf(pCommand->pCommandBuffer, "Unable to resolve hostname, error %d\n", WSAGetLastError());// <----------				pCommand->hCmd.nBufferLength = strlen(pCommand->pCommandBuffer)+1;				pCommand->bSuccess = false;				MarkCommandCompleted(pCommand);				break;			}			nTempIP = *(unsigned long *)pBuf;			pBuf += sizeof(unsigned long);			token = strtok(NULL, "\n");			if (token != NULL)				*((int *)pBuf) = nTempPort = atoi(token);			else				*((int *)pBuf) = nTempPort = -1;			pCommand->hCmd.cCommand = MPD_CMD_ENABLE;			pCommand->hCmd.nSrcIP = nLocalIP;			pCommand->hCmd.nSrcPort = nLocalPort;			pCommand->hCmd.pData = NULL;			pCommand->hCmd.nBufferLength = sizeof(unsigned long) + sizeof(int);			SendBlocking(sock, (char*)&pCommand->hCmd, sizeof(CommandHeader), 0);			SendBlocking(sock, pCommand->pCommandBuffer, pCommand->hCmd.nBufferLength, 0);			g_List.Enable(nTempIP, nTempPort);			MarkCommandCompleted(pCommand);			break;		case MPD_CMD_DISABLE:			token = strtok(pCommand->pCommandBuffer, ":");			pBuf = pCommand->pCommandBuffer;			if (NT_get_ip(token, (unsigned long *)pBuf))			{				sprintf(pCommand->pCommandBuffer, "Unable to resolve hostname, error %d\n", WSAGetLastError());// <----------				pCommand->hCmd.nBufferLength = strlen(pCommand->pCommandBuffer)+1;				pCommand->bSuccess = false;				MarkCommandCompleted(pCommand);				break;			}			nTempIP = *(unsigned long *)pBuf;			pBuf += sizeof(unsigned long);			token = strtok(NULL, "\n");			if (token != NULL)				*((int *)pBuf) = nTempPort = atoi(token);			else				*((int *)pBuf) = nTempPort = -1;			pCommand->hCmd.cCommand = MPD_CMD_DISABLE;			pCommand->hCmd.nSrcIP = nLocalIP;			pCommand->hCmd.nSrcPort = nLocalPort;			pCommand->hCmd.pData = NULL;			pCommand->hCmd.nBufferLength = sizeof(unsigned long) + sizeof(int);			SendBlocking(sock, (char*)&pCommand->hCmd, sizeof(CommandHeader), 0);			SendBlocking(sock, pCommand->pCommandBuffer, pCommand->hCmd.nBufferLength, 0);			g_List.Disable(nTempIP, nTempPort);			MarkCommandCompleted(pCommand);			break;		case MPD_CMD_PUTC:			bPersistentPut = false;		case MPD_CMD_PUT:			token = strtok(pCommand->pCommandBuffer, ":");			if (token == NULL)			{				MarkCommandCompleted(pCommand);				break;			}			strcpy(pszID, token);			token = strtok(NULL, "=");			if (token == NULL)			{				MarkCommandCompleted(pCommand);				break;			}			pszKey = new char[strlen(token)+1];			strcpy(pszKey, token);			token = strtok(NULL, "\n");			if (token == NULL)			{				delete pszKey;				MarkCommandCompleted(pCommand);				break;			}			n = strlen(token) + 1;			pszValue = new char[n];			strcpy(pszValue, token);			if (g_bDatabaseIsLocal)				g_Database.Put(pszID, pszKey, pszValue, n, bPersistentPut);			else			{				pCommand->hCmd.nBufferLength = 3 * sizeof(int);				pCommand->hCmd.cCommand = bPersistentPut ? MPD_CMD_PUT : MPD_CMD_PUTC;				pCommand->hCmd.nSrcIP = nLocalIP;				pCommand->hCmd.nSrcPort = nLocalPort;				pBuf = pCommand->pCommandBuffer;				n = strlen(pszID) + 1;				*((int *)pBuf) = n;				pBuf += sizeof(int);				strcpy(pBuf, pszID);				pBuf += n;				pCommand->hCmd.nBufferLength += n;				n = strlen(pszKey) + 1;				*((int *)pBuf) = n;				pBuf += sizeof(int);				strcpy(pBuf, pszKey);				pBuf += n;				pCommand->hCmd.nBufferLength += n;				n = strlen(pszValue) + 1;				*((int *)pBuf) = n;				pBuf += sizeof(int);				strcpy(pBuf, pszValue);				pBuf += n;				pCommand->hCmd.nBufferLength += n;				SendBlocking(sock, (char*)&pCommand->hCmd, sizeof(CommandHeader), 0);				SendBlocking(sock, pCommand->pCommandBuffer, pCommand->hCmd.nBufferLength, 0);				delete pszKey;				delete pszValue;			}			bPersistentPut = true;			MarkCommandCompleted(pCommand);			break;		case MPD_CMD_GET:			token = strtok(pCommand->pCommandBuffer, ":");			if (token == NULL)			{				MarkCommandCompleted(pCommand);				break;			}			strcpy(pszID, token);			token = strtok(NULL, "\n");			if (token == NULL)			{				MarkCommandCompleted(pCommand);				break;			}			pszKey = new char[strlen(token)+1];			strcpy(pszKey, token);			pCommand->hCmd.cCommand = MPD_CMD_GET;			pCommand->hCmd.nSrcIP = nLocalIP;			pCommand->hCmd.nSrcPort = nLocalPort;			if (g_bDatabaseIsLocal)			{				GetReturnThreadArg *pArg = new GetReturnThreadArg;				pArg->pCommand = pCommand;				strcpy(pArg->pszDbsID, pszID);				pArg->pszDbsKey = pszKey;				DWORD dwThreadID;				CloseHandle(CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)GetThread, pArg, 0, &dwThreadID));			}			else			{				pCommand->hCmd.nBufferLength = 2 * sizeof(unsigned long) + 3 * sizeof(int);				pBuf = pCommand->pCommandBuffer;				*((unsigned long *)pBuf) = nLocalIP;				pBuf += sizeof(unsigned long);				*((int *)pBuf) = nLocalPort;				pBuf += sizeof(int);				*((unsigned long *)pBuf) = (unsigned long)pCommand; // nGetIdentifier is a pointer to the Command structure				pBuf += sizeof(unsigned long);				n = strlen(pszID) + 1;				pCommand->hCmd.nBufferLength += n;				*((int *)pBuf) = n;				pBuf += sizeof(int);				strcpy(pBuf, pszID);				pBuf += n;				n = strlen(pszKey) + 1;				pCommand->hCmd.nBufferLength += n;				*((int *)pBuf) = n;				pBuf += sizeof(int);				strcpy(pBuf, pszKey);				SendBlocking(sock, (char*)&pCommand->hCmd, sizeof(CommandHeader), 0);				SendBlocking(sock, pCommand->pCommandBuffer, pCommand->hCmd.nBufferLength, 0);				delete pszKey;			}			break;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -