⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 nt_vi.cpp

📁 MPICH是MPI的重要研究,提供了一系列的接口函数,为并行计算的实现提供了编程环境.
💻 CPP
📖 第 1 页 / 共 5 页
字号:
		// Receive nRemoteRank and nReceivesPerAck		if (g_bViSingleThreaded)		{			while (g_nListenGate == 0)				ViWorkerThread(0);		}		else		{			while (g_nListenGate == 0)				Sleep(0);		}		while ( (dwStatus = VipRecvDone(g_hListenThreadVi, &pRecvDesc)) == VIP_NOT_DONE)			Sleep(1);		if (!AssertSuccess(dwStatus, "ViListenThread:VipRecvDone failed", pRecvDesc))		{			nt_error("Error", 1);			return;		}		nRemoteRank = ((int*)pRecvDesc->Data[0].Data.Address)[0];		nReceivesPerAck = ((int*)pRecvDesc->Data[0].Data.Address)[1];		if (nRemoteRank < 0 || nRemoteRank >= g_nNproc)			MakeErrMsg(1, "Invalid rank received on new VI: %d", nRemoteRank);		// Re-post the receive descriptor		pRecvDesc->Control.Control = VIP_CONTROL_OP_SENDRECV;		pRecvDesc->Control.Length = g_viMTU;		pRecvDesc->Control.SegCount = 1;		pRecvDesc->Control.Reserved = 0;		pRecvDesc->Data[0].Length = g_viMTU;		pRecvDesc->Data[0].Handle = mhReceive;		dwStatus = VipPostRecv(g_hListenThreadVi, pRecvDesc, mhReceive);		if (!AssertSuccess(dwStatus, "ViListenThread:VipPostRecv failed", pRecvDesc))		{			nt_error("Error", 1);			return;		}		// Set the user data for this connection to be the rank of the remote process		VipSetUserData(g_hListenThreadVi, (VIP_PVOID)nRemoteRank);		// Insert VI into proc table		bool bSetupConnection;#ifdef USE_VC6_HEADERS		if (InterlockedCompareExchange((void**)&g_pProcTable[nRemoteRank].vinfo.hVi, (void*)g_hListenThreadVi, (void*)NULL) == NULL)#else		if (InterlockedCompareExchange((LONG volatile *)&g_pProcTable[nRemoteRank].vinfo.hVi, (LONG)g_hListenThreadVi, (LONG)NULL) == NULL)#endif			bSetupConnection = true;		else		{			// Two connections have been made simultaneously			// One must be left up and the other must be disconnected			if (nRemoteRank > g_nIproc)			{				// If the remote rank is higer, reject the new connection and keep the existing				bSetupConnection = false;				// Send ack=0				pSendDesc[0]->Control.Control = VIP_CONTROL_OP_SENDRECV | VIP_CONTROL_IMMEDIATE;				pSendDesc[0]->Control.Length = 0;				pSendDesc[0]->Control.SegCount = 0;				pSendDesc[0]->Control.Reserved = 0;				pSendDesc[0]->Control.ImmediateData = 0; // Ack stored in immediate data				dwStatus = VipPostSend(g_hListenThreadVi, pSendDesc[0], mhSend);				if (!AssertSuccess(dwStatus, "ViListenThread:VipPostSend failed", pSendDesc[0]))				{					nt_error("Error", 1);					return;				}				dwStatus = VipSendWait(g_hListenThreadVi, VITIMEOUT, &pDesc);				if (!AssertSuccess(dwStatus, "ViListenThread:VipSendWait failed", pDesc))				{					nt_error("Error", 1);					return;				}			}			else			{				// If the remote rank is lower, destroy the existing connection and accept the new				CloseVi(&g_pProcTable[nRemoteRank].vinfo);				bSetupConnection = true;			}		}		if (bSetupConnection)		{			nSendsPerAck = nNumSendDescriptors / 2;			// Send ack=1			pSendDesc[0]->Control.Control = VIP_CONTROL_OP_SENDRECV | VIP_CONTROL_IMMEDIATE;			pSendDesc[0]->Control.Length = sizeof(int);			pSendDesc[0]->Control.SegCount = 1;			pSendDesc[0]->Control.Reserved = 0;			pSendDesc[0]->Control.ImmediateData = 1; // Ack stored in immediate data			pSendDesc[0]->Data[0].Length = sizeof(int);			pSendDesc[0]->Data[0].Handle = mhSend;			((int*)pSendDesc[0]->Data[0].Data.Address)[0] = nSendsPerAck;			dwStatus = VipPostSend(g_hListenThreadVi, pSendDesc[0], mhSend);			if (!AssertSuccess(dwStatus, "ViListenThread:VipPostSend failed", pSendDesc[0]))			{				nt_error("Error", 1);				return;			}			dwStatus = VipSendWait(g_hListenThreadVi, VITIMEOUT, &pDesc);			if (!AssertSuccess(dwStatus, "ViListenThread:VipSendWait failed", pDesc))			{				nt_error("Error", 1);				return;			}			// Store VI connection information in the proctable			VI_Info *vinfo = &g_pProcTable[nRemoteRank].vinfo;			memcpy(&vinfo->descriminator, g_ViDescriminator, g_nViDescriminator_len);			vinfo->descriminator_len = g_nViDescriminator_len;			vinfo->hNic = g_hViNic;			vinfo->hVi = g_hListenThreadVi;			//vinfo->localbuf;			vinfo->mhReceive = mhReceive;			vinfo->mhSend = mhSend;			vinfo->nCurSendIndex = 0;			vinfo->nNumReceived = 0;			vinfo->nNumRecvDescriptors = nNumRecvDescriptors;			vinfo->nNumSendDescriptors = nNumSendDescriptors;			vinfo->nNumSent = 0;			vinfo->nPostedSends = 0;			vinfo->nReceivesPerAck = nReceivesPerAck;			vinfo->nSendsPerAck = nSendsPerAck;			vinfo->nSendAcked = 0;			vinfo->nSequenceNumberReceive = 0;			vinfo->nSequenceNumberSend = 0;			//vinfo->pDataBuffer = pDataBuffer;			vinfo->pDesc = pDesc;			//vinfo->pLocalAddress = (VIP_NET_ADDRESS*)vinfo->localbuf;			vinfo->pRecvDesc = pRecvDesc;			vinfo->pRemoteAddress = (VIP_NET_ADDRESS*)vinfo->remotebuf;			vinfo->pSendDesc = pSendDesc;			vinfo->pSendDescriptorBuffer = pSendDescriptorBuffer;			vinfo->pReceiveDescriptorBuffer = pReceiveDescriptorBuffer;			//vinfo->remotebuf;			//vinfo->Vi_LocalAttribs = Vi_LocalAttribs;			vinfo->Vi_RemoteAttribs = Vi_RemoteAttribs;			initlock(&vinfo->lock);			// Setting the data to valid must be last			vinfo->valid = 1;			// Increase the completion queue size every time a new connection is made			g_nNumCQEntries += CQ_ENTRIES_INCREMENT;			dwStatus = VipResizeCQ(g_hViCQ, g_nNumCQEntries);		}		else		{			//printf("VI already in proctable %d\n", nRemoteRank);fflush(stdout);		}		g_hListenThreadVi = NULL;		g_nListenGate = 0;		if (!g_bViSingleThreaded)			g_nWorkerGate = 1;	}}// Function name	: HashViPointer// Description	    : // Return type		: int // Argument         : VIP_VI_HANDLE pint HashViPointer(VIP_VI_HANDLE p){	int index;	if (p == NULL)		nt_error("Hashing NULL VI handle", 1);		if (VipGetUserData != NULL)	{		index = (int)VipGetUserData(p);		if (g_pProcTable[index].vinfo.hVi == p)			return index;	}	else	{		// For now, just search for the handle		for (int i=0; i<g_nNproc; i++)		{			if ( g_pProcTable[i].via && (g_pProcTable[i].vinfo.hVi == p) )				return i;		}	}	if (p == g_hListenThreadVi)	{		g_nListenGate = 1;		if (g_bViSingleThreaded)			return -1;		while (g_nWorkerGate == 0)			Sleep(0);		g_nWorkerGate = 0;	}	else	if (p == g_hConnectToVi)	{		g_nConnectGate = 1;		if (g_bViSingleThreaded)			return -1;		while (g_nWorkerGate == 0)			Sleep(0);		g_nWorkerGate = 0;	}	else		MakeErrMsg(1, "HashViPointer: VI_HANDLE(%x) not found in g_pProcTable", p);	return -1;}// Function name	: ViWorkerThread// Description	    : // Return type		: void int ViWorkerThread(int bRepeating){	VIP_VI_HANDLE hVi;	VIP_BOOLEAN bRecvQ;	VIP_RETURN dwStatus;	int index;	VI_Info *vinfo;	NT_Message *message;	do	{		if (!bRepeating)		{			// Poll once and return if no packet is available			if ((dwStatus = VipCQDone(g_hViCQ, &hVi, &bRecvQ)) == VIP_NOT_DONE)				return 0;			if (!AssertSuccess(dwStatus, "ViWorkerThread:VipCQDone failed"))			{				if (g_bViClosing)					return 0;				nt_error("Error", 1);				return 0;			}		}		else		{			// Wait for a packet by either polling or a wait function			if (g_bViUsePolling)			{				while ((dwStatus = VipCQDone(g_hViCQ, &hVi, &bRecvQ)) == VIP_NOT_DONE)					Sleep(0);				if (!AssertSuccess(dwStatus, "ViWorkerThread:VipCQDone failed"))				{					if (g_bViClosing)						return 0;					nt_error("Error", 1);					return 0;				}			}			else			{				dwStatus = VipCQWait(g_hViCQ, VIP_INFINITE, &hVi, &bRecvQ);				if (!AssertSuccess(dwStatus, "ViWorkerThread:VipCQWait failed"))				{					if (g_bViClosing)						return 0;					nt_error("Error", 1);					return 0;				}			}		}		index = HashViPointer(hVi);		if (index == -1)		{			//printf("HashViPointer returned -1\n");fflush(stdout);			continue;		}		vinfo = &g_pProcTable[index].vinfo;		if (bRecvQ)		{			// Packet ready in the receive queue			while ( (dwStatus = VipRecvDone(vinfo->hVi, &vinfo->pRecvDesc)) == VIP_NOT_DONE )				Sleep(0);			if (!AssertSuccess(dwStatus, "ViWorkerThread:VipRecvDone failed", vinfo->pRecvDesc))			{				if (g_bViClosing)					return 0;				nt_error("Error", 1);				return 0;			}			// Zero length messages are assumed to be ack packets.			// In the future, I will probably check the immediate data to determine the packet type			if (vinfo->pRecvDesc->Control.Length == 0)			{				// Ack packet received				InterlockedIncrement(&vinfo->nSendAcked);				vinfo->nSequenceNumberReceive = vinfo->pRecvDesc->Control.ImmediateData;			}			else			{				// Data packet received				int datalen;				message = &g_pProcTable[index].msg;				if (message->state == NT_MSG_READING_TAG)				{					// This is the first packet in a message.					// Peel off the tag, length, and as much of the data as is available					message->tag = ((int*)(vinfo->pRecvDesc->Data[0].Data.Address))[0];					message->length = ((int*)(vinfo->pRecvDesc->Data[0].Data.Address))[1];					message->buffer = g_MsgQueue.GetBufferToFill(message->tag, message->length, index, &message->pElement);					datalen = vinfo->pRecvDesc->Control.Length - (2 * sizeof(int));					if (datalen > 0)					{						memcpy(							message->buffer, 							&((int*)(vinfo->pRecvDesc->Data[0].Data.Address))[2], 							datalen);						message->nRemaining = message->length - datalen;					}					if (message->nRemaining)						message->state = NT_MSG_READING_BUFFER;					else					{						message->state = NT_MSG_READING_TAG;						g_MsgQueue.SetElementEvent(message->pElement);					}				}				else				{					// This is next packet containing only data for the current message					datalen = vinfo->pRecvDesc->Control.Length;					memcpy(						&(((char*)message->buffer)[message->length - message->nRemaining]),						vinfo->pRecvDesc->Data[0].Data.Address,						datalen);					message->nRemaining -= datalen;					if (message->nRemaining == 0)					{						message->state = NT_MSG_READING_TAG;						g_MsgQueue.SetElementEvent(message->pElement);					}				}			}			// Re-post the receive			vinfo->pRecvDesc->Control.Control = VIP_CONTROL_OP_SENDRECV;			vinfo->pRecvDesc->Control.Length = g_viMTU;			vinfo->pRecvDesc->Control.SegCount = 1;			vinfo->pRecvDesc->Control.Reserved = 0;			vinfo->pRecvDesc->Data[0].Length = g_viMTU;			vinfo->pRecvDesc->Data[0].Handle = vinfo->mhReceive;			dwStatus = VipPostRecv(vinfo->hVi, vinfo->pRecvDesc, vinfo->mhReceive);			if (!AssertSuccess(dwStatus, "ViWorkerThread:VipPostRecv failed", vinfo->pRecvDesc))			{				nt_error("Error", 1);				return 0;			}						// Send ack if necessary			vinfo->nNumReceived++;			if (vinfo->nNumReceived % vinfo->nReceivesPerAck == 0)				ViSendAck(vinfo);		}		else		{			// Packet ready in the send queue			printf("There shouldn't be any send completion messages\n");fflush(stdout);		}	}while (bRepeating);	return 1;}// Function name	: PollViQueue// Description	    : // Return type		: void void PollViQueue(){	//*	if (!ViWorkerThread(0))		Sleep(0);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -