📄 nt_ipvishm_comport.cpp
字号:
#include "nt_global_cpp.h"#include "bnrfunctions.h"#include <stdio.h>#include <stdlib.h>int g_NumCommPortThreads = 2;#define EXIT_WORKER_KEY -1HANDLE g_hCommPortThread, g_hCommPort;HANDLE g_hCommPortEvent = CreateEvent(NULL, TRUE, FALSE, NULL);HANDLE g_hAddSocketMutex = CreateMutex(NULL, FALSE, NULL);int g_nCommPortCommand;// Function name : CommPortWorkerThread// Description : // Return type : void void CommPortWorkerThread(){ DWORD dwKey, nBytes; OVERLAPPED *p_Ovl; int error; while (true) { if (GetQueuedCompletionStatus(g_hCommPort, &nBytes, &dwKey, &p_Ovl, INFINITE)) { DPRINTF(("COMMPORT::%d bytes on socket %d\n", nBytes, dwKey)); if (dwKey == EXIT_WORKER_KEY) ExitThread(0); if (nBytes) { //printf("COMMPORT::%d bytes on socket %d\n", nBytes, dwKey);fflush(stdout); g_pProcTable[dwKey].msg.nRemaining -= nBytes; switch(g_pProcTable[dwKey].msg.state) { case NT_MSG_READING_TAG: if (g_pProcTable[dwKey].msg.nRemaining) { g_pProcTable[dwKey].msg.ovl.Offset = 0; g_pProcTable[dwKey].msg.ovl.OffsetHigh = 0; if (!ReadFile((HANDLE)g_pProcTable[dwKey].sock, (char*)&g_pProcTable[dwKey].msg.tag + sizeof(int) - g_pProcTable[dwKey].msg.nRemaining, g_pProcTable[dwKey].msg.nRemaining, &g_pProcTable[dwKey].msg.nRead, &g_pProcTable[dwKey].msg.ovl)) { error = GetLastError(); if (error != ERROR_IO_PENDING) MakeErrMsg(error, "CommPortWorkerThread:Post read(tag) from socket %d failed", dwKey); } } else { g_pProcTable[dwKey].msg.state = NT_MSG_READING_LENGTH; g_pProcTable[dwKey].msg.nRemaining = sizeof(int); g_pProcTable[dwKey].msg.ovl.Offset = 0; g_pProcTable[dwKey].msg.ovl.OffsetHigh = 0; if (!ReadFile((HANDLE)g_pProcTable[dwKey].sock, (char*)&g_pProcTable[dwKey].msg.length, sizeof(int), &g_pProcTable[dwKey].msg.nRead, &g_pProcTable[dwKey].msg.ovl)) { error = GetLastError(); if (error != ERROR_IO_PENDING) MakeErrMsg(error, "CommPortWorkerThread:Post read(length) from socket %d failed", dwKey); } } break; case NT_MSG_READING_LENGTH: if (g_pProcTable[dwKey].msg.nRemaining) { g_pProcTable[dwKey].msg.ovl.Offset = 0; g_pProcTable[dwKey].msg.ovl.OffsetHigh = 0; if (!ReadFile((HANDLE)g_pProcTable[dwKey].sock, (char*)&g_pProcTable[dwKey].msg.length + sizeof(int) - g_pProcTable[dwKey].msg.nRemaining, g_pProcTable[dwKey].msg.nRemaining, &g_pProcTable[dwKey].msg.nRead, &g_pProcTable[dwKey].msg.ovl)) { error = GetLastError(); if (error != ERROR_IO_PENDING) MakeErrMsg(error, "CommPortWorkerThread:Post read(length) from socket %d failed", dwKey); } } else { g_pProcTable[dwKey].msg.buffer = g_MsgQueue.GetBufferToFill(g_pProcTable[dwKey].msg.tag, g_pProcTable[dwKey].msg.length, dwKey, &g_pProcTable[dwKey].msg.pElement); g_pProcTable[dwKey].msg.nRemaining = g_pProcTable[dwKey].msg.length; g_pProcTable[dwKey].msg.state = NT_MSG_READING_BUFFER; g_pProcTable[dwKey].msg.ovl.Offset = 0; g_pProcTable[dwKey].msg.ovl.OffsetHigh = 0; if (!ReadFile((HANDLE)g_pProcTable[dwKey].sock, g_pProcTable[dwKey].msg.buffer, g_pProcTable[dwKey].msg.length, &g_pProcTable[dwKey].msg.nRead, &g_pProcTable[dwKey].msg.ovl)) { error = GetLastError(); if (error == ERROR_NO_SYSTEM_RESOURCES) { int n = g_pProcTable[dwKey].msg.length / 2; while (error == ERROR_NO_SYSTEM_RESOURCES) { if (!ReadFile((HANDLE)g_pProcTable[dwKey].sock, g_pProcTable[dwKey].msg.buffer, n, &g_pProcTable[dwKey].msg.nRead, &g_pProcTable[dwKey].msg.ovl)) { error = GetLastError(); } else error = ERROR_SUCCESS; n = n/2; if (n == 0) MakeErrMsg(1, "Not enough system resources available to post a read from socket %d\n", dwKey); } if (error != ERROR_SUCCESS && error != ERROR_IO_PENDING) MakeErrMsg(error, "CommPortWorkerThread:Post read(buffer[%d]) from socket %d failed", n*2, dwKey); } else if (error != ERROR_IO_PENDING) MakeErrMsg(error, "CommPortWorkerThread:Post read(buffer[%d]) from socket %d failed", g_pProcTable[dwKey].msg.length, dwKey); } } break; case NT_MSG_READING_BUFFER: if (g_pProcTable[dwKey].msg.nRemaining) { g_pProcTable[dwKey].msg.ovl.Offset = 0; g_pProcTable[dwKey].msg.ovl.OffsetHigh = 0; if (!ReadFile((HANDLE)g_pProcTable[dwKey].sock, &(((char*)g_pProcTable[dwKey].msg.buffer)[g_pProcTable[dwKey].msg.length - g_pProcTable[dwKey].msg.nRemaining]), g_pProcTable[dwKey].msg.nRemaining, &g_pProcTable[dwKey].msg.nRead, &g_pProcTable[dwKey].msg.ovl)) { error = GetLastError(); if (error == ERROR_NO_SYSTEM_RESOURCES) { int n = g_pProcTable[dwKey].msg.nRemaining / 2; while (error == ERROR_NO_SYSTEM_RESOURCES) { if (!ReadFile((HANDLE)g_pProcTable[dwKey].sock, &(((char*)g_pProcTable[dwKey].msg.buffer)[g_pProcTable[dwKey].msg.length - g_pProcTable[dwKey].msg.nRemaining]), n, &g_pProcTable[dwKey].msg.nRead, &g_pProcTable[dwKey].msg.ovl)) { error = GetLastError(); } else error = ERROR_SUCCESS; n = n/2; if (n == 0) MakeErrMsg(1, "Not enough system resources available to post a read from socket %d\n", dwKey); } if (error != ERROR_SUCCESS && error != ERROR_IO_PENDING) MakeErrMsg(error, "CommPortWorkerThread:Post read(buffer[%d]) from socket %d failed", n*2, dwKey); } else if (error != ERROR_IO_PENDING) MakeErrMsg(error, "CommPortWorkerThread:Post read(buffer[%d]) from socket %d failed", g_pProcTable[dwKey].msg.length, dwKey); } } else { g_MsgQueue.SetElementEvent(g_pProcTable[dwKey].msg.pElement); g_pProcTable[dwKey].msg.state = NT_MSG_READING_TAG; g_pProcTable[dwKey].msg.nRemaining = sizeof(int); g_pProcTable[dwKey].msg.ovl.Offset = 0; g_pProcTable[dwKey].msg.ovl.OffsetHigh = 0; if (!ReadFile((HANDLE)g_pProcTable[dwKey].sock, (char*)&g_pProcTable[dwKey].msg.tag, sizeof(int), &g_pProcTable[dwKey].msg.nRead, &g_pProcTable[dwKey].msg.ovl)) { error = GetLastError(); if (error != ERROR_IO_PENDING) MakeErrMsg(error, "CommPortWorkerThread:Post read(tag) from socket %d failed", dwKey); } } break; default: break; } } else { NT_Tcp_closesocket(g_pProcTable[dwKey].sock, g_pProcTable[dwKey].sock_event); g_pProcTable[dwKey].sock = INVALID_SOCKET; g_pProcTable[dwKey].sock_event = NULL; } } else { if (!g_bInNT_ipvishm_End) { if (dwKey >= 0 && dwKey < (DWORD)g_nNproc && dwKey != (DWORD)g_nIproc && strlen(g_pProcTable[dwKey].host)) { error = GetLastError(); MakeErrMsg(error, "GetQueuedCompletionStatus failed for socket %d connected to host '%s'", dwKey, g_pProcTable[dwKey].host); } else { nt_error_socket("GetQueuedCompletionStatus failed", GetLastError()); } } } }}// Function name : CommPortThread// Description : // Return type : void // Argument : HANDLE hReadyEventvoid CommPortThread(HANDLE hReadyEvent){ SOCKET listen_socket; HANDLE ahEvent[2]; // array of events to wait on int error = 0, num_handles=2; SOCKET temp_socket; WSAEVENT temp_event; DWORD ret_val; int remote_iproc; int i; BOOL opt; char add_socket_ack; DWORD dwThreadID; HANDLE *hWorkers; ahEvent[0] = g_hCommPortEvent; // create a listening socket if (error = NT_Tcp_create_bind_socket(&listen_socket, &ahEvent[1])) nt_error_socket("CommPortThread: NT_Tcp_create_bind_socket failed", error); // associate listen_socket_event with listen_socket if (WSAEventSelect(listen_socket, ahEvent[1], FD_ACCEPT) == SOCKET_ERROR) nt_error("CommPortThread: WSAEventSelect failed for listen_socket", 1); if (listen(listen_socket, SOMAXCONN) == SOCKET_ERROR) nt_error_socket("CommPortThread: listen failed", WSAGetLastError()); // get the port and local hostname for the listening socket if (error = NT_Tcp_get_sock_info(listen_socket, g_pProcTable[g_nIproc].host, &g_pProcTable[g_nIproc].listen_port)) nt_error_socket("CommPortThread: Unable to get host and port of listening socket", error); // Create the completion port g_hCommPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, g_NumCommPortThreads); if (g_hCommPort == NULL) nt_error_socket("CommPortThread: CreateIoCompletionPort failed", GetLastError()); hWorkers = new HANDLE[g_NumCommPortThreads]; // Start the completion port threads for (i=0; i<g_NumCommPortThreads; i++) { //HANDLE hWorkerThread; hWorkers[i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)CommPortWorkerThread, NULL, NT_THREAD_STACK_SIZE, &dwThreadID); if (hWorkers[i] == NULL) nt_error_socket("CommPortThread: CreateThread(CommPortWorkerThread) failed", GetLastError()); //CloseHandle(hWorkerThread); } // Signal that the port number is valid if (!SetEvent(hReadyEvent)) nt_error_socket("CommPortThread: SetEvent(hReadyEvent) failed", GetLastError()); while (true) { ret_val = WaitForMultipleObjects(num_handles, ahEvent, FALSE, INFINITE); if (ret_val != WAIT_OBJECT_0 && ret_val != WAIT_OBJECT_0+1) { nt_error_socket("CommPortThread: Wait failed", GetLastError()); return; } // Event[0] is the event used by other threads in this process to communicate with this thread if (WaitForSingleObject(ahEvent[0], 0) == WAIT_OBJECT_0) { switch (g_nCommPortCommand) { case NT_COMM_CMD_EXIT: DPRINTF(("process %d: Exit command.\n", g_nIproc)); for (i=0; i<g_NumCommPortThreads; i++) PostQueuedCompletionStatus(g_hCommPort, 0, EXIT_WORKER_KEY, NULL); WaitForMultipleObjects(g_NumCommPortThreads, hWorkers, TRUE, 5000); for (i=0; i<g_NumCommPortThreads; i++) CloseHandle(hWorkers[i]); delete hWorkers; CloseHandle(g_hAddSocketMutex); CloseHandle(g_hCommPortEvent); CloseHandle(g_hCommPort); closesocket(listen_socket); WSACloseEvent(ahEvent[1]); ExitThread(0); break; default: nt_error("Invalid command sent to CommPortThread", g_nCommPortCommand); break; } } // Event[1] is the listen socket event, which is signalled when other processes whish to establish a socket connection with this process if (WaitForSingleObject(ahEvent[1], 0) == WAIT_OBJECT_0) { ///DPRINTF(("process %d: listen_socket signalled.\n", g_nIproc)); i=0; opt = TRUE; // Something in my code is causing the listen_socket event to fail to be reset by the accept call. // For now I manually reset it here. WSAResetEvent(ahEvent[1]); temp_socket = accept(listen_socket, NULL, NULL); if (temp_socket != INVALID_SOCKET) { // Create an event and associate it with the newly accepted socket //if (setsockopt(temp_socket, IPPROTO_TCP, TCP_NODELAY, (char*)&opt, sizeof(BOOL)) == SOCKET_ERROR) //nt_error_socket("setsockopt failed", WSAGetLastError()); if (setsockopt(temp_socket, IPPROTO_TCP, TCP_NODELAY, (char*)&opt, sizeof(BOOL)) == SOCKET_ERROR) { error = WSAGetLastError(); if (error == WSAENOBUFS) { Sleep(250); if (setsockopt(temp_socket, IPPROTO_TCP, TCP_NODELAY, (char*)&opt, sizeof(BOOL)) == SOCKET_ERROR) { error = WSAGetLastError(); if (error == WSAENOBUFS) { Sleep(250); if (setsockopt(temp_socket, IPPROTO_TCP, TCP_NODELAY, (char*)&opt, sizeof(BOOL)) == SOCKET_ERROR) { error = WSAGetLastError(); if (error != WSAENOBUFS) nt_error_socket("setsockopt failed in CommPortThread", error); } } else nt_error_socket("setsockopt failed in CommPortThread", error); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -