📄 nt_vi.cpp
字号:
// Receive nRemoteRank and nReceivesPerAck if (g_bViSingleThreaded) { while (g_nListenGate == 0) ViWorkerThread(0); } else { while (g_nListenGate == 0) Sleep(0); } while ( (dwStatus = VipRecvDone(g_hListenThreadVi, &pRecvDesc)) == VIP_NOT_DONE) Sleep(1); if (!AssertSuccess(dwStatus, "ViListenThread:VipRecvDone failed", pRecvDesc)) { nt_error("Error", 1); return; } nRemoteRank = ((int*)pRecvDesc->Data[0].Data.Address)[0]; nReceivesPerAck = ((int*)pRecvDesc->Data[0].Data.Address)[1]; if (nRemoteRank < 0 || nRemoteRank >= g_nNproc) MakeErrMsg(1, "Invalid rank received on new VI: %d", nRemoteRank); // Re-post the receive descriptor pRecvDesc->Control.Control = VIP_CONTROL_OP_SENDRECV; pRecvDesc->Control.Length = g_viMTU; pRecvDesc->Control.SegCount = 1; pRecvDesc->Control.Reserved = 0; pRecvDesc->Data[0].Length = g_viMTU; pRecvDesc->Data[0].Handle = mhReceive; dwStatus = VipPostRecv(g_hListenThreadVi, pRecvDesc, mhReceive); if (!AssertSuccess(dwStatus, "ViListenThread:VipPostRecv failed", pRecvDesc)) { nt_error("Error", 1); return; } // Set the user data for this connection to be the rank of the remote process VipSetUserData(g_hListenThreadVi, (VIP_PVOID)nRemoteRank); // Insert VI into proc table bool bSetupConnection;#ifdef USE_VC6_HEADERS if (InterlockedCompareExchange((void**)&g_pProcTable[nRemoteRank].vinfo.hVi, (void*)g_hListenThreadVi, (void*)NULL) == NULL)#else if (InterlockedCompareExchange((LONG volatile *)&g_pProcTable[nRemoteRank].vinfo.hVi, (LONG)g_hListenThreadVi, (LONG)NULL) == NULL)#endif bSetupConnection = true; else { // Two connections have been made simultaneously // One must be left up and the other must be disconnected if (nRemoteRank > g_nIproc) { // If the remote rank is higer, reject the new connection and keep the existing bSetupConnection = false; // Send ack=0 pSendDesc[0]->Control.Control = VIP_CONTROL_OP_SENDRECV | VIP_CONTROL_IMMEDIATE; pSendDesc[0]->Control.Length = 0; pSendDesc[0]->Control.SegCount = 0; pSendDesc[0]->Control.Reserved = 0; pSendDesc[0]->Control.ImmediateData = 0; // Ack stored in immediate data dwStatus = VipPostSend(g_hListenThreadVi, pSendDesc[0], mhSend); if (!AssertSuccess(dwStatus, "ViListenThread:VipPostSend failed", pSendDesc[0])) { nt_error("Error", 1); return; } dwStatus = VipSendWait(g_hListenThreadVi, VITIMEOUT, &pDesc); if (!AssertSuccess(dwStatus, "ViListenThread:VipSendWait failed", pDesc)) { nt_error("Error", 1); return; } } else { // If the remote rank is lower, destroy the existing connection and accept the new CloseVi(&g_pProcTable[nRemoteRank].vinfo); bSetupConnection = true; } } if (bSetupConnection) { nSendsPerAck = nNumSendDescriptors / 2; // Send ack=1 pSendDesc[0]->Control.Control = VIP_CONTROL_OP_SENDRECV | VIP_CONTROL_IMMEDIATE; pSendDesc[0]->Control.Length = sizeof(int); pSendDesc[0]->Control.SegCount = 1; pSendDesc[0]->Control.Reserved = 0; pSendDesc[0]->Control.ImmediateData = 1; // Ack stored in immediate data pSendDesc[0]->Data[0].Length = sizeof(int); pSendDesc[0]->Data[0].Handle = mhSend; ((int*)pSendDesc[0]->Data[0].Data.Address)[0] = nSendsPerAck; dwStatus = VipPostSend(g_hListenThreadVi, pSendDesc[0], mhSend); if (!AssertSuccess(dwStatus, "ViListenThread:VipPostSend failed", pSendDesc[0])) { nt_error("Error", 1); return; } dwStatus = VipSendWait(g_hListenThreadVi, VITIMEOUT, &pDesc); if (!AssertSuccess(dwStatus, "ViListenThread:VipSendWait failed", pDesc)) { nt_error("Error", 1); return; } // Store VI connection information in the proctable VI_Info *vinfo = &g_pProcTable[nRemoteRank].vinfo; memcpy(&vinfo->descriminator, g_ViDescriminator, g_nViDescriminator_len); vinfo->descriminator_len = g_nViDescriminator_len; vinfo->hNic = g_hViNic; vinfo->hVi = g_hListenThreadVi; //vinfo->localbuf; vinfo->mhReceive = mhReceive; vinfo->mhSend = mhSend; vinfo->nCurSendIndex = 0; vinfo->nNumReceived = 0; vinfo->nNumRecvDescriptors = nNumRecvDescriptors; vinfo->nNumSendDescriptors = nNumSendDescriptors; vinfo->nNumSent = 0; vinfo->nPostedSends = 0; vinfo->nReceivesPerAck = nReceivesPerAck; vinfo->nSendsPerAck = nSendsPerAck; vinfo->nSendAcked = 0; vinfo->nSequenceNumberReceive = 0; vinfo->nSequenceNumberSend = 0; //vinfo->pDataBuffer = pDataBuffer; vinfo->pDesc = pDesc; //vinfo->pLocalAddress = (VIP_NET_ADDRESS*)vinfo->localbuf; vinfo->pRecvDesc = pRecvDesc; vinfo->pRemoteAddress = (VIP_NET_ADDRESS*)vinfo->remotebuf; vinfo->pSendDesc = pSendDesc; vinfo->pSendDescriptorBuffer = pSendDescriptorBuffer; vinfo->pReceiveDescriptorBuffer = pReceiveDescriptorBuffer; //vinfo->remotebuf; //vinfo->Vi_LocalAttribs = Vi_LocalAttribs; vinfo->Vi_RemoteAttribs = Vi_RemoteAttribs; initlock(&vinfo->lock); // Setting the data to valid must be last vinfo->valid = 1; // Increase the completion queue size every time a new connection is made g_nNumCQEntries += CQ_ENTRIES_INCREMENT; dwStatus = VipResizeCQ(g_hViCQ, g_nNumCQEntries); } else { //printf("VI already in proctable %d\n", nRemoteRank);fflush(stdout); } g_hListenThreadVi = NULL; g_nListenGate = 0; if (!g_bViSingleThreaded) g_nWorkerGate = 1; }}// Function name : HashViPointer// Description : // Return type : int // Argument : VIP_VI_HANDLE pint HashViPointer(VIP_VI_HANDLE p){ int index; if (p == NULL) nt_error("Hashing NULL VI handle", 1); if (VipGetUserData != NULL) { index = (int)VipGetUserData(p); if (g_pProcTable[index].vinfo.hVi == p) return index; } else { // For now, just search for the handle for (int i=0; i<g_nNproc; i++) { if ( g_pProcTable[i].via && (g_pProcTable[i].vinfo.hVi == p) ) return i; } } if (p == g_hListenThreadVi) { g_nListenGate = 1; if (g_bViSingleThreaded) return -1; while (g_nWorkerGate == 0) Sleep(0); g_nWorkerGate = 0; } else if (p == g_hConnectToVi) { g_nConnectGate = 1; if (g_bViSingleThreaded) return -1; while (g_nWorkerGate == 0) Sleep(0); g_nWorkerGate = 0; } else MakeErrMsg(1, "HashViPointer: VI_HANDLE(%x) not found in g_pProcTable", p); return -1;}// Function name : ViWorkerThread// Description : // Return type : void int ViWorkerThread(int bRepeating){ VIP_VI_HANDLE hVi; VIP_BOOLEAN bRecvQ; VIP_RETURN dwStatus; int index; VI_Info *vinfo; NT_Message *message; do { if (!bRepeating) { // Poll once and return if no packet is available if ((dwStatus = VipCQDone(g_hViCQ, &hVi, &bRecvQ)) == VIP_NOT_DONE) return 0; if (!AssertSuccess(dwStatus, "ViWorkerThread:VipCQDone failed")) { if (g_bViClosing) return 0; nt_error("Error", 1); return 0; } } else { // Wait for a packet by either polling or a wait function if (g_bViUsePolling) { while ((dwStatus = VipCQDone(g_hViCQ, &hVi, &bRecvQ)) == VIP_NOT_DONE) Sleep(0); if (!AssertSuccess(dwStatus, "ViWorkerThread:VipCQDone failed")) { if (g_bViClosing) return 0; nt_error("Error", 1); return 0; } } else { dwStatus = VipCQWait(g_hViCQ, VIP_INFINITE, &hVi, &bRecvQ); if (!AssertSuccess(dwStatus, "ViWorkerThread:VipCQWait failed")) { if (g_bViClosing) return 0; nt_error("Error", 1); return 0; } } } index = HashViPointer(hVi); if (index == -1) { //printf("HashViPointer returned -1\n");fflush(stdout); continue; } vinfo = &g_pProcTable[index].vinfo; if (bRecvQ) { // Packet ready in the receive queue while ( (dwStatus = VipRecvDone(vinfo->hVi, &vinfo->pRecvDesc)) == VIP_NOT_DONE ) Sleep(0); if (!AssertSuccess(dwStatus, "ViWorkerThread:VipRecvDone failed", vinfo->pRecvDesc)) { if (g_bViClosing) return 0; nt_error("Error", 1); return 0; } // Zero length messages are assumed to be ack packets. // In the future, I will probably check the immediate data to determine the packet type if (vinfo->pRecvDesc->Control.Length == 0) { // Ack packet received InterlockedIncrement(&vinfo->nSendAcked); vinfo->nSequenceNumberReceive = vinfo->pRecvDesc->Control.ImmediateData; } else { // Data packet received int datalen; message = &g_pProcTable[index].msg; if (message->state == NT_MSG_READING_TAG) { // This is the first packet in a message. // Peel off the tag, length, and as much of the data as is available message->tag = ((int*)(vinfo->pRecvDesc->Data[0].Data.Address))[0]; message->length = ((int*)(vinfo->pRecvDesc->Data[0].Data.Address))[1]; message->buffer = g_MsgQueue.GetBufferToFill(message->tag, message->length, index, &message->pElement); datalen = vinfo->pRecvDesc->Control.Length - (2 * sizeof(int)); if (datalen > 0) { memcpy( message->buffer, &((int*)(vinfo->pRecvDesc->Data[0].Data.Address))[2], datalen); message->nRemaining = message->length - datalen; } if (message->nRemaining) message->state = NT_MSG_READING_BUFFER; else { message->state = NT_MSG_READING_TAG; g_MsgQueue.SetElementEvent(message->pElement); } } else { // This is next packet containing only data for the current message datalen = vinfo->pRecvDesc->Control.Length; memcpy( &(((char*)message->buffer)[message->length - message->nRemaining]), vinfo->pRecvDesc->Data[0].Data.Address, datalen); message->nRemaining -= datalen; if (message->nRemaining == 0) { message->state = NT_MSG_READING_TAG; g_MsgQueue.SetElementEvent(message->pElement); } } } // Re-post the receive vinfo->pRecvDesc->Control.Control = VIP_CONTROL_OP_SENDRECV; vinfo->pRecvDesc->Control.Length = g_viMTU; vinfo->pRecvDesc->Control.SegCount = 1; vinfo->pRecvDesc->Control.Reserved = 0; vinfo->pRecvDesc->Data[0].Length = g_viMTU; vinfo->pRecvDesc->Data[0].Handle = vinfo->mhReceive; dwStatus = VipPostRecv(vinfo->hVi, vinfo->pRecvDesc, vinfo->mhReceive); if (!AssertSuccess(dwStatus, "ViWorkerThread:VipPostRecv failed", vinfo->pRecvDesc)) { nt_error("Error", 1); return 0; } // Send ack if necessary vinfo->nNumReceived++; if (vinfo->nNumReceived % vinfo->nReceivesPerAck == 0) ViSendAck(vinfo); } else { // Packet ready in the send queue printf("There shouldn't be any send completion messages\n");fflush(stdout); } }while (bRepeating); return 1;}// Function name : PollViQueue// Description : // Return type : void void PollViQueue(){ //* if (!ViWorkerThread(0)) Sleep(0);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -