📄 rightthread.cpp
字号:
#include "RightThread.h"#include "Command.h"#include "sockets.h"#include <stdio.h>#include "global.h"#include "GetReturnThread.h"#include "StringOpt.h"#include "LaunchMPDProcess.h"#include "LaunchNode.h"#include "GetCPUsage.h"// Function name : RightThread// Description : // Return type : void // Argument : LaunchMPDArg *pArgvoid RightThread(LaunchMPDArg *pArg){ SOCKET sock; WSAEVENT sock_event; char host[100] = ""; int port = 0; char buffer[CMD_BUFF_SIZE]; int error; CommandData *pCommand; char *pBuf; char pShortBuffer[100]; unsigned long nLocalIP; int nLocalPort = 0; int nLocalSpawns; bool bDone = false; char *token; char pszID[256], *pszKey, *pszValue; int n; bool bPersistentPut = true; unsigned long nTempIP; int nTempPort; char pszLocalHost[100] = ""; LaunchNode *pLaunchNode = NULL; gethostname(pszLocalHost, 100); if (error = NT_create_bind_socket(&sock, &sock_event, 0)) { printf("RightThread: create and bind socket failed, error %d\n", error); ExitProcess(error); } if (pArg == NULL) { gets(host); gets(buffer); port = atoi(buffer); } else { WaitForSingleObject(pArg->pRight->hReadyEvent, INFINITE); strcpy(host, pArg->pRight->pszHost); port = pArg->pRight->nPort; } if (error = NT_connect(sock, host, port)) { printf("RightThread: NT_connect failed for %s:%d, error %d\n", host, port, error); ExitProcess(error); } //printf("Connected to %s:%d\n", host, port); g_bRightConnected = true; while (nLocalPort == 0) { g_List.GetMyID(&nLocalIP, &nLocalPort, &nLocalSpawns); Sleep(200); } g_List.Add(nLocalIP, nLocalPort, nLocalSpawns); // Send an ADD message around the ring CommandData data; data.hCmd.cCommand = MPD_CMD_ADD; data.hCmd.nSrcIP = nLocalIP; data.hCmd.nSrcPort = nLocalPort; data.hCmd.pData = NULL; data.hCmd.nBufferLength = sizeof(unsigned long) + 2 * sizeof(int); pBuf = data.pCommandBuffer; *((unsigned long *)pBuf) = nLocalIP; pBuf += sizeof(unsigned long); *((int *)pBuf) = nLocalPort; pBuf += sizeof(int); *((int *)pBuf) = nLocalSpawns; SendBlocking(sock, (char*)&data.hCmd, sizeof(CommandHeader), 0); SendBlocking(sock, data.pCommandBuffer, data.hCmd.nBufferLength, 0); while (!bDone) { pCommand = GetNextCommand(); // Do something with the command if (pCommand->nCommand == MPD_CMD_QUIT) { MarkCommandCompleted(pCommand); break; } switch (pCommand->nCommand) { case MPD_CMD_FORWARD: SendBlocking(sock, (char*)&pCommand->hCmd, sizeof(CommandHeader), 0); if (pCommand->hCmd.nBufferLength > 0) SendBlocking(sock, pCommand->pCommandBuffer, pCommand->hCmd.nBufferLength, 0); MarkCommandCompleted(pCommand); break; case MPD_CMD_ADD: token = strtok(pCommand->pCommandBuffer, ":"); pBuf = pCommand->pCommandBuffer; if (NT_get_ip(token, (unsigned long *)pBuf)) { sprintf(pCommand->pCommandBuffer, "Unable to resolve hostname, error %d\n", WSAGetLastError()); // <---------- pCommand->hCmd.nBufferLength = strlen(pCommand->pCommandBuffer)+1; pCommand->bSuccess = false; MarkCommandCompleted(pCommand); break; } nTempIP = *(unsigned long *)pBuf; pBuf += sizeof(unsigned long); token = strtok(NULL, " \t"); *((int *)pBuf) = nTempPort = atoi(token); pBuf += sizeof(int); token = strtok(NULL, "\n"); *((int *)pBuf) = n = (token == NULL) ? 1 : atoi(token); pCommand->hCmd.cCommand = MPD_CMD_ADD; pCommand->hCmd.nSrcIP = nLocalIP; pCommand->hCmd.nSrcPort = nLocalPort; pCommand->hCmd.pData = NULL; pCommand->hCmd.nBufferLength = sizeof(unsigned long) + 2 * sizeof(int); SendBlocking(sock, (char*)&pCommand->hCmd, sizeof(CommandHeader), 0); SendBlocking(sock, pCommand->pCommandBuffer, pCommand->hCmd.nBufferLength, 0); g_List.Add(nTempIP, nTempPort, n); MarkCommandCompleted(pCommand); break; //case MPD_CMD_REMOVE: // Send a remove message after a node has crashed and the ring has been stitched. // break; case MPD_CMD_INCREMENT: pCommand->hCmd.cCommand = MPD_CMD_INCREMENT; pBuf = pCommand->pCommandBuffer; *(unsigned long *)pBuf = nLocalIP; pBuf += sizeof(unsigned long); *(int *)pBuf = nLocalPort; pCommand->hCmd.nBufferLength = sizeof(unsigned long) + sizeof(int); pCommand->hCmd.nSrcIP = nLocalIP; pCommand->hCmd.nSrcPort = nLocalPort; pCommand->hCmd.pData = NULL; SendBlocking(sock, (char*)&pCommand->hCmd, sizeof(CommandHeader), 0); SendBlocking(sock, pCommand->pCommandBuffer, pCommand->hCmd.nBufferLength, 0); g_List.Increment(nLocalIP, nLocalPort); MarkCommandCompleted(pCommand); break; case MPD_CMD_DECREMENT: pCommand->hCmd.cCommand = MPD_CMD_DECREMENT; pBuf = pCommand->pCommandBuffer; *(unsigned long *)pBuf = nLocalIP; pBuf += sizeof(unsigned long); *(int *)pBuf = nLocalPort; pCommand->hCmd.nBufferLength = sizeof(unsigned long) + sizeof(int); pCommand->hCmd.nSrcIP = nLocalIP; pCommand->hCmd.nSrcPort = nLocalPort; pCommand->hCmd.pData = NULL; SendBlocking(sock, (char*)&pCommand->hCmd, sizeof(CommandHeader), 0); SendBlocking(sock, pCommand->pCommandBuffer, pCommand->hCmd.nBufferLength, 0); g_List.Decrement(nLocalIP, nLocalPort); MarkCommandCompleted(pCommand); break; case MPD_CMD_ENABLE: token = strtok(pCommand->pCommandBuffer, ":"); pBuf = pCommand->pCommandBuffer; if (NT_get_ip(token, (unsigned long *)pBuf)) { sprintf(pCommand->pCommandBuffer, "Unable to resolve hostname, error %d\n", WSAGetLastError());// <---------- pCommand->hCmd.nBufferLength = strlen(pCommand->pCommandBuffer)+1; pCommand->bSuccess = false; MarkCommandCompleted(pCommand); break; } nTempIP = *(unsigned long *)pBuf; pBuf += sizeof(unsigned long); token = strtok(NULL, "\n"); if (token != NULL) *((int *)pBuf) = nTempPort = atoi(token); else *((int *)pBuf) = nTempPort = -1; pCommand->hCmd.cCommand = MPD_CMD_ENABLE; pCommand->hCmd.nSrcIP = nLocalIP; pCommand->hCmd.nSrcPort = nLocalPort; pCommand->hCmd.pData = NULL; pCommand->hCmd.nBufferLength = sizeof(unsigned long) + sizeof(int); SendBlocking(sock, (char*)&pCommand->hCmd, sizeof(CommandHeader), 0); SendBlocking(sock, pCommand->pCommandBuffer, pCommand->hCmd.nBufferLength, 0); g_List.Enable(nTempIP, nTempPort); MarkCommandCompleted(pCommand); break; case MPD_CMD_DISABLE: token = strtok(pCommand->pCommandBuffer, ":"); pBuf = pCommand->pCommandBuffer; if (NT_get_ip(token, (unsigned long *)pBuf)) { sprintf(pCommand->pCommandBuffer, "Unable to resolve hostname, error %d\n", WSAGetLastError());// <---------- pCommand->hCmd.nBufferLength = strlen(pCommand->pCommandBuffer)+1; pCommand->bSuccess = false; MarkCommandCompleted(pCommand); break; } nTempIP = *(unsigned long *)pBuf; pBuf += sizeof(unsigned long); token = strtok(NULL, "\n"); if (token != NULL) *((int *)pBuf) = nTempPort = atoi(token); else *((int *)pBuf) = nTempPort = -1; pCommand->hCmd.cCommand = MPD_CMD_DISABLE; pCommand->hCmd.nSrcIP = nLocalIP; pCommand->hCmd.nSrcPort = nLocalPort; pCommand->hCmd.pData = NULL; pCommand->hCmd.nBufferLength = sizeof(unsigned long) + sizeof(int); SendBlocking(sock, (char*)&pCommand->hCmd, sizeof(CommandHeader), 0); SendBlocking(sock, pCommand->pCommandBuffer, pCommand->hCmd.nBufferLength, 0); g_List.Disable(nTempIP, nTempPort); MarkCommandCompleted(pCommand); break; case MPD_CMD_PUTC: bPersistentPut = false; case MPD_CMD_PUT: token = strtok(pCommand->pCommandBuffer, ":"); if (token == NULL) { MarkCommandCompleted(pCommand); break; } strcpy(pszID, token); token = strtok(NULL, "="); if (token == NULL) { MarkCommandCompleted(pCommand); break; } pszKey = new char[strlen(token)+1]; strcpy(pszKey, token); token = strtok(NULL, "\n"); if (token == NULL) { delete pszKey; MarkCommandCompleted(pCommand); break; } n = strlen(token) + 1; pszValue = new char[n]; strcpy(pszValue, token); if (g_bDatabaseIsLocal) g_Database.Put(pszID, pszKey, pszValue, n, bPersistentPut); else { pCommand->hCmd.nBufferLength = 3 * sizeof(int); pCommand->hCmd.cCommand = bPersistentPut ? MPD_CMD_PUT : MPD_CMD_PUTC; pCommand->hCmd.nSrcIP = nLocalIP; pCommand->hCmd.nSrcPort = nLocalPort; pBuf = pCommand->pCommandBuffer; n = strlen(pszID) + 1; *((int *)pBuf) = n; pBuf += sizeof(int); strcpy(pBuf, pszID); pBuf += n; pCommand->hCmd.nBufferLength += n; n = strlen(pszKey) + 1; *((int *)pBuf) = n; pBuf += sizeof(int); strcpy(pBuf, pszKey); pBuf += n; pCommand->hCmd.nBufferLength += n; n = strlen(pszValue) + 1; *((int *)pBuf) = n; pBuf += sizeof(int); strcpy(pBuf, pszValue); pBuf += n; pCommand->hCmd.nBufferLength += n; SendBlocking(sock, (char*)&pCommand->hCmd, sizeof(CommandHeader), 0); SendBlocking(sock, pCommand->pCommandBuffer, pCommand->hCmd.nBufferLength, 0); delete pszKey; delete pszValue; } bPersistentPut = true; MarkCommandCompleted(pCommand); break; case MPD_CMD_GET: token = strtok(pCommand->pCommandBuffer, ":"); if (token == NULL) { MarkCommandCompleted(pCommand); break; } strcpy(pszID, token); token = strtok(NULL, "\n"); if (token == NULL) { MarkCommandCompleted(pCommand); break; } pszKey = new char[strlen(token)+1]; strcpy(pszKey, token); pCommand->hCmd.cCommand = MPD_CMD_GET; pCommand->hCmd.nSrcIP = nLocalIP; pCommand->hCmd.nSrcPort = nLocalPort; if (g_bDatabaseIsLocal) { GetReturnThreadArg *pArg = new GetReturnThreadArg; pArg->pCommand = pCommand; strcpy(pArg->pszDbsID, pszID); pArg->pszDbsKey = pszKey; DWORD dwThreadID; CloseHandle(CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)GetThread, pArg, 0, &dwThreadID)); } else { pCommand->hCmd.nBufferLength = 2 * sizeof(unsigned long) + 3 * sizeof(int); pBuf = pCommand->pCommandBuffer; *((unsigned long *)pBuf) = nLocalIP; pBuf += sizeof(unsigned long); *((int *)pBuf) = nLocalPort; pBuf += sizeof(int); *((unsigned long *)pBuf) = (unsigned long)pCommand; // nGetIdentifier is a pointer to the Command structure pBuf += sizeof(unsigned long); n = strlen(pszID) + 1; pCommand->hCmd.nBufferLength += n; *((int *)pBuf) = n; pBuf += sizeof(int); strcpy(pBuf, pszID); pBuf += n; n = strlen(pszKey) + 1; pCommand->hCmd.nBufferLength += n; *((int *)pBuf) = n; pBuf += sizeof(int); strcpy(pBuf, pszKey); SendBlocking(sock, (char*)&pCommand->hCmd, sizeof(CommandHeader), 0); SendBlocking(sock, pCommand->pCommandBuffer, pCommand->hCmd.nBufferLength, 0); delete pszKey; } break;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -