📄 nt_vi.cpp
字号:
{ VipDeregisterMem(g_hViNic, vinfo->pReceiveDescriptorBuffer, vinfo->mhReceive); delete vinfo->pReceiveDescriptorBuffer; } if (vinfo->pSendDescriptorBuffer != NULL) { VipDeregisterMem(g_hViNic, vinfo->pSendDescriptorBuffer, vinfo->mhSend); delete vinfo->pSendDescriptorBuffer; } if (vinfo->pSendDesc) delete vinfo->pSendDesc; } vinfo->pSendDesc = NULL; vinfo->hVi = NULL; vinfo->hNic = NULL; vinfo->pReceiveDescriptorBuffer = NULL; vinfo->pSendDescriptorBuffer = NULL; } return 0;}// Function name : ConnectViTo// Description : // Return type : int (TRUE=1 or FALSE=0)// Argument : int nRemoteRankint ConnectViTo(int nRemoteRank){ VIP_RETURN dwStatus; VIP_VI_ATTRIBUTES Vi_RemoteAttribs; VIP_DESCRIPTOR *pRecvDesc, **pSendDesc, *pDesc; VIP_MEM_HANDLE mhSend, mhReceive; void *pSendDescriptorBuffer, *pReceiveDescriptorBuffer;//, *pDataBuffer; char localbuf[40], remotebuf[40]; VIP_NET_ADDRESS *pLocalAddress; VIP_NET_ADDRESS *pRemoteAddress; int nNumRecvDescriptors=32, nNumSendDescriptors=30; void *vp; int nSendsPerAck, nReceivesPerAck; unsigned char ViDescriminator[16]; int nViDescriminator_len; //printf("ConnectViTo: Connecting to %d\n", nRemoteRank);fflush(stdout); // Create a VI //dwStatus = VipCreateVi(g_hViNic, &default_vi_attribs, g_hViCQ, g_hViCQ, &g_hConnectToVi); // Create a VI with only the receive queue associated with the completion queue dwStatus = VipCreateVi(g_hViNic, &default_vi_attribs, NULL, g_hViCQ, &g_hConnectToVi); if (!AssertSuccess(dwStatus, "can't create VI")) return 0; // Check and insert it in g_pProcTable //LPVOID pNull = NULL;#ifdef USE_VC6_HEADERS if (InterlockedCompareExchange((void**)&g_pProcTable[nRemoteRank].vinfo.hVi, (void*)g_hConnectToVi, (void*)NULL) != NULL)#else if (InterlockedCompareExchange((LONG volatile *)&g_pProcTable[nRemoteRank].vinfo.hVi, (LONG)g_hConnectToVi, (LONG)NULL) != NULL)#endif { //printf("ConnectViTo: waiting for vinfo to be validated\n");fflush(stdout); // Connection has already been made by another thread. Destroy this one and wait for the other one to be valid VipDestroyVi(g_hConnectToVi); while (g_pProcTable[nRemoteRank].vinfo.valid == 0) Sleep(200); return 1; } // Each node will establish VI connections using the 'JobID + rank' as the descriminator sprintf((char*)ViDescriminator, "%s%d", g_pszJobID, nRemoteRank); nViDescriminator_len = strlen((char*)ViDescriminator); pLocalAddress = (VIP_NET_ADDRESS*)localbuf; pRemoteAddress = (VIP_NET_ADDRESS*)remotebuf; pLocalAddress->HostAddressLen = ADDR_LEN; vp = pLocalAddress->HostAddress + ADDR_LEN; pLocalAddress->DiscriminatorLen = nViDescriminator_len; memcpy(vp, ViDescriminator, nViDescriminator_len); // Save the descriminator for automatic reconnect //descriminator = (unsigned char*)vp; //descriminator_len = descriminator_length; if (nNumSendDescriptors & 0x1) nNumSendDescriptors++; // must be even pSendDesc = new VIP_DESCRIPTOR*[nNumSendDescriptors]; if (g_bUseBNR) { char pszKey[100]; sprintf(pszKey, "ListenHost%d", nRemoteRank); BNR_Get(g_myBNRgroup, pszKey, g_pProcTable[nRemoteRank].host); } else if (g_bUseDatabase) { char pszKey[100]; int length = NT_HOSTNAME_LEN; sprintf(pszKey, "ListenHost%d", nRemoteRank); g_Database.Get(pszKey, g_pProcTable[nRemoteRank].host, &length); } else GetProcessConnectInfo(nRemoteRank); // Get the remote host information VipNSInit(g_hViNic, 0); dwStatus = VipNSGetHostByName(g_hViNic, g_pProcTable[nRemoteRank].host, pRemoteAddress, 0); if (!AssertSuccess(dwStatus, "can't find remote address")) return 0; // Append the discriminator vp = pRemoteAddress->HostAddress + ADDR_LEN; pRemoteAddress->DiscriminatorLen = nViDescriminator_len; memcpy(vp, ViDescriminator, nViDescriminator_len); // Reserve memory for descriptors, for sending and receiving data // I should allow for smaller sizes than g_viMTU pRecvDesc = get_descriptors(g_hViNic, nNumRecvDescriptors, g_viMTU, &mhReceive, &pReceiveDescriptorBuffer); pSendDesc[0] = get_descriptors(g_hViNic, nNumSendDescriptors, g_viMTU, &mhSend, &pSendDescriptorBuffer); pDesc = (VIP_DESCRIPTOR*)(pSendDesc[0]->Control.Next.Address); for (int i=1; i<nNumSendDescriptors; i++) { pSendDesc[i] = pDesc; pDesc = (VIP_DESCRIPTOR*)(pDesc->Control.Next.Address); } // Post the receive immediately VIP_DESCRIPTOR *pTemp2, *pTemp = pRecvDesc; while (pTemp) { pTemp2 = pTemp; // Advance to the next descriptor before calling PostRecv because PostRecv modifies the Address field pTemp = (VIP_DESCRIPTOR*)(pTemp->Control.Next.Address); dwStatus = VipPostRecv(g_hConnectToVi, pTemp2, mhReceive); if (!AssertSuccess(dwStatus, "can't post receive", pTemp2)) return 0; } // Request a connection dwStatus = VipConnectRequest(g_hConnectToVi, pLocalAddress, pRemoteAddress, VITIMEOUT, &Vi_RemoteAttribs); if (!AssertSuccess(dwStatus, "connect request failed")) { MakeErrMsg(1, "VI Connection request to process %d failed\n", nRemoteRank); return 0; } // Set the user data for this connection to be the rank of the remote process VipSetUserData(g_hConnectToVi, (VIP_PVOID)nRemoteRank); // Send my rank and nSendsPerAck nSendsPerAck = nNumSendDescriptors / 2; pSendDesc[0]->Control.Control = VIP_CONTROL_OP_SENDRECV | VIP_CONTROL_IMMEDIATE; pSendDesc[0]->Control.Length = 2 * sizeof(int); pSendDesc[0]->Control.SegCount = 1; pSendDesc[0]->Control.Reserved = 0; pSendDesc[0]->Control.ImmediateData = 0; pSendDesc[0]->Data[0].Length = 2 * sizeof(int); pSendDesc[0]->Data[0].Handle = mhSend; ((int*)pSendDesc[0]->Data[0].Data.Address)[0] = g_nIproc; ((int*)pSendDesc[0]->Data[0].Data.Address)[1] = nSendsPerAck; dwStatus = VipPostSend(g_hConnectToVi, pSendDesc[0], mhSend); if (!AssertSuccess(dwStatus, "ConnectViTo:VipPostSend failed", pSendDesc[0])) return 0; dwStatus = VipSendWait(g_hConnectToVi, VITIMEOUT, &pDesc); if (!AssertSuccess(dwStatus, "ConnectViTo:VipSendWait failed", pDesc)) return 0; // Receive ack if (g_bViSingleThreaded) { while (g_nConnectGate == 0) ViWorkerThread(0); } else { // Wait for the worker thread to signal that the packet is ready to be taken out of the queue // by setting the gate to 1 while (g_nConnectGate == 0) Sleep(0); } // Remove the packet while ( (dwStatus = VipRecvDone(g_hConnectToVi, &pRecvDesc)) == VIP_NOT_DONE) Sleep(1); if (!AssertSuccess(dwStatus, "ConnectViTo:VipRecvDone failed", pRecvDesc)) { MakeErrMsg(1, "Unable to receive connect packet from process %d\n", nRemoteRank); return 0; } if (pRecvDesc->Control.ImmediateData == 0) { // Close the VI due to loss in race condition VIP_DESCRIPTOR *d; VipDisconnect(g_hConnectToVi); do { VipRecvDone(g_hConnectToVi, &d); } while (d != 0); VipDestroyVi(g_hConnectToVi); if (pReceiveDescriptorBuffer != NULL) { VipDeregisterMem(g_hViNic, pReceiveDescriptorBuffer, mhReceive); delete pReceiveDescriptorBuffer; } if (pSendDescriptorBuffer != NULL) { VipDeregisterMem(g_hViNic, pSendDescriptorBuffer, mhSend); delete pSendDescriptorBuffer; } if (pSendDesc) delete pSendDesc; } else { nReceivesPerAck = ((int*)pRecvDesc->Data[0].Data.Address)[0]; // Re-post the receive descriptor immediately pRecvDesc->Control.Control = VIP_CONTROL_OP_SENDRECV; pRecvDesc->Control.Length = g_viMTU; pRecvDesc->Control.SegCount = 1; pRecvDesc->Control.Reserved = 0; pRecvDesc->Data[0].Length = g_viMTU; pRecvDesc->Data[0].Handle = mhReceive; dwStatus = VipPostRecv(g_hConnectToVi, pRecvDesc, mhReceive); if (!AssertSuccess(dwStatus, "ConnectViTo:VipPostRecv failed", pRecvDesc)) return 0; // Store VI connection information in the proctable VI_Info *vinfo = &g_pProcTable[nRemoteRank].vinfo; memcpy(&vinfo->descriminator, ViDescriminator, nViDescriminator_len); vinfo->descriminator_len = nViDescriminator_len; vinfo->hNic = g_hViNic; vinfo->hVi = g_hConnectToVi; //vinfo->localbuf; vinfo->mhReceive = mhReceive; vinfo->mhSend = mhSend; vinfo->nCurSendIndex = 0; vinfo->nNumReceived = 0; vinfo->nNumRecvDescriptors = nNumRecvDescriptors; vinfo->nNumSendDescriptors = nNumSendDescriptors; vinfo->nNumSent = 0; vinfo->nPostedSends = 0; vinfo->nReceivesPerAck = nReceivesPerAck; vinfo->nSendsPerAck = nSendsPerAck; vinfo->nSendAcked = 0; vinfo->nSequenceNumberReceive = 0; vinfo->nSequenceNumberSend = 0; vinfo->pDesc = pDesc; //vinfo->pLocalAddress = (VIP_NET_ADDRESS*)vinfo->localbuf; vinfo->pRecvDesc = pRecvDesc; vinfo->pRemoteAddress = (VIP_NET_ADDRESS*)vinfo->remotebuf; vinfo->pSendDesc = pSendDesc; vinfo->pSendDescriptorBuffer = pSendDescriptorBuffer; vinfo->pReceiveDescriptorBuffer = pReceiveDescriptorBuffer; //vinfo->remotebuf; //vinfo->Vi_LocalAttribs = Vi_LocalAttribs; vinfo->Vi_RemoteAttribs = Vi_RemoteAttribs; initlock(&vinfo->lock); // Setting the data to valid must be last and the compiler or chip must execute this instruction last too. vinfo->valid = 1; // Increase the completion queue size every time a new connection is made g_nNumCQEntries += CQ_ENTRIES_INCREMENT; dwStatus = VipResizeCQ(g_hViCQ, g_nNumCQEntries); } // Reset g_hConnectToVi before setting the worker gate so the worker thread will not accidentally match it again g_hConnectToVi = NULL; g_nConnectGate = 0; if (!g_bViSingleThreaded) g_nWorkerGate = 1; return 1;}// Function name : ViListenThread// Description : // Return type : void void ViListenThread(){ VIP_RETURN dwStatus; VIP_VI_ATTRIBUTES Vi_RemoteAttribs; VIP_DESCRIPTOR *pRecvDesc, **pSendDesc, *pDesc; VIP_MEM_HANDLE mhSend, mhReceive; void *pSendDescriptorBuffer, *pReceiveDescriptorBuffer; char localbuf[40], remotebuf[40]; VIP_NET_ADDRESS *pLocalAddress; VIP_NET_ADDRESS *pRemoteAddress; int nNumRecvDescriptors=32, nNumSendDescriptors=30; void *vp; int nRemoteRank; int nSendsPerAck, nReceivesPerAck; while (true) { ///////////////////// // Setup a VI pLocalAddress = (VIP_NET_ADDRESS*)localbuf; pRemoteAddress = (VIP_NET_ADDRESS*)remotebuf; pLocalAddress->HostAddressLen = ADDR_LEN; vp = pLocalAddress->HostAddress + ADDR_LEN; pLocalAddress->DiscriminatorLen = g_nViDescriminator_len; memcpy(vp, g_ViDescriminator, g_nViDescriminator_len); // Save the descriminator for automatic reconnect //descriminator = (unsigned char*)vp; //descriminator_len = descriminator_length; if (nNumSendDescriptors & 0x1) nNumSendDescriptors++; // must be even pSendDesc = new VIP_DESCRIPTOR*[nNumSendDescriptors]; //dwStatus = VipCreateVi(g_hViNic, &default_vi_attribs, g_hViCQ, g_hViCQ, &g_hListenThreadVi); dwStatus = VipCreateVi(g_hViNic, &default_vi_attribs, NULL, g_hViCQ, &g_hListenThreadVi); if (!AssertSuccess(dwStatus, "can't create VI")) { nt_error("Error", 1); return; } // Reserve memory for descriptors, for sending and receiving data pRecvDesc = get_descriptors(g_hViNic, nNumRecvDescriptors, g_viMTU, &mhReceive, &pReceiveDescriptorBuffer); pSendDesc[0] = get_descriptors(g_hViNic, nNumSendDescriptors, g_viMTU, &mhSend, &pSendDescriptorBuffer); pDesc = (VIP_DESCRIPTOR*)(pSendDesc[0]->Control.Next.Address); for (int i=1; i<nNumSendDescriptors; i++) { pSendDesc[i] = pDesc; pDesc = (VIP_DESCRIPTOR*)(pDesc->Control.Next.Address); } // Post the receives immediately VIP_DESCRIPTOR *pTemp2, *pTemp = pRecvDesc; while (pTemp) { pTemp2 = pTemp; // Advance to the next descriptor before calling PostRecv because PostRecv modifies the Address field pTemp = (VIP_DESCRIPTOR*)(pTemp->Control.Next.Address); dwStatus = VipPostRecv(g_hListenThreadVi, pTemp2, mhReceive); if (!AssertSuccess(dwStatus, "ViListenThread:can't post receive", pTemp2)) { nt_error("Error", 1); return; } } //////////////////////////// // Wait for a connection VIP_CONN_HANDLE conn; dwStatus = VipConnectWait(g_hViNic, pLocalAddress, VIP_INFINITE, pRemoteAddress, &Vi_RemoteAttribs, &conn); if (!AssertSuccess(dwStatus, "ViListenThread:failed waiting for connection")) { if (g_bViClosing) { // Clean up local VI structures // ... return; } nt_error("Error", 1); return; } dwStatus = VipConnectAccept(conn, g_hListenThreadVi); if (!AssertSuccess(dwStatus, "can't accept connection")) { nt_error("Error", 1); return; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -