📄 nt_vi.cpp
字号:
/*/ // My trials show that polling more than once before sleeping only // decreases performance.The lock function shows the exact opposite. // Maybe I need to slim down the ViWorkerThread to return faster. for (int i=0; i<10; i++) { if (ViWorkerThread(0)) return; } Sleep(0); //*/}// Function name : LoadViFunctions// Description : // Return type : bool bool LoadViFunctions(){ HMODULE hViLib; char pszLibrary[1024]; if (!GetEnvironmentVariable("MPICH_VI_LIB", pszLibrary, 1024)) strcpy(pszLibrary, "vipl.dll"); // First initialize everythting to NULL VipOpenNic = NULL; VipCloseNic = NULL; VipQueryNic = NULL; VipRegisterMem = NULL; VipDeregisterMem = NULL; VipQueryMem = NULL; VipSetMemAttributes = NULL; VipErrorCallback = NULL; VipQuerySystemManagementInfo = NULL; VipCreatePtag = NULL; VipDestroyPtag = NULL; VipCreateVi = NULL; VipDestroyVi = NULL; VipQueryVi = NULL; VipSetViAttributes = NULL; VipPostSend = NULL; VipSendDone = NULL; VipSendWait = NULL; VipSendNotify = NULL; VipPostRecv = NULL; VipRecvDone = NULL; VipRecvWait = NULL; VipRecvNotify = NULL; VipConnectWait = NULL; VipConnectAccept = NULL; VipConnectReject = NULL; VipConnectRequest = NULL; VipDisconnect = NULL; VipCreateCQ = NULL; VipDestroyCQ = NULL; VipResizeCQ = NULL; VipCQDone = NULL; VipCQWait = NULL; VipCQNotify = NULL; VipNSInit = NULL; VipNSGetHostByName = NULL; VipNSGetHostByAddr = NULL; VipNSShutdown = NULL; VipConnectPeerRequest = NULL; VipConnectPeerDone = NULL; VipConnectPeerWait = NULL; VipAddTagCQ = NULL; VipRemoveTagCQ = NULL; VipPostDeferredSends = NULL; VipGetUserData = NULL; VipSetUserData = NULL; hViLib = LoadLibrary(pszLibrary); if (hViLib == NULL) return false; // Add code to check if the return values are NULL ... VipOpenNic = (VIP_RETURN (VI_CALL *)(const char *, void **))GetProcAddress(hViLib, "VipOpenNic"); if (VipOpenNic == NULL) DPRINTF(("VipOpenNic == NULL\n")); VipCloseNic = (VIP_RETURN (VI_CALL *)(void *))GetProcAddress(hViLib, "VipCloseNic"); if (VipCloseNic == NULL) DPRINTF(("VipCloseNic == NULL\n")); VipQueryNic = (VIP_RETURN (VI_CALL *)(void *, struct _VIP_NIC_ATTRIBUTES *))GetProcAddress(hViLib, "VipQueryNic"); if (VipQueryNic == NULL) DPRINTF(("VipQueryNic == NULL\n")); VipRegisterMem = (VIP_RETURN (VI_CALL *)(void *, void *, unsigned long, struct _VIP_MEM_ATTRIBUTES *, unsigned __int32 *))GetProcAddress(hViLib, "VipRegisterMem"); if (VipRegisterMem == NULL) DPRINTF(("VipRegisterMem == NULL\n")); VipDeregisterMem = (VIP_RETURN (VI_CALL *)(void *, void *, unsigned __int32))GetProcAddress(hViLib, "VipDeregisterMem"); if (VipDeregisterMem == NULL) DPRINTF(("VipDeregisterMem == NULL\n")); VipQueryMem = (VIP_RETURN (VI_CALL *)(void *, void *, unsigned __int32, struct _VIP_MEM_ATTRIBUTES *))GetProcAddress(hViLib, "VipQueryMem"); if (VipQueryMem == NULL) DPRINTF(("VipQueryMem == NULL\n")); VipSetMemAttributes = (VIP_RETURN (VI_CALL *)(void *, void *, unsigned __int32, struct _VIP_MEM_ATTRIBUTES *))GetProcAddress(hViLib, "VipSetMemAttributes"); if (VipSetMemAttributes == NULL) DPRINTF(("VipSetMemAttributes == NULL\n")); VipErrorCallback = (VIP_RETURN (VI_CALL *)(void *, void *, void (VI_CALL *)(void *, struct _VIP_ERROR_DESCRIPTOR *)))GetProcAddress(hViLib, "VipErrorCallback"); if (VipErrorCallback == NULL) DPRINTF(("VipErrorCallback == NULL\n")); VipQuerySystemManagementInfo = (VIP_RETURN (VI_CALL *)(void *, unsigned long, void *))GetProcAddress(hViLib, "VipQuerySystemManagementInfo"); if (VipQuerySystemManagementInfo == NULL) DPRINTF(("VipQuerySystemManagementInfo == NULL\n")); VipCreatePtag = (VIP_RETURN (VI_CALL *)(void *, void **))GetProcAddress(hViLib, "VipCreatePtag"); if (VipCreatePtag == NULL) DPRINTF(("VipCreatePtag == NULL\n")); VipDestroyPtag = (VIP_RETURN (VI_CALL *)(void *, void *))GetProcAddress(hViLib, "VipDestroyPtag"); if (VipDestroyPtag == NULL) DPRINTF(("VipDestroyPtag == NULL\n")); VipCreateVi = (VIP_RETURN (VI_CALL *)(void *, struct _VIP_VI_ATTRIBUTES *, void *, void *, void **))GetProcAddress(hViLib, "VipCreateVi"); if (VipCreateVi == NULL) DPRINTF(("VipCreateVi == NULL\n")); VipDestroyVi = (VIP_RETURN (VI_CALL *)(void *))GetProcAddress(hViLib, "VipDestroyVi"); if (VipDestroyVi == NULL) DPRINTF(("VipDestroyVi == NULL\n")); VipQueryVi = (VIP_RETURN (VI_CALL *)(void *, VIP_VI_STATE *, struct _VIP_VI_ATTRIBUTES *, int *, int *))GetProcAddress(hViLib, "VipQueryVi"); if (VipQueryVi == NULL) DPRINTF(("VipQueryVi == NULL\n")); VipSetViAttributes = (VIP_RETURN (VI_CALL *)(void *, struct _VIP_VI_ATTRIBUTES *))GetProcAddress(hViLib, "VipSetViAttributes"); if (VipSetViAttributes == NULL) DPRINTF(("VipSetViAttributes == NULL\n")); VipPostSend = (VIP_RETURN (VI_CALL *)(void *, struct _VIP_DESCRIPTOR *, unsigned __int32))GetProcAddress(hViLib, "VipPostSend"); if (VipPostSend == NULL) DPRINTF(("VipPostSend == NULL\n")); VipSendDone = (VIP_RETURN (VI_CALL *)(void *, struct _VIP_DESCRIPTOR **))GetProcAddress(hViLib, "VipSendDone"); if (VipSendDone == NULL) DPRINTF(("VipSendDone == NULL\n")); VipSendWait = (VIP_RETURN (VI_CALL *)(void *, unsigned long, struct _VIP_DESCRIPTOR **))GetProcAddress(hViLib, "VipSendWait"); if (VipSendWait == NULL) DPRINTF(("VipSendWait == NULL\n")); VipSendNotify = (VIP_RETURN (VI_CALL *)(void *, void *, void (VI_CALL *)(void *, void *, void *, struct _VIP_DESCRIPTOR *)))GetProcAddress(hViLib, "VipSendNotify"); if (VipSendNotify == NULL) DPRINTF(("VipSendNotify == NULL\n")); VipPostRecv = (VIP_RETURN (VI_CALL *)(void *, struct _VIP_DESCRIPTOR *, unsigned __int32))GetProcAddress(hViLib, "VipPostRecv"); if (VipPostRecv == NULL) DPRINTF(("VipPostRecv == NULL\n")); VipRecvDone = (VIP_RETURN (VI_CALL *)(void *, struct _VIP_DESCRIPTOR **))GetProcAddress(hViLib, "VipRecvDone"); if (VipRecvDone == NULL) DPRINTF(("VipRecvDone == NULL\n")); VipRecvWait = (VIP_RETURN (VI_CALL *)(void *, unsigned long, struct _VIP_DESCRIPTOR **))GetProcAddress(hViLib, "VipRecvWait"); if (VipRecvWait == NULL) DPRINTF(("VipRecvWait == NULL\n")); VipRecvNotify = (VIP_RETURN (VI_CALL *)(void *, void *, void (_cdecl *)(void *, void *, void *, struct _VIP_DESCRIPTOR *)))GetProcAddress(hViLib, "VipRecvNotify"); if (VipRecvNotify == NULL) DPRINTF(("VipRecvNotify == NULL\n")); VipConnectWait = (VIP_RETURN (VI_CALL *)(void *, struct _VIP_NET_ADDRESS *, unsigned long, struct _VIP_NET_ADDRESS *, struct _VIP_VI_ATTRIBUTES *, void **))GetProcAddress(hViLib, "VipConnectWait"); if (VipConnectWait == NULL) DPRINTF(("VipConnectWait == NULL\n")); VipConnectAccept = (VIP_RETURN (VI_CALL *)(void *, void *))GetProcAddress(hViLib, "VipConnectAccept"); if (VipConnectAccept == NULL) DPRINTF(("VipConnectAccept == NULL\n")); VipConnectReject = (VIP_RETURN (VI_CALL *)(void *))GetProcAddress(hViLib, "VipConnectReject"); if (VipConnectReject == NULL) DPRINTF(("VipConnectReject == NULL\n")); VipConnectRequest = (VIP_RETURN (VI_CALL *)(void *, struct _VIP_NET_ADDRESS *, struct _VIP_NET_ADDRESS *, unsigned long, struct _VIP_VI_ATTRIBUTES *))GetProcAddress(hViLib, "VipConnectRequest"); if (VipConnectRequest == NULL) DPRINTF(("VipConnectRequest == NULL\n")); VipDisconnect = (VIP_RETURN (VI_CALL *)(void *))GetProcAddress(hViLib, "VipDisconnect"); if (VipDisconnect == NULL) DPRINTF(("VipDisconnect == NULL\n")); VipCreateCQ = (VIP_RETURN (VI_CALL *)(void *, unsigned long, void **))GetProcAddress(hViLib, "VipCreateCQ"); if (VipCreateCQ == NULL) DPRINTF(("VipCreateCQ == NULL\n")); VipDestroyCQ = (VIP_RETURN (VI_CALL *)(void *))GetProcAddress(hViLib, "VipDestroyCQ"); if (VipDestroyCQ == NULL) DPRINTF(("VipDestroyCQ == NULL\n")); VipResizeCQ = (VIP_RETURN (VI_CALL *)(void *, unsigned long))GetProcAddress(hViLib, "VipResizeCQ"); if (VipResizeCQ == NULL) DPRINTF(("VipResizeCQ == NULL\n")); VipCQDone = (VIP_RETURN (VI_CALL *)(void *, void **, int *))GetProcAddress(hViLib, "VipCQDone"); if (VipCQDone == NULL) DPRINTF(("VipCQDone == NULL\n")); VipCQWait = (VIP_RETURN (VI_CALL *)(void *, unsigned long, void **, int *))GetProcAddress(hViLib, "VipCQWait"); if (VipCQWait == NULL) DPRINTF(("VipCQWait == NULL\n")); VipCQNotify = (VIP_RETURN (VI_CALL *)(void *, void *, void (VI_CALL *)(void *, void *, void *, int)))GetProcAddress(hViLib, "VipCQNotify"); if (VipCQNotify == NULL) DPRINTF(("VipCQNotify == NULL\n")); VipNSInit = (VIP_RETURN (VI_CALL *)(void *, void *))GetProcAddress(hViLib, "VipNSInit"); if (VipNSInit == NULL) DPRINTF(("VipNSInit == NULL\n")); VipNSGetHostByName = (VIP_RETURN (VI_CALL *)(void *, char *, struct _VIP_NET_ADDRESS *, unsigned long))GetProcAddress(hViLib, "VipNSGetHostByName"); if (VipNSGetHostByName == NULL) DPRINTF(("VipNSGetHostByName == NULL\n")); VipNSGetHostByAddr = (VIP_RETURN (VI_CALL *)(void *, struct _VIP_NET_ADDRESS *, char *, unsigned long *))GetProcAddress(hViLib, "VipNSGetHostByAddr"); if (VipNSGetHostByAddr == NULL) DPRINTF(("VipNSGetHostByAddr == NULL\n")); VipNSShutdown = (VIP_RETURN (VI_CALL *)(void *))GetProcAddress(hViLib, "VipNSShutdown"); if (VipNSShutdown == NULL) DPRINTF(("VipNSShutdown == NULL\n")); VipConnectPeerRequest = (VIP_RETURN (VI_CALL *)(void *, struct _VIP_NET_ADDRESS *, struct _VIP_NET_ADDRESS *, unsigned long))GetProcAddress(hViLib, "VipConnectPeerRequest"); if (VipConnectPeerRequest == NULL) DPRINTF(("VipConnectPeerRequest == NULL\n")); VipConnectPeerDone = (VIP_RETURN (VI_CALL *)(void *, struct _VIP_VI_ATTRIBUTES *))GetProcAddress(hViLib, "VipConnectPeerDone"); if (VipConnectPeerDone == NULL) DPRINTF(("VipConnectPeerDone == NULL\n")); VipConnectPeerWait = (VIP_RETURN (VI_CALL *)(void *, struct _VIP_VI_ATTRIBUTES *))GetProcAddress(hViLib, "VipConnectPeerWait"); if (VipConnectPeerWait == NULL) DPRINTF(("VipConnectPeerWait == NULL\n")); VipAddTagCQ = (VIP_RETURN (VI_CALL *)(void *, void **, unsigned long, unsigned long))GetProcAddress(hViLib, "VipAddTagCQ"); if (VipAddTagCQ == NULL) DPRINTF(("VipAddTagCQ == NULL\n")); VipRemoveTagCQ = (VIP_RETURN (VI_CALL *)(void *, void *, unsigned long))GetProcAddress(hViLib, "VipRemoveTagCQ"); if (VipRemoveTagCQ == NULL) DPRINTF(("VipRemoveTagCQ == NULL\n")); VipPostDeferredSends = (VIP_RETURN (VI_CALL *)(void *, int, int *))GetProcAddress(hViLib, "VipPostDeferredSends"); if (VipPostDeferredSends == NULL) DPRINTF(("VipPostDeferredSends == NULL\n")); // Non-standard VIA calls // Giganet VipGetUserData = (void * (VI_CALL *)(void *))GetProcAddress(hViLib, "VipGetUserData"); if (VipGetUserData == NULL) DPRINTF(("VipGetUserData == NULL\n")); VipSetUserData = (void (VI_CALL *)(void *, void *))GetProcAddress(hViLib, "VipSetUserData"); if (VipSetUserData == NULL) DPRINTF(("VipSetUserData == NULL\n")); // Servernet // GWizz return true;}// Function name : InitVI// Description : // Return type : void bool InitVI(){ DWORD dwThreadID; VIP_RETURN dwStatus; char pszTemp[1024]; VIP_NIC_ATTRIBUTES nicAttribs; int nCount=0, *pMembers = NULL; try{ if (!LoadViFunctions()) return false; }catch(...) { nt_error("Exception thrown in LoadViFunctions caught in InitVi", 1); return false; } // Determine whether to use polling methods or not if (GetEnvironmentVariable("MPICH_VI_USE_POLLING", pszTemp, 100)) g_bViUsePolling = true; // Determine which processes this process can reach by VI connections if (GetEnvironmentVariable("MPICH_VI_CLICKS", pszTemp, 100) == 0 && GetEnvironmentVariable("MPICH_VI_CLIQUES", pszTemp, 100) == 0) return false; // If none, then there is no need to continue if (ParseCliques(pszTemp, g_nIproc, g_nNproc, &nCount, &pMembers)) { nt_error("Unable to parse the VI cliques", 1); return false; } for (int i=0; i<nCount; i++) { if ( (pMembers[i] >= 0) && (pMembers[i] < g_nNproc) ) { g_pProcTable[pMembers[i]].via = 1; g_pProcTable[pMembers[i]].vinfo.hVi = NULL; g_pProcTable[pMembers[i]].vinfo.valid = 0; } } if (pMembers != NULL) delete pMembers; // Open the network interface card and save the handle // TODO: What if there are multiple nics? char pszNic[100]; sprintf(pszNic, "%s0", g_pszNicBaseName); dwStatus = VipOpenNic(pszNic, &g_hViNic); if (!AssertSuccess(dwStatus, "InitVI:can't open nic")) { printf("VipOpenNic failed\n");fflush(stdout); } // Set the global descriminator used to accept VI connections sprintf((char*)g_ViDescriminator, "%s%d", g_pszJobID, g_nIproc); g_nViDescriminator_len = strlen((char*)g_ViDescriminator); // Determine and save the maximum transmission unit if (VipQueryNic(g_hViNic, &nicAttribs) == VIP_SUCCESS) { if (nicAttribs.MaxTransferSize < DESIRED_PACKET_LENGTH) { g_viMTU = nicAttribs.MaxTransferSize; default_vi_attribs.MaxTransferSize = nicAttribs.MaxTransferSize; } else { g_viMTU = DESIRED_PACKET_LENGTH; default_vi_attribs.MaxTransferSize = DESIRED_PACKET_LENGTH; } } //* // The code will work without the callback function but it is necessary // to detect catastrophic network closures, ie. a remote process dies. // Set the error callback function dwStatus = VipErrorCallback(g_hViNic, NULL, ErrorCallbackFunction); if (!AssertSuccess(dwStatus, "InitVI:VipErrorCallback failed")) { printf("VipErrorCallback failed\n");fflush(stdout); } //*/ // Create a global completion queue for all VI connections to share dwStatus = VipCreateCQ(g_hViNic, INITIAL_NUM_CQ_ENTRIES, &g_hViCQ); if (!AssertSuccess(dwStatus, "InitVI:VipCreateCQ failed")) { printf("VipCreateCQ failed\n");fflush(stdout); } // Create a thread to wait for VI connections g_hViListenThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)ViListenThread, NULL, NT_THREAD_STACK_SIZE, &dwThreadID); if (g_hViListenThread == NULL) { printf("CreateThread(ViListenThread) failed: %d\n", GetLastError());fflush(stdout); } pszTemp[0] = '\0'; GetEnvironmentVariable("MPICH_VI_SINGLETHREAD", pszTemp, 100); if (pszTemp[0] == '1') { // Set the poll function so the via device will run single threaded. g_MsgQueue.SetProgressFunction(PollViQueue); g_bViSingleThreaded = true; } else { // Create a worker thread to eagerly drain messages from all open VI connections g_hViWorkerThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)ViWorkerThread, (LPVOID)1, NT_THREAD_STACK_SIZE, &dwThreadID); if (g_hViWorkerThread == NULL) { printf("CreateThread(ViWorkerThread) failed: %d\n", GetLastError());fflush(stdout); } } return true;}// Function name : EndVI// Description : // Return type : void void EndVI(){ VIP_RETURN dwStatus; // Remove the error callback function if (g_hViNic != NULL) VipErrorCallback(g_hViNic, 0, NULL); // Close all VI connections for (int i=0; i<g_nNproc; i++) { if (g_pProcTable[i].via) CloseVi(&g_pProcTable[i].vinfo); } // Destroy the completion queue if (g_hViCQ != NULL) { dwStatus = VipDestroyCQ(g_hViCQ); AssertSuccess(dwStatus, "EndFI:VipDestroyCQ failed in EndVI"); } // Terminate the threads if (g_hViListenThread) TerminateThread(g_hViListenThread, 0); WaitForSingleObject(g_hViListenThread, 1000); CloseHandle(g_hViListenThread); if (g_hViWorkerThread) TerminateThread(g_hViWorkerThread, 0); WaitForSingleObject(g_hViWorkerThread, 1000); CloseHandle(g_hViWorkerThread); ClosedVINode *n = g_pClosedViList; while (n != NULL) { g_pClosedViList = n; n = n->pNext; delete g_pClosedViList; } g_pClosedViList = NULL;}// Function name : NT_ViSend// Description : // Return type : void // Argument : int type// Argument : void *buffer// Argument : unsigned int length// Argument : int tovoid NT_ViSend(int type, void *buffer, unsigned int length, int to){ if (g_pProcTable[to].vinfo.hVi == NULL) ConnectViTo(to); if (!ViSendFirstPacket(&g_pProcTable[to].vinfo, buffer, length, type)) nt_error("ViSendFirstPacket failed", 1); if (length) { if (!ViSendMsg(&g_pProcTable[to].vinfo, buffer, length)) nt_error("ViSendMsg failed", 1); } // Uncomment this if you want to guarantee that messages are out of the local buffers before send returns. // All this really does is slow down performance. //ViFlushPackets(&g_pProcTable[to].vinfo); }// These numbers are experimentally generated.// I don't know how to generate then dynamically.#define VI_STREAM_MIN 0x1000#define VI_STREAM_MIN_N 12#define VI_STREAM_MAX 0x400000
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -