⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 leftthread.cpp

📁 MPICH是MPI的重要研究,提供了一系列的接口函数,为并行计算的实现提供了编程环境.
💻 CPP
📖 第 1 页 / 共 2 页
字号:
				Command.hCmd.nBufferLength = sizeof(unsigned long) + sizeof(int);								hCommand = InsertCommand(Command);				WaitForCommand(hCommand);				g_List.Disable(nTempIP, nTempPort);				break;			case MPD_CMD_DELETE_ID:				if (g_bDatabaseIsLocal)				{					ReceiveBlocking(sock, sock_event, (char*)&n, sizeof(int), 0);					ReceiveBlocking(sock, sock_event, pszDbsID, n, 0);					g_Database.Delete(pszDbsID);				}				else				{					Command.nCommand = MPD_CMD_FORWARD;					ReceiveBlocking(sock, sock_event, Command.pCommandBuffer, Command.hCmd.nBufferLength, 0);					hCommand = InsertCommand(Command);					WaitForCommand(hCommand);				}				break;			case MPD_CMD_DELETE_KEY:				if (g_bDatabaseIsLocal)				{					ReceiveBlocking(sock, sock_event, (char*)&n, sizeof(int), 0);					ReceiveBlocking(sock, sock_event, pszDbsID, n, 0);					ReceiveBlocking(sock, sock_event, (char*)&n, sizeof(int), 0);					pszDbsKey = new char[n];					ReceiveBlocking(sock, sock_event, pszDbsKey, n, 0);					g_Database.Delete(pszDbsID, pszDbsKey);					delete pszDbsKey;				}				else				{					Command.nCommand = MPD_CMD_FORWARD;					ReceiveBlocking(sock, sock_event, Command.pCommandBuffer, Command.hCmd.nBufferLength, 0);					hCommand = InsertCommand(Command);					WaitForCommand(hCommand);				}				break;			case MPD_CMD_PUTC:				bPutPersistent = false;			case MPD_CMD_PUT:				if (g_bDatabaseIsLocal)				{					ReceiveBlocking(sock, sock_event, (char*)&n, sizeof(int), 0);					ReceiveBlocking(sock, sock_event, pszDbsID, n, 0);					ReceiveBlocking(sock, sock_event, (char*)&n, sizeof(int), 0);					pszDbsKey = new char[n];					ReceiveBlocking(sock, sock_event, pszDbsKey, n, 0);					ReceiveBlocking(sock, sock_event, (char*)&n, sizeof(int), 0);					pDbsValue = new char[n];					ReceiveBlocking(sock, sock_event, (char*)pDbsValue, n, 0);					g_Database.Put(pszDbsID, pszDbsKey, pDbsValue, n, bPutPersistent);				}				else				{					Command.nCommand = MPD_CMD_FORWARD;					ReceiveBlocking(sock, sock_event, Command.pCommandBuffer, Command.hCmd.nBufferLength, 0);					hCommand = InsertCommand(Command);					WaitForCommand(hCommand);				}				bPutPersistent = true;				break;			case MPD_CMD_GET:				if (g_bDatabaseIsLocal)				{					Command.hCmd.nBufferLength = 2 * sizeof(unsigned long) + 2 * sizeof(int);					ReceiveBlocking(sock, sock_event, (char*)&nTempIP, sizeof(unsigned long), 0);					ReceiveBlocking(sock, sock_event, (char*)&nTempPort, sizeof(int), 0);					ReceiveBlocking(sock, sock_event, (char*)&nGetIdentifier, sizeof(unsigned long), 0);					pBuf = Command.pCommandBuffer;					*((unsigned long *)pBuf) = nTempIP;					pBuf += sizeof(unsigned long);					*((int *)pBuf) = nTempPort;					pBuf += sizeof(int);					*((unsigned long *)pBuf) = nGetIdentifier;					pBuf += sizeof(unsigned long);					ReceiveBlocking(sock, sock_event, (char*)&n, sizeof(int), 0);					ReceiveBlocking(sock, sock_event, pszDbsID, n, 0);					ReceiveBlocking(sock, sock_event, (char*)&n, sizeof(int), 0);					pszDbsKey = new char[n];					ReceiveBlocking(sock, sock_event, pszDbsKey, n, 0);					Command.hCmd.cCommand = MPD_CMD_GETRETURN;					Command.hCmd.nSrcIP = nLocalIP;					Command.hCmd.nSrcPort = nLocalPort;									GetReturnThreadArg *pArg = new GetReturnThreadArg;					pArg->command = Command;					strcpy(pArg->pszDbsID, pszDbsID);					pArg->pszDbsKey = pszDbsKey;					DWORD dwThreadID;					CloseHandle(CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)GetReturnThread, pArg, 0, &dwThreadID));				}				else				{					Command.nCommand = MPD_CMD_FORWARD;					ReceiveBlocking(sock, sock_event, Command.pCommandBuffer, Command.hCmd.nBufferLength, 0);					hCommand = InsertCommand(Command);					WaitForCommand(hCommand);				}				break;			case MPD_CMD_GETRETURN:				Command.nCommand = MPD_CMD_FORWARD;				Command.hCmd.nBufferLength = 2 * sizeof(unsigned long) + 2 * sizeof(int);				pBuf = Command.pCommandBuffer;				ReceiveBlocking(sock, sock_event, pBuf, sizeof(unsigned long), 0); // IP				nTempIP = *((unsigned long *)pBuf);				pBuf += sizeof(unsigned long);				ReceiveBlocking(sock, sock_event, pBuf, sizeof(int), 0); // Port				nTempPort = *((int *)pBuf);				pBuf += sizeof(int);				ReceiveBlocking(sock, sock_event, pBuf, sizeof(unsigned long), 0); // GetIdentifier				nGetIdentifier = *((unsigned long *)pBuf);				pBuf += sizeof(unsigned long);				ReceiveBlocking(sock, sock_event, (char*)&n, sizeof(int), 0); // length				*((int *)pBuf) = n;				pBuf += sizeof(int);				if (nTempIP == nLocalIP && nTempPort == nLocalPort)				{					ReceiveBlocking(sock, sock_event, ((CommandData *)nGetIdentifier)->pCommandBuffer, n, 0);					((CommandData *)nGetIdentifier)->hCmd.nBufferLength = n;					MarkCommandCompleted((CommandData *)nGetIdentifier);				}				else				{					ReceiveBlocking(sock, sock_event, pBuf, n, 0); // value					Command.hCmd.nBufferLength += n;									hCommand = InsertCommand(Command);					WaitForCommand(hCommand);				}				break;			case MPD_CMD_HOSTS:				pBuf = Command.pCommandBuffer;				ReceiveBlocking(sock, sock_event, pBuf, Command.hCmd.nBufferLength, 0);				pBuf += Command.hCmd.nBufferLength;				sprintf(pBuf, "%s:%d\n", host, nLocalPort);				Command.hCmd.nBufferLength += strlen(pBuf);				Command.nCommand = MPD_CMD_FORWARD;				hCommand = InsertCommand(Command);				WaitForCommand(hCommand);				break;			case MPD_CMD_CPUSAGE:				pBuf = Command.pCommandBuffer;				ReceiveBlocking(sock, sock_event, pBuf, Command.hCmd.nBufferLength, 0);				pBuf += Command.hCmd.nBufferLength;				sprintf(pBuf, "%s:%d %d ", host, nLocalPort, GetCPUsage());				Command.hCmd.nBufferLength += strlen(pBuf);				Command.nCommand = MPD_CMD_FORWARD;				hCommand = InsertCommand(Command);				WaitForCommand(hCommand);				break;			case MPD_CMD_PS:				pBuf = Command.pCommandBuffer;				ReceiveBlocking(sock, sock_event, pBuf, Command.hCmd.nBufferLength, 0);				pBuf += Command.hCmd.nBufferLength;				//sprintf(pBuf, "%s:%d\n", host, nLocalPort);				//PrintMPDProcessesToBuffer(pBuf+strlen(pBuf));				sprintf(pShortBuffer, "%s:%d", host, nLocalPort);				PrintMPDProcessesToBuffer(pBuf, pShortBuffer);				Command.hCmd.nBufferLength += strlen(pBuf);				Command.nCommand = MPD_CMD_FORWARD;				hCommand = InsertCommand(Command);				WaitForCommand(hCommand);				break;			case MPD_CMD_DESTROY_RING:				printf("DestroyRing command received ...");fflush(stdout);				Command.nCommand = MPD_CMD_FORWARD;				Command.hCmd.nBufferLength = 0;				hCommand = InsertCommand(Command);				WaitForCommand(hCommand);				KillRemainingMPDProcesses();				printf(" Exiting\n");				ExitProcess(0);				break;			case MPD_CMD_RUN_THE_RING:				Command.nCommand = MPD_CMD_FORWARD;				Command.hCmd.nBufferLength = 0;				hCommand = InsertCommand(Command);				WaitForCommand(hCommand);				break;			case MPD_CMD_PRINT_LISTS:				Command.nCommand = MPD_CMD_FORWARD;				Command.hCmd.nBufferLength = 0;				hCommand = InsertCommand(Command);				WaitForCommand(hCommand);				g_List.Print();				break;			case MPD_CMD_PRINT_DATABASE:				if (Command.hCmd.nBufferLength > 0)					ReceiveBlocking(sock, sock_event, Command.pCommandBuffer, Command.hCmd.nBufferLength, 0);				if (g_bDatabaseIsLocal)				{					Command.hCmd.nBufferLength = CMD_BUFF_SIZE;					g_Database.PrintStateToBuffer(Command.pCommandBuffer, &Command.hCmd.nBufferLength);					Command.hCmd.nBufferLength++;				}				Command.nCommand = MPD_CMD_FORWARD;				hCommand = InsertCommand(Command);				WaitForCommand(hCommand);				break;			case MPD_CMD_LAUNCH:				ReceiveBlocking(sock, sock_event, Command.pCommandBuffer, Command.hCmd.nBufferLength, 0);				pBuf = Command.pCommandBuffer;				pLaunchNode = *(LaunchNode **)pBuf;				pBuf = pBuf + sizeof(LaunchNode*);				nTempIP = *(unsigned long *)pBuf;				pBuf = pBuf + sizeof(unsigned long);				nTempPort = *(int *)pBuf;				pBuf = pBuf + sizeof(int);				if (nTempIP == nLocalIP && nTempPort == nLocalPort)				{					//printf("launch command received: '%s'\n", pBuf);fflush(stdout);					LaunchMPDProcessArg *pArg = new LaunchMPDProcessArg;					pArg->nIP = nLocalIP;					pArg->nPort = nLocalPort;					pArg->nSrcIP = Command.hCmd.nSrcIP;					pArg->nSrcPort = Command.hCmd.nSrcPort;					pArg->pszCommand = new char[strlen(pBuf)+1];					pArg->pNode = pLaunchNode;					strcpy(pArg->pszCommand, pBuf);					//printf("launching '%s'\n", pArg->pszCommand);fflush(stdout);					DWORD dwThreadID;					CloseHandle(CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)LaunchMPDProcess, pArg, 0, &dwThreadID));				}				else				{					//printf("forwarding launch command: '%s'\n", pBuf);fflush(stdout);					Command.nCommand = MPD_CMD_FORWARD;					hCommand = InsertCommand(Command);					WaitForCommand(hCommand);				}				break;			case MPD_CMD_LAUNCH_RET:				ReceiveBlocking(sock, sock_event, Command.pCommandBuffer, Command.hCmd.nBufferLength, 0);				pBuf = Command.pCommandBuffer;				nTempIP = *((unsigned long *)pBuf);				pBuf = pBuf + sizeof(unsigned long);				nTempPort = *((int *)pBuf);				if (nTempIP == nLocalIP && nTempPort == nLocalPort)				{					pBuf = pBuf + sizeof(int);					pLaunchNode = *((LaunchNode **)pBuf);					pBuf = pBuf + sizeof(LaunchNode*);					pLaunchNode->Set(*((DWORD*)pBuf));				}				else				{					Command.nCommand = MPD_CMD_FORWARD;					hCommand = InsertCommand(Command);					WaitForCommand(hCommand);				}				break;			case MPD_CMD_LAUNCH_EXITCODE:				ReceiveBlocking(sock, sock_event, Command.pCommandBuffer, Command.hCmd.nBufferLength, 0);				pBuf = Command.pCommandBuffer;				nTempIP = *((unsigned long *)pBuf);				pBuf = pBuf + sizeof(unsigned long);				nTempPort = *((int *)pBuf);				if (nTempIP == nLocalIP && nTempPort == nLocalPort)				{					DWORD dwExitCode;					int nGroup, nRank;					pBuf = pBuf + sizeof(int);					pLaunchNode = *((LaunchNode **)pBuf);					pBuf = pBuf + sizeof(LaunchNode*);					dwExitCode = *((DWORD *)pBuf);					pBuf = pBuf + sizeof(DWORD);					nGroup = *((int *)pBuf);					pBuf = pBuf + sizeof(int);					nRank = *((int *)pBuf);					pLaunchNode->SetExit(nGroup, nRank, dwExitCode);				}				else				{					Command.nCommand = MPD_CMD_FORWARD;					hCommand = InsertCommand(Command);					WaitForCommand(hCommand);				}				break;			case MPD_CMD_KILL:				ReceiveBlocking(sock, sock_event, Command.pCommandBuffer, Command.hCmd.nBufferLength, 0);				pBuf = Command.pCommandBuffer;				nTempIP = *(unsigned long *)pBuf;				pBuf = pBuf + sizeof(unsigned long);				nTempPort = *(int *)pBuf;				pBuf = pBuf + sizeof(int);				n = *(int *)pBuf;				if (nTempIP == nLocalIP && nTempPort == nLocalPort)					KillMPDProcess(n);				else				{					Command.nCommand = MPD_CMD_FORWARD;					hCommand = InsertCommand(Command);					WaitForCommand(hCommand);				}				break;			case MPD_CMD_KILL_GROUP:				ReceiveBlocking(sock, sock_event, Command.pCommandBuffer, Command.hCmd.nBufferLength, 0);				pBuf = Command.pCommandBuffer;				n = *(int *)pBuf;				Command.nCommand = MPD_CMD_FORWARD;				hCommand = InsertCommand(Command);				WaitForCommand(hCommand);				KillMPDProcesses(n);				break;			default:				printf("Unknown command: %d\n", (int)Command.hCmd.cCommand);fflush(stdout);				ReceiveBlocking(sock, sock_event, Command.pCommandBuffer, Command.hCmd.nBufferLength, 0);			}		}	}	NT_closesocket(sock, sock_event);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -