⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 nt_vi.cpp

📁 MPICH是MPI的重要研究,提供了一系列的接口函数,为并行计算的实现提供了编程环境.
💻 CPP
📖 第 1 页 / 共 5 页
字号:
	/*/	// My trials show that polling more than once before sleeping only 	// decreases performance.The lock function shows the exact opposite.  	// Maybe I need to slim down the ViWorkerThread to return faster.	for (int i=0; i<10; i++)	{		if (ViWorkerThread(0))			return;	}	Sleep(0);	//*/}// Function name	: LoadViFunctions// Description	    : // Return type		: bool bool LoadViFunctions(){	HMODULE hViLib;	char pszLibrary[1024];		if (!GetEnvironmentVariable("MPICH_VI_LIB", pszLibrary, 1024))		strcpy(pszLibrary, "vipl.dll");	// First initialize everythting to NULL	VipOpenNic = NULL;	VipCloseNic = NULL;	VipQueryNic = NULL;	VipRegisterMem = NULL;	VipDeregisterMem = NULL;	VipQueryMem = NULL;	VipSetMemAttributes = NULL;	VipErrorCallback = NULL;	VipQuerySystemManagementInfo = NULL;	VipCreatePtag = NULL;	VipDestroyPtag = NULL;	VipCreateVi = NULL;	VipDestroyVi = NULL;	VipQueryVi = NULL;	VipSetViAttributes = NULL;	VipPostSend = NULL;	VipSendDone = NULL;	VipSendWait = NULL;	VipSendNotify = NULL;	VipPostRecv = NULL;	VipRecvDone = NULL;	VipRecvWait = NULL;	VipRecvNotify = NULL;	VipConnectWait = NULL;	VipConnectAccept = NULL;	VipConnectReject = NULL;	VipConnectRequest = NULL;	VipDisconnect = NULL;	VipCreateCQ = NULL;	VipDestroyCQ = NULL;	VipResizeCQ = NULL;	VipCQDone = NULL;	VipCQWait = NULL;	VipCQNotify = NULL;	VipNSInit = NULL;	VipNSGetHostByName = NULL;	VipNSGetHostByAddr = NULL;	VipNSShutdown = NULL;	VipConnectPeerRequest = NULL;	VipConnectPeerDone = NULL;	VipConnectPeerWait = NULL;	VipAddTagCQ = NULL;	VipRemoveTagCQ = NULL;	VipPostDeferredSends = NULL;	VipGetUserData = NULL;	VipSetUserData = NULL;	hViLib = LoadLibrary(pszLibrary);	if (hViLib == NULL)		return false;	// Add code to check if the return values are NULL ...	VipOpenNic = (VIP_RETURN (VI_CALL *)(const char *, void **))GetProcAddress(hViLib, "VipOpenNic");	if (VipOpenNic == NULL) DPRINTF(("VipOpenNic == NULL\n"));	VipCloseNic = (VIP_RETURN (VI_CALL *)(void *))GetProcAddress(hViLib, "VipCloseNic");	if (VipCloseNic == NULL) DPRINTF(("VipCloseNic == NULL\n"));	VipQueryNic = (VIP_RETURN (VI_CALL *)(void *, struct _VIP_NIC_ATTRIBUTES *))GetProcAddress(hViLib, "VipQueryNic");	if (VipQueryNic == NULL) DPRINTF(("VipQueryNic == NULL\n"));	VipRegisterMem = (VIP_RETURN (VI_CALL *)(void *, void *, unsigned long, struct _VIP_MEM_ATTRIBUTES *, unsigned __int32 *))GetProcAddress(hViLib, "VipRegisterMem");	if (VipRegisterMem == NULL) DPRINTF(("VipRegisterMem == NULL\n"));	VipDeregisterMem = (VIP_RETURN (VI_CALL *)(void *, void *, unsigned __int32))GetProcAddress(hViLib, "VipDeregisterMem");	if (VipDeregisterMem == NULL) DPRINTF(("VipDeregisterMem == NULL\n"));	VipQueryMem = (VIP_RETURN (VI_CALL *)(void *, void *, unsigned __int32, struct _VIP_MEM_ATTRIBUTES *))GetProcAddress(hViLib, "VipQueryMem");	if (VipQueryMem == NULL) DPRINTF(("VipQueryMem == NULL\n"));	VipSetMemAttributes = (VIP_RETURN (VI_CALL *)(void *, void *, unsigned __int32, struct _VIP_MEM_ATTRIBUTES *))GetProcAddress(hViLib, "VipSetMemAttributes");	if (VipSetMemAttributes == NULL) DPRINTF(("VipSetMemAttributes == NULL\n"));	VipErrorCallback = (VIP_RETURN (VI_CALL *)(void *, void *, void (VI_CALL *)(void *, struct _VIP_ERROR_DESCRIPTOR *)))GetProcAddress(hViLib, "VipErrorCallback");	if (VipErrorCallback == NULL) DPRINTF(("VipErrorCallback == NULL\n"));	VipQuerySystemManagementInfo = (VIP_RETURN (VI_CALL *)(void *, unsigned long, void *))GetProcAddress(hViLib, "VipQuerySystemManagementInfo");	if (VipQuerySystemManagementInfo == NULL) DPRINTF(("VipQuerySystemManagementInfo == NULL\n"));	VipCreatePtag = (VIP_RETURN (VI_CALL *)(void *, void **))GetProcAddress(hViLib, "VipCreatePtag");	if (VipCreatePtag == NULL) DPRINTF(("VipCreatePtag == NULL\n"));	VipDestroyPtag = (VIP_RETURN (VI_CALL *)(void *, void *))GetProcAddress(hViLib, "VipDestroyPtag");	if (VipDestroyPtag == NULL) DPRINTF(("VipDestroyPtag == NULL\n"));	VipCreateVi = (VIP_RETURN (VI_CALL *)(void *, struct _VIP_VI_ATTRIBUTES *, void *, void *, void **))GetProcAddress(hViLib, "VipCreateVi");	if (VipCreateVi == NULL) DPRINTF(("VipCreateVi == NULL\n"));	VipDestroyVi = (VIP_RETURN (VI_CALL *)(void *))GetProcAddress(hViLib, "VipDestroyVi");	if (VipDestroyVi == NULL) DPRINTF(("VipDestroyVi == NULL\n"));	VipQueryVi = (VIP_RETURN (VI_CALL *)(void *, VIP_VI_STATE *, struct _VIP_VI_ATTRIBUTES *, int *, int *))GetProcAddress(hViLib, "VipQueryVi");	if (VipQueryVi == NULL) DPRINTF(("VipQueryVi == NULL\n"));	VipSetViAttributes = (VIP_RETURN (VI_CALL *)(void *, struct _VIP_VI_ATTRIBUTES *))GetProcAddress(hViLib, "VipSetViAttributes");	if (VipSetViAttributes == NULL) DPRINTF(("VipSetViAttributes == NULL\n"));	VipPostSend = (VIP_RETURN (VI_CALL *)(void *, struct _VIP_DESCRIPTOR *, unsigned __int32))GetProcAddress(hViLib, "VipPostSend");	if (VipPostSend == NULL) DPRINTF(("VipPostSend == NULL\n"));	VipSendDone = (VIP_RETURN (VI_CALL *)(void *, struct _VIP_DESCRIPTOR **))GetProcAddress(hViLib, "VipSendDone");	if (VipSendDone == NULL) DPRINTF(("VipSendDone == NULL\n"));	VipSendWait = (VIP_RETURN (VI_CALL *)(void *, unsigned long, struct _VIP_DESCRIPTOR **))GetProcAddress(hViLib, "VipSendWait");	if (VipSendWait == NULL) DPRINTF(("VipSendWait == NULL\n"));	VipSendNotify = (VIP_RETURN (VI_CALL *)(void *, void *, void (VI_CALL *)(void *, void *, void *, struct _VIP_DESCRIPTOR *)))GetProcAddress(hViLib, "VipSendNotify");	if (VipSendNotify == NULL) DPRINTF(("VipSendNotify == NULL\n"));	VipPostRecv = (VIP_RETURN (VI_CALL *)(void *, struct _VIP_DESCRIPTOR *, unsigned __int32))GetProcAddress(hViLib, "VipPostRecv");	if (VipPostRecv == NULL) DPRINTF(("VipPostRecv == NULL\n"));	VipRecvDone = (VIP_RETURN (VI_CALL *)(void *, struct _VIP_DESCRIPTOR **))GetProcAddress(hViLib, "VipRecvDone");	if (VipRecvDone == NULL) DPRINTF(("VipRecvDone == NULL\n"));	VipRecvWait = (VIP_RETURN (VI_CALL *)(void *, unsigned long, struct _VIP_DESCRIPTOR **))GetProcAddress(hViLib, "VipRecvWait");	if (VipRecvWait == NULL) DPRINTF(("VipRecvWait == NULL\n"));	VipRecvNotify = (VIP_RETURN (VI_CALL *)(void *, void *, void (_cdecl *)(void *, void *, void *, struct _VIP_DESCRIPTOR *)))GetProcAddress(hViLib, "VipRecvNotify");	if (VipRecvNotify == NULL) DPRINTF(("VipRecvNotify == NULL\n"));	VipConnectWait = (VIP_RETURN (VI_CALL *)(void *, struct _VIP_NET_ADDRESS *, unsigned long, struct _VIP_NET_ADDRESS *, struct _VIP_VI_ATTRIBUTES *, void **))GetProcAddress(hViLib, "VipConnectWait");	if (VipConnectWait == NULL) DPRINTF(("VipConnectWait == NULL\n"));	VipConnectAccept = (VIP_RETURN (VI_CALL *)(void *, void *))GetProcAddress(hViLib, "VipConnectAccept");	if (VipConnectAccept == NULL) DPRINTF(("VipConnectAccept == NULL\n"));	VipConnectReject = (VIP_RETURN (VI_CALL *)(void *))GetProcAddress(hViLib, "VipConnectReject");	if (VipConnectReject == NULL) DPRINTF(("VipConnectReject == NULL\n"));	VipConnectRequest = (VIP_RETURN (VI_CALL *)(void *, struct _VIP_NET_ADDRESS *, struct _VIP_NET_ADDRESS *, unsigned long, struct _VIP_VI_ATTRIBUTES *))GetProcAddress(hViLib, "VipConnectRequest");	if (VipConnectRequest == NULL) DPRINTF(("VipConnectRequest == NULL\n"));	VipDisconnect = (VIP_RETURN (VI_CALL *)(void *))GetProcAddress(hViLib, "VipDisconnect");	if (VipDisconnect == NULL) DPRINTF(("VipDisconnect == NULL\n"));	VipCreateCQ = (VIP_RETURN (VI_CALL *)(void *, unsigned long, void **))GetProcAddress(hViLib, "VipCreateCQ");	if (VipCreateCQ == NULL) DPRINTF(("VipCreateCQ == NULL\n"));	VipDestroyCQ = (VIP_RETURN (VI_CALL *)(void *))GetProcAddress(hViLib, "VipDestroyCQ");	if (VipDestroyCQ == NULL) DPRINTF(("VipDestroyCQ == NULL\n"));	VipResizeCQ = (VIP_RETURN (VI_CALL *)(void *, unsigned long))GetProcAddress(hViLib, "VipResizeCQ");	if (VipResizeCQ == NULL) DPRINTF(("VipResizeCQ == NULL\n"));	VipCQDone = (VIP_RETURN (VI_CALL *)(void *, void **, int *))GetProcAddress(hViLib, "VipCQDone");	if (VipCQDone == NULL) DPRINTF(("VipCQDone == NULL\n"));	VipCQWait = (VIP_RETURN (VI_CALL *)(void *, unsigned long, void **, int *))GetProcAddress(hViLib, "VipCQWait");	if (VipCQWait == NULL) DPRINTF(("VipCQWait == NULL\n"));	VipCQNotify = (VIP_RETURN (VI_CALL *)(void *, void *, void (VI_CALL *)(void *, void *, void *, int)))GetProcAddress(hViLib, "VipCQNotify");	if (VipCQNotify == NULL) DPRINTF(("VipCQNotify == NULL\n"));	VipNSInit = (VIP_RETURN (VI_CALL *)(void *, void *))GetProcAddress(hViLib, "VipNSInit");	if (VipNSInit == NULL) DPRINTF(("VipNSInit == NULL\n"));	VipNSGetHostByName = (VIP_RETURN (VI_CALL *)(void *, char *, struct _VIP_NET_ADDRESS *, unsigned long))GetProcAddress(hViLib, "VipNSGetHostByName");	if (VipNSGetHostByName == NULL) DPRINTF(("VipNSGetHostByName == NULL\n"));	VipNSGetHostByAddr = (VIP_RETURN (VI_CALL *)(void *, struct _VIP_NET_ADDRESS *, char *, unsigned long *))GetProcAddress(hViLib, "VipNSGetHostByAddr");	if (VipNSGetHostByAddr == NULL) DPRINTF(("VipNSGetHostByAddr == NULL\n"));	VipNSShutdown = (VIP_RETURN (VI_CALL *)(void *))GetProcAddress(hViLib, "VipNSShutdown");	if (VipNSShutdown == NULL) DPRINTF(("VipNSShutdown == NULL\n"));	VipConnectPeerRequest = (VIP_RETURN (VI_CALL *)(void *, struct _VIP_NET_ADDRESS *, struct _VIP_NET_ADDRESS *, unsigned long))GetProcAddress(hViLib, "VipConnectPeerRequest");	if (VipConnectPeerRequest == NULL) DPRINTF(("VipConnectPeerRequest == NULL\n"));	VipConnectPeerDone = (VIP_RETURN (VI_CALL *)(void *, struct _VIP_VI_ATTRIBUTES *))GetProcAddress(hViLib, "VipConnectPeerDone");	if (VipConnectPeerDone == NULL) DPRINTF(("VipConnectPeerDone == NULL\n"));	VipConnectPeerWait = (VIP_RETURN (VI_CALL *)(void *, struct _VIP_VI_ATTRIBUTES *))GetProcAddress(hViLib, "VipConnectPeerWait");	if (VipConnectPeerWait == NULL) DPRINTF(("VipConnectPeerWait == NULL\n"));	VipAddTagCQ = (VIP_RETURN (VI_CALL *)(void *, void **, unsigned long, unsigned long))GetProcAddress(hViLib, "VipAddTagCQ");	if (VipAddTagCQ == NULL) DPRINTF(("VipAddTagCQ == NULL\n"));	VipRemoveTagCQ = (VIP_RETURN (VI_CALL *)(void *, void *, unsigned long))GetProcAddress(hViLib, "VipRemoveTagCQ");	if (VipRemoveTagCQ == NULL) DPRINTF(("VipRemoveTagCQ == NULL\n"));	VipPostDeferredSends = (VIP_RETURN (VI_CALL *)(void *, int, int *))GetProcAddress(hViLib, "VipPostDeferredSends");	if (VipPostDeferredSends == NULL) DPRINTF(("VipPostDeferredSends == NULL\n"));	// Non-standard VIA calls	// Giganet	VipGetUserData = (void * (VI_CALL *)(void *))GetProcAddress(hViLib, "VipGetUserData");	if (VipGetUserData == NULL) DPRINTF(("VipGetUserData == NULL\n"));	VipSetUserData = (void (VI_CALL *)(void *, void *))GetProcAddress(hViLib, "VipSetUserData");	if (VipSetUserData == NULL) DPRINTF(("VipSetUserData == NULL\n"));	// Servernet	// GWizz	return true;}// Function name	: InitVI// Description	    : // Return type		: void bool InitVI(){	DWORD dwThreadID;	VIP_RETURN dwStatus;	char pszTemp[1024];	VIP_NIC_ATTRIBUTES nicAttribs;	int nCount=0, *pMembers = NULL;	try{	if (!LoadViFunctions())		return false;	}catch(...)	{		nt_error("Exception thrown in LoadViFunctions caught in InitVi", 1);		return false;	}	// Determine whether to use polling methods or not	if (GetEnvironmentVariable("MPICH_VI_USE_POLLING", pszTemp, 100))		g_bViUsePolling = true;	// Determine which processes this process can reach by VI connections	if (GetEnvironmentVariable("MPICH_VI_CLICKS", pszTemp, 100) == 0 && GetEnvironmentVariable("MPICH_VI_CLIQUES", pszTemp, 100) == 0)		return false; // If none, then there is no need to continue	if (ParseCliques(pszTemp, g_nIproc, g_nNproc, &nCount, &pMembers))	{		nt_error("Unable to parse the VI cliques", 1);		return false;	}	for (int i=0; i<nCount; i++)	{		if ( (pMembers[i] >= 0) && (pMembers[i] < g_nNproc) )		{			g_pProcTable[pMembers[i]].via = 1;			g_pProcTable[pMembers[i]].vinfo.hVi = NULL;			g_pProcTable[pMembers[i]].vinfo.valid = 0;		}	}	if (pMembers != NULL)		delete pMembers;	// Open the network interface card and save the handle	// TODO: What if there are multiple nics?	char pszNic[100];	sprintf(pszNic, "%s0", g_pszNicBaseName);	dwStatus = VipOpenNic(pszNic, &g_hViNic);	if (!AssertSuccess(dwStatus, "InitVI:can't open nic"))	{		printf("VipOpenNic failed\n");fflush(stdout);	}	// Set the global descriminator used to accept VI connections	sprintf((char*)g_ViDescriminator, "%s%d", g_pszJobID, g_nIproc);	g_nViDescriminator_len = strlen((char*)g_ViDescriminator);	// Determine and save the maximum transmission unit	if (VipQueryNic(g_hViNic, &nicAttribs) == VIP_SUCCESS)	{		if (nicAttribs.MaxTransferSize < DESIRED_PACKET_LENGTH)		{			g_viMTU = nicAttribs.MaxTransferSize;			default_vi_attribs.MaxTransferSize = nicAttribs.MaxTransferSize;		}		else		{			g_viMTU = DESIRED_PACKET_LENGTH;			default_vi_attribs.MaxTransferSize = DESIRED_PACKET_LENGTH;		}	}	//*	// The code will work without the callback function but it is necessary	// to detect catastrophic network closures, ie. a remote process dies.	// Set the error callback function	dwStatus = VipErrorCallback(g_hViNic, NULL, ErrorCallbackFunction);	if (!AssertSuccess(dwStatus, "InitVI:VipErrorCallback failed"))	{		printf("VipErrorCallback failed\n");fflush(stdout);	}	//*/	// Create a global completion queue for all VI connections to share	dwStatus = VipCreateCQ(g_hViNic, INITIAL_NUM_CQ_ENTRIES, &g_hViCQ);	if (!AssertSuccess(dwStatus, "InitVI:VipCreateCQ failed"))	{		printf("VipCreateCQ failed\n");fflush(stdout);	}	// Create a thread to wait for VI connections	g_hViListenThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)ViListenThread, NULL, NT_THREAD_STACK_SIZE, &dwThreadID);	if (g_hViListenThread == NULL)	{		printf("CreateThread(ViListenThread) failed: %d\n", GetLastError());fflush(stdout);	}	pszTemp[0] = '\0';	GetEnvironmentVariable("MPICH_VI_SINGLETHREAD", pszTemp, 100);	if (pszTemp[0] == '1')	{		// Set the poll function so the via device will run single threaded.		g_MsgQueue.SetProgressFunction(PollViQueue);		g_bViSingleThreaded = true;	}	else	{		// Create a worker thread to eagerly drain messages from all open VI connections		g_hViWorkerThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)ViWorkerThread, (LPVOID)1, NT_THREAD_STACK_SIZE, &dwThreadID);		if (g_hViWorkerThread == NULL)		{			printf("CreateThread(ViWorkerThread) failed: %d\n", GetLastError());fflush(stdout);		}	}	return true;}// Function name	: EndVI// Description	    : // Return type		: void void EndVI(){	VIP_RETURN dwStatus;	// Remove the error callback function	if (g_hViNic != NULL)		VipErrorCallback(g_hViNic, 0, NULL);		// Close all VI connections	for (int i=0; i<g_nNproc; i++)	{		if (g_pProcTable[i].via)			CloseVi(&g_pProcTable[i].vinfo);	}		// Destroy the completion queue	if (g_hViCQ != NULL)	{		dwStatus = VipDestroyCQ(g_hViCQ);		AssertSuccess(dwStatus, "EndFI:VipDestroyCQ failed in EndVI");	}	// Terminate the threads	if (g_hViListenThread)		TerminateThread(g_hViListenThread, 0);	WaitForSingleObject(g_hViListenThread, 1000);	CloseHandle(g_hViListenThread);	if (g_hViWorkerThread)		TerminateThread(g_hViWorkerThread, 0);	WaitForSingleObject(g_hViWorkerThread, 1000);	CloseHandle(g_hViWorkerThread);	ClosedVINode *n = g_pClosedViList;	while (n != NULL)	{		g_pClosedViList = n;		n = n->pNext;		delete g_pClosedViList;	}	g_pClosedViList = NULL;}// Function name	: NT_ViSend// Description	    : // Return type		: void // Argument         : int type// Argument         : void *buffer// Argument         : unsigned int length// Argument         : int tovoid NT_ViSend(int type, void *buffer, unsigned int length, int to){	if (g_pProcTable[to].vinfo.hVi == NULL)		ConnectViTo(to);	if (!ViSendFirstPacket(&g_pProcTable[to].vinfo, buffer, length, type))		nt_error("ViSendFirstPacket failed", 1);	if (length)	{		if (!ViSendMsg(&g_pProcTable[to].vinfo, buffer, length))			nt_error("ViSendMsg failed", 1);	}	// Uncomment this if you want to guarantee that messages are out of the local buffers before send returns.	// All this really does is slow down performance.	//ViFlushPackets(&g_pProcTable[to].vinfo); }// These numbers are experimentally generated.// I don't know how to generate then dynamically.#define VI_STREAM_MIN	0x1000#define VI_STREAM_MIN_N	12#define VI_STREAM_MAX	0x400000

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -