📄 leftthread.cpp
字号:
Command.hCmd.nBufferLength = sizeof(unsigned long) + sizeof(int); hCommand = InsertCommand(Command); WaitForCommand(hCommand); g_List.Disable(nTempIP, nTempPort); break; case MPD_CMD_DELETE_ID: if (g_bDatabaseIsLocal) { ReceiveBlocking(sock, sock_event, (char*)&n, sizeof(int), 0); ReceiveBlocking(sock, sock_event, pszDbsID, n, 0); g_Database.Delete(pszDbsID); } else { Command.nCommand = MPD_CMD_FORWARD; ReceiveBlocking(sock, sock_event, Command.pCommandBuffer, Command.hCmd.nBufferLength, 0); hCommand = InsertCommand(Command); WaitForCommand(hCommand); } break; case MPD_CMD_DELETE_KEY: if (g_bDatabaseIsLocal) { ReceiveBlocking(sock, sock_event, (char*)&n, sizeof(int), 0); ReceiveBlocking(sock, sock_event, pszDbsID, n, 0); ReceiveBlocking(sock, sock_event, (char*)&n, sizeof(int), 0); pszDbsKey = new char[n]; ReceiveBlocking(sock, sock_event, pszDbsKey, n, 0); g_Database.Delete(pszDbsID, pszDbsKey); delete pszDbsKey; } else { Command.nCommand = MPD_CMD_FORWARD; ReceiveBlocking(sock, sock_event, Command.pCommandBuffer, Command.hCmd.nBufferLength, 0); hCommand = InsertCommand(Command); WaitForCommand(hCommand); } break; case MPD_CMD_PUTC: bPutPersistent = false; case MPD_CMD_PUT: if (g_bDatabaseIsLocal) { ReceiveBlocking(sock, sock_event, (char*)&n, sizeof(int), 0); ReceiveBlocking(sock, sock_event, pszDbsID, n, 0); ReceiveBlocking(sock, sock_event, (char*)&n, sizeof(int), 0); pszDbsKey = new char[n]; ReceiveBlocking(sock, sock_event, pszDbsKey, n, 0); ReceiveBlocking(sock, sock_event, (char*)&n, sizeof(int), 0); pDbsValue = new char[n]; ReceiveBlocking(sock, sock_event, (char*)pDbsValue, n, 0); g_Database.Put(pszDbsID, pszDbsKey, pDbsValue, n, bPutPersistent); } else { Command.nCommand = MPD_CMD_FORWARD; ReceiveBlocking(sock, sock_event, Command.pCommandBuffer, Command.hCmd.nBufferLength, 0); hCommand = InsertCommand(Command); WaitForCommand(hCommand); } bPutPersistent = true; break; case MPD_CMD_GET: if (g_bDatabaseIsLocal) { Command.hCmd.nBufferLength = 2 * sizeof(unsigned long) + 2 * sizeof(int); ReceiveBlocking(sock, sock_event, (char*)&nTempIP, sizeof(unsigned long), 0); ReceiveBlocking(sock, sock_event, (char*)&nTempPort, sizeof(int), 0); ReceiveBlocking(sock, sock_event, (char*)&nGetIdentifier, sizeof(unsigned long), 0); pBuf = Command.pCommandBuffer; *((unsigned long *)pBuf) = nTempIP; pBuf += sizeof(unsigned long); *((int *)pBuf) = nTempPort; pBuf += sizeof(int); *((unsigned long *)pBuf) = nGetIdentifier; pBuf += sizeof(unsigned long); ReceiveBlocking(sock, sock_event, (char*)&n, sizeof(int), 0); ReceiveBlocking(sock, sock_event, pszDbsID, n, 0); ReceiveBlocking(sock, sock_event, (char*)&n, sizeof(int), 0); pszDbsKey = new char[n]; ReceiveBlocking(sock, sock_event, pszDbsKey, n, 0); Command.hCmd.cCommand = MPD_CMD_GETRETURN; Command.hCmd.nSrcIP = nLocalIP; Command.hCmd.nSrcPort = nLocalPort; GetReturnThreadArg *pArg = new GetReturnThreadArg; pArg->command = Command; strcpy(pArg->pszDbsID, pszDbsID); pArg->pszDbsKey = pszDbsKey; DWORD dwThreadID; CloseHandle(CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)GetReturnThread, pArg, 0, &dwThreadID)); } else { Command.nCommand = MPD_CMD_FORWARD; ReceiveBlocking(sock, sock_event, Command.pCommandBuffer, Command.hCmd.nBufferLength, 0); hCommand = InsertCommand(Command); WaitForCommand(hCommand); } break; case MPD_CMD_GETRETURN: Command.nCommand = MPD_CMD_FORWARD; Command.hCmd.nBufferLength = 2 * sizeof(unsigned long) + 2 * sizeof(int); pBuf = Command.pCommandBuffer; ReceiveBlocking(sock, sock_event, pBuf, sizeof(unsigned long), 0); // IP nTempIP = *((unsigned long *)pBuf); pBuf += sizeof(unsigned long); ReceiveBlocking(sock, sock_event, pBuf, sizeof(int), 0); // Port nTempPort = *((int *)pBuf); pBuf += sizeof(int); ReceiveBlocking(sock, sock_event, pBuf, sizeof(unsigned long), 0); // GetIdentifier nGetIdentifier = *((unsigned long *)pBuf); pBuf += sizeof(unsigned long); ReceiveBlocking(sock, sock_event, (char*)&n, sizeof(int), 0); // length *((int *)pBuf) = n; pBuf += sizeof(int); if (nTempIP == nLocalIP && nTempPort == nLocalPort) { ReceiveBlocking(sock, sock_event, ((CommandData *)nGetIdentifier)->pCommandBuffer, n, 0); ((CommandData *)nGetIdentifier)->hCmd.nBufferLength = n; MarkCommandCompleted((CommandData *)nGetIdentifier); } else { ReceiveBlocking(sock, sock_event, pBuf, n, 0); // value Command.hCmd.nBufferLength += n; hCommand = InsertCommand(Command); WaitForCommand(hCommand); } break; case MPD_CMD_HOSTS: pBuf = Command.pCommandBuffer; ReceiveBlocking(sock, sock_event, pBuf, Command.hCmd.nBufferLength, 0); pBuf += Command.hCmd.nBufferLength; sprintf(pBuf, "%s:%d\n", host, nLocalPort); Command.hCmd.nBufferLength += strlen(pBuf); Command.nCommand = MPD_CMD_FORWARD; hCommand = InsertCommand(Command); WaitForCommand(hCommand); break; case MPD_CMD_CPUSAGE: pBuf = Command.pCommandBuffer; ReceiveBlocking(sock, sock_event, pBuf, Command.hCmd.nBufferLength, 0); pBuf += Command.hCmd.nBufferLength; sprintf(pBuf, "%s:%d %d ", host, nLocalPort, GetCPUsage()); Command.hCmd.nBufferLength += strlen(pBuf); Command.nCommand = MPD_CMD_FORWARD; hCommand = InsertCommand(Command); WaitForCommand(hCommand); break; case MPD_CMD_PS: pBuf = Command.pCommandBuffer; ReceiveBlocking(sock, sock_event, pBuf, Command.hCmd.nBufferLength, 0); pBuf += Command.hCmd.nBufferLength; //sprintf(pBuf, "%s:%d\n", host, nLocalPort); //PrintMPDProcessesToBuffer(pBuf+strlen(pBuf)); sprintf(pShortBuffer, "%s:%d", host, nLocalPort); PrintMPDProcessesToBuffer(pBuf, pShortBuffer); Command.hCmd.nBufferLength += strlen(pBuf); Command.nCommand = MPD_CMD_FORWARD; hCommand = InsertCommand(Command); WaitForCommand(hCommand); break; case MPD_CMD_DESTROY_RING: printf("DestroyRing command received ...");fflush(stdout); Command.nCommand = MPD_CMD_FORWARD; Command.hCmd.nBufferLength = 0; hCommand = InsertCommand(Command); WaitForCommand(hCommand); KillRemainingMPDProcesses(); printf(" Exiting\n"); ExitProcess(0); break; case MPD_CMD_RUN_THE_RING: Command.nCommand = MPD_CMD_FORWARD; Command.hCmd.nBufferLength = 0; hCommand = InsertCommand(Command); WaitForCommand(hCommand); break; case MPD_CMD_PRINT_LISTS: Command.nCommand = MPD_CMD_FORWARD; Command.hCmd.nBufferLength = 0; hCommand = InsertCommand(Command); WaitForCommand(hCommand); g_List.Print(); break; case MPD_CMD_PRINT_DATABASE: if (Command.hCmd.nBufferLength > 0) ReceiveBlocking(sock, sock_event, Command.pCommandBuffer, Command.hCmd.nBufferLength, 0); if (g_bDatabaseIsLocal) { Command.hCmd.nBufferLength = CMD_BUFF_SIZE; g_Database.PrintStateToBuffer(Command.pCommandBuffer, &Command.hCmd.nBufferLength); Command.hCmd.nBufferLength++; } Command.nCommand = MPD_CMD_FORWARD; hCommand = InsertCommand(Command); WaitForCommand(hCommand); break; case MPD_CMD_LAUNCH: ReceiveBlocking(sock, sock_event, Command.pCommandBuffer, Command.hCmd.nBufferLength, 0); pBuf = Command.pCommandBuffer; pLaunchNode = *(LaunchNode **)pBuf; pBuf = pBuf + sizeof(LaunchNode*); nTempIP = *(unsigned long *)pBuf; pBuf = pBuf + sizeof(unsigned long); nTempPort = *(int *)pBuf; pBuf = pBuf + sizeof(int); if (nTempIP == nLocalIP && nTempPort == nLocalPort) { //printf("launch command received: '%s'\n", pBuf);fflush(stdout); LaunchMPDProcessArg *pArg = new LaunchMPDProcessArg; pArg->nIP = nLocalIP; pArg->nPort = nLocalPort; pArg->nSrcIP = Command.hCmd.nSrcIP; pArg->nSrcPort = Command.hCmd.nSrcPort; pArg->pszCommand = new char[strlen(pBuf)+1]; pArg->pNode = pLaunchNode; strcpy(pArg->pszCommand, pBuf); //printf("launching '%s'\n", pArg->pszCommand);fflush(stdout); DWORD dwThreadID; CloseHandle(CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)LaunchMPDProcess, pArg, 0, &dwThreadID)); } else { //printf("forwarding launch command: '%s'\n", pBuf);fflush(stdout); Command.nCommand = MPD_CMD_FORWARD; hCommand = InsertCommand(Command); WaitForCommand(hCommand); } break; case MPD_CMD_LAUNCH_RET: ReceiveBlocking(sock, sock_event, Command.pCommandBuffer, Command.hCmd.nBufferLength, 0); pBuf = Command.pCommandBuffer; nTempIP = *((unsigned long *)pBuf); pBuf = pBuf + sizeof(unsigned long); nTempPort = *((int *)pBuf); if (nTempIP == nLocalIP && nTempPort == nLocalPort) { pBuf = pBuf + sizeof(int); pLaunchNode = *((LaunchNode **)pBuf); pBuf = pBuf + sizeof(LaunchNode*); pLaunchNode->Set(*((DWORD*)pBuf)); } else { Command.nCommand = MPD_CMD_FORWARD; hCommand = InsertCommand(Command); WaitForCommand(hCommand); } break; case MPD_CMD_LAUNCH_EXITCODE: ReceiveBlocking(sock, sock_event, Command.pCommandBuffer, Command.hCmd.nBufferLength, 0); pBuf = Command.pCommandBuffer; nTempIP = *((unsigned long *)pBuf); pBuf = pBuf + sizeof(unsigned long); nTempPort = *((int *)pBuf); if (nTempIP == nLocalIP && nTempPort == nLocalPort) { DWORD dwExitCode; int nGroup, nRank; pBuf = pBuf + sizeof(int); pLaunchNode = *((LaunchNode **)pBuf); pBuf = pBuf + sizeof(LaunchNode*); dwExitCode = *((DWORD *)pBuf); pBuf = pBuf + sizeof(DWORD); nGroup = *((int *)pBuf); pBuf = pBuf + sizeof(int); nRank = *((int *)pBuf); pLaunchNode->SetExit(nGroup, nRank, dwExitCode); } else { Command.nCommand = MPD_CMD_FORWARD; hCommand = InsertCommand(Command); WaitForCommand(hCommand); } break; case MPD_CMD_KILL: ReceiveBlocking(sock, sock_event, Command.pCommandBuffer, Command.hCmd.nBufferLength, 0); pBuf = Command.pCommandBuffer; nTempIP = *(unsigned long *)pBuf; pBuf = pBuf + sizeof(unsigned long); nTempPort = *(int *)pBuf; pBuf = pBuf + sizeof(int); n = *(int *)pBuf; if (nTempIP == nLocalIP && nTempPort == nLocalPort) KillMPDProcess(n); else { Command.nCommand = MPD_CMD_FORWARD; hCommand = InsertCommand(Command); WaitForCommand(hCommand); } break; case MPD_CMD_KILL_GROUP: ReceiveBlocking(sock, sock_event, Command.pCommandBuffer, Command.hCmd.nBufferLength, 0); pBuf = Command.pCommandBuffer; n = *(int *)pBuf; Command.nCommand = MPD_CMD_FORWARD; hCommand = InsertCommand(Command); WaitForCommand(hCommand); KillMPDProcesses(n); break; default: printf("Unknown command: %d\n", (int)Command.hCmd.cCommand);fflush(stdout); ReceiveBlocking(sock, sock_event, Command.pCommandBuffer, Command.hCmd.nBufferLength, 0); } } } NT_closesocket(sock, sock_event);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -