📄 nt_ipvishm_comport.cpp
字号:
} else nt_error_socket("setsockopt failed in CommPortThread", error); } if ((temp_event = WSACreateEvent()) == WSA_INVALID_EVENT) nt_error_socket("WSACreateEvent failed after accepting socket", WSAGetLastError()); if (WSAEventSelect(temp_socket, temp_event, FD_READ | FD_CLOSE) == SOCKET_ERROR) nt_error_socket("WSAEventSelect failed after accepting socket", WSAGetLastError()); // Receive the rank of the remote process if (ret_val = ReceiveBlocking(temp_socket, temp_event, (char*)&remote_iproc, sizeof(int), 0)) nt_error_socket("ReceiveBlocking remote_iproc failed after accepting socket", ret_val); if (remote_iproc >= 0 && remote_iproc < g_nNproc) { if (WaitForSingleObject(g_hAddSocketMutex, 5000) == WAIT_TIMEOUT) MakeErrMsg(1, "Accept connection attempt failed, wait for AddSocketMutex timed out"); if (g_pProcTable[remote_iproc].hConnectLock == NULL) { g_pProcTable[remote_iproc].hConnectLock = CreateMutex(NULL, FALSE, NULL); } ReleaseMutex(g_hAddSocketMutex); if (WaitForSingleObject(g_pProcTable[remote_iproc].hConnectLock, 0) == WAIT_OBJECT_0) { if (g_pProcTable[remote_iproc].sock == INVALID_SOCKET) { add_socket_ack = 1; if (SendBlocking(temp_socket, &add_socket_ack, 1, 0) == SOCKET_ERROR) MakeErrMsg(WSAGetLastError(), "send add_socket_ack(1) failed for socket %d", remote_iproc); // Insert the information in g_pProcTable g_pProcTable[remote_iproc].sock_event = temp_event; g_pProcTable[remote_iproc].sock = temp_socket; // Associate the socket with the completion port if (CreateIoCompletionPort((HANDLE)temp_socket, g_hCommPort, remote_iproc, g_NumCommPortThreads) == NULL) nt_error_socket("Unable to associate completion port with socket", GetLastError()); // Post the first read from the socket g_pProcTable[remote_iproc].msg.ovl.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); if (g_pProcTable[remote_iproc].msg.ovl.hEvent == NULL) MakeErrMsg(GetLastError(), "CommPortThread:CreateEvent failed for %d event", remote_iproc); g_pProcTable[remote_iproc].msg.state = NT_MSG_READING_TAG; g_pProcTable[remote_iproc].msg.nRemaining = sizeof(int); g_pProcTable[remote_iproc].msg.ovl.Offset = 0; g_pProcTable[remote_iproc].msg.ovl.OffsetHigh = 0; g_pProcTable[remote_iproc].msg.ovl.Internal = 0; g_pProcTable[remote_iproc].msg.ovl.InternalHigh = 0; if (!ReadFile((HANDLE)temp_socket, &(g_pProcTable[remote_iproc].msg.tag), sizeof(int), &(g_pProcTable[remote_iproc].msg.nRead), &(g_pProcTable[remote_iproc].msg.ovl))) { int error = GetLastError(); if (error != ERROR_IO_PENDING) MakeErrMsg(error, "CommPortThread:First posted read from socket %d failed", remote_iproc); } DPRINTF(("process %d: socket accepted and inserted in location %d, no race condition\n", g_nIproc, remote_iproc)); } else { add_socket_ack = 0; if (SendBlocking(temp_socket, &add_socket_ack, 1, 0) == SOCKET_ERROR) MakeErrMsg(WSAGetLastError(), "send add_socket_ack(0) failed for socket %d", remote_iproc); NT_Tcp_closesocket(temp_socket, temp_event); DPRINTF(("process %d: socket closed, valid socket already in location %d", g_nIproc, remote_iproc)); } ReleaseMutex(g_pProcTable[remote_iproc].hConnectLock); } else { if (g_nIproc > remote_iproc) { add_socket_ack = 1; if (SendBlocking(temp_socket, &add_socket_ack, 1, 0) == SOCKET_ERROR) MakeErrMsg(WSAGetLastError(), "send add_socket_ack(1) failed for socket %d", remote_iproc); // Insert the information in g_pProcTable g_pProcTable[remote_iproc].sock_event = temp_event; g_pProcTable[remote_iproc].sock = temp_socket; // Associate the socket with the completion port if (CreateIoCompletionPort((HANDLE)temp_socket, g_hCommPort, remote_iproc, g_NumCommPortThreads) == NULL) nt_error_socket("Unable to associate completion port with socket", GetLastError()); // Post the first read from the socket g_pProcTable[remote_iproc].msg.ovl.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); if (g_pProcTable[remote_iproc].msg.ovl.hEvent == NULL) MakeErrMsg(GetLastError(), "CommPortThread:CreateEvent failed for %d event", remote_iproc); g_pProcTable[remote_iproc].msg.state = NT_MSG_READING_TAG; g_pProcTable[remote_iproc].msg.nRemaining = sizeof(int); g_pProcTable[remote_iproc].msg.ovl.Offset = 0; g_pProcTable[remote_iproc].msg.ovl.OffsetHigh = 0; g_pProcTable[remote_iproc].msg.ovl.Internal = 0; g_pProcTable[remote_iproc].msg.ovl.InternalHigh = 0; if (!ReadFile((HANDLE)temp_socket, &(g_pProcTable[remote_iproc].msg.tag), sizeof(int), &(g_pProcTable[remote_iproc].msg.nRead), &(g_pProcTable[remote_iproc].msg.ovl))) { int error = GetLastError(); if (error != ERROR_IO_PENDING) MakeErrMsg(error, "CommPortThread:First posted read from socket %d failed", remote_iproc); } DPRINTF(("process %d: %d > %d, socket accepted and inserted in location %d\n", g_nIproc, g_nIproc, remote_iproc, remote_iproc)); } else { add_socket_ack = 0; if (SendBlocking(temp_socket, &add_socket_ack, 1, 0) == SOCKET_ERROR) MakeErrMsg(1, "send add_socket_ack(0) failed for socket %d", remote_iproc); NT_Tcp_closesocket(temp_socket, temp_event); DPRINTF(("process %d: socket closed, %d > %d", g_nIproc, g_nIproc, remote_iproc)); } } } else { MakeErrMsg(1, "CommPortThread: Process out of range, remote_iproc: %d\n", remote_iproc); return; } } else { error = WSAGetLastError(); if (error != WSAEWOULDBLOCK) { nt_error_socket("CommPortThread: accept failed", error); return; } } } }}// Function name : ConnectTo// Description : // Return type : int // Argument : int remote_iprocint ConnectTo(int remote_iproc){ SOCKET temp_socket; WSAEVENT temp_event; char ack = 0; int ret_val; BOOL opt = TRUE; int optval; int i=0; HOSTENT *hostEnt; unsigned long nic_addr = INADDR_ANY; int error;#ifdef USE_LINGER_SOCKOPT struct linger linger;#endif if (remote_iproc < 0 || remote_iproc >= g_nNproc) { MakeErrMsg(1, "ConnectTo failed, invalid remote process rank: %d\n", remote_iproc); return 0; } // acquire the global lock if (WaitForSingleObject(g_hAddSocketMutex, 5000) == WAIT_TIMEOUT) MakeErrMsg(1, "ConnectTo %d failed, wait for AddSocketMutex timed out", remote_iproc); // if the socket already exists return true if (g_pProcTable[remote_iproc].sock != INVALID_SOCKET) { ReleaseMutex(g_hAddSocketMutex); return 1; } // else create an individual lock for this connection if (g_pProcTable[remote_iproc].hConnectLock == NULL) { g_pProcTable[remote_iproc].hConnectLock = CreateMutex(NULL, FALSE, NULL); } // now that the individual lock is guaranteed to exist, release the global lock ReleaseMutex(g_hAddSocketMutex); // wait for the individual lock if (WaitForSingleObject(g_pProcTable[remote_iproc].hConnectLock, 5000) == WAIT_TIMEOUT) MakeErrMsg(1, "ConnectTo %d failed, wait for hConnectLock timed out", remote_iproc); // check to see if the socket has already been established if (g_pProcTable[remote_iproc].sock != INVALID_SOCKET) { ReleaseMutex(g_pProcTable[remote_iproc].hConnectLock); return 1; } // get the info necessary to connect to the remote rank if (g_bUseBNR) { char pszKey[100], pszValue[100]; sprintf(pszKey, "ListenHost%d", remote_iproc); BNR_Get(g_myBNRgroup, pszKey, g_pProcTable[remote_iproc].host); sprintf(pszKey, "ListenPort%d", remote_iproc); BNR_Get(g_myBNRgroup, pszKey, pszValue); g_pProcTable[remote_iproc].listen_port = atoi(pszValue); } else if (g_bUseDatabase) { char pszKey[100], pszValue[100]; int length = NT_HOSTNAME_LEN; sprintf(pszKey, "ListenHost%d", remote_iproc); g_Database.Get(pszKey, g_pProcTable[remote_iproc].host, &length); sprintf(pszKey, "ListenPort%d", remote_iproc); length = 100; g_Database.Get(pszKey, pszValue, &length); g_pProcTable[remote_iproc].listen_port = atoi(pszValue); } else GetProcessConnectInfo(remote_iproc); hostEnt = gethostbyname(g_pProcTable[remote_iproc].host); if (hostEnt != NULL) nic_addr = *((unsigned long*)hostEnt->h_addr_list[0]); // Create a socket and connect to process 'remote_iproc' // create the event temp_event = WSACreateEvent(); if (temp_event == WSA_INVALID_EVENT) nt_error_socket("WSACreateEvent failed in ConnectTo", WSAGetLastError()); // create the socket temp_socket = WSASocket(PF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); if (temp_socket == INVALID_SOCKET) nt_error_socket("socket failed in ConnectTo", WSAGetLastError()); optval = 32*1024; setsockopt(temp_socket, SOL_SOCKET, SO_RCVBUF, (char*)&optval, sizeof(int)); optval = 32*1024; setsockopt(temp_socket, SOL_SOCKET, SO_SNDBUF, (char*)&optval, sizeof(int)); DPRINTF(("connecting to %s on %d\n", g_pProcTable[remote_iproc].host, g_pProcTable[remote_iproc].listen_port)); if (ret_val = NT_Tcp_connect(temp_socket, g_pProcTable[remote_iproc].host, g_pProcTable[remote_iproc].listen_port)) MakeErrMsg(ret_val, "NT_Tcp_connect failed in ConnectTo(%s:%d)", g_pProcTable[remote_iproc].host, g_pProcTable[remote_iproc].listen_port); if (setsockopt(temp_socket, IPPROTO_TCP, TCP_NODELAY, (char*)&opt, sizeof(BOOL)) == SOCKET_ERROR) { error = WSAGetLastError(); if (error == WSAENOBUFS) { Sleep(250); if (setsockopt(temp_socket, IPPROTO_TCP, TCP_NODELAY, (char*)&opt, sizeof(BOOL)) == SOCKET_ERROR) { error = WSAGetLastError(); if (error == WSAENOBUFS) { Sleep(250); if (setsockopt(temp_socket, IPPROTO_TCP, TCP_NODELAY, (char*)&opt, sizeof(BOOL)) == SOCKET_ERROR) { error = WSAGetLastError(); if (error != WSAENOBUFS) nt_error_socket("setsockopt failed in ConnectTo", error); } } else nt_error_socket("setsockopt failed in ConnectTo", error); } } else nt_error_socket("setsockopt failed in ConnectTo", error); }#ifdef USE_LINGER_SOCKOPT /* Set the linger on close option */ linger.l_onoff = 1 ; linger.l_linger = 60; setsockopt(temp_socket, SOL_SOCKET, SO_LINGER, (char*)&linger, sizeof(linger));#endif if (WSAEventSelect(temp_socket, temp_event, FD_READ | FD_CLOSE) == SOCKET_ERROR) nt_error_socket("WSAEventSelect failed in ConnectTo", WSAGetLastError()); // Send my rank so the remote side knows who is connecting if (SendBlocking(temp_socket, (char*)&g_nIproc, sizeof(int), 0) == SOCKET_ERROR) nt_error_socket("send g_nIproc failed in ConnectTo", WSAGetLastError()); // Receive an ack determining whether the connection was added to the list or not if (ret_val = ReceiveBlocking(temp_socket, temp_event, &ack, 1, 0)) MakeErrMsg(ret_val, "ConnectTo failed to receive ack for socket %d", remote_iproc); if (ack == 1) { // Insert the socket in the proc table g_pProcTable[remote_iproc].sock = temp_socket; g_pProcTable[remote_iproc].sock_event = temp_event; // Associate the socket with the completion port if (CreateIoCompletionPort((HANDLE)temp_socket, g_hCommPort, remote_iproc, g_NumCommPortThreads) == NULL) nt_error_socket("Unable to associate completion port with socket", GetLastError()); // Post the first read from the socket g_pProcTable[remote_iproc].msg.ovl.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); if (g_pProcTable[remote_iproc].msg.ovl.hEvent == NULL) MakeErrMsg(GetLastError(), "ConnectTo:CreateEvent failed for event[%d]", remote_iproc); g_pProcTable[remote_iproc].msg.state = NT_MSG_READING_TAG; g_pProcTable[remote_iproc].msg.nRemaining = sizeof(int); g_pProcTable[remote_iproc].msg.ovl.Offset = 0; g_pProcTable[remote_iproc].msg.ovl.OffsetHigh = 0; if (!ReadFile((HANDLE)temp_socket, &(g_pProcTable[remote_iproc].msg.tag), sizeof(int), &(g_pProcTable[remote_iproc].msg.nRead), &(g_pProcTable[remote_iproc].msg.ovl))) { int error = GetLastError(); if (error != ERROR_IO_PENDING) MakeErrMsg(error, "ConnectTo:First posted read from socket %d failed", remote_iproc); } DPRINTF(("process %d: established connection to %d\n", g_nIproc, remote_iproc)); } else { // The listener determined this side to be the loser in a race condition // So close the socket and wait for the socket created in another thread // to be inserted in the proc table. DPRINTF(("process %d: connection rejected for rank %d, waiting for connection to be established\n", g_nIproc, remote_iproc)); NT_Tcp_closesocket(temp_socket, temp_event); while (g_pProcTable[remote_iproc].sock == INVALID_SOCKET) Sleep(100); } ReleaseMutex(g_pProcTable[remote_iproc].hConnectLock); return 1;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -