⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 nt_vi.cpp

📁 MPICH是MPI的重要研究,提供了一系列的接口函数,为并行计算的实现提供了编程环境.
💻 CPP
📖 第 1 页 / 共 5 页
字号:
			{				VipDeregisterMem(g_hViNic, vinfo->pReceiveDescriptorBuffer, vinfo->mhReceive);				delete vinfo->pReceiveDescriptorBuffer;			}			if (vinfo->pSendDescriptorBuffer != NULL)			{				VipDeregisterMem(g_hViNic, vinfo->pSendDescriptorBuffer, vinfo->mhSend);				delete vinfo->pSendDescriptorBuffer;			}						if (vinfo->pSendDesc)				delete vinfo->pSendDesc;		}		vinfo->pSendDesc = NULL;		vinfo->hVi = NULL;		vinfo->hNic = NULL;		vinfo->pReceiveDescriptorBuffer = NULL;		vinfo->pSendDescriptorBuffer = NULL;	}	return 0;}// Function name	: ConnectViTo// Description	    : // Return type		: int (TRUE=1 or FALSE=0)// Argument         : int nRemoteRankint ConnectViTo(int nRemoteRank){	VIP_RETURN			dwStatus;	VIP_VI_ATTRIBUTES	Vi_RemoteAttribs;	VIP_DESCRIPTOR		*pRecvDesc, **pSendDesc, *pDesc;	VIP_MEM_HANDLE		mhSend, mhReceive;	void				*pSendDescriptorBuffer, *pReceiveDescriptorBuffer;//, *pDataBuffer;	char				localbuf[40], remotebuf[40];	VIP_NET_ADDRESS		*pLocalAddress;	VIP_NET_ADDRESS		*pRemoteAddress;	int					nNumRecvDescriptors=32, nNumSendDescriptors=30;	void				*vp;	int					nSendsPerAck, nReceivesPerAck;	unsigned char		ViDescriminator[16];	int					nViDescriminator_len;	//printf("ConnectViTo: Connecting to %d\n", nRemoteRank);fflush(stdout);	// Create a VI	//dwStatus = VipCreateVi(g_hViNic, &default_vi_attribs, g_hViCQ, g_hViCQ, &g_hConnectToVi);	// Create a VI with only the receive queue associated with the completion queue	dwStatus = VipCreateVi(g_hViNic, &default_vi_attribs, NULL, g_hViCQ, &g_hConnectToVi);	if (!AssertSuccess(dwStatus, "can't create VI"))		return 0;	// Check and insert it in g_pProcTable	//LPVOID pNull = NULL;#ifdef USE_VC6_HEADERS	if (InterlockedCompareExchange((void**)&g_pProcTable[nRemoteRank].vinfo.hVi, (void*)g_hConnectToVi, (void*)NULL) != NULL)#else	if (InterlockedCompareExchange((LONG volatile *)&g_pProcTable[nRemoteRank].vinfo.hVi, (LONG)g_hConnectToVi, (LONG)NULL) != NULL)#endif	{		//printf("ConnectViTo: waiting for vinfo to be validated\n");fflush(stdout);		// Connection has already been made by another thread.  Destroy this one and wait for the other one to be valid		VipDestroyVi(g_hConnectToVi);		while (g_pProcTable[nRemoteRank].vinfo.valid == 0)			Sleep(200);		return 1;	}	// Each node will establish VI connections using the 'JobID + rank' as the descriminator	sprintf((char*)ViDescriminator, "%s%d", g_pszJobID, nRemoteRank);	nViDescriminator_len = strlen((char*)ViDescriminator);	pLocalAddress  = (VIP_NET_ADDRESS*)localbuf;	pRemoteAddress = (VIP_NET_ADDRESS*)remotebuf;	pLocalAddress->HostAddressLen = ADDR_LEN;	vp = pLocalAddress->HostAddress + ADDR_LEN;	pLocalAddress->DiscriminatorLen = nViDescriminator_len;	memcpy(vp, ViDescriminator, nViDescriminator_len);	// Save the descriminator for automatic reconnect	//descriminator = (unsigned char*)vp;	//descriminator_len = descriminator_length;		if (nNumSendDescriptors & 0x1)		nNumSendDescriptors++; // must be even		pSendDesc = new VIP_DESCRIPTOR*[nNumSendDescriptors];	if (g_bUseBNR)	{		char pszKey[100];		sprintf(pszKey, "ListenHost%d", nRemoteRank);		BNR_Get(g_myBNRgroup, pszKey, g_pProcTable[nRemoteRank].host);	}	else if (g_bUseDatabase)	{		char pszKey[100];		int length = NT_HOSTNAME_LEN;		sprintf(pszKey, "ListenHost%d", nRemoteRank);		g_Database.Get(pszKey, g_pProcTable[nRemoteRank].host, &length);	}	else		GetProcessConnectInfo(nRemoteRank);	// Get the remote host information	VipNSInit(g_hViNic, 0);	dwStatus = VipNSGetHostByName(g_hViNic, g_pProcTable[nRemoteRank].host, pRemoteAddress, 0);	if (!AssertSuccess(dwStatus, "can't find remote address"))		return 0;	// Append the discriminator	vp = pRemoteAddress->HostAddress + ADDR_LEN;	pRemoteAddress->DiscriminatorLen = nViDescriminator_len;	memcpy(vp, ViDescriminator, nViDescriminator_len);	// Reserve memory for descriptors, for sending and receiving data	// I should allow for smaller sizes than g_viMTU	pRecvDesc = get_descriptors(g_hViNic, nNumRecvDescriptors, g_viMTU, &mhReceive, &pReceiveDescriptorBuffer);	pSendDesc[0] = get_descriptors(g_hViNic, nNumSendDescriptors, g_viMTU, &mhSend, &pSendDescriptorBuffer);	pDesc = (VIP_DESCRIPTOR*)(pSendDesc[0]->Control.Next.Address);		for (int i=1; i<nNumSendDescriptors; i++)	{		pSendDesc[i] = pDesc;		pDesc = (VIP_DESCRIPTOR*)(pDesc->Control.Next.Address);	}		// Post the receive immediately	VIP_DESCRIPTOR *pTemp2, *pTemp = pRecvDesc;	while (pTemp)	{		pTemp2 = pTemp;		// Advance to the next descriptor before calling PostRecv because PostRecv modifies the Address field		pTemp = (VIP_DESCRIPTOR*)(pTemp->Control.Next.Address);		dwStatus = VipPostRecv(g_hConnectToVi, pTemp2, mhReceive);		if (!AssertSuccess(dwStatus, "can't post receive", pTemp2))			return 0;	}	// Request a connection	dwStatus = VipConnectRequest(g_hConnectToVi, pLocalAddress, pRemoteAddress, VITIMEOUT, &Vi_RemoteAttribs);	if (!AssertSuccess(dwStatus, "connect request failed"))	{		MakeErrMsg(1, "VI Connection request to process %d failed\n", nRemoteRank);		return 0;	}	// Set the user data for this connection to be the rank of the remote process	VipSetUserData(g_hConnectToVi, (VIP_PVOID)nRemoteRank);	// Send my rank and nSendsPerAck	nSendsPerAck = nNumSendDescriptors / 2;	pSendDesc[0]->Control.Control = VIP_CONTROL_OP_SENDRECV | VIP_CONTROL_IMMEDIATE;	pSendDesc[0]->Control.Length = 2 * sizeof(int);	pSendDesc[0]->Control.SegCount = 1;	pSendDesc[0]->Control.Reserved = 0;	pSendDesc[0]->Control.ImmediateData = 0;	pSendDesc[0]->Data[0].Length = 2 * sizeof(int);	pSendDesc[0]->Data[0].Handle = mhSend;	((int*)pSendDesc[0]->Data[0].Data.Address)[0] = g_nIproc;	((int*)pSendDesc[0]->Data[0].Data.Address)[1] = nSendsPerAck;	dwStatus = VipPostSend(g_hConnectToVi, pSendDesc[0], mhSend);	if (!AssertSuccess(dwStatus, "ConnectViTo:VipPostSend failed", pSendDesc[0]))		return 0;	dwStatus = VipSendWait(g_hConnectToVi, VITIMEOUT, &pDesc);	if (!AssertSuccess(dwStatus, "ConnectViTo:VipSendWait failed", pDesc))		return 0;	// Receive ack	if (g_bViSingleThreaded)	{		while (g_nConnectGate == 0)			ViWorkerThread(0);	}	else	{		// Wait for the worker thread to signal that the packet is ready to be taken out of the queue		// by setting the gate to 1		while (g_nConnectGate == 0)			Sleep(0);	}	// Remove the packet	while ( (dwStatus = VipRecvDone(g_hConnectToVi, &pRecvDesc)) == VIP_NOT_DONE)		Sleep(1);	if (!AssertSuccess(dwStatus, "ConnectViTo:VipRecvDone failed", pRecvDesc))	{		MakeErrMsg(1, "Unable to receive connect packet from process %d\n", nRemoteRank);		return 0;	}	if (pRecvDesc->Control.ImmediateData == 0)	{		// Close the VI due to loss in race condition		VIP_DESCRIPTOR *d;		VipDisconnect(g_hConnectToVi);		do 		{			VipRecvDone(g_hConnectToVi, &d);		} while (d != 0);		VipDestroyVi(g_hConnectToVi);		if (pReceiveDescriptorBuffer != NULL)		{			VipDeregisterMem(g_hViNic, pReceiveDescriptorBuffer, mhReceive);			delete pReceiveDescriptorBuffer;		}		if (pSendDescriptorBuffer != NULL)		{			VipDeregisterMem(g_hViNic, pSendDescriptorBuffer, mhSend);			delete pSendDescriptorBuffer;		}				if (pSendDesc)			delete pSendDesc;	}	else	{		nReceivesPerAck = ((int*)pRecvDesc->Data[0].Data.Address)[0];		// Re-post the receive descriptor immediately		pRecvDesc->Control.Control = VIP_CONTROL_OP_SENDRECV;		pRecvDesc->Control.Length = g_viMTU;		pRecvDesc->Control.SegCount = 1;		pRecvDesc->Control.Reserved = 0;		pRecvDesc->Data[0].Length = g_viMTU;		pRecvDesc->Data[0].Handle = mhReceive;		dwStatus = VipPostRecv(g_hConnectToVi, pRecvDesc, mhReceive);		if (!AssertSuccess(dwStatus, "ConnectViTo:VipPostRecv failed", pRecvDesc))			return 0;				// Store VI connection information in the proctable		VI_Info *vinfo = &g_pProcTable[nRemoteRank].vinfo;		memcpy(&vinfo->descriminator, ViDescriminator, nViDescriminator_len);		vinfo->descriminator_len = nViDescriminator_len;		vinfo->hNic = g_hViNic;		vinfo->hVi = g_hConnectToVi;		//vinfo->localbuf;		vinfo->mhReceive = mhReceive;		vinfo->mhSend = mhSend;		vinfo->nCurSendIndex = 0;		vinfo->nNumReceived = 0;		vinfo->nNumRecvDescriptors = nNumRecvDescriptors;		vinfo->nNumSendDescriptors = nNumSendDescriptors;		vinfo->nNumSent = 0;		vinfo->nPostedSends = 0;		vinfo->nReceivesPerAck = nReceivesPerAck;		vinfo->nSendsPerAck = nSendsPerAck;		vinfo->nSendAcked = 0;		vinfo->nSequenceNumberReceive = 0;		vinfo->nSequenceNumberSend = 0;		vinfo->pDesc = pDesc;		//vinfo->pLocalAddress = (VIP_NET_ADDRESS*)vinfo->localbuf;		vinfo->pRecvDesc = pRecvDesc;		vinfo->pRemoteAddress = (VIP_NET_ADDRESS*)vinfo->remotebuf;		vinfo->pSendDesc = pSendDesc;		vinfo->pSendDescriptorBuffer = pSendDescriptorBuffer;		vinfo->pReceiveDescriptorBuffer = pReceiveDescriptorBuffer;		//vinfo->remotebuf;		//vinfo->Vi_LocalAttribs = Vi_LocalAttribs;		vinfo->Vi_RemoteAttribs = Vi_RemoteAttribs;		initlock(&vinfo->lock);		// Setting the data to valid must be last and the compiler or chip must execute this instruction last too.		vinfo->valid = 1;		// Increase the completion queue size every time a new connection is made		g_nNumCQEntries += CQ_ENTRIES_INCREMENT;		dwStatus = VipResizeCQ(g_hViCQ, g_nNumCQEntries);	}	// Reset g_hConnectToVi before setting the worker gate so the worker thread will not accidentally match it again	g_hConnectToVi = NULL;	g_nConnectGate = 0;	if (!g_bViSingleThreaded)		g_nWorkerGate = 1;	return 1;}// Function name	: ViListenThread// Description	    : // Return type		: void void ViListenThread(){	VIP_RETURN			dwStatus;	VIP_VI_ATTRIBUTES	Vi_RemoteAttribs;	VIP_DESCRIPTOR		*pRecvDesc, **pSendDesc, *pDesc;	VIP_MEM_HANDLE		mhSend, mhReceive;	void				*pSendDescriptorBuffer, *pReceiveDescriptorBuffer;	char				localbuf[40], remotebuf[40];	VIP_NET_ADDRESS		*pLocalAddress;	VIP_NET_ADDRESS		*pRemoteAddress;	int					nNumRecvDescriptors=32, nNumSendDescriptors=30;	void				*vp;	int					nRemoteRank;	int					nSendsPerAck, nReceivesPerAck;	while (true)	{		/////////////////////		// Setup a VI		pLocalAddress  = (VIP_NET_ADDRESS*)localbuf;		pRemoteAddress = (VIP_NET_ADDRESS*)remotebuf;		pLocalAddress->HostAddressLen = ADDR_LEN;		vp = pLocalAddress->HostAddress + ADDR_LEN;		pLocalAddress->DiscriminatorLen = g_nViDescriminator_len;		memcpy(vp, g_ViDescriminator, g_nViDescriminator_len);			// Save the descriminator for automatic reconnect		//descriminator = (unsigned char*)vp;		//descriminator_len = descriminator_length;		if (nNumSendDescriptors & 0x1)			nNumSendDescriptors++; // must be even		pSendDesc = new VIP_DESCRIPTOR*[nNumSendDescriptors];		//dwStatus = VipCreateVi(g_hViNic, &default_vi_attribs, g_hViCQ, g_hViCQ, &g_hListenThreadVi);		dwStatus = VipCreateVi(g_hViNic, &default_vi_attribs, NULL, g_hViCQ, &g_hListenThreadVi);		if (!AssertSuccess(dwStatus, "can't create VI"))		{			nt_error("Error", 1);			return;		}		// Reserve memory for descriptors, for sending and receiving data		pRecvDesc = get_descriptors(g_hViNic, nNumRecvDescriptors, g_viMTU, &mhReceive, &pReceiveDescriptorBuffer);		pSendDesc[0] = get_descriptors(g_hViNic, nNumSendDescriptors, g_viMTU, &mhSend, &pSendDescriptorBuffer);		pDesc = (VIP_DESCRIPTOR*)(pSendDesc[0]->Control.Next.Address);		for (int i=1; i<nNumSendDescriptors; i++)		{			pSendDesc[i] = pDesc;			pDesc = (VIP_DESCRIPTOR*)(pDesc->Control.Next.Address);		}		// Post the receives immediately		VIP_DESCRIPTOR *pTemp2, *pTemp = pRecvDesc;		while (pTemp)		{			pTemp2 = pTemp;			// Advance to the next descriptor before calling PostRecv because PostRecv modifies the Address field			pTemp = (VIP_DESCRIPTOR*)(pTemp->Control.Next.Address);			dwStatus = VipPostRecv(g_hListenThreadVi, pTemp2, mhReceive);			if (!AssertSuccess(dwStatus, "ViListenThread:can't post receive", pTemp2))			{				nt_error("Error", 1);				return;			}		}		////////////////////////////		// Wait for a connection		VIP_CONN_HANDLE conn;		dwStatus = VipConnectWait(g_hViNic, pLocalAddress, VIP_INFINITE, pRemoteAddress, &Vi_RemoteAttribs, &conn);		if (!AssertSuccess(dwStatus, "ViListenThread:failed waiting for connection"))		{			if (g_bViClosing)			{				// Clean up local VI structures				// ...				return;			}			nt_error("Error", 1);			return;		}		dwStatus = VipConnectAccept(conn, g_hListenThreadVi);		if (!AssertSuccess(dwStatus, "can't accept connection"))		{			nt_error("Error", 1);			return;		}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -